< prev index next >

src/jdk.sctp/unix/classes/sun/nio/ch/sctp/SctpMultiChannelImpl.java

Print this page

 64 import sun.nio.ch.IOUtil;
 65 import sun.nio.ch.Net;
 66 import sun.nio.ch.SelChImpl;
 67 import sun.nio.ch.SelectionKeyImpl;
 68 import sun.nio.ch.Util;
 69 import static com.sun.nio.sctp.SctpStandardSocketOptions.*;
 70 import static sun.nio.ch.sctp.ResultContainer.*;
 71 
 72 /**
 73  * An implementation of SctpMultiChannel
 74  */
 75 public class SctpMultiChannelImpl extends SctpMultiChannel
 76     implements SelChImpl
 77 {
 78     private static final JavaNioAccess NIO_ACCESS = SharedSecrets.getJavaNioAccess();
 79 
 80     private final FileDescriptor fd;
 81 
 82     private final int fdVal;
 83 
 84     /* IDs of native threads doing send and receives, for signalling */
 85     private volatile long receiverThread;
 86     private volatile long senderThread;
 87 
 88     /* Lock held by current receiving thread */
 89     private final Object receiveLock = new Object();
 90 
 91     /* Lock held by current sending thread */
 92     private final Object sendLock = new Object();
 93 
 94     /* Lock held by any thread that modifies the state fields declared below
 95      * DO NOT invoke a blocking I/O operation while holding this lock! */
 96     private final Object stateLock = new Object();
 97 
 98     private enum ChannelState {
 99         UNINITIALIZED,
100         KILLPENDING,
101         KILLED,
102     }
103 
104     /* -- The following fields are protected by stateLock -- */
105     private ChannelState state = ChannelState.UNINITIALIZED;
106 

248 
249             return Collections.unmodifiableSet(associationMap.keySet());
250         }
251     }
252 
253     private boolean isBound() {
254         synchronized (stateLock) {
255             return port != -1;
256         }
257     }
258 
259     private void ensureOpen() throws IOException {
260         synchronized (stateLock) {
261             if (!isOpen())
262                 throw new ClosedChannelException();
263         }
264     }
265 
266     private void receiverCleanup() throws IOException {
267         synchronized (stateLock) {
268             receiverThread = 0;
269             if (state == ChannelState.KILLPENDING)
270                 kill();
271         }
272     }
273 
274     private void senderCleanup() throws IOException {
275         synchronized (stateLock) {
276             senderThread = 0;
277             if (state == ChannelState.KILLPENDING)
278                 kill();
279         }
280     }
281 
282     @Override
283     protected void implConfigureBlocking(boolean block) throws IOException {
284         IOUtil.configureBlocking(fd, block);
285     }
286 
287     @Override
288     public void implCloseSelectableChannel() throws IOException {
289         synchronized (stateLock) {
290             if (state != ChannelState.KILLED)
291                 SctpNet.preClose(fdVal);
292 
293             if (receiverThread != 0)
294                 NativeThread.signal(receiverThread);
295 
296             if (senderThread != 0)
297                 NativeThread.signal(senderThread);
298 
299             if (!isRegistered())
300                 kill();
301         }
302     }
303 
304     @Override
305     public FileDescriptor getFD() {
306         return fd;
307     }
308 
309     @Override
310     public int getFDVal() {
311         return fdVal;
312     }
313 
314     /**
315      * Translates native poll revent ops into a ready operation ops
316      */

