New non-blocking HttpAsyncClient APIs

git-svn-id: https://svn.apache.org/repos/asf/httpcomponents/httpclient/trunk@1784140 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Oleg Kalnichevski 2017-02-23 14:33:47 +00:00
parent 113d40ef39
commit d1c104a89e
37 changed files with 4840 additions and 11 deletions

View File

@ -140,11 +140,20 @@ SSLSessionCacheTimeout 300
# General setup for the virtual host
Protocols http/1.1
Protocols h2 http/1.1
<IfModule http2_module>
LogLevel http2:info
H2Push on
H2Direct on
</IfModule>
ServerAdmin dev@hc.apache.org
ServerName localhost:8443
ErrorLog "/usr/local/apache2/logs/error_log"
TransferLog "/usr/local/apache2/logs/access_log"
ErrorLog /proc/self/fd/2
TransferLog /proc/self/fd/2
DocumentRoot "/var/httpd/www"
<Directory "/var/httpd/www">

View File

@ -0,0 +1,198 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.testing.external;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.net.ssl.SSLContext;
import org.apache.hc.client5.http.async.AsyncClientEndpoint;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer;
import org.apache.hc.client5.http.auth.AuthScope;
import org.apache.hc.client5.http.auth.Credentials;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.impl.sync.BasicCredentialsProvider;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.client5.http.ssl.SSLUpgradeStrategy;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.HttpVersion;
import org.apache.hc.core5.http2.config.H2Config;
import org.apache.hc.core5.ssl.SSLContexts;
import org.apache.hc.core5.util.TextUtils;
public class HttpAsyncClientCompatibilityTest {
public static void main(final String... args) throws Exception {
final HttpAsyncClientCompatibilityTest[] tests = new HttpAsyncClientCompatibilityTest[] {
new HttpAsyncClientCompatibilityTest(
HttpVersion.HTTP_1_1, new HttpHost("localhost", 8080, "http"), null, null),
new HttpAsyncClientCompatibilityTest(
HttpVersion.HTTP_2_0, new HttpHost("localhost", 8080, "http"), null, null),
new HttpAsyncClientCompatibilityTest(
HttpVersion.HTTP_1_1, new HttpHost("localhost", 8443, "https"), null, null),
new HttpAsyncClientCompatibilityTest(
HttpVersion.HTTP_2_0, new HttpHost("localhost", 8443, "https"), null, null)
};
for (final HttpAsyncClientCompatibilityTest test: tests) {
try {
test.execute();
} finally {
test.shutdown();
}
}
}
private final HttpVersion protocolVersion;
private final HttpHost target;
private final HttpHost proxy;
private final BasicCredentialsProvider credentialsProvider;
private final PoolingAsyncClientConnectionManager connManager;
private final CloseableHttpAsyncClient client;
HttpAsyncClientCompatibilityTest(
final HttpVersion protocolVersion,
final HttpHost target,
final HttpHost proxy,
final Credentials proxyCreds) throws Exception {
this.protocolVersion = protocolVersion;
this.target = target;
this.proxy = proxy;
this.credentialsProvider = new BasicCredentialsProvider();
final RequestConfig requestConfig = RequestConfig.custom()
.setProxy(proxy)
.build();
if (proxy != null && proxyCreds != null) {
this.credentialsProvider.setCredentials(new AuthScope(proxy), proxyCreds);
}
final SSLContext sslContext = SSLContexts.custom()
.loadTrustMaterial(getClass().getResource("/test-ca.keystore"), "nopassword".toCharArray()).build();
this.connManager = PoolingAsyncClientConnectionManagerBuilder.create()
.setTlsStrategy(new SSLUpgradeStrategy(sslContext))
.build();
this.client = HttpAsyncClients.custom()
.setProtocolVersion(this.protocolVersion)
.setH2Config(H2Config.custom()
.setSettingAckNeeded(false)
.build())
.setConnectionManager(this.connManager)
.setDefaultRequestConfig(requestConfig)
.build();
}
void shutdown() throws Exception {
client.close();
}
enum TestResult {OK, NOK};
private void logResult(final TestResult result, final HttpRequest request, final String message) {
final StringBuilder buf = new StringBuilder();
buf.append(result);
if (buf.length() == 2) {
buf.append(" ");
}
buf.append(": ").append(protocolVersion).append(" ").append(target);
if (proxy != null) {
buf.append(" via ").append(proxy);
}
buf.append(": ");
buf.append(request.getMethod()).append(" ").append(request.getRequestUri());
if (message != null && !TextUtils.isBlank(message)) {
buf.append(" -> ").append(message);
}
System.out.println(buf.toString());
}
void execute() throws Exception {
client.start();
// Initial ping
{
final HttpClientContext context = HttpClientContext.create();
context.setCredentialsProvider(credentialsProvider);
final SimpleHttpRequest options = new SimpleHttpRequest("OPTIONS", target, "*", null, null);
final Future<SimpleHttpResponse> future = client.execute(new SimpleRequestProducer(options), new SimpleResponseConsumer(), null);
try {
final SimpleHttpResponse response = future.get(5, TimeUnit.SECONDS);
final int code = response.getCode();
if (code == HttpStatus.SC_OK) {
logResult(TestResult.OK, options, Objects.toString(response.getFirstHeader("server")));
} else {
logResult(TestResult.NOK, options, "(status " + code + ")");
}
} catch (ExecutionException ex) {
final Throwable cause = ex.getCause();
logResult(TestResult.NOK, options, "(" + cause.getMessage() + ")");
} catch (TimeoutException ex) {
logResult(TestResult.NOK, options, "(time out)");
}
}
// Basic GET requests
{
final HttpClientContext context = HttpClientContext.create();
context.setCredentialsProvider(credentialsProvider);
final Future<AsyncClientEndpoint> leaseFuture = client.lease(target, null);
final AsyncClientEndpoint clientEndpoint = leaseFuture.get(20, TimeUnit.SECONDS);
final String[] requestUris = new String[] {"/", "/news.html", "/status.html"};
for (String requestUri: requestUris) {
final SimpleHttpRequest httpGet = new SimpleHttpRequest("GET", target, requestUri, null, null);
final Future<SimpleHttpResponse> future = clientEndpoint.execute(new SimpleRequestProducer(httpGet), new SimpleResponseConsumer(), null);
try {
final SimpleHttpResponse response = future.get(5, TimeUnit.SECONDS);
final int code = response.getCode();
if (code == HttpStatus.SC_OK) {
logResult(TestResult.OK, httpGet, "200");
} else {
logResult(TestResult.NOK, httpGet, "(status " + code + ")");
}
} catch (ExecutionException ex) {
final Throwable cause = ex.getCause();
logResult(TestResult.NOK, httpGet, "(" + cause.getMessage() + ")");
} catch (TimeoutException ex) {
logResult(TestResult.NOK, httpGet, "(time out)");
}
}
}
}
}

View File

@ -25,7 +25,10 @@
<Root level="warn">
<AppenderRef ref="STDOUT"/>
</Root>
<Logger name="org.apache.hc.client5.http.headers" level="debug"/>
<Logger name="org.apache.hc.core5.reactor" level="warn"/>
<Logger name="org.apache.hc.client5.http" level="warn"/>
<Logger name="org.apache.hc.client5.http.wire" level="error"/>
<Logger name="org.apache.hc.client5.http2" level="warn"/>
<Logger name="org.apache.hc.client5.http2.frame.payload" level="error"/>
</Loggers>
</Configuration>

View File

@ -0,0 +1,124 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.examples;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.reactor.IOReactorConfig;
/**
* Example demonstrating how to evict expired and idle connections
* from the connection pool.
*/
public class AsyncClientConnectionEviction {
public static void main(String[] args) throws Exception {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setConnectTimeout(5000)
.setSoTimeout(5000)
.build();
CloseableHttpAsyncClient client = HttpAsyncClients.custom()
.setIOReactorConfig(ioReactorConfig)
.evictExpiredConnections()
.evictIdleConnections(10, TimeUnit.SECONDS)
.build();
client.start();
HttpHost target = new HttpHost("httpbin.org");
final SimpleHttpRequest request = new SimpleHttpRequest("GET", target, "/", null, null);
final Future<SimpleHttpResponse> future1 = client.execute(
new SimpleRequestProducer(request),
new SimpleResponseConsumer(),
new FutureCallback<SimpleHttpResponse>() {
@Override
public void completed(final SimpleHttpResponse response) {
System.out.println(request.getRequestUri() + "->" + response.getCode());
System.out.println(response.getBody());
}
@Override
public void failed(final Exception ex) {
System.out.println(request.getRequestUri() + "->" + ex);
}
@Override
public void cancelled() {
System.out.println(request.getRequestUri() + " cancelled");
}
});
future1.get();
Thread.sleep(TimeUnit.SECONDS.toMillis(30));
// Previous connection should get evicted from the pool by now
final Future<SimpleHttpResponse> future2 = client.execute(
new SimpleRequestProducer(request),
new SimpleResponseConsumer(),
new FutureCallback<SimpleHttpResponse>() {
@Override
public void completed(final SimpleHttpResponse response) {
System.out.println(request.getRequestUri() + "->" + response.getCode());
System.out.println(response.getBody());
}
@Override
public void failed(final Exception ex) {
System.out.println(request.getRequestUri() + "->" + ex);
}
@Override
public void cancelled() {
System.out.println(request.getRequestUri() + " cancelled");
}
});
future2.get();
System.out.println("Shutting down");
client.shutdown(5, TimeUnit.SECONDS);
}
}

View File

@ -0,0 +1,108 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.examples;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hc.client5.http.async.AsyncClientEndpoint;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpVersion;
import org.apache.hc.core5.reactor.IOReactorConfig;
/**
* This example demonstrates pipelined execution of multiple HTTP/1.1 message exchanges.
*/
public class AsyncClientHttp1Pipelining {
public static void main(String[] args) throws Exception {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setConnectTimeout(5000)
.setSoTimeout(5000)
.build();
CloseableHttpAsyncClient client = HttpAsyncClients.custom()
.setProtocolVersion(HttpVersion.HTTP_1_1)
.setIOReactorConfig(ioReactorConfig)
.build();
client.start();
HttpHost target = new HttpHost("httpbin.org");
Future<AsyncClientEndpoint> leaseFuture = client.lease(target, null);
AsyncClientEndpoint endpoint = leaseFuture.get(30, TimeUnit.SECONDS);
try {
String[] requestUris = new String[] {"/", "/ip", "/user-agent", "/headers"};
final CountDownLatch latch = new CountDownLatch(requestUris.length);
for (final String requestUri: requestUris) {
SimpleHttpRequest request = new SimpleHttpRequest("GET", target, requestUri, null, null);
endpoint.execute(
new SimpleRequestProducer(request),
new SimpleResponseConsumer(),
new FutureCallback<SimpleHttpResponse>() {
@Override
public void completed(final SimpleHttpResponse response) {
latch.countDown();
System.out.println(requestUri + "->" + response.getCode());
System.out.println(response.getBody());
}
@Override
public void failed(final Exception ex) {
latch.countDown();
System.out.println(requestUri + "->" + ex);
}
@Override
public void cancelled() {
latch.countDown();
System.out.println(requestUri + " cancelled");
}
});
}
latch.await();
} finally {
endpoint.releaseAndReuse();
}
System.out.println("Shutting down");
client.shutdown(5, TimeUnit.SECONDS);
}
}

View File

@ -0,0 +1,109 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.examples;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hc.client5.http.async.AsyncClientEndpoint;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpVersion;
import org.apache.hc.core5.reactor.IOReactorConfig;
/**
* This example demonstrates concurrent (multiplexed) execution of multiple
* HTTP/2 message exchanges.
*/
public class AsyncClientHttp2Multiplexing {
public static void main(String[] args) throws Exception {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setConnectTimeout(5000)
.setSoTimeout(5000)
.build();
CloseableHttpAsyncClient client = HttpAsyncClients.custom()
.setProtocolVersion(HttpVersion.HTTP_1_1)
.setIOReactorConfig(ioReactorConfig)
.build();
client.start();
HttpHost target = new HttpHost("http2bin.org");
Future<AsyncClientEndpoint> leaseFuture = client.lease(target, null);
AsyncClientEndpoint endpoint = leaseFuture.get(30, TimeUnit.SECONDS);
try {
String[] requestUris = new String[] {"/", "/ip", "/user-agent", "/headers"};
final CountDownLatch latch = new CountDownLatch(requestUris.length);
for (final String requestUri: requestUris) {
SimpleHttpRequest request = new SimpleHttpRequest("GET", target, requestUri, null, null);
endpoint.execute(
new SimpleRequestProducer(request),
new SimpleResponseConsumer(),
new FutureCallback<SimpleHttpResponse>() {
@Override
public void completed(final SimpleHttpResponse response) {
latch.countDown();
System.out.println(requestUri + "->" + response.getCode());
System.out.println(response.getBody());
}
@Override
public void failed(final Exception ex) {
latch.countDown();
System.out.println(requestUri + "->" + ex);
}
@Override
public void cancelled() {
latch.countDown();
System.out.println(requestUri + " cancelled");
}
});
}
latch.await();
} finally {
endpoint.releaseAndReuse();
}
System.out.println("Shutting down");
client.shutdown(5, TimeUnit.SECONDS);
}
}

View File

