< prev index next >

src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java

Print this page




 108         }
 109     }
 110 
 111     public static class ConsumerSubscriber implements TrustedSubscriber<Void> {
 112         private final Consumer<Optional<byte[]>> consumer;
 113         private Flow.Subscription subscription;
 114         private final CompletableFuture<Void> result = new MinimalFuture<>();
 115         private final AtomicBoolean subscribed = new AtomicBoolean();
 116 
 117         public ConsumerSubscriber(Consumer<Optional<byte[]>> consumer) {
 118             this.consumer = Objects.requireNonNull(consumer);
 119         }
 120 
 121         @Override
 122         public CompletionStage<Void> getBody() {
 123             return result;
 124         }
 125 
 126         @Override
 127         public void onSubscribe(Flow.Subscription subscription) {
 128             Objects.requireNonNull(subscription);
 129             if (!subscribed.compareAndSet(false, true)) {
 130                 subscription.cancel();
 131             } else {
 132                 this.subscription = subscription;
 133                 subscription.request(1);
 134             }
 135         }
 136 
 137         @Override
 138         public void onNext(List<ByteBuffer> items) {
 139             Objects.requireNonNull(items);
 140             for (ByteBuffer item : items) {
 141                 byte[] buf = new byte[item.remaining()];
 142                 item.get(buf);
 143                 consumer.accept(Optional.of(buf));
 144             }
 145             subscription.request(1);
 146         }
 147 
 148         @Override
 149         public void onError(Throwable throwable) {
 150             Objects.requireNonNull(throwable);
 151             result.completeExceptionally(throwable);
 152         }
 153 
 154         @Override
 155         public void onComplete() {
 156             consumer.accept(Optional.empty());
 157             result.complete(null);
 158         }
 159 
 160     }
 161 
 162     /**
 163      * A Subscriber that writes the flow of data to a given file.
 164      *
 165      * Privileged actions are performed within a limited doPrivileged that only
 166      * asserts the specific, write, file permissions that were checked during
 167      * the construction of this PathSubscriber.
 168      */
 169     public static class PathSubscriber implements TrustedSubscriber<Path> {
 170 
 171         private static final FilePermission[] EMPTY_FILE_PERMISSIONS = new FilePermission[0];
 172 
 173         private final Path file;
 174         private final OpenOption[] options;
 175         private final FilePermission[] filePermissions;
 176         private final CompletableFuture<Path> result = new MinimalFuture<>();
 177 
 178         private final AtomicBoolean subscribed = new AtomicBoolean();
 179         private volatile Flow.Subscription subscription;
 180         private volatile FileChannel out;
 181 
 182         private static final String pathForSecurityCheck(Path path) {
 183             return path.toFile().getPath();
 184         }
 185 
 186         /**
 187          * Factory for creating PathSubscriber.
 188          *
 189          * Permission checks are performed here before construction of the
 190          * PathSubscriber. Permission checking and construction are deliberately
 191          * and tightly co-located.
 192          */
 193         public static PathSubscriber create(Path file,
 194                                             List<OpenOption> options) {
 195             FilePermission filePermission = null;
 196             SecurityManager sm = System.getSecurityManager();
 197             if (sm != null) {
 198                 String fn = pathForSecurityCheck(file);
 199                 FilePermission writePermission = new FilePermission(fn, "write");
 200                 sm.checkPermission(writePermission);
 201                 filePermission = writePermission;
 202             }
 203             return new PathSubscriber(file, options, filePermission);
 204         }
 205 
 206         // pp so handler implementations in the same package can construct
 207         /*package-private*/ PathSubscriber(Path file,
 208                                            List<OpenOption> options,
 209                                            FilePermission... filePermissions) {
 210             this.file = file;
 211             this.options = options.stream().toArray(OpenOption[]::new);
 212             this.filePermissions =
 213                     filePermissions == null ? EMPTY_FILE_PERMISSIONS : filePermissions;
 214         }
 215 
 216         @Override
 217         public void onSubscribe(Flow.Subscription subscription) {
 218             Objects.requireNonNull(subscription);
 219             if (!subscribed.compareAndSet(false, true)) {
 220                 subscription.cancel();
 221                 return;
 222             }
 223 
 224             this.subscription = subscription;
 225             if (System.getSecurityManager() == null) {
 226                 try {
 227                     out = FileChannel.open(file, options);
 228                 } catch (IOException ioe) {
 229                     result.completeExceptionally(ioe);
 230                     return;
 231                 }
 232             } else {
 233                 try {
 234                     PrivilegedExceptionAction<FileChannel> pa =
 235                             () -> FileChannel.open(file, options);
 236                     out = AccessController.doPrivileged(pa, null, filePermissions);
 237                 } catch (PrivilegedActionException pae) {
 238                     Throwable t = pae.getCause() != null ? pae.getCause() : pae;
 239                     result.completeExceptionally(t);
 240                     subscription.cancel();
 241                     return;
 242                 }
 243             }


 463         @Override
 464         public int available() throws IOException {
 465             // best effort: returns the number of remaining bytes in
 466             // the current buffer if any, or 1 if the current buffer
 467             // is null or empty but the queue or current buffer list
 468             // are not empty. Returns 0 otherwise.
 469             if (closed) return 0;
 470             int available = 0;
 471             ByteBuffer current = currentBuffer;
 472             if (current == LAST_BUFFER) return 0;
 473             if (current != null) available = current.remaining();
 474             if (available != 0) return available;
 475             Iterator<?> iterator = currentListItr;
 476             if (iterator != null && iterator.hasNext()) return 1;
 477             if (buffers.isEmpty()) return 0;
 478             return 1;
 479         }
 480 
 481         @Override
 482         public void onSubscribe(Flow.Subscription s) {
 483             Objects.requireNonNull(s);
 484             try {
 485                 if (!subscribed.compareAndSet(false, true)) {
 486                     s.cancel();
 487                 } else {
 488                     // check whether the stream is already closed.
 489                     // if so, we should cancel the subscription
 490                     // immediately.
 491                     boolean closed;
 492                     synchronized (this) {
 493                         closed = this.closed;
 494                         if (!closed) {
 495                             this.subscription = s;
 496                         }
 497                     }
 498                     if (closed) {
 499                         s.cancel();
 500                         return;
 501                     }
 502                     assert buffers.remainingCapacity() > 1; // should contain at least 2
 503                     if (debug.on())


 594                 return new BufferedReader(new InputStreamReader(stream, charset))
 595                             .lines().onClose(() -> Utils.close(stream));
 596             }, true);
 597     }
 598 
 599     /**
 600      * Currently this consumes all of the data and ignores it
 601      */
 602     public static class NullSubscriber<T> implements TrustedSubscriber<T> {
 603 
 604         private final CompletableFuture<T> cf = new MinimalFuture<>();
 605         private final Optional<T> result;
 606         private final AtomicBoolean subscribed = new AtomicBoolean();
 607 
 608         public NullSubscriber(Optional<T> result) {
 609             this.result = result;
 610         }
 611 
 612         @Override
 613         public void onSubscribe(Flow.Subscription subscription) {
 614             Objects.requireNonNull(subscription);
 615             if (!subscribed.compareAndSet(false, true)) {
 616                 subscription.cancel();
 617             } else {
 618                 subscription.request(Long.MAX_VALUE);
 619             }
 620         }
 621 
 622         @Override
 623         public void onNext(List<ByteBuffer> items) {
 624             Objects.requireNonNull(items);
 625         }
 626 
 627         @Override
 628         public void onError(Throwable throwable) {
 629             Objects.requireNonNull(throwable);
 630             cf.completeExceptionally(throwable);
 631         }
 632 
 633         @Override
 634         public void onComplete() {
 635             if (result.isPresent()) {
 636                 cf.complete(result.get());
 637             } else {
 638                 cf.complete(null);
 639             }
 640         }
 641 
 642         @Override
 643         public CompletionStage<T> getBody() {
 644             return cf;
 645         }
 646     }
 647 
 648     /** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber}. */
 649     public static final class SubscriberAdapter<S extends Subscriber<? super List<ByteBuffer>>,R>


 903                         subscribedCF.complete(ref);
 904                     } catch (Throwable t) {
 905                         if (Log.errors()) {
 906                             Log.logError("Failed to call onSubscribe: " +
 907                                     "cancelling subscription: " + t);
 908                             Log.logError(t);
 909                         }
 910                         subscription.cancel();
 911                     }
 912                 });
 913             } else {
 914                 subscriber.onSubscribe(new Flow.Subscription() {
 915                     @Override public void request(long n) { }
 916                     @Override public void cancel() { }
 917                 });
 918                 subscriber.onError(new IllegalStateException(
 919                         "This publisher has already one subscriber"));
 920             }
 921         }
 922 
 923         private final AtomicBoolean subscribed = new AtomicBoolean();
 924 
 925         @Override
 926         public void onSubscribe(Flow.Subscription subscription) {
 927             Objects.requireNonNull(subscription);
 928             if (!subscribed.compareAndSet(false, true)) {
 929                 subscription.cancel();
 930             } else {
 931                 subscriptionCF.complete(subscription);
 932             }
 933         }
 934 
 935         @Override
 936         public void onNext(List<ByteBuffer> item) {
 937             Objects.requireNonNull(item);
 938             try {
 939                 // cannot be called before onSubscribe()
 940                 assert subscriptionCF.isDone();
 941                 SubscriberRef ref = subscriberRef.get();
 942                 // cannot be called before subscriber calls request(1)
 943                 assert ref != null;
 944                 Flow.Subscriber<? super List<ByteBuffer>>
 945                         subscriber = ref.get();
 946                 if (subscriber != null) {
 947                     // may be null if subscription was cancelled.
 948                     subscriber.onNext(item);
 949                 }
 950             } catch (Throwable err) {
 951                 signalError(err);
 952                 subscriptionCF.thenAccept(s -> s.cancel());
 953             }
 954         }
 955 
 956         @Override
 957         public void onError(Throwable throwable) {
 958             // cannot be called before onSubscribe();
 959             assert suppress(subscriptionCF.isDone(),
 960                     "onError called before onSubscribe",
 961                     throwable);
 962             // onError can be called before request(1), and therefore can
 963             // be called before subscriberRef is set.
 964             signalError(throwable);
 965             Objects.requireNonNull(throwable);
 966         }
 967 
 968         @Override
 969         public void onComplete() {
 970             // cannot be called before onSubscribe()
 971             if (!subscriptionCF.isDone()) {
 972                 signalError(new InternalError(
 973                         "onComplete called before onSubscribed"));
 974             } else {
 975                 // onComplete can be called before request(1),
 976                 // and therefore can be called before subscriberRef
 977                 // is set.
 978                 signalComplete();
 979             }
 980         }
 981 
 982         @Override
 983         public CompletionStage<Flow.Publisher<List<ByteBuffer>>> getBody() {
 984             return body;
 985         }




 108         }
 109     }
 110 
 111     public static class ConsumerSubscriber implements TrustedSubscriber<Void> {
 112         private final Consumer<Optional<byte[]>> consumer;
 113         private Flow.Subscription subscription;
 114         private final CompletableFuture<Void> result = new MinimalFuture<>();
 115         private final AtomicBoolean subscribed = new AtomicBoolean();
 116 
 117         public ConsumerSubscriber(Consumer<Optional<byte[]>> consumer) {
 118             this.consumer = Objects.requireNonNull(consumer);
 119         }
 120 
 121         @Override
 122         public CompletionStage<Void> getBody() {
 123             return result;
 124         }
 125 
 126         @Override
 127         public void onSubscribe(Flow.Subscription subscription) {

 128             if (!subscribed.compareAndSet(false, true)) {
 129                 subscription.cancel();
 130             } else {
 131                 this.subscription = subscription;
 132                 subscription.request(1);
 133             }
 134         }
 135 
 136         @Override
 137         public void onNext(List<ByteBuffer> items) {

 138             for (ByteBuffer item : items) {
 139                 byte[] buf = new byte[item.remaining()];
 140                 item.get(buf);
 141                 consumer.accept(Optional.of(buf));
 142             }
 143             subscription.request(1);
 144         }
 145 
 146         @Override
 147         public void onError(Throwable throwable) {

 148             result.completeExceptionally(throwable);
 149         }
 150 
 151         @Override
 152         public void onComplete() {
 153             consumer.accept(Optional.empty());
 154             result.complete(null);
 155         }
 156 
 157     }
 158 
 159     /**
 160      * A Subscriber that writes the flow of data to a given file.
 161      *
 162      * Privileged actions are performed within a limited doPrivileged that only
 163      * asserts the specific, write, file permissions that were checked during
 164      * the construction of this PathSubscriber.
 165      */
 166     public static class PathSubscriber implements TrustedSubscriber<Path> {
 167 
 168         private static final FilePermission[] EMPTY_FILE_PERMISSIONS = new FilePermission[0];
 169 
 170         private final Path file;
 171         private final OpenOption[] options;
 172         private final FilePermission[] filePermissions;
 173         private final CompletableFuture<Path> result = new MinimalFuture<>();
 174 

 175         private volatile Flow.Subscription subscription;
 176         private volatile FileChannel out;
 177 
 178         private static final String pathForSecurityCheck(Path path) {
 179             return path.toFile().getPath();
 180         }
 181 
 182         /**
 183          * Factory for creating PathSubscriber.
 184          *
 185          * Permission checks are performed here before construction of the
 186          * PathSubscriber. Permission checking and construction are deliberately
 187          * and tightly co-located.
 188          */
 189         public static PathSubscriber create(Path file,
 190                                             List<OpenOption> options) {
 191             FilePermission filePermission = null;
 192             SecurityManager sm = System.getSecurityManager();
 193             if (sm != null) {
 194                 String fn = pathForSecurityCheck(file);
 195                 FilePermission writePermission = new FilePermission(fn, "write");
 196                 sm.checkPermission(writePermission);
 197                 filePermission = writePermission;
 198             }
 199             return new PathSubscriber(file, options, filePermission);
 200         }
 201 
 202         // pp so handler implementations in the same package can construct
 203         /*package-private*/ PathSubscriber(Path file,
 204                                            List<OpenOption> options,
 205                                            FilePermission... filePermissions) {
 206             this.file = file;
 207             this.options = options.stream().toArray(OpenOption[]::new);
 208             this.filePermissions =
 209                     filePermissions == null ? EMPTY_FILE_PERMISSIONS : filePermissions;
 210         }
 211 
 212         @Override
 213         public void onSubscribe(Flow.Subscription subscription) {






 214             this.subscription = subscription;
 215             if (System.getSecurityManager() == null) {
 216                 try {
 217                     out = FileChannel.open(file, options);
 218                 } catch (IOException ioe) {
 219                     result.completeExceptionally(ioe);
 220                     return;
 221                 }
 222             } else {
 223                 try {
 224                     PrivilegedExceptionAction<FileChannel> pa =
 225                             () -> FileChannel.open(file, options);
 226                     out = AccessController.doPrivileged(pa, null, filePermissions);
 227                 } catch (PrivilegedActionException pae) {
 228                     Throwable t = pae.getCause() != null ? pae.getCause() : pae;
 229                     result.completeExceptionally(t);
 230                     subscription.cancel();
 231                     return;
 232                 }
 233             }


 453         @Override
 454         public int available() throws IOException {
 455             // best effort: returns the number of remaining bytes in
 456             // the current buffer if any, or 1 if the current buffer
 457             // is null or empty but the queue or current buffer list
 458             // are not empty. Returns 0 otherwise.
 459             if (closed) return 0;
 460             int available = 0;
 461             ByteBuffer current = currentBuffer;
 462             if (current == LAST_BUFFER) return 0;
 463             if (current != null) available = current.remaining();
 464             if (available != 0) return available;
 465             Iterator<?> iterator = currentListItr;
 466             if (iterator != null && iterator.hasNext()) return 1;
 467             if (buffers.isEmpty()) return 0;
 468             return 1;
 469         }
 470 
 471         @Override
 472         public void onSubscribe(Flow.Subscription s) {

 473             try {
 474                 if (!subscribed.compareAndSet(false, true)) {
 475                     s.cancel();
 476                 } else {
 477                     // check whether the stream is already closed.
 478                     // if so, we should cancel the subscription
 479                     // immediately.
 480                     boolean closed;
 481                     synchronized (this) {
 482                         closed = this.closed;
 483                         if (!closed) {
 484                             this.subscription = s;
 485                         }
 486                     }
 487                     if (closed) {
 488                         s.cancel();
 489                         return;
 490                     }
 491                     assert buffers.remainingCapacity() > 1; // should contain at least 2
 492                     if (debug.on())


 583                 return new BufferedReader(new InputStreamReader(stream, charset))
 584                             .lines().onClose(() -> Utils.close(stream));
 585             }, true);
 586     }
 587 
 588     /**
 589      * Currently this consumes all of the data and ignores it
 590      */
 591     public static class NullSubscriber<T> implements TrustedSubscriber<T> {
 592 
 593         private final CompletableFuture<T> cf = new MinimalFuture<>();
 594         private final Optional<T> result;
 595         private final AtomicBoolean subscribed = new AtomicBoolean();
 596 
 597         public NullSubscriber(Optional<T> result) {
 598             this.result = result;
 599         }
 600 
 601         @Override
 602         public void onSubscribe(Flow.Subscription subscription) {

 603             if (!subscribed.compareAndSet(false, true)) {
 604                 subscription.cancel();
 605             } else {
 606                 subscription.request(Long.MAX_VALUE);
 607             }
 608         }
 609 
 610         @Override
 611         public void onNext(List<ByteBuffer> items) {
 612             Objects.requireNonNull(items);
 613         }
 614 
 615         @Override
 616         public void onError(Throwable throwable) {

 617             cf.completeExceptionally(throwable);
 618         }
 619 
 620         @Override
 621         public void onComplete() {
 622             if (result.isPresent()) {
 623                 cf.complete(result.get());
 624             } else {
 625                 cf.complete(null);
 626             }
 627         }
 628 
 629         @Override
 630         public CompletionStage<T> getBody() {
 631             return cf;
 632         }
 633     }
 634 
 635     /** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber}. */
 636     public static final class SubscriberAdapter<S extends Subscriber<? super List<ByteBuffer>>,R>


 890                         subscribedCF.complete(ref);
 891                     } catch (Throwable t) {
 892                         if (Log.errors()) {
 893                             Log.logError("Failed to call onSubscribe: " +
 894                                     "cancelling subscription: " + t);
 895                             Log.logError(t);
 896                         }
 897                         subscription.cancel();
 898                     }
 899                 });
 900             } else {
 901                 subscriber.onSubscribe(new Flow.Subscription() {
 902                     @Override public void request(long n) { }
 903                     @Override public void cancel() { }
 904                 });
 905                 subscriber.onError(new IllegalStateException(
 906                         "This publisher has already one subscriber"));
 907             }
 908         }
 909 


 910         @Override
 911         public void onSubscribe(Flow.Subscription subscription) {
 912             subscriptionCF.complete(subscription);





 913         }
 914 
 915         @Override
 916         public void onNext(List<ByteBuffer> item) {

 917             try {
 918                 // cannot be called before onSubscribe()
 919                 assert subscriptionCF.isDone();
 920                 SubscriberRef ref = subscriberRef.get();
 921                 // cannot be called before subscriber calls request(1)
 922                 assert ref != null;
 923                 Flow.Subscriber<? super List<ByteBuffer>>
 924                         subscriber = ref.get();
 925                 if (subscriber != null) {
 926                     // may be null if subscription was cancelled.
 927                     subscriber.onNext(item);
 928                 }
 929             } catch (Throwable err) {
 930                 signalError(err);
 931                 subscriptionCF.thenAccept(s -> s.cancel());
 932             }
 933         }
 934 
 935         @Override
 936         public void onError(Throwable throwable) {
 937             // cannot be called before onSubscribe();
 938             assert suppress(subscriptionCF.isDone(),
 939                     "onError called before onSubscribe",
 940                     throwable);
 941             // onError can be called before request(1), and therefore can
 942             // be called before subscriberRef is set.
 943             signalError(throwable);

 944         }
 945 
 946         @Override
 947         public void onComplete() {
 948             // cannot be called before onSubscribe()
 949             if (!subscriptionCF.isDone()) {
 950                 signalError(new InternalError(
 951                         "onComplete called before onSubscribed"));
 952             } else {
 953                 // onComplete can be called before request(1),
 954                 // and therefore can be called before subscriberRef
 955                 // is set.
 956                 signalComplete();
 957             }
 958         }
 959 
 960         @Override
 961         public CompletionStage<Flow.Publisher<List<ByteBuffer>>> getBody() {
 962             return body;
 963         }


< prev index next >