From 45f1a2a740250882bb2bc9912146dae763c92946 Mon Sep 17 00:00:00 2001 From: Oleg Kalnichevski Date: Sun, 5 Nov 2017 13:42:26 +0100 Subject: [PATCH] Improved internal client exchange handling code --- .../async/InternalAsyncEntityProducer.java | 98 -------- .../impl/async/InternalHttpAsyncClient.java | 229 +++++++++++------- 2 files changed, 142 insertions(+), 185 deletions(-) delete mode 100644 httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAsyncEntityProducer.java diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAsyncEntityProducer.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAsyncEntityProducer.java deleted file mode 100644 index 4c067f26f..000000000 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAsyncEntityProducer.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * ==================================================================== - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * ==================================================================== - * - * This software consists of voluntary contributions made by many - * individuals on behalf of the Apache Software Foundation. For more - * information on the Apache Software Foundation, please see - * . - * - */ - -package org.apache.hc.client5.http.impl.async; - -import java.io.IOException; -import java.util.Set; - -import org.apache.hc.core5.http.EntityDetails; -import org.apache.hc.core5.http.nio.AsyncDataProducer; -import org.apache.hc.core5.http.nio.AsyncEntityProducer; -import org.apache.hc.core5.http.nio.DataStreamChannel; - -final class InternalAsyncEntityProducer implements AsyncEntityProducer { - - private final AsyncDataProducer dataProducer; - private final EntityDetails entityDetails; - - InternalAsyncEntityProducer(final AsyncDataProducer dataProducer, final EntityDetails entityDetails) { - this.dataProducer = dataProducer; - this.entityDetails = entityDetails; - } - - @Override - public void releaseResources() { - dataProducer.releaseResources(); - } - - @Override - public void failed(final Exception cause) { - dataProducer.releaseResources(); - } - - @Override - public long getContentLength() { - return entityDetails.getContentLength(); - } - - @Override - public String getContentType() { - return entityDetails.getContentType(); - } - - @Override - public String getContentEncoding() { - return entityDetails.getContentEncoding(); - } - - @Override - public boolean isRepeatable() { - return false; - } - - @Override - public boolean isChunked() { - return entityDetails.isChunked(); - } - - @Override - public Set getTrailerNames() { - return entityDetails.getTrailerNames(); - } - - @Override - public int available() { - return dataProducer.available(); - } - - @Override - public void produce(final DataStreamChannel channel) throws IOException { - dataProducer.produce(channel); - } - -} \ No newline at end of file diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java index 431111ca2..02b4d9e26 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java @@ -29,8 +29,10 @@ package org.apache.hc.client5.http.impl.async; import java.io.Closeable; import java.io.IOException; import java.util.List; +import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hc.client5.http.HttpRoute; import org.apache.hc.client5.http.async.AsyncExecCallback; @@ -54,15 +56,16 @@ import org.apache.hc.core5.http.HttpException; import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.HttpRequest; import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.HttpStatus; import org.apache.hc.core5.http.HttpVersion; import org.apache.hc.core5.http.ProtocolVersion; import org.apache.hc.core5.http.config.Lookup; -import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; import org.apache.hc.core5.http.nio.AsyncDataConsumer; +import org.apache.hc.core5.http.nio.AsyncEntityProducer; import org.apache.hc.core5.http.nio.AsyncRequestProducer; import org.apache.hc.core5.http.nio.AsyncResponseConsumer; +import org.apache.hc.core5.http.nio.DataStreamChannel; import org.apache.hc.core5.http.nio.RequestChannel; -import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler; import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.http2.HttpVersionPolicy; import org.apache.hc.core5.reactor.DefaultConnectingIOReactor; @@ -139,70 +142,6 @@ class InternalHttpAsyncClient extends AbstractHttpAsyncClientBase { } } - private void executeChain( - final String exchangeId, - final AsyncExecChainElement execChain, - final HttpRoute route, - final HttpRequest request, - final EntityDetails entityDetails, - final AsyncClientExchangeHandler exchangeHandler, - final HttpClientContext clientContext, - final AsyncExecRuntime execRuntime) throws IOException, HttpException { - - if (log.isDebugEnabled()) { - log.debug(exchangeId + ": preparing request execution"); - } - - final ProtocolVersion protocolVersion = clientContext.getProtocolVersion(); - if (route.isTunnelled() && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) { - throw new HttpException("HTTP/2 tunneling not supported"); - } - - setupContext(clientContext); - - final AsyncExecChain.Scope scope = new AsyncExecChain.Scope(exchangeId, route, request, clientContext, execRuntime); - execChain.execute( - RequestCopier.INSTANCE.copy(request), - entityDetails != null ? new InternalAsyncEntityProducer(exchangeHandler, entityDetails) : null, - scope, - new AsyncExecCallback() { - - @Override - public AsyncDataConsumer handleResponse( - final HttpResponse response, - final EntityDetails entityDetails) throws HttpException, IOException { - exchangeHandler.consumeResponse(response, entityDetails); - return exchangeHandler; - } - - @Override - public void completed() { - if (log.isDebugEnabled()) { - log.debug(exchangeId + ": message exchange successfully completed"); - } - try { - exchangeHandler.releaseResources(); - } finally { - execRuntime.releaseConnection(); - } - } - - @Override - public void failed(final Exception cause) { - if (log.isDebugEnabled()) { - log.debug(exchangeId + ": request failed: " + cause.getMessage()); - } - try { - exchangeHandler.failed(cause); - exchangeHandler.releaseResources(); - } finally { - execRuntime.discardConnection(); - } - } - - }); - } - @Override public Future execute( final AsyncRequestProducer requestProducer, @@ -222,25 +161,7 @@ class InternalHttpAsyncClient extends AbstractHttpAsyncClientBase { clientContext.setRequestConfig(requestConfig); } - final AsyncClientExchangeHandler exchangeHandler = new BasicClientExchangeHandler<>(requestProducer, responseConsumer, new FutureCallback() { - - @Override - public void completed(final T result) { - future.completed(result); - } - - @Override - public void failed(final Exception ex) { - future.failed(ex); - } - - @Override - public void cancelled() { - future.cancel(); - } - - }); - exchangeHandler.produceRequest(new RequestChannel() { + requestProducer.sendRequest(new RequestChannel() { @Override public void sendRequest( @@ -251,11 +172,145 @@ class InternalHttpAsyncClient extends AbstractHttpAsyncClientBase { final HttpRoute route = routePlanner.determineRoute(target, clientContext); final String exchangeId = String.format("ex-%08X", ExecSupport.getNextExecNumber()); final AsyncExecRuntime execRuntime = new AsyncExecRuntimeImpl(log, connmgr, getConnectionInitiator(), versionPolicy); - executeChain(exchangeId, execChain, route, request, entityDetails, exchangeHandler, clientContext, execRuntime); + if (log.isDebugEnabled()) { + log.debug(exchangeId + ": preparing request execution"); + } + + final ProtocolVersion protocolVersion = clientContext.getProtocolVersion(); + if (route.isTunnelled() && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) { + throw new HttpException("HTTP/2 tunneling not supported"); + } + + setupContext(clientContext); + + final AsyncExecChain.Scope scope = new AsyncExecChain.Scope(exchangeId, route, request, clientContext, execRuntime); + final AtomicBoolean outputTerminated = new AtomicBoolean(false); + execChain.execute( + RequestCopier.INSTANCE.copy(request), + entityDetails != null ? new AsyncEntityProducer() { + + @Override + public void releaseResources() { + requestProducer.releaseResources(); + } + + @Override + public void failed(final Exception cause) { + requestProducer.failed(cause); + } + + @Override + public boolean isRepeatable() { + return false; + } + + @Override + public long getContentLength() { + return entityDetails.getContentLength(); + } + + @Override + public String getContentType() { + return entityDetails.getContentType(); + } + + @Override + public String getContentEncoding() { + return entityDetails.getContentEncoding(); + } + + @Override + public boolean isChunked() { + return entityDetails.isChunked(); + } + + @Override + public Set getTrailerNames() { + return entityDetails.getTrailerNames(); + } + + @Override + public int available() { + return requestProducer.available(); + } + + @Override + public void produce(final DataStreamChannel channel) throws IOException { + if (outputTerminated.get()) { + channel.endStream(); + return; + } + requestProducer.produce(channel); + } + + } : null, + scope, + new AsyncExecCallback() { + + @Override + public AsyncDataConsumer handleResponse( + final HttpResponse response, + final EntityDetails entityDetails) throws HttpException, IOException { + if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) { + outputTerminated.set(true); + requestProducer.releaseResources(); + } + responseConsumer.consumeResponse(response, entityDetails, new FutureCallback() { + + @Override + public void completed(final T result) { + future.completed(result); + } + + @Override + public void failed(final Exception ex) { + future.failed(ex); + } + + @Override + public void cancelled() { + future.cancel(); + } + + }); + return responseConsumer; + } + + @Override + public void completed() { + if (log.isDebugEnabled()) { + log.debug(exchangeId + ": message exchange successfully completed"); + } + try { + responseConsumer.releaseResources(); + requestProducer.releaseResources(); + } finally { + execRuntime.releaseConnection(); + } + } + + @Override + public void failed(final Exception cause) { + if (log.isDebugEnabled()) { + log.debug(exchangeId + ": request failed: " + cause.getMessage()); + } + try { + try { + future.failed(cause); + responseConsumer.failed(cause); + } finally { + responseConsumer.releaseResources(); + requestProducer.releaseResources(); + } + } finally { + execRuntime.discardConnection(); + } + } + + }); } }); - } catch (final HttpException | IOException ex) { future.failed(ex); }