@ -0,0 +1,94 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.examples;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.reactor.IOReactorConfig;
/**
* Example of asynchronous HTTP/1.1 request execution.
*/
public class AsyncClientHttpExchange {
public static void main(String[] args) throws Exception {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setConnectTimeout(5000)
.setSoTimeout(5000)
.build();
CloseableHttpAsyncClient client = HttpAsyncClients.custom()
.setIOReactorConfig(ioReactorConfig)
.build();
client.start();
HttpHost target = new HttpHost("httpbin.org");
String[] requestUris = new String[] {"/", "/ip", "/user-agent", "/headers"};
for (final String requestUri: requestUris) {
SimpleHttpRequest request = new SimpleHttpRequest("GET", target, requestUri, null, null);
Future<SimpleHttpResponse> future = client.execute(
new SimpleRequestProducer(request),
new SimpleResponseConsumer(),
new FutureCallback<SimpleHttpResponse>() {
@Override
public void completed(final SimpleHttpResponse response) {
System.out.println(requestUri + "->" + response.getCode());
System.out.println(response.getBody());
}
@Override
public void failed(final Exception ex) {
System.out.println(requestUri + "->" + ex);
}
@Override
public void cancelled() {
System.out.println(requestUri + " cancelled");
}
});
future.get();
}
System.out.println("Shutting down");
client.shutdown(5, TimeUnit.SECONDS);
}
}

View File

@ -0,0 +1,111 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.examples;
import java.io.IOException;
import java.nio.CharBuffer;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hc.client5.http.async.methods.AbstractCharResponseConsumer;
import org.apache.hc.client5.http.async.methods.AsyncRequestBuilder;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.message.StatusLine;
import org.apache.hc.core5.reactor.IOReactorConfig;
/**
* Example of asynchronous HTTP/1.1 request execution with response streaming.
*/
public class AsyncClientHttpExchangeStreaming {
public static void main(String[] args) throws Exception {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setConnectTimeout(5000)
.setSoTimeout(5000)
.build();
CloseableHttpAsyncClient client = HttpAsyncClients.custom()
.setIOReactorConfig(ioReactorConfig)
.build();
client.start();
HttpHost target = new HttpHost("httpbin.org");
String[] requestUris = new String[] {"/", "/ip", "/user-agent", "/headers"};
for (final String requestUri: requestUris) {
Future<Void> future = client.execute(
AsyncRequestBuilder.get(target, requestUri).build(),
new AbstractCharResponseConsumer<Void>() {
@Override
protected void start(
final HttpResponse response,
final ContentType contentType) throws HttpException, IOException {
System.out.println(requestUri + "->" + new StatusLine(response));
}
@Override
protected int capacity() {
return Integer.MAX_VALUE;
}
@Override
protected void data(final CharBuffer data, final boolean endOfStream) throws IOException {
while (data.hasRemaining()) {
System.out.print(data.get());
}
if (endOfStream) {
System.out.println();
}
}
@Override
protected Void getResult() {
return null;
}
@Override
public void failed(final Exception cause) {
System.out.println(requestUri + "->" + cause);
}
}, null);
future.get();
}
System.out.println("Shutting down");
client.shutdown(5, TimeUnit.SECONDS);
}
}

View File

@ -0,0 +1,178 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.async;
import java.util.concurrent.Future;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.concurrent.BasicFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.http.protocol.HttpCoreContext;
/**
* Client endpoint leased from a connection manager.
* <p>
* Once the endpoint is no longer needed it MUST be released with {@link #releaseAndReuse()}
* or {@link #releaseAndDiscard()}.
*
* @since 5.0
*/
@Contract(threading = ThreadingBehavior.SAFE)
public abstract class AsyncClientEndpoint {
/**
* Initiates a message exchange using the given handler.
* <p>
* Once the endpoint is no longer needed it MUST be released with {@link #releaseAndReuse()}
* or {@link #releaseAndDiscard()}.
*/
public abstract void execute(AsyncClientExchangeHandler exchangeHandler, HttpContext context);
/**
* Releases the underlying connection back to the connection pool as re-usable.
*/
public abstract void releaseAndReuse();
/**
* Shuts down the underlying connection and removes it from the connection pool.
*/
public abstract void releaseAndDiscard();
/**
* Initiates message exchange using the given request producer and response consumer.
* <p>
* Once the endpoint is no longer needed it MUST be released with {@link #releaseAndReuse()}
* or {@link #releaseAndDiscard()}.
*/
public final <T> Future<T> execute(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
final HttpContext context,
final FutureCallback<T> callback) {
final BasicFuture<T> future = new BasicFuture<>(callback);
execute(new BasicClientExchangeHandler<>(requestProducer, responseConsumer,
new FutureCallback<T>() {
@Override
public void completed(final T result) {
future.completed(result);
}
@Override
public void failed(final Exception ex) {
future.failed(ex);
}
@Override
public void cancelled() {
future.cancel();
}
}),
context != null ? context : HttpCoreContext.create());
return future;
}
/**
* Initiates a message exchange using the given request producer and response consumer.
* <p>
* Once the endpoint is no longer needed it MUST be released with {@link #releaseAndReuse()}
* or {@link #releaseAndDiscard()}.
*/
public final <T> Future<T> execute(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
final FutureCallback<T> callback) {
return execute(requestProducer, responseConsumer, null, callback);
}
/**
* Initiates a message exchange using the given request producer and response consumer and
* automatically invokes {@link #releaseAndReuse()} upon its successful completion.
*/
public final <T> Future<T> executeAndRelease(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
final HttpContext context,
final FutureCallback<T> callback) {
return execute(requestProducer, responseConsumer, context, new FutureCallback<T>() {
@Override
public void completed(final T result) {
try {
if (callback != null) {
callback.completed(result);
}
} finally {
releaseAndReuse();
}
}
@Override
public void failed(final Exception ex) {
try {
if (callback != null) {
callback.failed(ex);
}
} finally {
releaseAndDiscard();
}
}
@Override
public void cancelled() {
try {
if (callback != null) {
callback.cancelled();
}
} finally {
releaseAndDiscard();
}
}
});
}
/**
* Initiates a message exchange using the given request producer and response consumer and
* automatically invokes {@link #releaseAndReuse()} upon its successful completion.
*/
public final <T> Future<T> executeAndRelease(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
final FutureCallback<T> callback) {
return executeAndRelease(requestProducer, responseConsumer, null, callback);
}
}

View File

@ -0,0 +1,82 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.async;
import java.util.concurrent.Future;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.apache.hc.core5.http.protocol.HttpContext;
/**
* This interface represents only the most basic contract for HTTP request
* execution. It imposes no restrictions or particular details on the request
* execution process and leaves the specifics of state management,
* authentication and redirect handling up to individual implementations.
*
* @since 4.0
*/
public interface HttpAsyncClient {
/**
* Leases {@link AsyncClientEndpoint} for the given {@link HttpHost}.
* <p>
* The endpoint MUST be released back when no longer used by calling
* {@link AsyncClientEndpoint#releaseAndReuse()} or {@link AsyncClientEndpoint#releaseAndDiscard()}
*/
Future<AsyncClientEndpoint> lease(
HttpHost host,
HttpContext context,
FutureCallback<AsyncClientEndpoint> callback);
/**
* Initiates asynchronous HTTP request execution using the given context.
* <p>
* The request producer passed to this method will be used to generate
* a request message and stream out its content without buffering it
* in memory. The response consumer passed to this method will be used
* to process a response message without buffering its content in memory.
* <p>
* Please note it may be unsafe to interact with the context instance
* while the request is still being executed.
*
* @param <T> the result type of request execution.
* @param requestProducer request producer callback.
* @param responseConsumer response consumer callaback.
* @param context HTTP context
* @param callback future callback.
* @return future representing pending completion of the operation.
*/
<T> Future<T> execute(
AsyncRequestProducer requestProducer,
AsyncResponseConsumer<T> responseConsumer,
HttpContext context,
FutureCallback<T> callback);
}

View File

@ -0,0 +1,118 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.async.methods;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.UnsupportedCharsetException;
import java.util.List;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.nio.AsyncEntityConsumer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.apache.hc.core5.http.nio.CapacityChannel;
import org.apache.hc.core5.util.Args;
public abstract class AbstractAsyncResponseConsumer<T, E> implements AsyncResponseConsumer<T> {
private final AsyncEntityConsumer<E> entityConsumer;
public AbstractAsyncResponseConsumer(final AsyncEntityConsumer<E> entityConsumer) {
Args.notNull(entityConsumer, "Entity consumer");
this.entityConsumer = entityConsumer;
}
protected abstract T buildResult(HttpResponse response, E entity, ContentType contentType);
@Override
public final void consumeResponse(
final HttpResponse response,
final EntityDetails entityDetails,
final FutureCallback<T> resultCallback) throws HttpException, IOException {
if (entityDetails != null) {
entityConsumer.streamStart(entityDetails, new FutureCallback<E>() {
@Override
public void completed(final E result) {
final ContentType contentType;
try {
contentType = ContentType.parse(entityDetails.getContentType());
resultCallback.completed(buildResult(response, result, contentType));
} catch (UnsupportedCharsetException ex) {
resultCallback.failed(ex);
}
}
@Override
public void failed(final Exception ex) {
resultCallback.failed(ex);
}
@Override
public void cancelled() {
resultCallback.cancelled();
}
});
} else {
resultCallback.completed(buildResult(response, null, null));
entityConsumer.releaseResources();
}
}
@Override
public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
entityConsumer.updateCapacity(capacityChannel);
}
@Override
public final int consume(final ByteBuffer src) throws IOException {
return entityConsumer.consume(src);
}
@Override
public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
entityConsumer.streamEnd(trailers);
}
@Override
public final void failed(final Exception cause) {
releaseResources();
}
@Override
public final void releaseResources() {
entityConsumer.releaseResources();
}
}

View File

@ -0,0 +1,103 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.async.methods;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.charset.UnsupportedCharsetException;
import java.util.List;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.apache.hc.core5.http.nio.CapacityChannel;
public abstract class AbstractBinResponseConsumer<T> implements AsyncResponseConsumer<T> {
private static final ByteBuffer EMPTY = ByteBuffer.wrap(new byte[0]);
private volatile FutureCallback<T> resultCallback;
protected abstract void start(HttpResponse response, ContentType contentType) throws HttpException, IOException;
protected abstract int capacity();
protected abstract void data(ByteBuffer data, boolean endOfStream) throws IOException;
protected abstract T getResult();
@Override
public final void consumeResponse(
final HttpResponse response,
final EntityDetails entityDetails,
final FutureCallback<T> resultCallback) throws HttpException, IOException {
this.resultCallback = resultCallback;
if (entityDetails != null) {
try {
final ContentType contentType = ContentType.parse(entityDetails.getContentType());
start(response, contentType);
} catch (UnsupportedCharsetException ex) {
throw new UnsupportedEncodingException(ex.getMessage());
}
} else {
start(response, null);
resultCallback.completed(getResult());
}
}
@Override
public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
capacityChannel.update(capacity());
}
@Override
public final int consume(final ByteBuffer src) throws IOException {
data(src, false);
return capacity();
}
@Override
public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
data(EMPTY, true);
resultCallback.completed(getResult());
}
@Override
public void failed(final Exception cause) {
}
@Override
public void releaseResources() {
}
}

View File

@ -0,0 +1,142 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.async.methods;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CoderResult;
import java.nio.charset.StandardCharsets;
import java.nio.charset.UnsupportedCharsetException;
import java.util.List;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.apache.hc.core5.http.nio.CapacityChannel;
import org.apache.hc.core5.util.Asserts;
public abstract class AbstractCharResponseConsumer<T> implements AsyncResponseConsumer<T> {
private static final ByteBuffer EMPTY_BIN = ByteBuffer.wrap(new byte[0]);
private static final CharBuffer EMPTY_CHAR = CharBuffer.wrap(new char[0]);
private final CharBuffer charbuf = CharBuffer.allocate(8192);
private volatile CharsetDecoder charsetDecoder;
private volatile FutureCallback<T> resultCallback;
protected abstract void start(HttpResponse response, ContentType contentType) throws HttpException, IOException;
protected abstract int capacity();
protected abstract void data(CharBuffer data, boolean endOfStream) throws IOException;
protected abstract T getResult();
@Override
public final void consumeResponse(
final HttpResponse response,
final EntityDetails entityDetails,
final FutureCallback<T> resultCallback) throws HttpException, IOException {
this.resultCallback = resultCallback;
if (entityDetails != null) {
ContentType contentType;
try {
contentType = ContentType.parse(entityDetails.getContentType());
} catch (UnsupportedCharsetException ex) {
throw new UnsupportedEncodingException(ex.getMessage());
}
Charset charset = contentType != null ? contentType.getCharset() : null;
if (charset == null) {
charset = StandardCharsets.US_ASCII;
}
charsetDecoder = charset.newDecoder();
start(response, contentType);
} else {
start(response, null);
resultCallback.completed(getResult());
}
}
@Override
public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
capacityChannel.update(capacity());
}
private void checkResult(final CoderResult result) throws IOException {
if (result.isError()) {
result.throwException();
}
}
private void doDecode() throws IOException {
charbuf.flip();
final int chunk = charbuf.remaining();
if (chunk > 0) {
data(charbuf, false);
}
charbuf.clear();
}
@Override
public final int consume(final ByteBuffer src) throws IOException {
Asserts.notNull(charsetDecoder, "Charset decoder");
while (src.hasRemaining()) {
checkResult(charsetDecoder.decode(src, charbuf, false));
doDecode();
}
return capacity();
}
@Override
public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
Asserts.notNull(charsetDecoder, "Charset decoder");
checkResult(charsetDecoder.decode(EMPTY_BIN, charbuf, true));
doDecode();
checkResult(charsetDecoder.flush(charbuf));
doDecode();
data(EMPTY_CHAR, true);
resultCallback.completed(getResult());
}
@Override
public void failed(final Exception cause) {
}
@Override
public void releaseResources() {
}
}

View File

