< prev index next >

src/java.net.http/share/classes/jdk/internal/net/http/LineSubscriberAdapter.java

Print this page


   1 /*
   2  * Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.http;
  27 
  28 import java.nio.ByteBuffer;
  29 import java.nio.CharBuffer;
  30 import java.nio.charset.CharacterCodingException;
  31 import java.nio.charset.Charset;
  32 import java.nio.charset.CharsetDecoder;
  33 import java.nio.charset.CoderResult;
  34 import java.nio.charset.CodingErrorAction;
  35 import java.util.List;
  36 import java.util.Objects;
  37 import java.util.concurrent.CompletableFuture;
  38 import java.util.concurrent.CompletionStage;
  39 import java.util.concurrent.ConcurrentLinkedDeque;
  40 import java.util.concurrent.Flow;
  41 import java.util.concurrent.Flow.Subscriber;
  42 import java.util.concurrent.Flow.Subscription;
  43 import java.util.concurrent.atomic.AtomicBoolean;
  44 import java.util.concurrent.atomic.AtomicLong;
  45 import java.util.concurrent.atomic.AtomicReference;
  46 import java.util.function.Function;
  47 import jdk.internal.net.http.common.Demand;
  48 import java.net.http.HttpResponse.BodySubscriber;
  49 import jdk.internal.net.http.common.MinimalFuture;
  50 import jdk.internal.net.http.common.SequentialScheduler;
  51 
  52 /** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber<String>}. */
  53 public final class LineSubscriberAdapter<S extends Subscriber<? super String>,R>
  54         implements BodySubscriber<R> {
  55     private final CompletableFuture<R> cf = new MinimalFuture<>();
  56     private final S subscriber;
  57     private final Function<? super S, ? extends R> finisher;
  58     private final Charset charset;
  59     private final String eol;
  60     private final AtomicBoolean subscribed = new AtomicBoolean();
  61     private volatile LineSubscription downstream;
  62 
  63     private LineSubscriberAdapter(S subscriber,
  64                                   Function<? super S, ? extends R> finisher,
  65                                   Charset charset,
  66                                   String eol) {
  67         if (eol != null && eol.isEmpty())
  68             throw new IllegalArgumentException("empty line separator");
  69         this.subscriber = Objects.requireNonNull(subscriber);
  70         this.finisher = Objects.requireNonNull(finisher);
  71         this.charset = Objects.requireNonNull(charset);
  72         this.eol = eol;
  73     }
  74 
  75     @Override
  76     public void onSubscribe(Subscription subscription) {
  77         Objects.requireNonNull(subscription);
  78         if (!subscribed.compareAndSet(false, true)) {
  79             subscription.cancel();
  80             return;
  81         }
  82 
  83         downstream = LineSubscription.create(subscription,
  84                                              charset,
  85                                              eol,
  86                                              subscriber,
  87                                              cf);
  88         subscriber.onSubscribe(downstream);
  89     }
  90 
  91     @Override
  92     public void onNext(List<ByteBuffer> item) {
  93         Objects.requireNonNull(item);
  94         try {
  95             downstream.submit(item);
  96         } catch (Throwable t) {
  97             onError(t);
  98         }
  99     }
 100 
 101     @Override
 102     public void onError(Throwable throwable) {
 103         Objects.requireNonNull(throwable);
 104         try {
 105             downstream.signalError(throwable);
 106         } finally {
 107             cf.completeExceptionally(throwable);
 108         }
 109     }
 110 
 111     @Override
 112     public void onComplete() {
 113         try {
 114             downstream.signalComplete();
 115         } finally {
 116             cf.complete(finisher.apply(subscriber));
 117         }
 118     }
 119 
 120     @Override
 121     public CompletionStage<R> getBody() {
 122         return cf;
 123     }


   1 /*
   2  * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.http;
  27 
  28 import java.nio.ByteBuffer;
  29 import java.nio.CharBuffer;
  30 import java.nio.charset.CharacterCodingException;
  31 import java.nio.charset.Charset;
  32 import java.nio.charset.CharsetDecoder;
  33 import java.nio.charset.CoderResult;
  34 import java.nio.charset.CodingErrorAction;
  35 import java.util.List;
  36 import java.util.Objects;
  37 import java.util.concurrent.CompletableFuture;
  38 import java.util.concurrent.CompletionStage;
  39 import java.util.concurrent.ConcurrentLinkedDeque;
  40 import java.util.concurrent.Flow;
  41 import java.util.concurrent.Flow.Subscriber;
  42 import java.util.concurrent.Flow.Subscription;

  43 import java.util.concurrent.atomic.AtomicLong;
  44 import java.util.concurrent.atomic.AtomicReference;
  45 import java.util.function.Function;
  46 import jdk.internal.net.http.common.Demand;
  47 import java.net.http.HttpResponse.BodySubscriber;
  48 import jdk.internal.net.http.common.MinimalFuture;
  49 import jdk.internal.net.http.common.SequentialScheduler;
  50 
  51 /** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber<String>}. */
  52 public final class LineSubscriberAdapter<S extends Subscriber<? super String>,R>
  53         implements BodySubscriber<R> {
  54     private final CompletableFuture<R> cf = new MinimalFuture<>();
  55     private final S subscriber;
  56     private final Function<? super S, ? extends R> finisher;
  57     private final Charset charset;
  58     private final String eol;

  59     private volatile LineSubscription downstream;
  60 
  61     private LineSubscriberAdapter(S subscriber,
  62                                   Function<? super S, ? extends R> finisher,
  63                                   Charset charset,
  64                                   String eol) {
  65         if (eol != null && eol.isEmpty())
  66             throw new IllegalArgumentException("empty line separator");
  67         this.subscriber = Objects.requireNonNull(subscriber);
  68         this.finisher = Objects.requireNonNull(finisher);
  69         this.charset = Objects.requireNonNull(charset);
  70         this.eol = eol;
  71     }
  72 
  73     @Override
  74     public void onSubscribe(Subscription subscription) {






  75         downstream = LineSubscription.create(subscription,
  76                                              charset,
  77                                              eol,
  78                                              subscriber,
  79                                              cf);
  80         subscriber.onSubscribe(downstream);
  81     }
  82 
  83     @Override
  84     public void onNext(List<ByteBuffer> item) {

  85         try {
  86             downstream.submit(item);
  87         } catch (Throwable t) {
  88             onError(t);
  89         }
  90     }
  91 
  92     @Override
  93     public void onError(Throwable throwable) {

  94         try {
  95             downstream.signalError(throwable);
  96         } finally {
  97             cf.completeExceptionally(throwable);
  98         }
  99     }
 100 
 101     @Override
 102     public void onComplete() {
 103         try {
 104             downstream.signalComplete();
 105         } finally {
 106             cf.complete(finisher.apply(subscriber));
 107         }
 108     }
 109 
 110     @Override
 111     public CompletionStage<R> getBody() {
 112         return cf;
 113     }


< prev index next >