361         if ((ops & SelectionKey.OP_READ) != 0)
362             newOps |= Net.POLLIN;
363         if ((ops & SelectionKey.OP_WRITE) != 0)
364             newOps |= Net.POLLOUT;
365         return newOps;
366     }
367 
368     @Override
369     public void kill() throws IOException {
370         synchronized (stateLock) {
371             if (state == ChannelState.KILLED)
372                 return;
373             if (state == ChannelState.UNINITIALIZED) {
374                 state = ChannelState.KILLED;
375                 SctpNet.close(fdVal);
376                 return;
377             }
378             assert !isOpen() && !isRegistered();
379 
380             /* Postpone the kill if there is a thread sending or receiving. */
381             if (receiverThread == 0 && senderThread == 0) {
382                 state = ChannelState.KILLED;
383                 SctpNet.close(fdVal);
384             } else {
385                 state = ChannelState.KILLPENDING;
386             }
387         }
388     }
389 
390     @Override
391     public <T> SctpMultiChannel setOption(SctpSocketOption<T> name,
392                                           T value,
393                                           Association association)
394             throws IOException {
395         if (name == null)
396             throw new NullPointerException();
397         if (!(supportedOptions().contains(name)))
398             throw new UnsupportedOperationException("'" + name + "' not supported");
399 
400         synchronized (stateLock) {
401             if (association != null && (name.equals(SCTP_PRIMARY_ADDR) ||

467             throw new IllegalReceiveException(
468                     "cannot invoke receive from handler");
469         receiveInvoked.set(Boolean.TRUE);
470 
471         try {
472             ResultContainer resultContainer = new ResultContainer();
473             do {
474                 resultContainer.clear();
475                 synchronized (receiveLock) {
476                     ensureOpen();
477                     if (!isBound())
478                         throw new NotYetBoundException();
479 
480                     int n = 0;
481                     try {
482                         begin();
483 
484                         synchronized (stateLock) {
485                             if(!isOpen())
486                                 return null;
487                             receiverThread = NativeThread.current();
488                         }
489 
490                         do {
491                             n = receive(fdVal, buffer, resultContainer);
492                         } while ((n == IOStatus.INTERRUPTED) && isOpen());
493 
494                     } finally {
495                         receiverCleanup();
496                         end((n > 0) || (n == IOStatus.UNAVAILABLE));
497                         assert IOStatus.check(n);
498                     }
499 
500                     if (!resultContainer.isNotification()) {
501                         /* message or nothing */
502                         if (resultContainer.hasSomething()) {
503                             /* Set the association before returning */
504                             MessageInfoImpl info =
505                                     resultContainer.getMessageInfo();
506                             info.setAssociation(lookupAssociation(info.
507                                     associationID()));

748             throw new IllegalArgumentException("buffer cannot be null");
749 
750         if (messageInfo == null)
751             throw new IllegalArgumentException("messageInfo cannot be null");
752 
753         synchronized (sendLock) {
754             ensureOpen();
755 
756             if (!isBound())
757                 bind(null, 0);
758 
759             int n = 0;
760             try {
761                 int assocId = -1;
762                 SocketAddress address = null;
763                 begin();
764 
765                 synchronized (stateLock) {
766                     if(!isOpen())
767                         return 0;
768                     senderThread = NativeThread.current();
769 
770                     /* Determine what address or association to send to */
771                     Association assoc = messageInfo.association();
772                     InetSocketAddress addr = (InetSocketAddress)messageInfo.address();
773                     if (assoc != null) {
774                         checkAssociation(assoc);
775                         checkStreamNumber(assoc, messageInfo.streamNumber());
776                         assocId = assoc.associationID();
777                         /* have we also got a preferred address */
778                         if (addr != null) {
779                             if (!assoc.equals(addressMap.get(addr)))
780                                 throw new IllegalArgumentException("given preferred address is not part of this association");
781                             address = addr;
782                         }
783                     } else if (addr != null) {
784                         address = addr;
785                         Association association = addressMap.get(addr);
786                         if (association != null) {
787                             checkStreamNumber(association, messageInfo.streamNumber());
788                             assocId = association.associationID();

 64 import sun.nio.ch.IOUtil;
 65 import sun.nio.ch.Net;
 66 import sun.nio.ch.SelChImpl;
 67 import sun.nio.ch.SelectionKeyImpl;
 68 import sun.nio.ch.Util;
 69 import static com.sun.nio.sctp.SctpStandardSocketOptions.*;
 70 import static sun.nio.ch.sctp.ResultContainer.*;
 71 
 72 /**
 73  * An implementation of SctpMultiChannel
 74  */
 75 public class SctpMultiChannelImpl extends SctpMultiChannel
 76     implements SelChImpl
 77 {
 78     private static final JavaNioAccess NIO_ACCESS = SharedSecrets.getJavaNioAccess();
 79 
 80     private final FileDescriptor fd;
 81 
 82     private final int fdVal;
 83 
 84     /* Threads doing send and receives, for signalling */
 85     private volatile Thread receiverThread;
 86     private volatile Thread senderThread;
 87 
 88     /* Lock held by current receiving thread */
 89     private final Object receiveLock = new Object();
 90 
 91     /* Lock held by current sending thread */
 92     private final Object sendLock = new Object();
 93 
 94     /* Lock held by any thread that modifies the state fields declared below
 95      * DO NOT invoke a blocking I/O operation while holding this lock! */
 96     private final Object stateLock = new Object();
 97 
 98     private enum ChannelState {
 99         UNINITIALIZED,
100         KILLPENDING,
101         KILLED,
102     }
103 
104     /* -- The following fields are protected by stateLock -- */
105     private ChannelState state = ChannelState.UNINITIALIZED;
106 

248 
249             return Collections.unmodifiableSet(associationMap.keySet());
250         }
251     }
252 
253     private boolean isBound() {
254         synchronized (stateLock) {
255             return port != -1;
256         }
257     }
258 
259     private void ensureOpen() throws IOException {
260         synchronized (stateLock) {
261             if (!isOpen())
262                 throw new ClosedChannelException();
263         }
264     }
265 
266     private void receiverCleanup() throws IOException {
267         synchronized (stateLock) {
268             receiverThread = null;
269             if (state == ChannelState.KILLPENDING)
270                 kill();
271         }
272     }
273 
274     private void senderCleanup() throws IOException {
275         synchronized (stateLock) {
276             senderThread = null;
277             if (state == ChannelState.KILLPENDING)
278                 kill();
279         }
280     }
281 
282     @Override
283     protected void implConfigureBlocking(boolean block) throws IOException {
284         IOUtil.configureBlocking(fd, block);
285     }
286 
287     @Override
288     public void implCloseSelectableChannel() throws IOException {
289         synchronized (stateLock) {
290             if (state != ChannelState.KILLED)
291                 SctpNet.preClose(fdVal);
292 
293             if (receiverThread != null)
294                 NativeThread.signal(receiverThread);
295 
296             if (senderThread != null)
297                 NativeThread.signal(senderThread);
298 
299             if (!isRegistered())
300                 kill();
301         }
302     }
303 
304     @Override
305     public FileDescriptor getFD() {
306         return fd;
307     }
308 
309     @Override
310     public int getFDVal() {
311         return fdVal;
312     }
313 
314     /**
315      * Translates native poll revent ops into a ready operation ops
316      */

361         if ((ops & SelectionKey.OP_READ) != 0)
362             newOps |= Net.POLLIN;
363         if ((ops & SelectionKey.OP_WRITE) != 0)
364             newOps |= Net.POLLOUT;
365         return newOps;
366     }
367 
368     @Override
369     public void kill() throws IOException {
370         synchronized (stateLock) {
371             if (state == ChannelState.KILLED)
372                 return;
373             if (state == ChannelState.UNINITIALIZED) {
374                 state = ChannelState.KILLED;
375                 SctpNet.close(fdVal);
376                 return;
377             }
378             assert !isOpen() && !isRegistered();
379 
380             /* Postpone the kill if there is a thread sending or receiving. */
381             if (receiverThread == null && senderThread == null) {
382                 state = ChannelState.KILLED;
383                 SctpNet.close(fdVal);
384             } else {
385                 state = ChannelState.KILLPENDING;
386             }
387         }
388     }
389 
390     @Override
391     public <T> SctpMultiChannel setOption(SctpSocketOption<T> name,
392                                           T value,
393                                           Association association)
394             throws IOException {
395         if (name == null)
396             throw new NullPointerException();
397         if (!(supportedOptions().contains(name)))
398             throw new UnsupportedOperationException("'" + name + "' not supported");
399 
400         synchronized (stateLock) {
401             if (association != null && (name.equals(SCTP_PRIMARY_ADDR) ||

467             throw new IllegalReceiveException(
468                     "cannot invoke receive from handler");
469         receiveInvoked.set(Boolean.TRUE);
470 
471         try {
472             ResultContainer resultContainer = new ResultContainer();
473             do {
474                 resultContainer.clear();
475                 synchronized (receiveLock) {
476                     ensureOpen();
477                     if (!isBound())
478                         throw new NotYetBoundException();
479 
480                     int n = 0;
481                     try {
482                         begin();
483 
484                         synchronized (stateLock) {
485                             if(!isOpen())
486                                 return null;
487                             receiverThread = NativeThread.threadToSignal();
488                         }
489 
490                         do {
491                             n = receive(fdVal, buffer, resultContainer);
492                         } while ((n == IOStatus.INTERRUPTED) && isOpen());
493 
494                     } finally {
495                         receiverCleanup();
496                         end((n > 0) || (n == IOStatus.UNAVAILABLE));
497                         assert IOStatus.check(n);
498                     }
499 
500                     if (!resultContainer.isNotification()) {
501                         /* message or nothing */
502                         if (resultContainer.hasSomething()) {
503                             /* Set the association before returning */
504                             MessageInfoImpl info =
505                                     resultContainer.getMessageInfo();
506                             info.setAssociation(lookupAssociation(info.
507                                     associationID()));

748             throw new IllegalArgumentException("buffer cannot be null");
749 
750         if (messageInfo == null)
751             throw new IllegalArgumentException("messageInfo cannot be null");
752 
753         synchronized (sendLock) {
754             ensureOpen();
755 
756             if (!isBound())
757                 bind(null, 0);
758 
759             int n = 0;
760             try {
761                 int assocId = -1;
762                 SocketAddress address = null;
763                 begin();
764 
765                 synchronized (stateLock) {
766                     if(!isOpen())
767                         return 0;
768                     senderThread = NativeThread.threadToSignal();
769 
770                     /* Determine what address or association to send to */
771                     Association assoc = messageInfo.association();
772                     InetSocketAddress addr = (InetSocketAddress)messageInfo.address();
773                     if (assoc != null) {
774                         checkAssociation(assoc);
775                         checkStreamNumber(assoc, messageInfo.streamNumber());
776                         assocId = assoc.associationID();
777                         /* have we also got a preferred address */
778                         if (addr != null) {
779                             if (!assoc.equals(addressMap.get(addr)))
780                                 throw new IllegalArgumentException("given preferred address is not part of this association");
781                             address = addr;
782                         }
783                     } else if (addr != null) {
784                         address = addr;
785                         Association association = addressMap.get(addr);
786                         if (association != null) {
787                             checkStreamNumber(association, messageInfo.streamNumber());
788                             assocId = association.associationID();
< prev index next >