@ -0,0 +1,445 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.async.methods;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.NameValuePair;
import org.apache.hc.core5.http.ProtocolVersion;
import org.apache.hc.core5.http.message.BasicHeader;
import org.apache.hc.core5.http.message.BasicHttpRequest;
import org.apache.hc.core5.http.message.BasicNameValuePair;
import org.apache.hc.core5.http.message.HeaderGroup;
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityProducer;
import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
import org.apache.hc.core5.net.URIBuilder;
import org.apache.hc.core5.net.URLEncodedUtils;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.TextUtils;
/**
* Builder for {@link AsyncRequestProducer} instances.
* <p>
* Please note that this class treats parameters differently depending on composition
* of the request: if the request has a content entity explicitly set with
* {@link #setEntity(AsyncEntityProducer)} or it is not an entity enclosing method
* (such as POST or PUT), parameters will be added to the query component of the request URI.
* Otherwise, parameters will be added as a URL encoded entity.
*
* @since 5.0
*/
public class AsyncRequestBuilder {
private enum METHOD { GET, HEAD, POST, PUT, DELETE, PATCH, TRACE, OPTIONS }
private HttpHost host;
private String path;
private URI uri;
private String method;
private Charset charset;
private ProtocolVersion version;
private HeaderGroup headergroup;
private AsyncEntityProducer entityProducer;
private List<NameValuePair> parameters;
private RequestConfig config;
AsyncRequestBuilder() {
}
AsyncRequestBuilder(final String method) {
super();
this.method = method;
}
AsyncRequestBuilder(final METHOD method) {
this(method.name());
}
AsyncRequestBuilder(final String method, final URI uri) {
super();
this.method = method;
this.uri = uri;
}
AsyncRequestBuilder(final METHOD method, final HttpHost host, final String path) {
super();
this.method = method.name();
this.host = host;
this.path = path;
}
AsyncRequestBuilder(final METHOD method, final URI uri) {
this(method.name(), uri);
}
AsyncRequestBuilder(final METHOD method, final String uri) {
this(method.name(), uri != null ? URI.create(uri) : null);
}
AsyncRequestBuilder(final String method, final String uri) {
this(method, uri != null ? URI.create(uri) : null);
}
public static AsyncRequestBuilder create(final String method) {
Args.notBlank(method, "HTTP method");
return new AsyncRequestBuilder(method);
}
public static AsyncRequestBuilder get() {
return new AsyncRequestBuilder(METHOD.GET);
}
public static AsyncRequestBuilder get(final URI uri) {
return new AsyncRequestBuilder(METHOD.GET, uri);
}
public static AsyncRequestBuilder get(final String uri) {
return new AsyncRequestBuilder(METHOD.GET, uri);
}
public static AsyncRequestBuilder get(final HttpHost host, final String path) {
return new AsyncRequestBuilder(METHOD.GET, host, path);
}
public static AsyncRequestBuilder head() {
return new AsyncRequestBuilder(METHOD.HEAD);
}
public static AsyncRequestBuilder head(final URI uri) {
return new AsyncRequestBuilder(METHOD.HEAD, uri);
}
public static AsyncRequestBuilder head(final String uri) {
return new AsyncRequestBuilder(METHOD.HEAD, uri);
}
public static AsyncRequestBuilder head(final HttpHost host, final String path) {
return new AsyncRequestBuilder(METHOD.HEAD, host, path);
}
public static AsyncRequestBuilder patch() {
return new AsyncRequestBuilder(METHOD.PATCH);
}
public static AsyncRequestBuilder patch(final URI uri) {
return new AsyncRequestBuilder(METHOD.PATCH, uri);
}
public static AsyncRequestBuilder patch(final String uri) {
return new AsyncRequestBuilder(METHOD.PATCH, uri);
}
public static AsyncRequestBuilder patch(final HttpHost host, final String path) {
return new AsyncRequestBuilder(METHOD.PATCH, host, path);
}
public static AsyncRequestBuilder post() {
return new AsyncRequestBuilder(METHOD.POST);
}
public static AsyncRequestBuilder post(final URI uri) {
return new AsyncRequestBuilder(METHOD.POST, uri);
}
public static AsyncRequestBuilder post(final String uri) {
return new AsyncRequestBuilder(METHOD.POST, uri);
}
public static AsyncRequestBuilder post(final HttpHost host, final String path) {
return new AsyncRequestBuilder(METHOD.POST, host, path);
}
public static AsyncRequestBuilder put() {
return new AsyncRequestBuilder(METHOD.PUT);
}
public static AsyncRequestBuilder put(final URI uri) {
return new AsyncRequestBuilder(METHOD.PUT, uri);
}
public static AsyncRequestBuilder put(final String uri) {
return new AsyncRequestBuilder(METHOD.PUT, uri);
}
public static AsyncRequestBuilder put(final HttpHost host, final String path) {
return new AsyncRequestBuilder(METHOD.PUT, host, path);
}
public static AsyncRequestBuilder delete() {
return new AsyncRequestBuilder(METHOD.DELETE);
}
public static AsyncRequestBuilder delete(final URI uri) {
return new AsyncRequestBuilder(METHOD.DELETE, uri);
}
public static AsyncRequestBuilder delete(final String uri) {
return new AsyncRequestBuilder(METHOD.DELETE, uri);
}
public static AsyncRequestBuilder delete(final HttpHost host, final String path) {
return new AsyncRequestBuilder(METHOD.DELETE, host, path);
}
public static AsyncRequestBuilder trace() {
return new AsyncRequestBuilder(METHOD.TRACE);
}
public static AsyncRequestBuilder trace(final URI uri) {
return new AsyncRequestBuilder(METHOD.TRACE, uri);
}
public static AsyncRequestBuilder trace(final String uri) {
return new AsyncRequestBuilder(METHOD.TRACE, uri);
}
public static AsyncRequestBuilder trace(final HttpHost host, final String path) {
return new AsyncRequestBuilder(METHOD.TRACE, host, path);
}
public static AsyncRequestBuilder options() {
return new AsyncRequestBuilder(METHOD.OPTIONS);
}
public static AsyncRequestBuilder options(final URI uri) {
return new AsyncRequestBuilder(METHOD.OPTIONS, uri);
}
public static AsyncRequestBuilder options(final String uri) {
return new AsyncRequestBuilder(METHOD.OPTIONS, uri);
}
public static AsyncRequestBuilder options(final HttpHost host, final String path) {
return new AsyncRequestBuilder(METHOD.OPTIONS, host, path);
}
public AsyncRequestBuilder setCharset(final Charset charset) {
this.charset = charset;
return this;
}
public Charset getCharset() {
return charset;
}
public String getMethod() {
return method;
}
public URI getUri() {
return uri;
}
public AsyncRequestBuilder setUri(final URI uri) {
this.uri = uri;
this.host = null;
this.path = null;
return this;
}
public AsyncRequestBuilder setUri(final String uri) {
this.uri = uri != null ? URI.create(uri) : null;
this.host = null;
this.path = null;
return this;
}
public ProtocolVersion getVersion() {
return version;
}
public AsyncRequestBuilder setVersion(final ProtocolVersion version) {
this.version = version;
return this;
}
public Header getFirstHeader(final String name) {
return headergroup != null ? headergroup.getFirstHeader(name) : null;
}
public Header getLastHeader(final String name) {
return headergroup != null ? headergroup.getLastHeader(name) : null;
}
public Header[] getHeaders(final String name) {
return headergroup != null ? headergroup.getHeaders(name) : null;
}
public AsyncRequestBuilder addHeader(final Header header) {
if (headergroup == null) {
headergroup = new HeaderGroup();
}
headergroup.addHeader(header);
return this;
}
public AsyncRequestBuilder addHeader(final String name, final String value) {
if (headergroup == null) {
headergroup = new HeaderGroup();
}
this.headergroup.addHeader(new BasicHeader(name, value));
return this;
}
public AsyncRequestBuilder removeHeader(final Header header) {
if (headergroup == null) {
headergroup = new HeaderGroup();
}
headergroup.removeHeader(header);
return this;
}
public AsyncRequestBuilder removeHeaders(final String name) {
if (name == null || headergroup == null) {
return this;
}
for (final Iterator<Header> i = headergroup.headerIterator(); i.hasNext(); ) {
final Header header = i.next();
if (name.equalsIgnoreCase(header.getName())) {
i.remove();
}
}
return this;
}
public AsyncRequestBuilder setHeader(final Header header) {
if (headergroup == null) {
headergroup = new HeaderGroup();
}
this.headergroup.setHeader(header);
return this;
}
public AsyncRequestBuilder setHeader(final String name, final String value) {
if (headergroup == null) {
headergroup = new HeaderGroup();
}
this.headergroup.setHeader(new BasicHeader(name, value));
return this;
}
public List<NameValuePair> getParameters() {
return parameters != null ? new ArrayList<>(parameters) :
new ArrayList<NameValuePair>();
}
public AsyncRequestBuilder addParameter(final NameValuePair nvp) {
Args.notNull(nvp, "Name value pair");
if (parameters == null) {
parameters = new LinkedList<>();
}
parameters.add(nvp);
return this;
}
public AsyncRequestBuilder addParameter(final String name, final String value) {
return addParameter(new BasicNameValuePair(name, value));
}
public AsyncRequestBuilder addParameters(final NameValuePair... nvps) {
for (final NameValuePair nvp: nvps) {
addParameter(nvp);
}
return this;
}
public RequestConfig getConfig() {
return config;
}
public AsyncRequestBuilder setConfig(final RequestConfig config) {
this.config = config;
return this;
}
public AsyncEntityProducer getEntity() {
return entityProducer;
}
public AsyncRequestBuilder setEntity(final AsyncEntityProducer entityProducer) {
this.entityProducer = entityProducer;
return this;
}
public AsyncRequestBuilder setEntity(final String content, final ContentType contentType) {
this.entityProducer = new BasicAsyncEntityProducer(content, contentType);
return this;
}
public AsyncRequestBuilder setEntity(final byte[] content, final ContentType contentType) {
this.entityProducer = new BasicAsyncEntityProducer(content, contentType);
return this;
}
public AsyncRequestProducer build() {
AsyncEntityProducer entityProducerCopy = this.entityProducer;
if (parameters != null && !parameters.isEmpty()) {
if (entityProducerCopy == null && (METHOD.POST.name().equalsIgnoreCase(method)
|| METHOD.PUT.name().equalsIgnoreCase(method))) {
final String content = URLEncodedUtils.format(
parameters,
charset != null ? charset : ContentType.APPLICATION_FORM_URLENCODED.getCharset());
entityProducerCopy = new StringAsyncEntityProducer(
content,
ContentType.APPLICATION_FORM_URLENCODED);
} else {
try {
uri = new URIBuilder(uri)
.setCharset(this.charset)
.addParameters(parameters)
.build();
} catch (final URISyntaxException ex) {
// should never happen
}
}
}
final BasicHttpRequest request = host != null ?
new BasicHttpRequest(method, host, !TextUtils.isBlank(path) ? path : "/") :
new BasicHttpRequest(method, uri != null ? uri : URI.create("/"));
if (this.headergroup != null) {
request.setHeaders(this.headergroup.getAllHeaders());
}
if (version != null) {
request.setVersion(version);
}
return new DefaultAsyncRequestProducer(request, entityProducerCopy, config);
}
}

View File

@ -0,0 +1,96 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.async.methods;
import java.io.IOException;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.methods.Configurable;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.DataStreamChannel;
import org.apache.hc.core5.util.Args;
public class DefaultAsyncRequestProducer implements AsyncRequestProducer, Configurable {
private final HttpRequest request;
private final AsyncEntityProducer entityProducer;
private final RequestConfig config;
public DefaultAsyncRequestProducer(final HttpRequest request, final AsyncEntityProducer entityProducer, final RequestConfig config) {
Args.notNull(request, "Request");
this.request = request;
this.entityProducer = entityProducer;
this.config = config;
}
public DefaultAsyncRequestProducer(final HttpRequest request, final AsyncEntityProducer entityProducer) {
this(request, entityProducer, null);
}
@Override
public RequestConfig getConfig() {
return config;
}
@Override
public final HttpRequest produceRequest() {
return request;
}
@Override
public final EntityDetails getEntityDetails() {
return entityProducer;
}
@Override
public final int available() {
return entityProducer != null ? entityProducer.available() : 0;
}
@Override
public final void produce(final DataStreamChannel channel) throws IOException {
if (entityProducer != null) {
entityProducer.produce(channel);
}
}
@Override
public final void failed(final Exception cause) {
releaseResources();
}
@Override
public final void releaseResources() {
if (entityProducer != null) {
entityProducer.releaseResources();
}
}
}

View File

@ -0,0 +1,75 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.async.methods;
import java.net.URI;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.message.BasicHttpRequest;
import org.apache.hc.core5.http.message.HttpRequestWrapper;
public final class SimpleHttpRequest extends HttpRequestWrapper {
private final String body;
private final ContentType contentType;
public SimpleHttpRequest(final HttpRequest head, final String body, final ContentType contentType) {
super(head);
this.body = body;
this.contentType = contentType;
}
public SimpleHttpRequest(
final String method,
final HttpHost host,
final String path,
final String body,
final ContentType contentType) {
super(new BasicHttpRequest(method, host, path));
this.body = body;
this.contentType = contentType;
}
public SimpleHttpRequest(final String method, final URI requestUri, final String body, final ContentType contentType) {
super(new BasicHttpRequest(method, requestUri));
this.body = body;
this.contentType = contentType;
}
public String getBody() {
return body;
}
public ContentType getContentType() {
return contentType;
}
}

View File

@ -0,0 +1,74 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.async.methods;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.message.BasicHttpResponse;
import org.apache.hc.core5.http.message.HttpResponseWrapper;
public final class SimpleHttpResponse extends HttpResponseWrapper {
private final String body;
private final ContentType contentType;
public SimpleHttpResponse(
final HttpResponse head,
final String body,
final ContentType contentType) {
super(head);
this.body = body;
this.contentType = contentType;
}
public SimpleHttpResponse(
final int code,
final String reasonPhrase,
final String body,
final ContentType contentType) {
super(new BasicHttpResponse(code, reasonPhrase));
this.body = body;
this.contentType = contentType;
}
public SimpleHttpResponse(final int code, final String body, final ContentType contentType) {
super(new BasicHttpResponse(code));
this.body = body;
this.contentType = contentType;
}
public String getBody() {
return body;
}
public ContentType getContentType() {
return contentType;
}
}

