64 import sun.nio.ch.IOUtil;
65 import sun.nio.ch.Net;
66 import sun.nio.ch.SelChImpl;
67 import sun.nio.ch.SelectionKeyImpl;
68 import sun.nio.ch.Util;
69 import static com.sun.nio.sctp.SctpStandardSocketOptions.*;
70 import static sun.nio.ch.sctp.ResultContainer.*;
71
72 /**
73 * An implementation of SctpMultiChannel
74 */
75 public class SctpMultiChannelImpl extends SctpMultiChannel
76 implements SelChImpl
77 {
78 private static final JavaNioAccess NIO_ACCESS = SharedSecrets.getJavaNioAccess();
79
80 private final FileDescriptor fd;
81
82 private final int fdVal;
83
84 /* IDs of native threads doing send and receives, for signalling */
85 private volatile long receiverThread;
86 private volatile long senderThread;
87
88 /* Lock held by current receiving thread */
89 private final Object receiveLock = new Object();
90
91 /* Lock held by current sending thread */
92 private final Object sendLock = new Object();
93
94 /* Lock held by any thread that modifies the state fields declared below
95 * DO NOT invoke a blocking I/O operation while holding this lock! */
96 private final Object stateLock = new Object();
97
98 private enum ChannelState {
99 UNINITIALIZED,
100 KILLPENDING,
101 KILLED,
102 }
103
104 /* -- The following fields are protected by stateLock -- */
105 private ChannelState state = ChannelState.UNINITIALIZED;
106
248
249 return Collections.unmodifiableSet(associationMap.keySet());
250 }
251 }
252
253 private boolean isBound() {
254 synchronized (stateLock) {
255 return port != -1;
256 }
257 }
258
259 private void ensureOpen() throws IOException {
260 synchronized (stateLock) {
261 if (!isOpen())
262 throw new ClosedChannelException();
263 }
264 }
265
266 private void receiverCleanup() throws IOException {
267 synchronized (stateLock) {
268 receiverThread = 0;
269 if (state == ChannelState.KILLPENDING)
270 kill();
271 }
272 }
273
274 private void senderCleanup() throws IOException {
275 synchronized (stateLock) {
276 senderThread = 0;
277 if (state == ChannelState.KILLPENDING)
278 kill();
279 }
280 }
281
282 @Override
283 protected void implConfigureBlocking(boolean block) throws IOException {
284 IOUtil.configureBlocking(fd, block);
285 }
286
287 @Override
288 public void implCloseSelectableChannel() throws IOException {
289 synchronized (stateLock) {
290 if (state != ChannelState.KILLED)
291 SctpNet.preClose(fdVal);
292
293 if (receiverThread != 0)
294 NativeThread.signal(receiverThread);
295
296 if (senderThread != 0)
297 NativeThread.signal(senderThread);
298
299 if (!isRegistered())
300 kill();
301 }
302 }
303
304 @Override
305 public FileDescriptor getFD() {
306 return fd;
307 }
308
309 @Override
310 public int getFDVal() {
311 return fdVal;
312 }
313
314 /**
315 * Translates native poll revent ops into a ready operation ops
316 */
361 if ((ops & SelectionKey.OP_READ) != 0)
362 newOps |= Net.POLLIN;
363 if ((ops & SelectionKey.OP_WRITE) != 0)
364 newOps |= Net.POLLOUT;
365 return newOps;
366 }
367
368 @Override
369 public void kill() throws IOException {
370 synchronized (stateLock) {
371 if (state == ChannelState.KILLED)
372 return;
373 if (state == ChannelState.UNINITIALIZED) {
374 state = ChannelState.KILLED;
375 SctpNet.close(fdVal);
376 return;
377 }
378 assert !isOpen() && !isRegistered();
379
380 /* Postpone the kill if there is a thread sending or receiving. */
381 if (receiverThread == 0 && senderThread == 0) {
382 state = ChannelState.KILLED;
383 SctpNet.close(fdVal);
384 } else {
385 state = ChannelState.KILLPENDING;
386 }
387 }
388 }
389
390 @Override
391 public <T> SctpMultiChannel setOption(SctpSocketOption<T> name,
392 T value,
393 Association association)
394 throws IOException {
395 if (name == null)
396 throw new NullPointerException();
397 if (!(supportedOptions().contains(name)))
398 throw new UnsupportedOperationException("'" + name + "' not supported");
399
400 synchronized (stateLock) {
401 if (association != null && (name.equals(SCTP_PRIMARY_ADDR) ||
467 throw new IllegalReceiveException(
468 "cannot invoke receive from handler");
469 receiveInvoked.set(Boolean.TRUE);
470
471 try {
472 ResultContainer resultContainer = new ResultContainer();
473 do {
474 resultContainer.clear();
475 synchronized (receiveLock) {
476 ensureOpen();
477 if (!isBound())
478 throw new NotYetBoundException();
479
480 int n = 0;
481 try {
482 begin();
483
484 synchronized (stateLock) {
485 if(!isOpen())
486 return null;
487 receiverThread = NativeThread.current();
488 }
489
490 do {
491 n = receive(fdVal, buffer, resultContainer);
492 } while ((n == IOStatus.INTERRUPTED) && isOpen());
493
494 } finally {
495 receiverCleanup();
496 end((n > 0) || (n == IOStatus.UNAVAILABLE));
497 assert IOStatus.check(n);
498 }
499
500 if (!resultContainer.isNotification()) {
501 /* message or nothing */
502 if (resultContainer.hasSomething()) {
503 /* Set the association before returning */
504 MessageInfoImpl info =
505 resultContainer.getMessageInfo();
506 info.setAssociation(lookupAssociation(info.
507 associationID()));
748 throw new IllegalArgumentException("buffer cannot be null");
749
750 if (messageInfo == null)
751 throw new IllegalArgumentException("messageInfo cannot be null");
752
753 synchronized (sendLock) {
754 ensureOpen();
755
756 if (!isBound())
757 bind(null, 0);
758
759 int n = 0;
760 try {
761 int assocId = -1;
762 SocketAddress address = null;
763 begin();
764
765 synchronized (stateLock) {
766 if(!isOpen())
767 return 0;
768 senderThread = NativeThread.current();
769
770 /* Determine what address or association to send to */
771 Association assoc = messageInfo.association();
772 InetSocketAddress addr = (InetSocketAddress)messageInfo.address();
773 if (assoc != null) {
774 checkAssociation(assoc);
775 checkStreamNumber(assoc, messageInfo.streamNumber());
776 assocId = assoc.associationID();
777 /* have we also got a preferred address */
778 if (addr != null) {
779 if (!assoc.equals(addressMap.get(addr)))
780 throw new IllegalArgumentException("given preferred address is not part of this association");
781 address = addr;
782 }
783 } else if (addr != null) {
784 address = addr;
785 Association association = addressMap.get(addr);
786 if (association != null) {
787 checkStreamNumber(association, messageInfo.streamNumber());
788 assocId = association.associationID();
|
64 import sun.nio.ch.IOUtil;
65 import sun.nio.ch.Net;
66 import sun.nio.ch.SelChImpl;
67 import sun.nio.ch.SelectionKeyImpl;
68 import sun.nio.ch.Util;
69 import static com.sun.nio.sctp.SctpStandardSocketOptions.*;
70 import static sun.nio.ch.sctp.ResultContainer.*;
71
72 /**
73 * An implementation of SctpMultiChannel
74 */
75 public class SctpMultiChannelImpl extends SctpMultiChannel
76 implements SelChImpl
77 {
78 private static final JavaNioAccess NIO_ACCESS = SharedSecrets.getJavaNioAccess();
79
80 private final FileDescriptor fd;
81
82 private final int fdVal;
83
84 /* Threads doing send and receives, for signalling */
85 private volatile Thread receiverThread;
86 private volatile Thread senderThread;
87
88 /* Lock held by current receiving thread */
89 private final Object receiveLock = new Object();
90
91 /* Lock held by current sending thread */
92 private final Object sendLock = new Object();
93
94 /* Lock held by any thread that modifies the state fields declared below
95 * DO NOT invoke a blocking I/O operation while holding this lock! */
96 private final Object stateLock = new Object();
97
98 private enum ChannelState {
99 UNINITIALIZED,
100 KILLPENDING,
101 KILLED,
102 }
103
104 /* -- The following fields are protected by stateLock -- */
105 private ChannelState state = ChannelState.UNINITIALIZED;
106
248
249 return Collections.unmodifiableSet(associationMap.keySet());
250 }
251 }
252
253 private boolean isBound() {
254 synchronized (stateLock) {
255 return port != -1;
256 }
257 }
258
259 private void ensureOpen() throws IOException {
260 synchronized (stateLock) {
261 if (!isOpen())
262 throw new ClosedChannelException();
263 }
264 }
265
266 private void receiverCleanup() throws IOException {
267 synchronized (stateLock) {
268 receiverThread = null;
269 if (state == ChannelState.KILLPENDING)
270 kill();
271 }
272 }
273
274 private void senderCleanup() throws IOException {
275 synchronized (stateLock) {
276 senderThread = null;
277 if (state == ChannelState.KILLPENDING)
278 kill();
279 }
280 }
281
282 @Override
283 protected void implConfigureBlocking(boolean block) throws IOException {
284 IOUtil.configureBlocking(fd, block);
285 }
286
287 @Override
288 public void implCloseSelectableChannel() throws IOException {
289 synchronized (stateLock) {
290 if (state != ChannelState.KILLED)
291 SctpNet.preClose(fdVal);
292
293 if (receiverThread != null)
294 NativeThread.signal(receiverThread);
295
296 if (senderThread != null)
297 NativeThread.signal(senderThread);
298
299 if (!isRegistered())
300 kill();
301 }
302 }
303
304 @Override
305 public FileDescriptor getFD() {
306 return fd;
307 }
308
309 @Override
310 public int getFDVal() {
311 return fdVal;
312 }
313
314 /**
315 * Translates native poll revent ops into a ready operation ops
316 */
361 if ((ops & SelectionKey.OP_READ) != 0)
362 newOps |= Net.POLLIN;
363 if ((ops & SelectionKey.OP_WRITE) != 0)
364 newOps |= Net.POLLOUT;
365 return newOps;
366 }
367
368 @Override
369 public void kill() throws IOException {
370 synchronized (stateLock) {
371 if (state == ChannelState.KILLED)
372 return;
373 if (state == ChannelState.UNINITIALIZED) {
374 state = ChannelState.KILLED;
375 SctpNet.close(fdVal);
376 return;
377 }
378 assert !isOpen() && !isRegistered();
379
380 /* Postpone the kill if there is a thread sending or receiving. */
381 if (receiverThread == null && senderThread == null) {
382 state = ChannelState.KILLED;
383 SctpNet.close(fdVal);
384 } else {
385 state = ChannelState.KILLPENDING;
386 }
387 }
388 }
389
390 @Override
391 public <T> SctpMultiChannel setOption(SctpSocketOption<T> name,
392 T value,
393 Association association)
394 throws IOException {
395 if (name == null)
396 throw new NullPointerException();
397 if (!(supportedOptions().contains(name)))
398 throw new UnsupportedOperationException("'" + name + "' not supported");
399
400 synchronized (stateLock) {
401 if (association != null && (name.equals(SCTP_PRIMARY_ADDR) ||
467 throw new IllegalReceiveException(
468 "cannot invoke receive from handler");
469 receiveInvoked.set(Boolean.TRUE);
470
471 try {
472 ResultContainer resultContainer = new ResultContainer();
473 do {
474 resultContainer.clear();
475 synchronized (receiveLock) {
476 ensureOpen();
477 if (!isBound())
478 throw new NotYetBoundException();
479
480 int n = 0;
481 try {
482 begin();
483
484 synchronized (stateLock) {
485 if(!isOpen())
486 return null;
487 receiverThread = NativeThread.threadToSignal();
488 }
489
490 do {
491 n = receive(fdVal, buffer, resultContainer);
492 } while ((n == IOStatus.INTERRUPTED) && isOpen());
493
494 } finally {
495 receiverCleanup();
496 end((n > 0) || (n == IOStatus.UNAVAILABLE));
497 assert IOStatus.check(n);
498 }
499
500 if (!resultContainer.isNotification()) {
501 /* message or nothing */
502 if (resultContainer.hasSomething()) {
503 /* Set the association before returning */
504 MessageInfoImpl info =
505 resultContainer.getMessageInfo();
506 info.setAssociation(lookupAssociation(info.
507 associationID()));
748 throw new IllegalArgumentException("buffer cannot be null");
749
750 if (messageInfo == null)
751 throw new IllegalArgumentException("messageInfo cannot be null");
752
753 synchronized (sendLock) {
754 ensureOpen();
755
756 if (!isBound())
757 bind(null, 0);
758
759 int n = 0;
760 try {
761 int assocId = -1;
762 SocketAddress address = null;
763 begin();
764
765 synchronized (stateLock) {
766 if(!isOpen())
767 return 0;
768 senderThread = NativeThread.threadToSignal();
769
770 /* Determine what address or association to send to */
771 Association assoc = messageInfo.association();
772 InetSocketAddress addr = (InetSocketAddress)messageInfo.address();
773 if (assoc != null) {
774 checkAssociation(assoc);
775 checkStreamNumber(assoc, messageInfo.streamNumber());
776 assocId = assoc.associationID();
777 /* have we also got a preferred address */
778 if (addr != null) {
779 if (!assoc.equals(addressMap.get(addr)))
780 throw new IllegalArgumentException("given preferred address is not part of this association");
781 address = addr;
782 }
783 } else if (addr != null) {
784 address = addr;
785 Association association = addressMap.get(addr);
786 if (association != null) {
787 checkStreamNumber(association, messageInfo.streamNumber());
788 assocId = association.associationID();
|