< prev index next >

src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java

Print this page

        

@@ -123,33 +123,30 @@
             return result;
         }
 
         @Override
         public void onSubscribe(Flow.Subscription subscription) {
-            Objects.requireNonNull(subscription);
             if (!subscribed.compareAndSet(false, true)) {
                 subscription.cancel();
             } else {
                 this.subscription = subscription;
                 subscription.request(1);
             }
         }
 
         @Override
         public void onNext(List<ByteBuffer> items) {
-            Objects.requireNonNull(items);
             for (ByteBuffer item : items) {
                 byte[] buf = new byte[item.remaining()];
                 item.get(buf);
                 consumer.accept(Optional.of(buf));
             }
             subscription.request(1);
         }
 
         @Override
         public void onError(Throwable throwable) {
-            Objects.requireNonNull(throwable);
             result.completeExceptionally(throwable);
         }
 
         @Override
         public void onComplete() {

@@ -173,11 +170,10 @@
         private final Path file;
         private final OpenOption[] options;
         private final FilePermission[] filePermissions;
         private final CompletableFuture<Path> result = new MinimalFuture<>();
 
-        private final AtomicBoolean subscribed = new AtomicBoolean();
         private volatile Flow.Subscription subscription;
         private volatile FileChannel out;
 
         private static final String pathForSecurityCheck(Path path) {
             return path.toFile().getPath();

@@ -213,16 +209,10 @@
                     filePermissions == null ? EMPTY_FILE_PERMISSIONS : filePermissions;
         }
 
         @Override
         public void onSubscribe(Flow.Subscription subscription) {
-            Objects.requireNonNull(subscription);
-            if (!subscribed.compareAndSet(false, true)) {
-                subscription.cancel();
-                return;
-            }
-
             this.subscription = subscription;
             if (System.getSecurityManager() == null) {
                 try {
                     out = FileChannel.open(file, options);
                 } catch (IOException ioe) {

@@ -478,11 +468,10 @@
             return 1;
         }
 
         @Override
         public void onSubscribe(Flow.Subscription s) {
-            Objects.requireNonNull(s);
             try {
                 if (!subscribed.compareAndSet(false, true)) {
                     s.cancel();
                 } else {
                     // check whether the stream is already closed.

@@ -609,11 +598,10 @@
             this.result = result;
         }
 
         @Override
         public void onSubscribe(Flow.Subscription subscription) {
-            Objects.requireNonNull(subscription);
             if (!subscribed.compareAndSet(false, true)) {
                 subscription.cancel();
             } else {
                 subscription.request(Long.MAX_VALUE);
             }

@@ -624,11 +612,10 @@
             Objects.requireNonNull(items);
         }
 
         @Override
         public void onError(Throwable throwable) {
-            Objects.requireNonNull(throwable);
             cf.completeExceptionally(throwable);
         }
 
         @Override
         public void onComplete() {

@@ -918,25 +905,17 @@
                 subscriber.onError(new IllegalStateException(
                         "This publisher has already one subscriber"));
             }
         }
 
-        private final AtomicBoolean subscribed = new AtomicBoolean();
-
         @Override
         public void onSubscribe(Flow.Subscription subscription) {
-            Objects.requireNonNull(subscription);
-            if (!subscribed.compareAndSet(false, true)) {
-                subscription.cancel();
-            } else {
-                subscriptionCF.complete(subscription);
-            }
+            subscriptionCF.complete(subscription);
         }
 
         @Override
         public void onNext(List<ByteBuffer> item) {
-            Objects.requireNonNull(item);
             try {
                 // cannot be called before onSubscribe()
                 assert subscriptionCF.isDone();
                 SubscriberRef ref = subscriberRef.get();
                 // cannot be called before subscriber calls request(1)

@@ -960,11 +939,10 @@
                     "onError called before onSubscribe",
                     throwable);
             // onError can be called before request(1), and therefore can
             // be called before subscriberRef is set.
             signalError(throwable);
-            Objects.requireNonNull(throwable);
         }
 
         @Override
         public void onComplete() {
             // cannot be called before onSubscribe()
< prev index next >