View File

@ -0,0 +1,39 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.async.methods;
import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
import org.apache.hc.core5.util.Args;
public final class SimpleRequestProducer extends DefaultAsyncRequestProducer {
public SimpleRequestProducer(final SimpleHttpRequest request) {
super(Args.notNull(request, "Request"), request.getBody() != null ?
new StringAsyncEntityProducer(request.getBody(), request.getContentType()) : null);
}
}

View File

@ -0,0 +1,44 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.async.methods;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
public final class SimpleResponseConsumer extends AbstractAsyncResponseConsumer<SimpleHttpResponse, String> {
public SimpleResponseConsumer() {
super(new StringAsyncEntityConsumer());
}
@Override
protected SimpleHttpResponse buildResult(final HttpResponse response, final String entity, final ContentType contentType) {
return new SimpleHttpResponse(response, entity, contentType);
}
}

View File

@ -24,7 +24,7 @@
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.protocol;
package org.apache.hc.client5.http.impl;
import java.security.Principal;

View File

@ -25,12 +25,11 @@
*
*/
package org.apache.hc.client5.http.impl.sync;
package org.apache.hc.client5.http.impl;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.hc.client5.http.impl.DefaultThreadFactory;
import org.apache.hc.core5.pool.ConnPoolControl;
import org.apache.hc.core5.util.Args;

View File

@ -24,7 +24,7 @@
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.sync;
package org.apache.hc.client5.http.impl;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.protocol.UserTokenHandler;

View File

@ -0,0 +1,153 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.async;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.http.ExceptionListener;
import org.apache.hc.core5.http.nio.command.ShutdownCommand;
import org.apache.hc.core5.http.nio.command.ShutdownType;
import org.apache.hc.core5.reactor.ConnectionInitiator;
import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
import org.apache.hc.core5.reactor.ExceptionEvent;
import org.apache.hc.core5.reactor.IOEventHandlerFactory;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.reactor.IOReactorException;
import org.apache.hc.core5.reactor.IOReactorStatus;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
abstract class AbstractHttpAsyncClientBase extends CloseableHttpAsyncClient {
enum Status { READY, RUNNING, TERMINATED }
final Logger log = LogManager.getLogger(getClass());
private final DefaultConnectingIOReactor ioReactor;
private final ExceptionListener exceptionListener;
private final ExecutorService executorService;
private final AtomicReference<Status> status;
public AbstractHttpAsyncClientBase(
final IOEventHandlerFactory eventHandlerFactory,
final IOReactorConfig reactorConfig,
final ThreadFactory threadFactory,
final ThreadFactory workerThreadFactory) throws IOReactorException {
super();
this.ioReactor = new DefaultConnectingIOReactor(
eventHandlerFactory,
reactorConfig,
workerThreadFactory,
new Callback<IOSession>() {
@Override
public void execute(final IOSession ioSession) {
ioSession.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
}
});
this.exceptionListener = new ExceptionListener() {
@Override
public void onError(final Exception ex) {
log.error(ex.getMessage(), ex);
}
};
this.executorService = Executors.newSingleThreadExecutor(threadFactory);
this.status = new AtomicReference<>(Status.READY);
}
@Override
public final void start() {
if (status.compareAndSet(Status.READY, Status.RUNNING)) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
ioReactor.execute();
} catch (Exception ex) {
if (exceptionListener != null) {
exceptionListener.onError(ex);
}
}
}
});
}
}
void ensureRunning() {
switch (status.get()) {
case READY:
throw new IllegalStateException("Client is not running");
case TERMINATED:
throw new IllegalStateException("Client has been terminated");
}
}
ConnectionInitiator getConnectionInitiator() {
return ioReactor;
}
@Override
public final IOReactorStatus getStatus() {
return ioReactor.getStatus();
}
@Override
public final List<ExceptionEvent> getAuditLog() {
return ioReactor.getAuditLog();
}
@Override
public final void awaitShutdown(final long deadline, final TimeUnit timeUnit) throws InterruptedException {
ioReactor.awaitShutdown(deadline, timeUnit);
}
@Override
public final void initiateShutdown() {
ioReactor.initiateShutdown();
}
@Override
public final void shutdown(final long graceTime, final TimeUnit timeUnit) {
ioReactor.initiateShutdown();
ioReactor.shutdown(graceTime, timeUnit);
}
@Override
public void close() {
shutdown(5, TimeUnit.SECONDS);
}
}

View File

@ -0,0 +1,79 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.async;
import java.io.Closeable;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hc.client5.http.async.AsyncClientEndpoint;
import org.apache.hc.client5.http.async.HttpAsyncClient;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.apache.hc.core5.reactor.ExceptionEvent;
import org.apache.hc.core5.reactor.IOReactorStatus;
/**
* Base implementation of {@link HttpAsyncClient} that also implements {@link Closeable}.
*
* @since 4.0
*/
@Contract(threading = ThreadingBehavior.SAFE)
public abstract class CloseableHttpAsyncClient implements HttpAsyncClient, Closeable {
public abstract void start();
public abstract IOReactorStatus getStatus();
public abstract List<ExceptionEvent> getAuditLog();
public abstract void awaitShutdown(long deadline, TimeUnit timeUnit) throws InterruptedException;
public abstract void initiateShutdown();
public abstract void shutdown(long graceTime, TimeUnit timeUnit);
public final Future<AsyncClientEndpoint> lease(
final HttpHost host,
final FutureCallback<AsyncClientEndpoint> callback) {
return lease(host, HttpClientContext.create(), callback);
}
public final <T> Future<T> execute(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
final FutureCallback<T> callback) {
return execute(requestProducer, responseConsumer, HttpClientContext.create(), callback);
}
}

View File

@ -0,0 +1,184 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.async;
import java.util.Iterator;
import org.apache.hc.client5.http.impl.ConnPoolSupport;
import org.apache.hc.client5.http.impl.logging.LoggingIOEventHandler;
import org.apache.hc.client5.http.impl.logging.LoggingIOSession;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.http.ConnectionClosedException;
import org.apache.hc.core5.http.ConnectionReuseStrategy;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpConnection;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.config.ConnectionConfig;
import org.apache.hc.core5.http.config.H1Config;
import org.apache.hc.core5.http.impl.ConnectionListener;
import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
import org.apache.hc.core5.http.impl.DefaultContentLengthStrategy;
import org.apache.hc.core5.http.impl.Http1StreamListener;
import org.apache.hc.core5.http.impl.nio.ClientHttp1IOEventHandler;
import org.apache.hc.core5.http.impl.nio.ClientHttp1StreamDuplexer;
import org.apache.hc.core5.http.impl.nio.DefaultHttpRequestWriterFactory;
import org.apache.hc.core5.http.impl.nio.DefaultHttpResponseParserFactory;
import org.apache.hc.core5.http.message.RequestLine;
import org.apache.hc.core5.http.message.StatusLine;
import org.apache.hc.core5.http.protocol.HttpProcessor;
import org.apache.hc.core5.reactor.IOEventHandler;
import org.apache.hc.core5.reactor.IOEventHandlerFactory;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.util.Args;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @since 5.0
*/
@Contract(threading = ThreadingBehavior.IMMUTABLE)
public class DefaultAsyncHttp1ClientEventHandlerFactory implements IOEventHandlerFactory {
private final Logger streamLog = LogManager.getLogger(ClientHttp1StreamDuplexer.class);
private final Logger wireLog = LogManager.getLogger("org.apache.hc.client5.http.wire");
private final Logger headerLog = LogManager.getLogger("org.apache.hc.client5.http.headers");
private final HttpProcessor httpProcessor;
private final H1Config h1Config;
private final ConnectionConfig connectionConfig;
private final ConnectionReuseStrategy connectionReuseStrategy;
DefaultAsyncHttp1ClientEventHandlerFactory(
final HttpProcessor httpProcessor,
final H1Config h1Config,
final ConnectionConfig connectionConfig,
final ConnectionReuseStrategy connectionReuseStrategy) {
this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
this.h1Config = h1Config != null ? h1Config : H1Config.DEFAULT;
this.connectionConfig = connectionConfig != null ? connectionConfig: ConnectionConfig.DEFAULT;
this.connectionReuseStrategy = connectionReuseStrategy != null ? connectionReuseStrategy :
DefaultConnectionReuseStrategy.INSTANCE;
}
@Override
public IOEventHandler createHandler(final IOSession ioSession) {
final Logger sessionLog = LogManager.getLogger(ioSession.getClass());
if (sessionLog.isDebugEnabled()
|| streamLog.isDebugEnabled()
|| wireLog.isDebugEnabled()
|| headerLog.isDebugEnabled()) {
final String id = ConnPoolSupport.getId(ioSession);
final ClientHttp1StreamDuplexer streamDuplexer = new ClientHttp1StreamDuplexer(
new LoggingIOSession(ioSession, id, sessionLog, wireLog),
httpProcessor,
h1Config,
connectionConfig,
connectionReuseStrategy,
DefaultHttpResponseParserFactory.INSTANCE.create(h1Config),
DefaultHttpRequestWriterFactory.INSTANCE.create(),
DefaultContentLengthStrategy.INSTANCE,
DefaultContentLengthStrategy.INSTANCE,
new ConnectionListener() {
@Override
public void onConnect(final HttpConnection connection) {
if (streamLog.isDebugEnabled()) {
streamLog.debug(id + ": " + connection + " connected");
}
}
@Override
public void onDisconnect(final HttpConnection connection) {
if (streamLog.isDebugEnabled()) {
streamLog.debug(id + ": " + connection + " disconnected");
}
}
@Override
public void onError(final HttpConnection connection, final Exception ex) {
if (ex instanceof ConnectionClosedException) {
return;
}
streamLog.error(id + ": " + ex.getMessage(), ex);
}
},
new Http1StreamListener() {
@Override
public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
if (headerLog.isDebugEnabled()) {
headerLog.debug(id + " >> " + new RequestLine(request));
for (final Iterator<Header> it = request.headerIterator(); it.hasNext(); ) {
headerLog.debug(id + " >> " + it.next());
}
}
}
@Override
public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
if (headerLog.isDebugEnabled()) {
headerLog.debug(id + " << " + new StatusLine(response));
for (final Iterator<Header> it = response.headerIterator(); it.hasNext(); ) {
headerLog.debug(id + " << " + it.next());
}
}
}
@Override
public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
if (streamLog.isDebugEnabled()) {
if (keepAlive) {
streamLog.debug(id + " Connection is kept alive");
} else {
streamLog.debug(id + " Connection is not kept alive");
}
}
}
});
return new LoggingIOEventHandler(new ClientHttp1IOEventHandler(streamDuplexer), id, sessionLog);
} else {
final ClientHttp1StreamDuplexer streamDuplexer = new ClientHttp1StreamDuplexer(
ioSession,
httpProcessor,
h1Config,
connectionConfig,
connectionReuseStrategy,
DefaultHttpResponseParserFactory.INSTANCE.create(h1Config),
DefaultHttpRequestWriterFactory.INSTANCE.create(),
DefaultContentLengthStrategy.INSTANCE,
DefaultContentLengthStrategy.INSTANCE,
null, null);
return new ClientHttp1IOEventHandler(streamDuplexer);
}
}
}

View File

@ -0,0 +1,217 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.async;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
import org.apache.hc.client5.http.impl.ConnPoolSupport;
import org.apache.hc.client5.http.impl.logging.LogAppendable;
import org.apache.hc.client5.http.impl.logging.LoggingIOEventHandler;
import org.apache.hc.client5.http.impl.logging.LoggingIOSession;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.http.ConnectionClosedException;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpConnection;
import org.apache.hc.core5.http.impl.ConnectionListener;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.protocol.HttpProcessor;
import org.apache.hc.core5.http2.config.H2Config;
import org.apache.hc.core5.http2.frame.FramePrinter;
import org.apache.hc.core5.http2.frame.RawFrame;
import org.apache.hc.core5.http2.impl.nio.ClientHttpProtocolNegotiator;
import org.apache.hc.core5.http2.impl.nio.Http2StreamListener;
import org.apache.hc.core5.reactor.IOEventHandler;
import org.apache.hc.core5.reactor.IOEventHandlerFactory;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.util.Args;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @since 5.0
*/
@Contract(threading = ThreadingBehavior.IMMUTABLE)
public class DefaultAsyncHttp2ClientEventHandlerFactory implements IOEventHandlerFactory {
private final Logger streamLog = LogManager.getLogger(ClientHttpProtocolNegotiator.class);
private final Logger wireLog = LogManager.getLogger("org.apache.hc.client5.http.wire");
private final Logger headerLog = LogManager.getLogger("org.apache.hc.client5.http.headers");
private final Logger frameLog = LogManager.getLogger("org.apache.hc.client5.http2.frame");
private final Logger framePayloadLog = LogManager.getLogger("org.apache.hc.client5.http2.frame.payload");
private final Logger flowCtrlLog = LogManager.getLogger("org.apache.hc.client5.http2.flow");
private final HttpProcessor httpProcessor;
private final HandlerFactory<AsyncPushConsumer> exchangeHandlerFactory;
private final Charset charset;
private final H2Config h2Config;
DefaultAsyncHttp2ClientEventHandlerFactory(
final HttpProcessor httpProcessor,
final HandlerFactory<AsyncPushConsumer> exchangeHandlerFactory,
final Charset charset,
final H2Config h2Config) {
this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
this.exchangeHandlerFactory = exchangeHandlerFactory;
this.charset = charset;
this.h2Config = h2Config;
}
@Override
public IOEventHandler createHandler(final IOSession ioSession) {
final Logger sessionLog = LogManager.getLogger(ioSession.getClass());
if (sessionLog.isDebugEnabled()
|| streamLog.isDebugEnabled()
|| wireLog.isDebugEnabled()
|| headerLog.isDebugEnabled()
|| frameLog.isDebugEnabled()
|| framePayloadLog.isDebugEnabled()
|| flowCtrlLog.isDebugEnabled()) {
final String id = ConnPoolSupport.getId(ioSession);
return new LoggingIOEventHandler(new DefaultAsyncHttpClientProtocolNegotiator(
new LoggingIOSession(ioSession, id, sessionLog, wireLog),
httpProcessor, exchangeHandlerFactory, charset, h2Config,
new ConnectionListener() {
@Override
public void onConnect(final HttpConnection connection) {
if (streamLog.isDebugEnabled()) {
streamLog.debug(id + ": " + connection + " connected");
}
}
@Override
public void onDisconnect(final HttpConnection connection) {
if (streamLog.isDebugEnabled()) {
streamLog.debug(id + ": " + connection + " disconnected");
}
}
@Override
public void onError(final HttpConnection connection, final Exception ex) {
if (ex instanceof ConnectionClosedException) {
return;
}
streamLog.error(id + ": " + ex.getMessage(), ex);
}
},
new Http2StreamListener() {
final FramePrinter framePrinter = new FramePrinter();
private void logFrameInfo(final String prefix, final RawFrame frame) {
try {
final LogAppendable logAppendable = new LogAppendable(frameLog, prefix);
framePrinter.printFrameInfo(frame, logAppendable);
logAppendable.flush();
} catch (IOException ignore) {
}
}
private void logFramePayload(final String prefix, final RawFrame frame) {
try {
final LogAppendable logAppendable = new LogAppendable(framePayloadLog, prefix);
framePrinter.printPayload(frame, logAppendable);
logAppendable.flush();
} catch (IOException ignore) {
}
}
private void logFlowControl(final String prefix, final int streamId, final int delta, final int actualSize) {
final StringBuilder buffer = new StringBuilder();
buffer.append(prefix).append(" stream ").append(streamId).append(" flow control " )
.append(delta).append(" -> ")
.append(actualSize);
flowCtrlLog.debug(buffer.toString());
}
@Override
public void onHeaderInput(final HttpConnection connection, final int streamId, final List<? extends Header> headers) {
if (headerLog.isDebugEnabled()) {
for (int i = 0; i < headers.size(); i++) {
headerLog.debug(id + " << " + headers.get(i));
}
}
}
@Override
public void onHeaderOutput(final HttpConnection connection, final int streamId, final List<? extends Header> headers) {
if (headerLog.isDebugEnabled()) {
for (int i = 0; i < headers.size(); i++) {
headerLog.debug(id + " >> " + headers.get(i));
}
}
}
@Override
public void onFrameInput(final HttpConnection connection, final int streamId, final RawFrame frame) {
if (frameLog.isDebugEnabled()) {
logFrameInfo(id + " <<", frame);
}
if (framePayloadLog.isDebugEnabled()) {
logFramePayload(id + " <<", frame);
}
}
@Override
public void onFrameOutput(final HttpConnection connection, final int streamId, final RawFrame frame) {
if (frameLog.isDebugEnabled()) {
logFrameInfo(id + " >>", frame);
}
if (framePayloadLog.isDebugEnabled()) {
logFramePayload(id + " >>", frame);
}
}
@Override
public void onInputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) {
if (flowCtrlLog.isDebugEnabled()) {
logFlowControl(id + " <<", streamId, delta, actualSize);
}
}
@Override
public void onOutputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) {
if (flowCtrlLog.isDebugEnabled()) {
logFlowControl(id + " >>", streamId, delta, actualSize);
}
}
}), id, streamLog);
} else {
return new DefaultAsyncHttpClientProtocolNegotiator(ioSession,
httpProcessor, exchangeHandlerFactory, charset, h2Config, null, null);
}
}
}

View File

@ -0,0 +1,67 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.async;
import java.nio.charset.Charset;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.http.HttpVersion;
import org.apache.hc.core5.http.ProtocolVersion;
import org.apache.hc.core5.http.impl.ConnectionListener;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.protocol.HttpProcessor;
import org.apache.hc.core5.http2.config.H2Config;
import org.apache.hc.core5.http2.impl.nio.ClientHttpProtocolNegotiator;
import org.apache.hc.core5.http2.impl.nio.Http2StreamListener;
import org.apache.hc.core5.reactor.IOSession;
/**
* @since 5.0
*/
@Contract(threading = ThreadingBehavior.IMMUTABLE)
public class DefaultAsyncHttpClientProtocolNegotiator extends ClientHttpProtocolNegotiator {
public DefaultAsyncHttpClientProtocolNegotiator(
final IOSession ioSession,
final HttpProcessor httpProcessor,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final Charset charset,
final H2Config h2Config,
final ConnectionListener connectionListener,
final Http2StreamListener streamListener) {
super(ioSession, httpProcessor, pushHandlerFactory, charset, h2Config, connectionListener, streamListener);
}
@Override
public ProtocolVersion getProtocolVersion() {
return HttpVersion.HTTP_2_0;
}
}

View File

@ -0,0 +1,571 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.async;
import java.io.Closeable;
import java.io.IOException;
import java.net.ProxySelector;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hc.client5.http.ConnectionKeepAliveStrategy;
import org.apache.hc.client5.http.SchemePortResolver;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.DefaultConnectionKeepAliveStrategy;
import org.apache.hc.client5.http.impl.DefaultSchemePortResolver;
import org.apache.hc.client5.http.impl.DefaultThreadFactory;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.impl.DefaultUserTokenHandler;
import org.apache.hc.client5.http.impl.routing.DefaultProxyRoutePlanner;
import org.apache.hc.client5.http.impl.routing.DefaultRoutePlanner;
import org.apache.hc.client5.http.impl.routing.SystemDefaultRoutePlanner;
import org.apache.hc.client5.http.impl.IdleConnectionEvictor;
import org.apache.hc.client5.http.impl.NoopUserTokenHandler;
import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
import org.apache.hc.client5.http.protocol.RequestDefaultHeaders;
import org.apache.hc.client5.http.protocol.RequestExpectContinue;
import org.apache.hc.client5.http.protocol.UserTokenHandler;
import org.apache.hc.client5.http.routing.HttpRoutePlanner;
import org.apache.hc.core5.http.ConnectionReuseStrategy;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpRequestInterceptor;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpResponseInterceptor;
import org.apache.hc.core5.http.HttpVersion;
import org.apache.hc.core5.http.config.ConnectionConfig;
import org.apache.hc.core5.http.config.H1Config;
import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.http.protocol.HttpProcessor;
import org.apache.hc.core5.http.protocol.HttpProcessorBuilder;
import org.apache.hc.core5.http.protocol.RequestUserAgent;
import org.apache.hc.core5.http2.config.H2Config;
import org.apache.hc.core5.http2.protocol.H2RequestConnControl;
import org.apache.hc.core5.http2.protocol.H2RequestContent;
import org.apache.hc.core5.http2.protocol.H2RequestTargetHost;
import org.apache.hc.core5.pool.ConnPoolControl;
import org.apache.hc.core5.reactor.IOEventHandlerFactory;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.reactor.IOReactorException;
import org.apache.hc.core5.util.VersionInfo;
/**
* Builder for {@link CloseableHttpAsyncClient} instances.
* <p>
* When a particular component is not explicitly set this class will
* use its default implementation. System properties will be taken
* into account when configuring the default implementations when
* {@link #useSystemProperties()} method is called prior to calling
* {@link #build()}.
* </p>
* <ul>
* <li>http.proxyHost</li>
* <li>http.proxyPort</li>
* <li>http.nonProxyHosts</li>
* <li>http.keepAlive</li>
* <li>http.agent</li>
* </ul>
* <p>
* Please note that some settings used by this class can be mutually
* exclusive and may not apply when building {@link CloseableHttpAsyncClient}
* instances.
* </p>
*
* @since 5.0
*/
public class HttpAsyncClientBuilder {
private HttpVersion protocolVersion;
private AsyncClientConnectionManager connManager;
private boolean connManagerShared;
private IOReactorConfig ioReactorConfig;
private H1Config h1Config;
private H2Config h2Config;
private SchemePortResolver schemePortResolver;
private ConnectionReuseStrategy reuseStrategy;
private ConnectionKeepAliveStrategy keepAliveStrategy;
private UserTokenHandler userTokenHandler;
private LinkedList<HttpRequestInterceptor> requestFirst;
private LinkedList<HttpRequestInterceptor> requestLast;
private LinkedList<HttpResponseInterceptor> responseFirst;
private LinkedList<HttpResponseInterceptor> responseLast;
private HttpRoutePlanner routePlanner;
private String userAgent;
private HttpHost proxy;
private Collection<? extends Header> defaultHeaders;
private RequestConfig defaultRequestConfig;
private boolean evictExpiredConnections;
private boolean evictIdleConnections;
private long maxIdleTime;
private TimeUnit maxIdleTimeUnit;
private boolean systemProperties;
private boolean connectionStateDisabled;
private List<Closeable> closeables;
public static HttpAsyncClientBuilder create() {
return new HttpAsyncClientBuilder();
}
protected HttpAsyncClientBuilder() {
super();
}
/**
* Sets HTTP protocol version.
*/
public final HttpAsyncClientBuilder setProtocolVersion(final HttpVersion protocolVersion) {
this.protocolVersion = protocolVersion;
return this;
}
/**
* Assigns {@link AsyncClientConnectionManager} instance.
*/
public final HttpAsyncClientBuilder setConnectionManager(final AsyncClientConnectionManager connManager) {
this.connManager = connManager;
return this;
}
/**
* Defines the connection manager is to be shared by multiple
* client instances.
* <p>
* If the connection manager is shared its life-cycle is expected
* to be managed by the caller and it will not be shut down
* if the client is closed.
*
* @param shared defines whether or not the connection manager can be shared
* by multiple clients.
*/
public final HttpAsyncClientBuilder setConnectionManagerShared(final boolean shared) {
this.connManagerShared = shared;
return this;
}
/**
* Sets {@link IOReactorConfig} configuration.
*/
public final HttpAsyncClientBuilder setIOReactorConfig(final IOReactorConfig ioReactorConfig) {
this.ioReactorConfig = ioReactorConfig;
return this;
}
/**
* Sets {@link H1Config} configuration.
* <p>
* Please note this setting applies only if {@link #setProtocolVersion(HttpVersion)} is
* set to {@code HTTP/1.1} or below.
*/
public final HttpAsyncClientBuilder setH1Config(final H1Config h1Config) {
this.h1Config = h1Config;
return this;
}
/**
* Sets {@link H2Config} configuration.
* <p>
* Please note this setting applies only if {@link #setProtocolVersion(HttpVersion)} is
* set to {@code HTTP/2} or above.
*/
public final HttpAsyncClientBuilder setH2Config(final H2Config h2Config) {
this.h2Config = h2Config;
return this;
}
/**
* Assigns {@link ConnectionReuseStrategy} instance.
* <p>
* Please note this setting applies only if {@link #setProtocolVersion(HttpVersion)} is
* set to {@code HTTP/1.1} or below.
*/
public final HttpAsyncClientBuilder setConnectionReuseStrategy(final ConnectionReuseStrategy reuseStrategy) {
this.reuseStrategy = reuseStrategy;
return this;
}
/**
* Assigns {@link ConnectionKeepAliveStrategy} instance.
*/
public final HttpAsyncClientBuilder setKeepAliveStrategy(final ConnectionKeepAliveStrategy keepAliveStrategy) {
this.keepAliveStrategy = keepAliveStrategy;
return this;
}
/**
* Assigns {@link UserTokenHandler} instance.
* <p>
* Please note this value can be overridden by the {@link #disableConnectionState()}
* method.
* </p>
*/
public final HttpAsyncClientBuilder setUserTokenHandler(final UserTokenHandler userTokenHandler) {
this.userTokenHandler = userTokenHandler;
return this;
}
/**
* Disables connection state tracking.
*/
public final HttpAsyncClientBuilder disableConnectionState() {
connectionStateDisabled = true;
return this;
}
/**
* Assigns {@link SchemePortResolver} instance.
*/
public final HttpAsyncClientBuilder setSchemePortResolver(final SchemePortResolver schemePortResolver) {
this.schemePortResolver = schemePortResolver;
return this;
}
/**
* Assigns {@code User-Agent} value.
*/
public final HttpAsyncClientBuilder setUserAgent(final String userAgent) {
this.userAgent = userAgent;
return this;
}
/**
* Assigns default request header values.
*/
public final HttpAsyncClientBuilder setDefaultHeaders(final Collection<? extends Header> defaultHeaders) {
this.defaultHeaders = defaultHeaders;
return this;
}
/**
* Adds this protocol interceptor to the head of the protocol processing list.
*/
public final HttpAsyncClientBuilder addInterceptorFirst(final HttpResponseInterceptor itcp) {
if (itcp == null) {
return this;
}
if (responseFirst == null) {
responseFirst = new LinkedList<>();
}
responseFirst.addFirst(itcp);
return this;
}
/**
* Adds this protocol interceptor to the tail of the protocol processing list.
*/
public final HttpAsyncClientBuilder addInterceptorLast(final HttpResponseInterceptor itcp) {
if (itcp == null) {
return this;
}
if (responseLast == null) {
responseLast = new LinkedList<>();
}
responseLast.addLast(itcp);
return this;
}
/**
* Adds this protocol interceptor to the head of the protocol processing list.
*/
public final HttpAsyncClientBuilder addInterceptorFirst(final HttpRequestInterceptor itcp) {
if (itcp == null) {
return this;
}
if (requestFirst == null) {
requestFirst = new LinkedList<>();
}
requestFirst.addFirst(itcp);
return this;
}
/**
* Adds this protocol interceptor to the tail of the protocol processing list.
*/
public final HttpAsyncClientBuilder addInterceptorLast(final HttpRequestInterceptor itcp) {
if (itcp == null) {
return this;
}
if (requestLast == null) {
requestLast = new LinkedList<>();
}
requestLast.addLast(itcp);
return this;
}
/**
* Assigns default proxy value.
* <p>
* Please note this value can be overridden by the {@link #setRoutePlanner(
* HttpRoutePlanner)} method.
*/
public final HttpAsyncClientBuilder setProxy(final HttpHost proxy) {
this.proxy = proxy;
return this;
}
/**
* Assigns {@link HttpRoutePlanner} instance.
*/
public final HttpAsyncClientBuilder setRoutePlanner(final HttpRoutePlanner routePlanner) {
this.routePlanner = routePlanner;
return this;
}
/**
* Assigns default {@link RequestConfig} instance which will be used
* for request execution if not explicitly set in the client execution
* context.
*/
public final HttpAsyncClientBuilder setDefaultRequestConfig(final RequestConfig config) {
this.defaultRequestConfig = config;
return this;
}
/**
* Use system properties when creating and configuring default
* implementations.
*/
public final HttpAsyncClientBuilder useSystemProperties() {
this.systemProperties = true;
return this;
}
/**
* Makes this instance of HttpClient proactively evict expired connections from the
* connection pool using a background thread.
* <p>
* One MUST explicitly close HttpClient with {@link CloseableHttpAsyncClient#close()} in order
* to stop and release the background thread.
* <p>
* Please note this method has no effect if the instance of HttpClient is configuted to
* use a shared connection manager.
*
* @see #setConnectionManagerShared(boolean)
* @see ConnPoolControl#closeExpired()
*/
public final HttpAsyncClientBuilder evictExpiredConnections() {
evictExpiredConnections = true;
return this;
}
/**
* Makes this instance of HttpClient proactively evict idle connections from the
* connection pool using a background thread.
* <p>
* One MUST explicitly close HttpClient with {@link CloseableHttpAsyncClient#close()}
* in order to stop and release the background thread.
* <p>
* Please note this method has no effect if the instance of HttpClient is configuted to
* use a shared connection manager.
*
* @see #setConnectionManagerShared(boolean)
* @see ConnPoolControl#closeIdle(long, TimeUnit)
*
* @param maxIdleTime maximum time persistent connections can stay idle while kept alive
* in the connection pool. Connections whose inactivity period exceeds this value will
* get closed and evicted from the pool.
* @param maxIdleTimeUnit time unit for the above parameter.
*/
public final HttpAsyncClientBuilder evictIdleConnections(final long maxIdleTime, final TimeUnit maxIdleTimeUnit) {
this.evictIdleConnections = true;
this.maxIdleTime = maxIdleTime;
this.maxIdleTimeUnit = maxIdleTimeUnit;
return this;
}
/**
* For internal use.
*/
protected void addCloseable(final Closeable closeable) {
if (closeable == null) {
return;
}
if (closeables == null) {
closeables = new ArrayList<>();
}
closeables.add(closeable);
}
public CloseableHttpAsyncClient build() {
AsyncClientConnectionManager connManagerCopy = this.connManager;
if (connManagerCopy == null) {
connManagerCopy = PoolingAsyncClientConnectionManagerBuilder.create().build();
}
ConnectionKeepAliveStrategy keepAliveStrategyCopy = this.keepAliveStrategy;
if (keepAliveStrategyCopy == null) {
keepAliveStrategyCopy = DefaultConnectionKeepAliveStrategy.INSTANCE;
}
UserTokenHandler userTokenHandlerCopy = this.userTokenHandler;
if (userTokenHandlerCopy == null) {
if (!connectionStateDisabled) {
userTokenHandlerCopy = DefaultUserTokenHandler.INSTANCE;
} else {
userTokenHandlerCopy = NoopUserTokenHandler.INSTANCE;
}
}
String userAgentCopy = this.userAgent;
if (userAgentCopy == null) {
if (systemProperties) {
userAgentCopy = System.getProperty("http.agent");
}
if (userAgentCopy == null) {
userAgentCopy = VersionInfo.getSoftwareInfo("Apache-HttpAsyncClient",
"org.apache.hc.client5", getClass());
}
}
final HttpProcessorBuilder b = HttpProcessorBuilder.create();
if (requestFirst != null) {
for (final HttpRequestInterceptor i: requestFirst) {
b.addFirst(i);
}
}
if (responseFirst != null) {
for (final HttpResponseInterceptor i: responseFirst) {
b.addFirst(i);
}
}
b.addAll(
new RequestDefaultHeaders(defaultHeaders),
new H2RequestContent(),
new H2RequestTargetHost(),
new H2RequestConnControl(),
new RequestUserAgent(userAgentCopy),
new RequestExpectContinue());
if (requestLast != null) {
for (final HttpRequestInterceptor i: requestLast) {
b.addLast(i);
}
}
if (responseLast != null) {
for (final HttpResponseInterceptor i: responseLast) {
b.addLast(i);
}
}
final HttpProcessor httpProcessor = b.build();
HttpRoutePlanner routePlannerCopy = this.routePlanner;
if (routePlannerCopy == null) {
SchemePortResolver schemePortResolverCopy = this.schemePortResolver;
if (schemePortResolverCopy == null) {
schemePortResolverCopy = DefaultSchemePortResolver.INSTANCE;
}
if (proxy != null) {
routePlannerCopy = new DefaultProxyRoutePlanner(proxy, schemePortResolverCopy);
} else if (systemProperties) {
routePlannerCopy = new SystemDefaultRoutePlanner(
schemePortResolverCopy, ProxySelector.getDefault());
} else {
routePlannerCopy = new DefaultRoutePlanner(schemePortResolverCopy);
}
}
List<Closeable> closeablesCopy = closeables != null ? new ArrayList<>(closeables) : null;
if (!this.connManagerShared) {
if (closeablesCopy == null) {
closeablesCopy = new ArrayList<>(1);
}
if (evictExpiredConnections || evictIdleConnections) {
if (connManagerCopy instanceof ConnPoolControl) {
final IdleConnectionEvictor connectionEvictor = new IdleConnectionEvictor((ConnPoolControl<?>) connManagerCopy,
maxIdleTime > 0 ? maxIdleTime : 10, maxIdleTimeUnit != null ? maxIdleTimeUnit : TimeUnit.SECONDS);
closeablesCopy.add(new Closeable() {
@Override
public void close() throws IOException {
connectionEvictor.shutdown();
}
});
connectionEvictor.start();
}
}
closeablesCopy.add(connManagerCopy);
}
final IOEventHandlerFactory ioEventHandlerFactory;
if (protocolVersion != null && protocolVersion.greaterEquals(HttpVersion.HTTP_2)) {
ioEventHandlerFactory = new DefaultAsyncHttp2ClientEventHandlerFactory(
httpProcessor,
null,
StandardCharsets.US_ASCII,
h2Config != null ? h2Config : H2Config.DEFAULT);
} else {
ConnectionReuseStrategy reuseStrategyCopy = this.reuseStrategy;
if (reuseStrategyCopy == null) {
if (systemProperties) {
final String s = System.getProperty("http.keepAlive", "true");
if ("true".equalsIgnoreCase(s)) {
reuseStrategyCopy = DefaultConnectionReuseStrategy.INSTANCE;
} else {
reuseStrategyCopy = new ConnectionReuseStrategy() {
@Override
public boolean keepAlive(
final HttpRequest request, final HttpResponse response, final HttpContext context) {
return false;
}
};
}
} else {
reuseStrategyCopy = DefaultConnectionReuseStrategy.INSTANCE;
}
}
ioEventHandlerFactory = new DefaultAsyncHttp1ClientEventHandlerFactory(
httpProcessor,
h1Config != null ? h1Config : H1Config.DEFAULT,
ConnectionConfig.DEFAULT,
reuseStrategyCopy);
}
try {
return new InternalHttpAsyncClient(
ioEventHandlerFactory,
ioReactorConfig != null ? ioReactorConfig : IOReactorConfig.DEFAULT,
new DefaultThreadFactory("httpclient-main", true),
new DefaultThreadFactory("httpclient-dispatch", true),
connManagerCopy,
routePlannerCopy,
keepAliveStrategyCopy,
userTokenHandlerCopy,
defaultRequestConfig,
closeablesCopy);
} catch (IOReactorException ex) {
throw new IllegalStateException(ex.getMessage(), ex);
}
}
}

