Improved internal client exchange handling code
This commit is contained in:
parent
389272de3a
commit
45f1a2a740
|
@ -1,98 +0,0 @@
|
|||
/*
|
||||
* ====================================================================
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
* ====================================================================
|
||||
*
|
||||
* This software consists of voluntary contributions made by many
|
||||
* individuals on behalf of the Apache Software Foundation. For more
|
||||
* information on the Apache Software Foundation, please see
|
||||
* <http://www.apache.org/>.
|
||||
*
|
||||
*/
|
||||
|
||||
package org.apache.hc.client5.http.impl.async;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hc.core5.http.EntityDetails;
|
||||
import org.apache.hc.core5.http.nio.AsyncDataProducer;
|
||||
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
|
||||
import org.apache.hc.core5.http.nio.DataStreamChannel;
|
||||
|
||||
final class InternalAsyncEntityProducer implements AsyncEntityProducer {
|
||||
|
||||
private final AsyncDataProducer dataProducer;
|
||||
private final EntityDetails entityDetails;
|
||||
|
||||
InternalAsyncEntityProducer(final AsyncDataProducer dataProducer, final EntityDetails entityDetails) {
|
||||
this.dataProducer = dataProducer;
|
||||
this.entityDetails = entityDetails;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void releaseResources() {
|
||||
dataProducer.releaseResources();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(final Exception cause) {
|
||||
dataProducer.releaseResources();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getContentLength() {
|
||||
return entityDetails.getContentLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getContentType() {
|
||||
return entityDetails.getContentType();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getContentEncoding() {
|
||||
return entityDetails.getContentEncoding();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRepeatable() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isChunked() {
|
||||
return entityDetails.isChunked();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getTrailerNames() {
|
||||
return entityDetails.getTrailerNames();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int available() {
|
||||
return dataProducer.available();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void produce(final DataStreamChannel channel) throws IOException {
|
||||
dataProducer.produce(channel);
|
||||
}
|
||||
|
||||
}
|
|
@ -29,8 +29,10 @@ package org.apache.hc.client5.http.impl.async;
|
|||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.hc.client5.http.HttpRoute;
|
||||
import org.apache.hc.client5.http.async.AsyncExecCallback;
|
||||
|
@ -54,15 +56,16 @@ import org.apache.hc.core5.http.HttpException;
|
|||
import org.apache.hc.core5.http.HttpHost;
|
||||
import org.apache.hc.core5.http.HttpRequest;
|
||||
import org.apache.hc.core5.http.HttpResponse;
|
||||
import org.apache.hc.core5.http.HttpStatus;
|
||||
import org.apache.hc.core5.http.HttpVersion;
|
||||
import org.apache.hc.core5.http.ProtocolVersion;
|
||||
import org.apache.hc.core5.http.config.Lookup;
|
||||
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
|
||||
import org.apache.hc.core5.http.nio.AsyncDataConsumer;
|
||||
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
|
||||
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
|
||||
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
|
||||
import org.apache.hc.core5.http.nio.DataStreamChannel;
|
||||
import org.apache.hc.core5.http.nio.RequestChannel;
|
||||
import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
|
||||
import org.apache.hc.core5.http.protocol.HttpContext;
|
||||
import org.apache.hc.core5.http2.HttpVersionPolicy;
|
||||
import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
|
||||
|
@ -139,70 +142,6 @@ class InternalHttpAsyncClient extends AbstractHttpAsyncClientBase {
|
|||
}
|
||||
}
|
||||
|
||||
private void executeChain(
|
||||
final String exchangeId,
|
||||
final AsyncExecChainElement execChain,
|
||||
final HttpRoute route,
|
||||
final HttpRequest request,
|
||||
final EntityDetails entityDetails,
|
||||
final AsyncClientExchangeHandler exchangeHandler,
|
||||
final HttpClientContext clientContext,
|
||||
final AsyncExecRuntime execRuntime) throws IOException, HttpException {
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug(exchangeId + ": preparing request execution");
|
||||
}
|
||||
|
||||
final ProtocolVersion protocolVersion = clientContext.getProtocolVersion();
|
||||
if (route.isTunnelled() && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) {
|
||||
throw new HttpException("HTTP/2 tunneling not supported");
|
||||
}
|
||||
|
||||
setupContext(clientContext);
|
||||
|
||||
final AsyncExecChain.Scope scope = new AsyncExecChain.Scope(exchangeId, route, request, clientContext, execRuntime);
|
||||
execChain.execute(
|
||||
RequestCopier.INSTANCE.copy(request),
|
||||
entityDetails != null ? new InternalAsyncEntityProducer(exchangeHandler, entityDetails) : null,
|
||||
scope,
|
||||
new AsyncExecCallback() {
|
||||
|
||||
@Override
|
||||
public AsyncDataConsumer handleResponse(
|
||||
final HttpResponse response,
|
||||
final EntityDetails entityDetails) throws HttpException, IOException {
|
||||
exchangeHandler.consumeResponse(response, entityDetails);
|
||||
return exchangeHandler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void completed() {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug(exchangeId + ": message exchange successfully completed");
|
||||
}
|
||||
try {
|
||||
exchangeHandler.releaseResources();
|
||||
} finally {
|
||||
execRuntime.releaseConnection();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(final Exception cause) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug(exchangeId + ": request failed: " + cause.getMessage());
|
||||
}
|
||||
try {
|
||||
exchangeHandler.failed(cause);
|
||||
exchangeHandler.releaseResources();
|
||||
} finally {
|
||||
execRuntime.discardConnection();
|
||||
}
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Future<T> execute(
|
||||
final AsyncRequestProducer requestProducer,
|
||||
|
@ -222,7 +161,101 @@ class InternalHttpAsyncClient extends AbstractHttpAsyncClientBase {
|
|||
clientContext.setRequestConfig(requestConfig);
|
||||
}
|
||||
|
||||
final AsyncClientExchangeHandler exchangeHandler = new BasicClientExchangeHandler<>(requestProducer, responseConsumer, new FutureCallback<T>() {
|
||||
requestProducer.sendRequest(new RequestChannel() {
|
||||
|
||||
@Override
|
||||
public void sendRequest(
|
||||
final HttpRequest request,
|
||||
final EntityDetails entityDetails) throws HttpException, IOException {
|
||||
|
||||
final HttpHost target = routePlanner.determineTargetHost(request, clientContext);
|
||||
final HttpRoute route = routePlanner.determineRoute(target, clientContext);
|
||||
final String exchangeId = String.format("ex-%08X", ExecSupport.getNextExecNumber());
|
||||
final AsyncExecRuntime execRuntime = new AsyncExecRuntimeImpl(log, connmgr, getConnectionInitiator(), versionPolicy);
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug(exchangeId + ": preparing request execution");
|
||||
}
|
||||
|
||||
final ProtocolVersion protocolVersion = clientContext.getProtocolVersion();
|
||||
if (route.isTunnelled() && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) {
|
||||
throw new HttpException("HTTP/2 tunneling not supported");
|
||||
}
|
||||
|
||||
setupContext(clientContext);
|
||||
|
||||
final AsyncExecChain.Scope scope = new AsyncExecChain.Scope(exchangeId, route, request, clientContext, execRuntime);
|
||||
final AtomicBoolean outputTerminated = new AtomicBoolean(false);
|
||||
execChain.execute(
|
||||
RequestCopier.INSTANCE.copy(request),
|
||||
entityDetails != null ? new AsyncEntityProducer() {
|
||||
|
||||
@Override
|
||||
public void releaseResources() {
|
||||
requestProducer.releaseResources();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(final Exception cause) {
|
||||
requestProducer.failed(cause);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRepeatable() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getContentLength() {
|
||||
return entityDetails.getContentLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getContentType() {
|
||||
return entityDetails.getContentType();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getContentEncoding() {
|
||||
return entityDetails.getContentEncoding();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isChunked() {
|
||||
return entityDetails.isChunked();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getTrailerNames() {
|
||||
return entityDetails.getTrailerNames();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int available() {
|
||||
return requestProducer.available();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void produce(final DataStreamChannel channel) throws IOException {
|
||||
if (outputTerminated.get()) {
|
||||
channel.endStream();
|
||||
return;
|
||||
}
|
||||
requestProducer.produce(channel);
|
||||
}
|
||||
|
||||
} : null,
|
||||
scope,
|
||||
new AsyncExecCallback() {
|
||||
|
||||
@Override
|
||||
public AsyncDataConsumer handleResponse(
|
||||
final HttpResponse response,
|
||||
final EntityDetails entityDetails) throws HttpException, IOException {
|
||||
if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
|
||||
outputTerminated.set(true);
|
||||
requestProducer.releaseResources();
|
||||
}
|
||||
responseConsumer.consumeResponse(response, entityDetails, new FutureCallback<T>() {
|
||||
|
||||
@Override
|
||||
public void completed(final T result) {
|
||||
|
@ -240,22 +273,44 @@ class InternalHttpAsyncClient extends AbstractHttpAsyncClientBase {
|
|||
}
|
||||
|
||||
});
|
||||
exchangeHandler.produceRequest(new RequestChannel() {
|
||||
return responseConsumer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendRequest(
|
||||
final HttpRequest request,
|
||||
final EntityDetails entityDetails) throws HttpException, IOException {
|
||||
public void completed() {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug(exchangeId + ": message exchange successfully completed");
|
||||
}
|
||||
try {
|
||||
responseConsumer.releaseResources();
|
||||
requestProducer.releaseResources();
|
||||
} finally {
|
||||
execRuntime.releaseConnection();
|
||||
}
|
||||
}
|
||||
|
||||
final HttpHost target = routePlanner.determineTargetHost(request, clientContext);
|
||||
final HttpRoute route = routePlanner.determineRoute(target, clientContext);
|
||||
final String exchangeId = String.format("ex-%08X", ExecSupport.getNextExecNumber());
|
||||
final AsyncExecRuntime execRuntime = new AsyncExecRuntimeImpl(log, connmgr, getConnectionInitiator(), versionPolicy);
|
||||
executeChain(exchangeId, execChain, route, request, entityDetails, exchangeHandler, clientContext, execRuntime);
|
||||
@Override
|
||||
public void failed(final Exception cause) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug(exchangeId + ": request failed: " + cause.getMessage());
|
||||
}
|
||||
try {
|
||||
try {
|
||||
future.failed(cause);
|
||||
responseConsumer.failed(cause);
|
||||
} finally {
|
||||
responseConsumer.releaseResources();
|
||||
requestProducer.releaseResources();
|
||||
}
|
||||
} finally {
|
||||
execRuntime.discardConnection();
|
||||
}
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
});
|
||||
} catch (final HttpException | IOException ex) {
|
||||
future.failed(ex);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue