1 /*
   2  * Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.http;
  27 
  28 import java.io.File;
  29 import java.io.FileInputStream;
  30 import java.io.FileNotFoundException;
  31 import java.io.FilePermission;
  32 import java.io.IOException;
  33 import java.io.InputStream;
  34 import java.io.UncheckedIOException;
  35 import java.nio.ByteBuffer;
  36 import java.nio.charset.Charset;
  37 import java.nio.file.Files;
  38 import java.nio.file.Path;
  39 import java.security.AccessControlContext;
  40 import java.security.AccessController;
  41 import java.security.PrivilegedAction;
  42 import java.security.PrivilegedActionException;
  43 import java.security.PrivilegedExceptionAction;
  44 import java.util.ArrayList;
  45 import java.util.Collections;
  46 import java.util.Iterator;
  47 import java.util.List;
  48 import java.util.NoSuchElementException;
  49 import java.util.Objects;
  50 import java.util.concurrent.ConcurrentLinkedQueue;
  51 import java.util.concurrent.Flow;
  52 import java.util.concurrent.Flow.Publisher;
  53 import java.util.function.Supplier;
  54 import java.net.http.HttpRequest.BodyPublisher;
  55 import jdk.internal.net.http.common.Utils;
  56 
  57 public final class RequestPublishers {
  58 
  59     private RequestPublishers() { }
  60 
  61     public static class ByteArrayPublisher implements BodyPublisher {
  62         private volatile Flow.Publisher<ByteBuffer> delegate;
  63         private final int length;
  64         private final byte[] content;
  65         private final int offset;
  66         private final int bufSize;
  67 
  68         public ByteArrayPublisher(byte[] content) {
  69             this(content, 0, content.length);
  70         }
  71 
  72         public ByteArrayPublisher(byte[] content, int offset, int length) {
  73             this(content, offset, length, Utils.BUFSIZE);
  74         }
  75 
  76         /* bufSize exposed for testing purposes */
  77         ByteArrayPublisher(byte[] content, int offset, int length, int bufSize) {
  78             this.content = content;
  79             this.offset = offset;
  80             this.length = length;
  81             this.bufSize = bufSize;
  82         }
  83 
  84         List<ByteBuffer> copy(byte[] content, int offset, int length) {
  85             List<ByteBuffer> bufs = new ArrayList<>();
  86             while (length > 0) {
  87                 ByteBuffer b = ByteBuffer.allocate(Math.min(bufSize, length));
  88                 int max = b.capacity();
  89                 int tocopy = Math.min(max, length);
  90                 b.put(content, offset, tocopy);
  91                 offset += tocopy;
  92                 length -= tocopy;
  93                 b.flip();
  94                 bufs.add(b);
  95             }
  96             return bufs;
  97         }
  98 
  99         @Override
 100         public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
 101             List<ByteBuffer> copy = copy(content, offset, length);
 102             this.delegate = new PullPublisher<>(copy);
 103             delegate.subscribe(subscriber);
 104         }
 105 
 106         @Override
 107         public long contentLength() {
 108             return length;
 109         }
 110     }
 111 
 112     // This implementation has lots of room for improvement.
 113     public static class IterablePublisher implements BodyPublisher {
 114         private volatile Flow.Publisher<ByteBuffer> delegate;
 115         private final Iterable<byte[]> content;
 116         private volatile long contentLength;
 117 
 118         public IterablePublisher(Iterable<byte[]> content) {
 119             this.content = Objects.requireNonNull(content);
 120         }
 121 
 122         // The ByteBufferIterator will iterate over the byte[] arrays in
 123         // the content one at the time.
 124         //
 125         class ByteBufferIterator implements Iterator<ByteBuffer> {
 126             final ConcurrentLinkedQueue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>();
 127             final Iterator<byte[]> iterator = content.iterator();
 128             @Override
 129             public boolean hasNext() {
 130                 return !buffers.isEmpty() || iterator.hasNext();
 131             }
 132 
 133             @Override
 134             public ByteBuffer next() {
 135                 ByteBuffer buffer = buffers.poll();
 136                 while (buffer == null) {
 137                     copy();
 138                     buffer = buffers.poll();
 139                 }
 140                 return buffer;
 141             }
 142 
 143             ByteBuffer getBuffer() {
 144                 return Utils.getBuffer();
 145             }
 146 
 147             void copy() {
 148                 byte[] bytes = iterator.next();
 149                 int length = bytes.length;
 150                 if (length == 0 && iterator.hasNext()) {
 151                     // avoid inserting empty buffers, except
 152                     // if that's the last.
 153                     return;
 154                 }
 155                 int offset = 0;
 156                 do {
 157                     ByteBuffer b = getBuffer();
 158                     int max = b.capacity();
 159 
 160                     int tocopy = Math.min(max, length);
 161                     b.put(bytes, offset, tocopy);
 162                     offset += tocopy;
 163                     length -= tocopy;
 164                     b.flip();
 165                     buffers.add(b);
 166                 } while (length > 0);
 167             }
 168         }
 169 
 170         public Iterator<ByteBuffer> iterator() {
 171             return new ByteBufferIterator();
 172         }
 173 
 174         @Override
 175         public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
 176             Iterable<ByteBuffer> iterable = this::iterator;
 177             this.delegate = new PullPublisher<>(iterable);
 178             delegate.subscribe(subscriber);
 179         }
 180 
 181         static long computeLength(Iterable<byte[]> bytes) {
 182             long len = 0;
 183             for (byte[] b : bytes) {
 184                 len = Math.addExact(len, (long)b.length);
 185             }
 186             return len;
 187         }
 188 
 189         @Override
 190         public long contentLength() {
 191             if (contentLength == 0) {
 192                 synchronized(this) {
 193                     if (contentLength == 0) {
 194                         contentLength = computeLength(content);
 195                     }
 196                 }
 197             }
 198             return contentLength;
 199         }
 200     }
 201 
 202     public static class StringPublisher extends ByteArrayPublisher {
 203         public StringPublisher(String content, Charset charset) {
 204             super(content.getBytes(charset));
 205         }
 206     }
 207 
 208     public static class EmptyPublisher implements BodyPublisher {
 209         private final Flow.Publisher<ByteBuffer> delegate =
 210                 new PullPublisher<ByteBuffer>(Collections.emptyList(), null);
 211 
 212         @Override
 213         public long contentLength() {
 214             return 0;
 215         }
 216 
 217         @Override
 218         public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
 219             delegate.subscribe(subscriber);
 220         }
 221     }
 222 
 223     /**
 224      * Publishes the content of a given file.
 225      *
 226      * Privileged actions are performed within a limited doPrivileged that only
 227      * asserts the specific, read, file permission that was checked during the
 228      * construction of this FilePublisher.
 229      */
 230     public static class FilePublisher implements BodyPublisher  {
 231 
 232         private static final FilePermission[] EMPTY_FILE_PERMISSIONS = new FilePermission[0];
 233 
 234         private final File file;
 235         private final FilePermission[] filePermissions;
 236 
 237         private static String pathForSecurityCheck(Path path) {
 238             return path.toFile().getPath();
 239         }
 240 
 241         /**
 242          * Factory for creating FilePublisher.
 243          *
 244          * Permission checks are performed here before construction of the
 245          * FilePublisher. Permission checking and construction are deliberately
 246          * and tightly co-located.
 247          */
 248         public static FilePublisher create(Path path) throws FileNotFoundException {
 249             FilePermission filePermission = null;
 250             SecurityManager sm = System.getSecurityManager();
 251             if (sm != null) {
 252                 String fn = pathForSecurityCheck(path);
 253                 FilePermission readPermission = new FilePermission(fn, "read");
 254                 sm.checkPermission(readPermission);
 255                 filePermission = readPermission;
 256             }
 257 
 258             // existence check must be after permission checks
 259             if (Files.notExists(path))
 260                 throw new FileNotFoundException(path + " not found");
 261 
 262             return new FilePublisher(path, filePermission);
 263         }
 264 
 265         private FilePublisher(Path name, FilePermission filePermission) {
 266             assert filePermission != null ? filePermission.getActions().equals("read") : true;
 267             file = name.toFile();
 268             this.filePermissions = filePermission == null ? EMPTY_FILE_PERMISSIONS
 269                     : new FilePermission[] { filePermission };
 270         }
 271 
 272         @Override
 273         public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
 274             InputStream is;
 275             if (System.getSecurityManager() == null) {
 276                 try {
 277                     is = new FileInputStream(file);
 278                 } catch (IOException ioe) {
 279                     throw new UncheckedIOException(ioe);
 280                 }
 281             } else {
 282                 try {
 283                     PrivilegedExceptionAction<FileInputStream> pa =
 284                             () -> new FileInputStream(file);
 285                     is = AccessController.doPrivileged(pa, null, filePermissions);
 286                 } catch (PrivilegedActionException pae) {
 287                     throw new UncheckedIOException((IOException) pae.getCause());
 288                 }
 289             }
 290             PullPublisher<ByteBuffer> publisher =
 291                     new PullPublisher<>(() -> new StreamIterator(is));
 292             publisher.subscribe(subscriber);
 293         }
 294 
 295         @Override
 296         public long contentLength() {
 297             if (System.getSecurityManager() == null) {
 298                 return file.length();
 299             } else {
 300                 PrivilegedAction<Long> pa = () -> file.length();
 301                 return AccessController.doPrivileged(pa, null, filePermissions);
 302             }
 303         }
 304     }
 305 
 306     /**
 307      * Reads one buffer ahead all the time, blocking in hasNext()
 308      */
 309     public static class StreamIterator implements Iterator<ByteBuffer> {
 310         final InputStream is;
 311         final Supplier<? extends ByteBuffer> bufSupplier;
 312         volatile ByteBuffer nextBuffer;
 313         volatile boolean need2Read = true;
 314         volatile boolean haveNext;
 315 
 316         StreamIterator(InputStream is) {
 317             this(is, Utils::getBuffer);
 318         }
 319 
 320         StreamIterator(InputStream is, Supplier<? extends ByteBuffer> bufSupplier) {
 321             this.is = is;
 322             this.bufSupplier = bufSupplier;
 323         }
 324 
 325 //        Throwable error() {
 326 //            return error;
 327 //        }
 328 
 329         private int read() {
 330             nextBuffer = bufSupplier.get();
 331             nextBuffer.clear();
 332             byte[] buf = nextBuffer.array();
 333             int offset = nextBuffer.arrayOffset();
 334             int cap = nextBuffer.capacity();
 335             try {
 336                 int n = is.read(buf, offset, cap);
 337                 if (n == -1) {
 338                     is.close();
 339                     return -1;
 340                 }
 341                 //flip
 342                 nextBuffer.limit(n);
 343                 nextBuffer.position(0);
 344                 return n;
 345             } catch (IOException ex) {
 346                 return -1;
 347             }
 348         }
 349 
 350         @Override
 351         public synchronized boolean hasNext() {
 352             if (need2Read) {
 353                 haveNext = read() != -1;
 354                 if (haveNext) {
 355                     need2Read = false;
 356                 }
 357                 return haveNext;
 358             }
 359             return haveNext;
 360         }
 361 
 362         @Override
 363         public synchronized ByteBuffer next() {
 364             if (!hasNext()) {
 365                 throw new NoSuchElementException();
 366             }
 367             need2Read = true;
 368             return nextBuffer;
 369         }
 370 
 371     }
 372 
 373     public static class InputStreamPublisher implements BodyPublisher {
 374         private final Supplier<? extends InputStream> streamSupplier;
 375 
 376         public InputStreamPublisher(Supplier<? extends InputStream> streamSupplier) {
 377             this.streamSupplier = Objects.requireNonNull(streamSupplier);
 378         }
 379 
 380         @Override
 381         public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
 382             PullPublisher<ByteBuffer> publisher;
 383             InputStream is = streamSupplier.get();
 384             if (is == null) {
 385                 Throwable t = new IOException("streamSupplier returned null");
 386                 publisher = new PullPublisher<>(null, t);
 387             } else  {
 388                 publisher = new PullPublisher<>(iterableOf(is), null);
 389             }
 390             publisher.subscribe(subscriber);
 391         }
 392 
 393         protected Iterable<ByteBuffer> iterableOf(InputStream is) {
 394             return () -> new StreamIterator(is);
 395         }
 396 
 397         @Override
 398         public long contentLength() {
 399             return -1;
 400         }
 401     }
 402 
 403     public static final class PublisherAdapter implements BodyPublisher {
 404 
 405         private final Publisher<? extends ByteBuffer> publisher;
 406         private final long contentLength;
 407 
 408         public PublisherAdapter(Publisher<? extends ByteBuffer> publisher,
 409                          long contentLength) {
 410             this.publisher = Objects.requireNonNull(publisher);
 411             this.contentLength = contentLength;
 412         }
 413 
 414         @Override
 415         public final long contentLength() {
 416             return contentLength;
 417         }
 418 
 419         @Override
 420         public final void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
 421             publisher.subscribe(subscriber);
 422         }
 423     }
 424 }