View File

@ -0,0 +1,189 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.async;
import java.nio.charset.StandardCharsets;
import org.apache.hc.client5.http.impl.DefaultThreadFactory;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
import org.apache.hc.core5.http.HttpVersion;
import org.apache.hc.core5.http.config.ConnectionConfig;
import org.apache.hc.core5.http.config.H1Config;
import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
import org.apache.hc.core5.http.protocol.HttpProcessor;
import org.apache.hc.core5.http.protocol.RequestUserAgent;
import org.apache.hc.core5.http2.config.H2Config;
import org.apache.hc.core5.http2.protocol.H2RequestConnControl;
import org.apache.hc.core5.http2.protocol.H2RequestContent;
import org.apache.hc.core5.http2.protocol.H2RequestTargetHost;
import org.apache.hc.core5.reactor.IOEventHandlerFactory;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.reactor.IOReactorException;
import org.apache.hc.core5.util.VersionInfo;
/**
* Factory methods for {@link CloseableHttpAsyncClient} instances.
*
* @since 5.0
*/
public class HttpAsyncClients {
private HttpAsyncClients() {
super();
}
/**
* Creates builder object for construction of custom
* {@link CloseableHttpAsyncClient} instances.
*/
public static HttpAsyncClientBuilder custom() {
return HttpAsyncClientBuilder.create();
}
/**
* Creates HTTP/1.1 {@link CloseableHttpAsyncClient} instance with default
* configuration.
*/
public static CloseableHttpAsyncClient createDefault() {
return HttpAsyncClientBuilder.create().build();
}
/**
* Creates HTTP/2 {@link CloseableHttpAsyncClient} instance with the given
* configuration.
*/
public static CloseableHttpAsyncClient createDefault(final H2Config config) {
return HttpAsyncClientBuilder.create()
.setProtocolVersion(HttpVersion.HTTP_2)
.setH2Config(config)
.build();
}
/**
* Creates HTTP/1.1 {@link CloseableHttpAsyncClient} instance with default
* configuration and system properties.
*/
public static CloseableHttpAsyncClient createSystem() {
return HttpAsyncClientBuilder.create().useSystemProperties().build();
}
/**
* Creates HTTP/2 {@link CloseableHttpAsyncClient} instance with the given
* configuration and system properties.
*/
public static CloseableHttpAsyncClient createSystem(final H2Config config) {
return HttpAsyncClientBuilder.create()
.useSystemProperties()
.setProtocolVersion(HttpVersion.HTTP_2)
.setH2Config(config)
.build();
}
private static HttpProcessor createMinimalProtocolProcessor() {
return new DefaultHttpProcessor(
new H2RequestContent(),
new H2RequestTargetHost(),
new H2RequestConnControl(),
new RequestUserAgent(VersionInfo.getSoftwareInfo(
"Apache-HttpAsyncClient", "org.apache.hc.client5", HttpAsyncClients.class)));
}
private static MinimalHttpAsyncClient createMinimalImpl(
final IOEventHandlerFactory eventHandlerFactory,
final AsyncClientConnectionManager connmgr) {
try {
return new MinimalHttpAsyncClient(
eventHandlerFactory,
IOReactorConfig.DEFAULT,
new DefaultThreadFactory("httpclient-main", true),
new DefaultThreadFactory("httpclient-dispatch", true),
connmgr);
} catch (IOReactorException ex) {
throw new IllegalStateException(ex.getMessage(), ex);
}
}
private static MinimalHttpAsyncClient createMinimalImpl(
final H1Config h1Config,
final AsyncClientConnectionManager connmgr) {
return createMinimalImpl(new DefaultAsyncHttp1ClientEventHandlerFactory(
createMinimalProtocolProcessor(),
h1Config,
ConnectionConfig.DEFAULT,
DefaultConnectionReuseStrategy.INSTANCE),
connmgr);
}
private static MinimalHttpAsyncClient createMinimalImpl(
final H2Config h2Config,
final AsyncClientConnectionManager connmgr) {
return createMinimalImpl(new DefaultAsyncHttp2ClientEventHandlerFactory(
createMinimalProtocolProcessor(),
null,
StandardCharsets.US_ASCII,
h2Config),
connmgr);
}
/**
* Creates {@link CloseableHttpAsyncClient} instance that provides
* essential HTTP/1.1 message transport only.
*/
public static CloseableHttpAsyncClient createMinimal() {
return createMinimalImpl(H1Config.DEFAULT, PoolingAsyncClientConnectionManagerBuilder.create().build());
}
/**
* Creates {@link CloseableHttpAsyncClient} instance that provides
* essential HTTP/1.1 message transport only.
*/
public static CloseableHttpAsyncClient createMinimal(final AsyncClientConnectionManager connManager) {
return createMinimalImpl(H1Config.DEFAULT, connManager);
}
/**
* Creates {@link CloseableHttpAsyncClient} instance that provides
* essential HTTP/2 message transport only.
*/
public static CloseableHttpAsyncClient createMinimal(final H2Config h2Config) {
return createMinimal(h2Config, PoolingAsyncClientConnectionManagerBuilder.create().build());
}
/**
* Creates {@link CloseableHttpAsyncClient} instance that provides
* essential HTTP/2 message transport only.
*/
public static CloseableHttpAsyncClient createMinimal(
final H2Config h2Config,
final AsyncClientConnectionManager connManager) {
return createMinimalImpl(h2Config, connManager);
}
}

View File

