< prev index next >

src/java.net.http/share/classes/jdk/internal/net/http/LineSubscriberAdapter.java

Print this page

        

*** 1,7 **** /* ! * Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this --- 1,7 ---- /* ! * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this
*** 38,48 **** import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.Flow; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; - import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import jdk.internal.net.http.common.Demand; import java.net.http.HttpResponse.BodySubscriber; --- 38,47 ----
*** 55,65 **** private final CompletableFuture<R> cf = new MinimalFuture<>(); private final S subscriber; private final Function<? super S, ? extends R> finisher; private final Charset charset; private final String eol; - private final AtomicBoolean subscribed = new AtomicBoolean(); private volatile LineSubscription downstream; private LineSubscriberAdapter(S subscriber, Function<? super S, ? extends R> finisher, Charset charset, --- 54,63 ----
*** 72,108 **** this.eol = eol; } @Override public void onSubscribe(Subscription subscription) { - Objects.requireNonNull(subscription); - if (!subscribed.compareAndSet(false, true)) { - subscription.cancel(); - return; - } - downstream = LineSubscription.create(subscription, charset, eol, subscriber, cf); subscriber.onSubscribe(downstream); } @Override public void onNext(List<ByteBuffer> item) { - Objects.requireNonNull(item); try { downstream.submit(item); } catch (Throwable t) { onError(t); } } @Override public void onError(Throwable throwable) { - Objects.requireNonNull(throwable); try { downstream.signalError(throwable); } finally { cf.completeExceptionally(throwable); } --- 70,98 ----
< prev index next >