1 /*
   2  * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.http;
  27 
  28 import java.nio.ByteBuffer;
  29 import java.nio.CharBuffer;
  30 import java.nio.charset.CharacterCodingException;
  31 import java.nio.charset.Charset;
  32 import java.nio.charset.CharsetDecoder;
  33 import java.nio.charset.CoderResult;
  34 import java.nio.charset.CodingErrorAction;
  35 import java.util.List;
  36 import java.util.Objects;
  37 import java.util.concurrent.CompletableFuture;
  38 import java.util.concurrent.CompletionStage;
  39 import java.util.concurrent.ConcurrentLinkedDeque;
  40 import java.util.concurrent.Flow;
  41 import java.util.concurrent.Flow.Subscriber;
  42 import java.util.concurrent.Flow.Subscription;
  43 import java.util.concurrent.atomic.AtomicLong;
  44 import java.util.concurrent.atomic.AtomicReference;
  45 import java.util.function.Function;
  46 import jdk.internal.net.http.common.Demand;
  47 import java.net.http.HttpResponse.BodySubscriber;
  48 import jdk.internal.net.http.common.MinimalFuture;
  49 import jdk.internal.net.http.common.SequentialScheduler;
  50 
  51 /** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber<String>}. */
  52 public final class LineSubscriberAdapter<S extends Subscriber<? super String>,R>
  53         implements BodySubscriber<R> {
  54     private final CompletableFuture<R> cf = new MinimalFuture<>();
  55     private final S subscriber;
  56     private final Function<? super S, ? extends R> finisher;
  57     private final Charset charset;
  58     private final String eol;
  59     private volatile LineSubscription downstream;
  60 
  61     private LineSubscriberAdapter(S subscriber,
  62                                   Function<? super S, ? extends R> finisher,
  63                                   Charset charset,
  64                                   String eol) {
  65         if (eol != null && eol.isEmpty())
  66             throw new IllegalArgumentException("empty line separator");
  67         this.subscriber = Objects.requireNonNull(subscriber);
  68         this.finisher = Objects.requireNonNull(finisher);
  69         this.charset = Objects.requireNonNull(charset);
  70         this.eol = eol;
  71     }
  72 
  73     @Override
  74     public void onSubscribe(Subscription subscription) {
  75         downstream = LineSubscription.create(subscription,
  76                                              charset,
  77                                              eol,
  78                                              subscriber,
  79                                              cf);
  80         subscriber.onSubscribe(downstream);
  81     }
  82 
  83     @Override
  84     public void onNext(List<ByteBuffer> item) {
  85         try {
  86             downstream.submit(item);
  87         } catch (Throwable t) {
  88             onError(t);
  89         }
  90     }
  91 
  92     @Override
  93     public void onError(Throwable throwable) {
  94         try {
  95             downstream.signalError(throwable);
  96         } finally {
  97             cf.completeExceptionally(throwable);
  98         }
  99     }
 100 
 101     @Override
 102     public void onComplete() {
 103         try {
 104             downstream.signalComplete();
 105         } finally {
 106             cf.complete(finisher.apply(subscriber));
 107         }
 108     }
 109 
 110     @Override
 111     public CompletionStage<R> getBody() {
 112         return cf;
 113     }
 114 
 115     public static <S extends Subscriber<? super String>, R> LineSubscriberAdapter<S, R>
 116     create(S subscriber, Function<? super S, ? extends R> finisher, Charset charset, String eol)
 117     {
 118         if (eol != null && eol.isEmpty())
 119             throw new IllegalArgumentException("empty line separator");
 120         return new LineSubscriberAdapter<>(Objects.requireNonNull(subscriber),
 121                 Objects.requireNonNull(finisher),
 122                 Objects.requireNonNull(charset),
 123                 eol);
 124     }
 125 
 126     static final class LineSubscription implements Flow.Subscription {
 127         final Flow.Subscription upstreamSubscription;
 128         final CharsetDecoder decoder;
 129         final String newline;
 130         final Demand downstreamDemand;
 131         final ConcurrentLinkedDeque<ByteBuffer> queue;
 132         final SequentialScheduler scheduler;
 133         final Flow.Subscriber<? super String> upstream;
 134         final CompletableFuture<?> cf;
 135         private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
 136         private final AtomicLong demanded = new AtomicLong();
 137         private volatile boolean completed;
 138         private volatile boolean cancelled;
 139 
 140         private final char[] chars = new char[1024];
 141         private final ByteBuffer leftover = ByteBuffer.wrap(new byte[64]);
 142         private final CharBuffer buffer = CharBuffer.wrap(chars);
 143         private final StringBuilder builder = new StringBuilder();
 144         private String nextLine;
 145 
 146         private LineSubscription(Flow.Subscription s,
 147                                  CharsetDecoder dec,
 148                                  String separator,
 149                                  Flow.Subscriber<? super String> subscriber,
 150                                  CompletableFuture<?> completion) {
 151             downstreamDemand = new Demand();
 152             queue = new ConcurrentLinkedDeque<>();
 153             upstreamSubscription = Objects.requireNonNull(s);
 154             decoder = Objects.requireNonNull(dec);
 155             newline = separator;
 156             upstream = Objects.requireNonNull(subscriber);
 157             cf = Objects.requireNonNull(completion);
 158             scheduler = SequentialScheduler.synchronizedScheduler(this::loop);
 159         }
 160 
 161         @Override
 162         public void request(long n) {
 163             if (cancelled) return;
 164             if (downstreamDemand.increase(n)) {
 165                 scheduler.runOrSchedule();
 166             }
 167         }
 168 
 169         @Override
 170         public void cancel() {
 171             cancelled = true;
 172             upstreamSubscription.cancel();
 173         }
 174 
 175         public void submit(List<ByteBuffer> list) {
 176             queue.addAll(list);
 177             demanded.decrementAndGet();
 178             scheduler.runOrSchedule();
 179         }
 180 
 181         public void signalComplete() {
 182             completed = true;
 183             scheduler.runOrSchedule();
 184         }
 185 
 186         public void signalError(Throwable error) {
 187             if (errorRef.compareAndSet(null,
 188                     Objects.requireNonNull(error))) {
 189                 scheduler.runOrSchedule();
 190             }
 191         }
 192 
 193         // This method looks at whether some bytes where left over (in leftover)
 194         // from decoding the previous buffer when the previous buffer was in
 195         // underflow. If so, it takes bytes one by one from the new buffer 'in'
 196         // and combines them with the leftover bytes until 'in' is exhausted or a
 197         // character was produced in 'out', resolving the previous underflow.
 198         // Returns true if the buffer is still in underflow, false otherwise.
 199         // However, in both situation some chars might have been produced in 'out'.
 200         private boolean isUnderFlow(ByteBuffer in, CharBuffer out, boolean endOfInput)
 201                 throws CharacterCodingException {
 202             int limit = leftover.position();
 203             if (limit == 0) {
 204                 // no leftover
 205                 return false;
 206             } else {
 207                 CoderResult res = null;
 208                 while (in.hasRemaining()) {
 209                     leftover.position(limit);
 210                     leftover.limit(++limit);
 211                     leftover.put(in.get());
 212                     leftover.position(0);
 213                     res = decoder.decode(leftover, out,
 214                             endOfInput && !in.hasRemaining());
 215                     int remaining = leftover.remaining();
 216                     if (remaining > 0) {
 217                         assert leftover.position() == 0;
 218                         leftover.position(remaining);
 219                     } else {
 220                         leftover.position(0);
 221                     }
 222                     leftover.limit(leftover.capacity());
 223                     if (res.isUnderflow() && remaining > 0 && in.hasRemaining()) {
 224                         continue;
 225                     }
 226                     if (res.isError()) {
 227                         res.throwException();
 228                     }
 229                     assert !res.isOverflow();
 230                     return false;
 231                 }
 232                 return !endOfInput;
 233             }
 234         }
 235 
 236         // extract characters from start to end and remove them from
 237         // the StringBuilder
 238         private static String take(StringBuilder b, int start, int end) {
 239             assert start == 0;
 240             String line;
 241             if (end == start) return "";
 242             line = b.substring(start, end);
 243             b.delete(start, end);
 244             return line;
 245         }
 246 
 247         // finds end of line, returns -1 if not found, or the position after
 248         // the line delimiter if found, removing the delimiter in the process.
 249         private static int endOfLine(StringBuilder b, String eol, boolean endOfInput) {
 250             int len = b.length();
 251             if (eol != null) { // delimiter explicitly specified
 252                 int i = b.indexOf(eol);
 253                 if (i >= 0) {
 254                     // remove the delimiter and returns the position
 255                     // of the char after it.
 256                     b.delete(i, i + eol.length());
 257                     return i;
 258                 }
 259             } else { // no delimiter specified, behaves as BufferedReader::readLine
 260                 boolean crfound = false;
 261                 for (int i = 0; i < len; i++) {
 262                     char c = b.charAt(i);
 263                     if (c == '\n') {
 264                         // '\n' or '\r\n' found.
 265                         // remove the delimiter and returns the position
 266                         // of the char after it.
 267                         b.delete(crfound ? i - 1 : i, i + 1);
 268                         return crfound ? i - 1 : i;
 269                     } else if (crfound) {
 270                         // previous char was '\r', c != '\n'
 271                         assert i != 0;
 272                         // remove the delimiter and returns the position
 273                         // of the char after it.
 274                         b.delete(i - 1, i);
 275                         return i - 1;
 276                     }
 277                     crfound = c == '\r';
 278                 }
 279                 if (crfound && endOfInput) {
 280                     // remove the delimiter and returns the position
 281                     // of the char after it.
 282                     b.delete(len - 1, len);
 283                     return len - 1;
 284                 }
 285             }
 286             return endOfInput && len > 0 ? len : -1;
 287         }
 288 
 289         // Looks at whether the StringBuilder contains a line.
 290         // Returns null if more character are needed.
 291         private static String nextLine(StringBuilder b, String eol, boolean endOfInput) {
 292             int next = endOfLine(b, eol, endOfInput);
 293             return (next > -1) ? take(b, 0, next) : null;
 294         }
 295 
 296         // Attempts to read the next line. Returns the next line if
 297         // the delimiter was found, null otherwise. The delimiters are
 298         // consumed.
 299         private String nextLine()
 300                 throws CharacterCodingException {
 301             assert nextLine == null;
 302             LINES:
 303             while (nextLine == null) {
 304                 boolean endOfInput = completed && queue.isEmpty();
 305                 nextLine = nextLine(builder, newline,
 306                         endOfInput && leftover.position() == 0);
 307                 if (nextLine != null) return nextLine;
 308                 ByteBuffer b;
 309                 BUFFERS:
 310                 while ((b = queue.peek()) != null) {
 311                     if (!b.hasRemaining()) {
 312                         queue.poll();
 313                         continue BUFFERS;
 314                     }
 315                     BYTES:
 316                     while (b.hasRemaining()) {
 317                         buffer.position(0);
 318                         buffer.limit(buffer.capacity());
 319                         boolean endofInput = completed && queue.size() <= 1;
 320                         if (isUnderFlow(b, buffer, endofInput)) {
 321                             assert !b.hasRemaining();
 322                             if (buffer.position() > 0) {
 323                                 buffer.flip();
 324                                 builder.append(buffer);
 325                             }
 326                             continue BUFFERS;
 327                         }
 328                         CoderResult res = decoder.decode(b, buffer, endofInput);
 329                         if (res.isError()) res.throwException();
 330                         if (buffer.position() > 0) {
 331                             buffer.flip();
 332                             builder.append(buffer);
 333                             continue LINES;
 334                         }
 335                         if (res.isUnderflow() && b.hasRemaining()) {
 336                             //System.out.println("underflow: adding " + b.remaining() + " bytes");
 337                             leftover.put(b);
 338                             assert !b.hasRemaining();
 339                             continue BUFFERS;
 340                         }
 341                     }
 342                 }
 343 
 344                 assert queue.isEmpty();
 345                 if (endOfInput) {
 346                     // Time to cleanup: there may be some undecoded leftover bytes
 347                     // We need to flush them out.
 348                     // The decoder has been configured to replace malformed/unmappable
 349                     // chars with some replacement, in order to behave like
 350                     // InputStreamReader.
 351                     leftover.flip();
 352                     buffer.position(0);
 353                     buffer.limit(buffer.capacity());
 354 
 355                     // decode() must be called just before flush, even if there
 356                     // is nothing to decode. We must do this even if leftover
 357                     // has no remaining bytes.
 358                     CoderResult res = decoder.decode(leftover, buffer, endOfInput);
 359                     if (buffer.position() > 0) {
 360                         buffer.flip();
 361                         builder.append(buffer);
 362                     }
 363                     if (res.isError()) res.throwException();
 364 
 365                     // Now call decoder.flush()
 366                     buffer.position(0);
 367                     buffer.limit(buffer.capacity());
 368                     res = decoder.flush(buffer);
 369                     if (buffer.position() > 0) {
 370                         buffer.flip();
 371                         builder.append(buffer);
 372                     }
 373                     if (res.isError()) res.throwException();
 374 
 375                     // It's possible that we reach here twice - just for the
 376                     // purpose of checking that no bytes were left over, so
 377                     // we reset leftover/decoder to make the function reentrant.
 378                     leftover.position(0);
 379                     leftover.limit(leftover.capacity());
 380                     decoder.reset();
 381 
 382                     // if some chars were produced then this call will
 383                     // return them.
 384                     return nextLine = nextLine(builder, newline, endOfInput);
 385                 }
 386                 return null;
 387             }
 388             return null;
 389         }
 390 
 391         // The main sequential scheduler loop.
 392         private void loop() {
 393             try {
 394                 while (!cancelled) {
 395                     Throwable error = errorRef.get();
 396                     if (error != null) {
 397                         cancelled = true;
 398                         scheduler.stop();
 399                         upstream.onError(error);
 400                         cf.completeExceptionally(error);
 401                         return;
 402                     }
 403                     if (nextLine == null) nextLine = nextLine();
 404                     if (nextLine == null) {
 405                         if (completed) {
 406                             scheduler.stop();
 407                             if (leftover.position() != 0) {
 408                                 // Underflow: not all bytes could be
 409                                 // decoded, but no more bytes will be coming.
 410                                 // This should not happen as we should already
 411                                 // have got a MalformedInputException, or
 412                                 // replaced the unmappable chars.
 413                                 errorRef.compareAndSet(null,
 414                                         new IllegalStateException(
 415                                                 "premature end of input ("
 416                                                         + leftover.position()
 417                                                         + " undecoded bytes)"));
 418                                 continue;
 419                             } else {
 420                                 upstream.onComplete();
 421                             }
 422                             return;
 423                         } else if (demanded.get() == 0
 424                                 && !downstreamDemand.isFulfilled()) {
 425                             long incr = Math.max(1, downstreamDemand.get());
 426                             demanded.addAndGet(incr);
 427                             upstreamSubscription.request(incr);
 428                             continue;
 429                         } else return;
 430                     }
 431                     assert nextLine != null;
 432                     assert newline != null && !nextLine.endsWith(newline)
 433                             || !nextLine.endsWith("\n") || !nextLine.endsWith("\r");
 434                     if (downstreamDemand.tryDecrement()) {
 435                         String forward = nextLine;
 436                         nextLine = null;
 437                         upstream.onNext(forward);
 438                     } else return; // no demand: come back later
 439                 }
 440             } catch (Throwable t) {
 441                 try {
 442                     upstreamSubscription.cancel();
 443                 } finally {
 444                     signalError(t);
 445                 }
 446             }
 447         }
 448 
 449         static LineSubscription create(Flow.Subscription s,
 450                                        Charset charset,
 451                                        String lineSeparator,
 452                                        Flow.Subscriber<? super String> upstream,
 453                                        CompletableFuture<?> cf) {
 454             return new LineSubscription(Objects.requireNonNull(s),
 455                     Objects.requireNonNull(charset).newDecoder()
 456                             // use the same decoder configuration than
 457                             // java.io.InputStreamReader
 458                             .onMalformedInput(CodingErrorAction.REPLACE)
 459                             .onUnmappableCharacter(CodingErrorAction.REPLACE),
 460                     lineSeparator,
 461                     Objects.requireNonNull(upstream),
 462                     Objects.requireNonNull(cf));
 463         }
 464     }
 465 }
 466