@ -0,0 +1,437 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.async;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.client5.http.ConnectionKeepAliveStrategy;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.async.AsyncClientEndpoint;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.ConnPoolSupport;
import org.apache.hc.client5.http.methods.Configurable;
import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.client5.http.protocol.UserTokenHandler;
import org.apache.hc.client5.http.routing.HttpRoutePlanner;
import org.apache.hc.core5.concurrent.BasicFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.message.RequestLine;
import org.apache.hc.core5.http.message.StatusLine;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.apache.hc.core5.http.nio.CapacityChannel;
import org.apache.hc.core5.http.nio.DataStreamChannel;
import org.apache.hc.core5.http.nio.RequestChannel;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.reactor.IOEventHandlerFactory;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.reactor.IOReactorException;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.Asserts;
class InternalHttpAsyncClient extends AbstractHttpAsyncClientBase {
private final static AtomicLong COUNT = new AtomicLong(0);
private final AsyncClientConnectionManager connmgr;
private final HttpRoutePlanner routePlanner;
private final ConnectionKeepAliveStrategy keepAliveStrategy;
private final UserTokenHandler userTokenHandler;
private final RequestConfig defaultConfig;
private final List<Closeable> closeables;
public InternalHttpAsyncClient(
final IOEventHandlerFactory eventHandlerFactory,
final IOReactorConfig reactorConfig,
final ThreadFactory threadFactory,
final ThreadFactory workerThreadFactory,
final AsyncClientConnectionManager connmgr,
final HttpRoutePlanner routePlanner,
final ConnectionKeepAliveStrategy keepAliveStrategy,
final UserTokenHandler userTokenHandler,
final RequestConfig defaultConfig,
final List<Closeable> closeables) throws IOReactorException {
super(eventHandlerFactory, reactorConfig, threadFactory, workerThreadFactory);
this.connmgr = connmgr;
this.routePlanner = routePlanner;
this.keepAliveStrategy = keepAliveStrategy;
this.userTokenHandler = userTokenHandler;
this.defaultConfig = defaultConfig;
this.closeables = closeables;
}
@Override
public void close() {
super.close();
if (closeables != null) {
for (final Closeable closeable: closeables) {
try {
closeable.close();
} catch (final IOException ex) {
log.error(ex.getMessage(), ex);
}
}
}
}
private void leaseEndpoint(
final HttpRoute route,
final Object userToken,
final HttpClientContext clientContext,
final FutureCallback<AsyncConnectionEndpoint> callback) {
final RequestConfig requestConfig = clientContext.getRequestConfig();
connmgr.lease(route, userToken, requestConfig.getConnectTimeout(), TimeUnit.MILLISECONDS,
new FutureCallback<AsyncConnectionEndpoint>() {
@Override
public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
if (connectionEndpoint.isConnected()) {
callback.completed(connectionEndpoint);
} else {
connmgr.connect(
connectionEndpoint,
getConnectionInitiator(),
requestConfig.getConnectTimeout(), TimeUnit.MILLISECONDS,
clientContext,
new FutureCallback<AsyncConnectionEndpoint>() {
@Override
public void completed(final AsyncConnectionEndpoint result) {
callback.completed(result);
}
@Override
public void failed(final Exception ex) {
callback.failed(ex);
}
@Override
public void cancelled() {
callback.cancelled();
}
});
}
}
@Override
public void failed(final Exception ex) {
callback.failed(ex);
}
@Override
public void cancelled() {
callback.cancelled();
}
});
}
@Override
public Future<AsyncClientEndpoint> lease(
final HttpHost host,
final HttpContext context,
final FutureCallback<AsyncClientEndpoint> callback) {
Args.notNull(host, "Host");
Args.notNull(context, "HTTP context");
ensureRunning();
final HttpClientContext clientContext = HttpClientContext.adapt(context);
final BasicFuture<AsyncClientEndpoint> future = new BasicFuture<>(callback);
try {
final HttpRoute route = routePlanner.determineRoute(host, clientContext);
final Object userToken = clientContext.getUserToken();
leaseEndpoint(route, userToken, clientContext, new FutureCallback<AsyncConnectionEndpoint>() {
@Override
public void completed(final AsyncConnectionEndpoint result) {
future.completed(new InternalAsyncClientEndpoint(route, result));
}
@Override
public void failed(final Exception ex) {
future.failed(ex);
}
@Override
public void cancelled() {
future.cancel(true);
}
});
} catch (HttpException ex) {
future.failed(ex);
}
return future;
}
@Override
public <T> Future<T> execute(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
final HttpContext context,
final FutureCallback<T> callback) {
ensureRunning();
final BasicFuture<T> future = new BasicFuture<>(callback);
try {
final HttpClientContext clientContext = HttpClientContext.adapt(context);
final HttpRequest request = requestProducer.produceRequest();
RequestConfig requestConfig = null;
if (requestProducer instanceof Configurable) {
requestConfig = ((Configurable) requestProducer).getConfig();
}
if (requestConfig != null) {
clientContext.setRequestConfig(requestConfig);
}
final HttpHost target = routePlanner.determineTargetHost(request, clientContext);
final HttpRoute route = routePlanner.determineRoute(target, clientContext);
final Object userToken = clientContext.getUserToken();
leaseEndpoint(route, userToken, clientContext, new FutureCallback<AsyncConnectionEndpoint>() {
@Override
public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
final InternalAsyncClientEndpoint endpoint = new InternalAsyncClientEndpoint(route, connectionEndpoint);
endpoint.executeAndRelease(requestProducer, responseConsumer, clientContext, new FutureCallback<T>() {
@Override
public void completed(final T result) {
future.completed(result);
}
@Override
public void failed(final Exception ex) {
future.failed(ex);
}
@Override
public void cancelled() {
future.cancel();
}
});
}
@Override
public void failed(final Exception ex) {
future.failed(ex);
}
@Override
public void cancelled() {
future.cancel();
}
});
} catch (HttpException ex) {
future.failed(ex);
}
return future;
}
private void setupContext(final HttpClientContext context) {
if (context.getAttribute(HttpClientContext.REQUEST_CONFIG) == null) {
context.setAttribute(HttpClientContext.REQUEST_CONFIG, defaultConfig);
}
}
private class InternalAsyncClientEndpoint extends AsyncClientEndpoint {
private final HttpRoute route;
private final AsyncConnectionEndpoint connectionEndpoint;
private final AtomicBoolean reusable;
private final AtomicReference<Object> userTokenRef;
private final AtomicLong keepAlive;
private final AtomicBoolean released;
InternalAsyncClientEndpoint(final HttpRoute route, final AsyncConnectionEndpoint connectionEndpoint) {
this.route = route;
this.connectionEndpoint = connectionEndpoint;
this.reusable = new AtomicBoolean(true);
this.keepAlive = new AtomicLong(Long.MAX_VALUE);
this.userTokenRef = new AtomicReference<>(null);
this.released = new AtomicBoolean(false);
}
@Override
public void execute(final AsyncClientExchangeHandler exchangeHandler, final HttpContext context) {
Asserts.check(!released.get(), ConnPoolSupport.getId(connectionEndpoint) + " endpoint has already been released");
final HttpClientContext clientContext = HttpClientContext.adapt(context);
setupContext(clientContext);
connectionEndpoint.execute(new AsyncClientExchangeHandler() {
private final String id = Long.toString(COUNT.incrementAndGet());
void updateState() {
reusable.set(true);
Object userToken = clientContext.getUserToken();
if (userToken == null) {
userToken = userTokenHandler.getUserToken(route, context);
context.setAttribute(HttpClientContext.USER_TOKEN, userToken);
}
userTokenRef.set(userToken);
}
public void produceRequest(
final RequestChannel channel) throws HttpException, IOException {
exchangeHandler.produceRequest(log.isDebugEnabled() ? new RequestChannel() {
@Override
public void sendRequest(
final HttpRequest request,
final EntityDetails entityDetails) throws HttpException, IOException {
if (log.isDebugEnabled()) {
log.debug(ConnPoolSupport.getId(connectionEndpoint) + " exchange " + id + ": request " + new RequestLine(request));
}
channel.sendRequest(request, entityDetails);
}
} : channel);
}
public int available() {
return exchangeHandler.available();
}
public void produce(final DataStreamChannel channel) throws IOException {
exchangeHandler.produce(channel);
}
public void consumeResponse(
final HttpResponse response,
final EntityDetails entityDetails) throws HttpException, IOException {
if (log.isDebugEnabled()) {
log.debug(ConnPoolSupport.getId(connectionEndpoint) + " exchange " + id + ": response " + new StatusLine(response));
}
exchangeHandler.consumeResponse(response, entityDetails);
keepAlive.set(keepAliveStrategy.getKeepAliveDuration(response, context));
if (entityDetails == null) {
updateState();
if (log.isDebugEnabled()) {
log.debug(ConnPoolSupport.getId(connectionEndpoint) + " exchange " + id + ": completed");
}
}
}
public void consumeInformation(final HttpResponse response) throws HttpException, IOException {
if (log.isDebugEnabled()) {
log.debug(ConnPoolSupport.getId(connectionEndpoint) + " exchange " + id + ": intermediate response " + new StatusLine(response));
}
exchangeHandler.consumeInformation(response);
}
public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
exchangeHandler.updateCapacity(capacityChannel);
}
public int consume(final ByteBuffer src) throws IOException {
return exchangeHandler.consume(src);
}
public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
if (log.isDebugEnabled()) {
log.debug(ConnPoolSupport.getId(connectionEndpoint) + " exchange " + id + ": completed");
}
exchangeHandler.streamEnd(trailers);
updateState();
}
public void failed(final Exception cause) {
if (log.isDebugEnabled()) {
log.debug(ConnPoolSupport.getId(connectionEndpoint) + " exchange " + id + ": failed", cause);
}
reusable.set(false);
exchangeHandler.failed(cause);
}
public void cancel() {
if (log.isDebugEnabled()) {
log.debug(ConnPoolSupport.getId(connectionEndpoint) + " exchange " + id + ": cancelled");
}
reusable.set(false);
exchangeHandler.cancel();
}
public void releaseResources() {
exchangeHandler.releaseResources();
}
}, clientContext);
}
private void closeEndpoint() {
try {
connectionEndpoint.close();
} catch (IOException ex) {
log.debug("I/O error closing connection endpoint: " + ex.getMessage(), ex);
}
}
@Override
public void releaseAndReuse() {
if (released.compareAndSet(false, true)) {
if (!reusable.get()) {
closeEndpoint();
connmgr.release(connectionEndpoint, null, -1L, TimeUnit.MILLISECONDS);
} else {
connmgr.release(connectionEndpoint, userTokenRef.get(), keepAlive.get(), TimeUnit.MILLISECONDS);
}
}
}
@Override
public void releaseAndDiscard() {
if (released.compareAndSet(false, true)) {
closeEndpoint();
connmgr.release(connectionEndpoint, null, -1L, TimeUnit.MILLISECONDS);
}
}
}
}

View File

