< prev index next >

src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java

Print this page

        

*** 123,155 **** return result; } @Override public void onSubscribe(Flow.Subscription subscription) { - Objects.requireNonNull(subscription); if (!subscribed.compareAndSet(false, true)) { subscription.cancel(); } else { this.subscription = subscription; subscription.request(1); } } @Override public void onNext(List<ByteBuffer> items) { - Objects.requireNonNull(items); for (ByteBuffer item : items) { byte[] buf = new byte[item.remaining()]; item.get(buf); consumer.accept(Optional.of(buf)); } subscription.request(1); } @Override public void onError(Throwable throwable) { - Objects.requireNonNull(throwable); result.completeExceptionally(throwable); } @Override public void onComplete() { --- 123,152 ----
*** 173,183 **** private final Path file; private final OpenOption[] options; private final FilePermission[] filePermissions; private final CompletableFuture<Path> result = new MinimalFuture<>(); - private final AtomicBoolean subscribed = new AtomicBoolean(); private volatile Flow.Subscription subscription; private volatile FileChannel out; private static final String pathForSecurityCheck(Path path) { return path.toFile().getPath(); --- 170,179 ----
*** 213,228 **** filePermissions == null ? EMPTY_FILE_PERMISSIONS : filePermissions; } @Override public void onSubscribe(Flow.Subscription subscription) { - Objects.requireNonNull(subscription); - if (!subscribed.compareAndSet(false, true)) { - subscription.cancel(); - return; - } - this.subscription = subscription; if (System.getSecurityManager() == null) { try { out = FileChannel.open(file, options); } catch (IOException ioe) { --- 209,218 ----
*** 478,488 **** return 1; } @Override public void onSubscribe(Flow.Subscription s) { - Objects.requireNonNull(s); try { if (!subscribed.compareAndSet(false, true)) { s.cancel(); } else { // check whether the stream is already closed. --- 468,477 ----
*** 609,619 **** this.result = result; } @Override public void onSubscribe(Flow.Subscription subscription) { - Objects.requireNonNull(subscription); if (!subscribed.compareAndSet(false, true)) { subscription.cancel(); } else { subscription.request(Long.MAX_VALUE); } --- 598,607 ----
*** 624,634 **** Objects.requireNonNull(items); } @Override public void onError(Throwable throwable) { - Objects.requireNonNull(throwable); cf.completeExceptionally(throwable); } @Override public void onComplete() { --- 612,621 ----
*** 918,942 **** subscriber.onError(new IllegalStateException( "This publisher has already one subscriber")); } } - private final AtomicBoolean subscribed = new AtomicBoolean(); - @Override public void onSubscribe(Flow.Subscription subscription) { ! Objects.requireNonNull(subscription); ! if (!subscribed.compareAndSet(false, true)) { ! subscription.cancel(); ! } else { ! subscriptionCF.complete(subscription); ! } } @Override public void onNext(List<ByteBuffer> item) { - Objects.requireNonNull(item); try { // cannot be called before onSubscribe() assert subscriptionCF.isDone(); SubscriberRef ref = subscriberRef.get(); // cannot be called before subscriber calls request(1) --- 905,921 ---- subscriber.onError(new IllegalStateException( "This publisher has already one subscriber")); } } @Override public void onSubscribe(Flow.Subscription subscription) { ! subscriptionCF.complete(subscription); } @Override public void onNext(List<ByteBuffer> item) { try { // cannot be called before onSubscribe() assert subscriptionCF.isDone(); SubscriberRef ref = subscriberRef.get(); // cannot be called before subscriber calls request(1)
*** 960,970 **** "onError called before onSubscribe", throwable); // onError can be called before request(1), and therefore can // be called before subscriberRef is set. signalError(throwable); - Objects.requireNonNull(throwable); } @Override public void onComplete() { // cannot be called before onSubscribe() --- 939,948 ----
< prev index next >