@ -0,0 +1,275 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.async;
import java.io.IOException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.async.AsyncClientEndpoint;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.ComplexFuture;
import org.apache.hc.client5.http.methods.Configurable;
import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.core5.concurrent.BasicFuture;
import org.apache.hc.core5.concurrent.Cancellable;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.reactor.IOEventHandlerFactory;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.reactor.IOReactorException;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.Asserts;
class MinimalHttpAsyncClient extends AbstractHttpAsyncClientBase {
private final AsyncClientConnectionManager connmgr;
public MinimalHttpAsyncClient(
final IOEventHandlerFactory eventHandlerFactory,
final IOReactorConfig reactorConfig,
final ThreadFactory threadFactory,
final ThreadFactory workerThreadFactory,
final AsyncClientConnectionManager connmgr) throws IOReactorException {
super(eventHandlerFactory, reactorConfig, threadFactory, workerThreadFactory);
this.connmgr = connmgr;
}
private Future<AsyncConnectionEndpoint> leaseEndpoint(
final HttpHost host,
final int connectTimeout,
final HttpClientContext clientContext,
final FutureCallback<AsyncConnectionEndpoint> callback) {
final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
final Future<AsyncConnectionEndpoint> leaseFuture = connmgr.lease(
new HttpRoute(host), null,
connectTimeout, TimeUnit.MILLISECONDS,
new FutureCallback<AsyncConnectionEndpoint>() {
@Override
public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
if (connectionEndpoint.isConnected()) {
resultFuture.completed(connectionEndpoint);
} else {
final Future<AsyncConnectionEndpoint> connectFuture = connmgr.connect(
connectionEndpoint,
getConnectionInitiator(),
connectTimeout, TimeUnit.MILLISECONDS,
clientContext,
new FutureCallback<AsyncConnectionEndpoint>() {
@Override
public void completed(final AsyncConnectionEndpoint result) {
resultFuture.completed(result);
}
@Override
public void failed(final Exception ex) {
resultFuture.failed(ex);
}
@Override
public void cancelled() {
resultFuture.cancel(true);
}
});
resultFuture.setDependency(connectFuture);
}
}
@Override
public void failed(final Exception ex) {
callback.failed(ex);
}
@Override
public void cancelled() {
callback.cancelled();
}
});
resultFuture.setDependency(leaseFuture);
return resultFuture;
}
@Override
public Future<AsyncClientEndpoint> lease(
final HttpHost host,
final HttpContext context,
final FutureCallback<AsyncClientEndpoint> callback) {
Args.notNull(host, "Host");
Args.notNull(context, "HTTP context");
ensureRunning();
final HttpClientContext clientContext = HttpClientContext.adapt(context);
final RequestConfig requestConfig = clientContext.getRequestConfig();
final BasicFuture<AsyncClientEndpoint> future = new BasicFuture<>(callback);
leaseEndpoint(host, requestConfig.getConnectTimeout(), clientContext,
new FutureCallback<AsyncConnectionEndpoint>() {
@Override
public void completed(final AsyncConnectionEndpoint result) {
future.completed(new InternalAsyncClientEndpoint(result));
}
@Override
public void failed(final Exception ex) {
future.failed(ex);
}
@Override
public void cancelled() {
future.cancel(true);
}
});
return future;
}
@Override
public <T> Future<T> execute(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
final HttpContext context,
final FutureCallback<T> callback) {
ensureRunning();
final HttpRequest request = requestProducer.produceRequest();
final HttpHost target = new HttpHost(request.getAuthority(), request.getScheme());
final HttpClientContext clientContext = HttpClientContext.adapt(context);
RequestConfig requestConfig = null;
if (requestProducer instanceof Configurable) {
requestConfig = ((Configurable) requestProducer).getConfig();
}
if (requestConfig != null) {
clientContext.setRequestConfig(requestConfig);
} else {
requestConfig = clientContext.getRequestConfig();
}
final ComplexFuture<T> resultFuture = new ComplexFuture<>(callback);
final Future<AsyncConnectionEndpoint> leaseFuture = leaseEndpoint(target, requestConfig.getConnectTimeout(), clientContext,
new FutureCallback<AsyncConnectionEndpoint>() {
@Override
public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
final InternalAsyncClientEndpoint endpoint = new InternalAsyncClientEndpoint(connectionEndpoint);
endpoint.executeAndRelease(requestProducer, responseConsumer, clientContext, new FutureCallback<T>() {
@Override
public void completed(final T result) {
resultFuture.completed(result);
}
@Override
public void failed(final Exception ex) {
resultFuture.failed(ex);
}
@Override
public void cancelled() {
resultFuture.cancel();
}
});
resultFuture.setDependency(new Cancellable() {
@Override
public boolean cancel() {
final boolean active = !endpoint.isReleased();
endpoint.releaseAndDiscard();
return active;
}
});
}
@Override
public void failed(final Exception ex) {
resultFuture.failed(ex);
}
@Override
public void cancelled() {
resultFuture.cancel();
}
});
resultFuture.setDependency(leaseFuture);
return resultFuture;
}
private class InternalAsyncClientEndpoint extends AsyncClientEndpoint {
private final AsyncConnectionEndpoint connectionEndpoint;
private final AtomicBoolean released;
InternalAsyncClientEndpoint(final AsyncConnectionEndpoint connectionEndpoint) {
this.connectionEndpoint = connectionEndpoint;
this.released = new AtomicBoolean(false);
}
boolean isReleased() {
return released.get();
}
@Override
public void execute(final AsyncClientExchangeHandler exchangeHandler, final HttpContext context) {
Asserts.check(!released.get(), "Endpoint has already been released");
connectionEndpoint.execute(exchangeHandler, context);
}
@Override
public void releaseAndReuse() {
if (released.compareAndSet(false, true)) {
connmgr.release(connectionEndpoint, null, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
}
@Override
public void releaseAndDiscard() {
if (released.compareAndSet(false, true)) {
try {
connectionEndpoint.close();
} catch (IOException ignore) {
}
connmgr.release(connectionEndpoint, null, -1L, TimeUnit.MILLISECONDS);
}
}
}
}

View File

@ -0,0 +1,78 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.logging;
import java.io.IOException;
import org.apache.logging.log4j.Logger;
public final class LogAppendable implements Appendable {
private final Logger log;
private final String prefix;
private final StringBuilder buffer;
public LogAppendable(final Logger log, final String prefix) {
this.log = log;
this.prefix = prefix;
this.buffer = new StringBuilder();
}
@Override
public Appendable append(final CharSequence text) throws IOException {
return append(text, 0, text.length());
}
@Override
public Appendable append(final CharSequence text, final int start, final int end) throws IOException {
for (int i = start; i < end; i++) {
append(text.charAt(i));
}
return this;
}
@Override
public Appendable append(final char ch) throws IOException {
if (ch == '\n') {
log.debug(prefix + " " + buffer.toString());
buffer.setLength(0);
} else if (ch != '\r') {
buffer.append(ch);
}
return this;
}
public void flush() {
if (buffer.length() > 0) {
log.debug(prefix + " " + buffer.toString());
buffer.setLength(0);
}
}
}

View File

@ -0,0 +1,154 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.logging;
import java.io.IOException;
import java.net.SocketAddress;
import org.apache.hc.core5.http.HttpConnectionMetrics;
import org.apache.hc.core5.http.ProtocolVersion;
import org.apache.hc.core5.http.impl.nio.HttpConnectionEventHandler;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.logging.log4j.Logger;
public class LoggingIOEventHandler implements HttpConnectionEventHandler {
private final HttpConnectionEventHandler handler;
private final String id;
private final Logger log;
public LoggingIOEventHandler(
final HttpConnectionEventHandler handler,
final String id,
final Logger log) {
super();
this.handler = handler;
this.id = id;
this.log = log;
}
@Override
public void connected(final IOSession session) {
if (log.isDebugEnabled()) {
log.debug(id + " " + session + " connected");
}
handler.connected(session);
}
@Override
public void inputReady(final IOSession session) {
if (log.isDebugEnabled()) {
log.debug(id + " " + session + " input ready");
}
handler.inputReady(session);
}
@Override
public void outputReady(final IOSession session) {
if (log.isDebugEnabled()) {
log.debug(id + " " + session + " output ready");
}
handler.outputReady(session);
}
@Override
public void timeout(final IOSession session) {
if (log.isDebugEnabled()) {
log.debug(id + " " + session + " timeout");
}
handler.timeout(session);
}
@Override
public void exception(final IOSession session, final Exception cause) {
handler.exception(session, cause);
}
@Override
public void disconnected(final IOSession session) {
if (log.isDebugEnabled()) {
log.debug(id + " " + session + " disconnected");
}
handler.disconnected(session);
}
@Override
public HttpConnectionMetrics getMetrics() {
return handler.getMetrics();
}
@Override
public void setSocketTimeout(final int timeout) {
if (log.isDebugEnabled()) {
log.debug(id + " set timeout " + timeout);
}
handler.setSocketTimeout(timeout);
}
@Override
public int getSocketTimeout() {
return handler.getSocketTimeout();
}
@Override
public ProtocolVersion getProtocolVersion() {
return handler.getProtocolVersion();
}
@Override
public SocketAddress getRemoteAddress() {
return handler.getRemoteAddress();
}
@Override
public SocketAddress getLocalAddress() {
return handler.getLocalAddress();
}
@Override
public boolean isOpen() {
return handler.isOpen();
}
@Override
public void close() throws IOException {
if (log.isDebugEnabled()) {
log.debug(id + " close");
}
handler.close();
}
@Override
public void shutdown() throws IOException {
if (log.isDebugEnabled()) {
log.debug(id + " shutdown");
}
handler.shutdown();
}
}

View File

@ -0,0 +1,272 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.logging;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
import org.apache.hc.core5.reactor.Command;
import org.apache.hc.core5.reactor.IOEventHandler;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.reactor.ssl.SSLBufferManagement;
import org.apache.hc.core5.reactor.ssl.SSLSessionInitializer;
import org.apache.hc.core5.reactor.ssl.SSLSessionVerifier;
import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
import org.apache.logging.log4j.Logger;
public class LoggingIOSession implements IOSession, TransportSecurityLayer {
private final Logger log;
private final Wire wirelog;
private final String id;
private final IOSession session;
private final ByteChannel channel;
public LoggingIOSession(final IOSession session, final String id, final Logger log, final Logger wirelog) {
super();
this.session = session;
this.id = id;
this.log = log;
this.wirelog = new Wire(wirelog, this.id);
this.channel = new LoggingByteChannel();
}
public LoggingIOSession(final IOSession session, final String id, final Logger log) {
this(session, id, log, null);
}
@Override
public void addLast(final Command command) {
this.session.addLast(command);
}
@Override
public void addFirst(final Command command) {
this.session.addFirst(command);
}
@Override
public Command getCommand() {
return this.session.getCommand();
}
@Override
public ByteChannel channel() {
return this.channel;
}
@Override
public SocketAddress getLocalAddress() {
return this.session.getLocalAddress();
}
@Override
public SocketAddress getRemoteAddress() {
return this.session.getRemoteAddress();
}
@Override
public int getEventMask() {
return this.session.getEventMask();
}
private static String formatOps(final int ops) {
final StringBuilder buffer = new StringBuilder(6);
buffer.append('[');
if ((ops & SelectionKey.OP_READ) > 0) {
buffer.append('r');
}
if ((ops & SelectionKey.OP_WRITE) > 0) {
buffer.append('w');
}
if ((ops & SelectionKey.OP_ACCEPT) > 0) {
buffer.append('a');
}
if ((ops & SelectionKey.OP_CONNECT) > 0) {
buffer.append('c');
}
buffer.append(']');
return buffer.toString();
}
@Override
public void setEventMask(final int ops) {
this.session.setEventMask(ops);
if (this.log.isDebugEnabled()) {
this.log.debug(this.id + " " + this.session + ": Event mask set " + formatOps(ops));
}
}
@Override
public void setEvent(final int op) {
this.session.setEvent(op);
if (this.log.isDebugEnabled()) {
this.log.debug(this.id + " " + this.session + ": Event set " + formatOps(op));
}
}
@Override
public void clearEvent(final int op) {
this.session.clearEvent(op);
if (this.log.isDebugEnabled()) {
this.log.debug(this.id + " " + this.session + ": Event cleared " + formatOps(op));
}
}
@Override
public void close() {
if (this.log.isDebugEnabled()) {
this.log.debug(this.id + " " + this.session + ": Close");
}
this.session.close();
}
@Override
public int getStatus() {
return this.session.getStatus();
}
@Override
public boolean isClosed() {
return this.session.isClosed();
}
@Override
public void shutdown() {
if (this.log.isDebugEnabled()) {
this.log.debug(this.id + " " + this.session + ": Shutdown");
}
this.session.shutdown();
}
@Override
public int getSocketTimeout() {
return this.session.getSocketTimeout();
}
@Override
public void setSocketTimeout(final int timeout) {
if (this.log.isDebugEnabled()) {
this.log.debug(this.id + " " + this.session + ": Set timeout " + timeout);
}
this.session.setSocketTimeout(timeout);
}
@Override
public IOEventHandler getHandler() {
return this.session.getHandler();
}
@Override
public void setHandler(final IOEventHandler handler) {
this.session.setHandler(handler);
}
@Override
public void start(
final SSLContext sslContext,
final SSLBufferManagement sslBufferManagement,
final SSLSessionInitializer initializer,
final SSLSessionVerifier verifier) throws UnsupportedOperationException {
if (session instanceof TransportSecurityLayer) {
((TransportSecurityLayer) session).start(sslContext, sslBufferManagement, initializer, verifier);
} else {
throw new UnsupportedOperationException();
}
}
@Override
public SSLSession getSSLSession() {
if (session instanceof TransportSecurityLayer) {
return ((TransportSecurityLayer) session).getSSLSession();
} else {
return null;
}
}
@Override
public String toString() {
return this.id + " " + this.session.toString();
}
class LoggingByteChannel implements ByteChannel {
@Override
public int read(final ByteBuffer dst) throws IOException {
final int bytesRead = session.channel().read(dst);
if (log.isDebugEnabled()) {
log.debug(id + " " + session + ": " + bytesRead + " bytes read");
}
if (bytesRead > 0 && wirelog.isEnabled()) {
final ByteBuffer b = dst.duplicate();
final int p = b.position();
b.limit(p);
b.position(p - bytesRead);
wirelog.input(b);
}
return bytesRead;
}
@Override
public int write(final ByteBuffer src) throws IOException {
final int byteWritten = session.channel().write(src);
if (log.isDebugEnabled()) {
log.debug(id + " " + session + ": " + byteWritten + " bytes written");
}
if (byteWritten > 0 && wirelog.isEnabled()) {
final ByteBuffer b = src.duplicate();
final int p = b.position();
b.limit(p);
b.position(p - byteWritten);
wirelog.output(b);
}
return byteWritten;
}
@Override
public void close() throws IOException {
if (log.isDebugEnabled()) {
log.debug(id + " " + session + ": Channel close");
}
session.channel().close();
}
@Override
public boolean isOpen() {
return session.channel().isOpen();
}
}
}

View File

@ -51,6 +51,8 @@ import org.apache.hc.client5.http.cookie.CookieStore;
import org.apache.hc.client5.http.entity.InputStreamFactory;
import org.apache.hc.client5.http.impl.DefaultConnectionKeepAliveStrategy;
import org.apache.hc.client5.http.impl.DefaultSchemePortResolver;
import org.apache.hc.client5.http.impl.IdleConnectionEvictor;
import org.apache.hc.client5.http.impl.NoopUserTokenHandler;
import org.apache.hc.client5.http.impl.auth.BasicSchemeFactory;
import org.apache.hc.client5.http.impl.auth.DigestSchemeFactory;
import org.apache.hc.client5.http.impl.auth.KerberosSchemeFactory;
@ -60,7 +62,7 @@ import org.apache.hc.client5.http.impl.auth.SystemDefaultCredentialsProvider;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
import org.apache.hc.client5.http.impl.protocol.DefaultAuthenticationStrategy;
import org.apache.hc.client5.http.impl.protocol.DefaultRedirectStrategy;
import org.apache.hc.client5.http.impl.protocol.DefaultUserTokenHandler;
import org.apache.hc.client5.http.impl.DefaultUserTokenHandler;
import org.apache.hc.client5.http.impl.routing.DefaultProxyRoutePlanner;
import org.apache.hc.client5.http.impl.routing.DefaultRoutePlanner;
import org.apache.hc.client5.http.impl.routing.SystemDefaultRoutePlanner;

View File

@ -25,7 +25,7 @@
*
*/
package org.apache.hc.client5.http.impl.sync;
package org.apache.hc.client5.http.impl;
import java.util.concurrent.TimeUnit;

View File

@ -30,7 +30,7 @@ package org.apache.hc.client5.http.impl.integration;
import java.util.concurrent.TimeUnit;
import org.apache.hc.client5.http.impl.sync.CloseableHttpClient;
import org.apache.hc.client5.http.impl.sync.IdleConnectionEvictor;
import org.apache.hc.client5.http.impl.IdleConnectionEvictor;
import org.apache.hc.client5.http.localserver.LocalServerTestBase;
import org.apache.hc.client5.http.methods.HttpGet;
import org.apache.hc.client5.http.methods.HttpUriRequest;