Client side APIs for HTTP/2 server push handling

git-svn-id: https://svn.apache.org/repos/asf/httpcomponents/httpclient/trunk@1784909 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Oleg Kalnichevski 2017-03-01 10:47:22 +00:00
parent 191183bf75
commit 26cfea101f
17 changed files with 662 additions and 112 deletions

View File

@ -40,6 +40,7 @@ import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpVersion;
import org.apache.hc.core5.http2.config.H2Config;
import org.apache.hc.core5.reactor.IOReactorConfig;
/**
@ -55,9 +56,14 @@ public class AsyncClientHttp2Multiplexing {
.setSoTimeout(5000)
.build();
H2Config h2Config = H2Config.custom()
.setPushEnabled(false)
.build();
CloseableHttpAsyncClient client = HttpAsyncClients.custom()
.setProtocolVersion(HttpVersion.HTTP_1_1)
.setIOReactorConfig(ioReactorConfig)
.setProtocolVersion(HttpVersion.HTTP_2)
.setH2Config(h2Config)
.build();
client.start();

View File

@ -0,0 +1,160 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.examples;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hc.client5.http.async.methods.AbstractBinPushConsumer;
import org.apache.hc.client5.http.async.methods.AbstractCharResponseConsumer;
import org.apache.hc.client5.http.async.methods.AsyncRequestBuilder;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpVersion;
import org.apache.hc.core5.http.message.StatusLine;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http2.config.H2Config;
import org.apache.hc.core5.reactor.IOReactorConfig;
/**
* This example demonstrates handling of HTTP/2 message exchanges pushed by the server.
*/
public class AsyncClientHttp2ServerPush {
public static void main(String[] args) throws Exception {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setConnectTimeout(5000)
.setSoTimeout(5000)
.build();
H2Config h2Config = H2Config.custom()
.setPushEnabled(true)
.build();
CloseableHttpAsyncClient client = HttpAsyncClients.custom()
.setIOReactorConfig(ioReactorConfig)
.setProtocolVersion(HttpVersion.HTTP_2)
.setH2Config(h2Config)
.build();
client.start();
client.register("*", new Supplier<AsyncPushConsumer>() {
@Override
public AsyncPushConsumer get() {
return new AbstractBinPushConsumer() {
@Override
protected void start(
final HttpRequest promise,
final HttpResponse response,
final ContentType contentType) throws HttpException, IOException {
System.out.println(promise.getPath() + " (push)->" + new StatusLine(response));
}
@Override
protected int capacity() {
return Integer.MAX_VALUE;
}
@Override
protected void data(final ByteBuffer data, final boolean endOfStream) throws IOException {
}
@Override
protected void completed() {
}
@Override
public void failed(final Exception cause) {
System.out.println("(push)->" + cause);
}
@Override
public void releaseResources() {
}
};
}
});
final HttpHost target = new HttpHost("http2bin.org");
final String requestURI = "/";
Future<Void> future = client.execute(
AsyncRequestBuilder.get(target, requestURI).build(),
new AbstractCharResponseConsumer<Void>() {
@Override
protected void start(
final HttpResponse response,
final ContentType contentType) throws HttpException, IOException {
System.out.println(requestURI + "->" + new StatusLine(response));
}
@Override
protected int capacity() {
return Integer.MAX_VALUE;
}
@Override
protected void data(final CharBuffer data, final boolean endOfStream) throws IOException {
}
@Override
protected Void buildResult() throws IOException {
return null;
}
@Override
public void failed(final Exception cause) {
System.out.println(requestURI + "->" + cause);
}
@Override
public void releaseResources() {
}
}, null);
future.get();
System.out.println("Shutting down");
client.shutdown(5, TimeUnit.SECONDS);
}
}

View File

@ -91,7 +91,7 @@ public class AsyncClientHttpExchangeStreaming {
}
@Override
protected Void getResult() {
protected Void buildResult() throws IOException {
return null;
}
@ -100,6 +100,10 @@ public class AsyncClientHttpExchangeStreaming {
System.out.println(requestUri + "->" + cause);
}
@Override
public void releaseResources() {
}
}, null);
future.get();
}

View File

@ -29,7 +29,9 @@ package org.apache.hc.client5.http.async;
import java.util.concurrent.Future;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.apache.hc.core5.http.protocol.HttpContext;
@ -68,7 +70,7 @@ public interface HttpAsyncClient {
*
* @param <T> the result type of request execution.
* @param requestProducer request producer callback.
* @param responseConsumer response consumer callaback.
* @param responseConsumer response consumer callback.
* @param context HTTP context
* @param callback future callback.
* @return future representing pending completion of the operation.
@ -79,4 +81,15 @@ public interface HttpAsyncClient {
HttpContext context,
FutureCallback<T> callback);
/**
* Registers {@link AsyncPushConsumer} for the given host and the URI pattern.
*
* @param hostname the name of the host this consumer intended for.
* Can be {@code null} if applies to all hosts
* @param uriPattern URI request pattern
* @param supplier supplier that will be used to supply a consumer instance
* for the given combination of hostname and URI pattern.
*/
void register(String hostname, String uriPattern, Supplier<AsyncPushConsumer> supplier);
}

View File

@ -0,0 +1,65 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.async.methods;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.nio.AsyncDataConsumer;
import org.apache.hc.core5.http.nio.CapacityChannel;
public abstract class AbstractBinDataConsumer implements AsyncDataConsumer {
private static final ByteBuffer EMPTY = ByteBuffer.wrap(new byte[0]);
protected abstract int capacity();
protected abstract void data(ByteBuffer data, boolean endOfStream) throws IOException;
protected abstract void completed();
@Override
public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
capacityChannel.update(capacity());
}
@Override
public final int consume(final ByteBuffer src) throws IOException {
data(src, false);
return capacity();
}
@Override
public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
data(EMPTY, true);
completed();
}
}

View File

@ -0,0 +1,67 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.async.methods;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.UnsupportedCharsetException;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
public abstract class AbstractBinPushConsumer extends AbstractBinDataConsumer implements AsyncPushConsumer {
protected abstract void start(HttpRequest promise, HttpResponse response, ContentType contentType) throws HttpException, IOException;
@Override
public final void consumePromise(
final HttpRequest promise,
final HttpResponse response,
final EntityDetails entityDetails) throws HttpException, IOException {
if (entityDetails != null) {
final ContentType contentType;
try {
contentType = ContentType.parse(entityDetails.getContentType());
} catch (UnsupportedCharsetException ex) {
throw new UnsupportedEncodingException(ex.getMessage());
}
start(promise, response, contentType);
} else {
start(promise, response, null);
completed();
}
}
@Override
public void failed(final Exception cause) {
}
}

View File

@ -28,32 +28,22 @@ package org.apache.hc.client5.http.async.methods;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.charset.UnsupportedCharsetException;
import java.util.List;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.apache.hc.core5.http.nio.CapacityChannel;
public abstract class AbstractBinResponseConsumer<T> implements AsyncResponseConsumer<T> {
private static final ByteBuffer EMPTY = ByteBuffer.wrap(new byte[0]);
public abstract class AbstractBinResponseConsumer<T> extends AbstractBinDataConsumer implements AsyncResponseConsumer<T> {
private volatile FutureCallback<T> resultCallback;
protected abstract void start(HttpResponse response, ContentType contentType) throws HttpException, IOException;
protected abstract int capacity();
protected abstract void data(ByteBuffer data, boolean endOfStream) throws IOException;
protected abstract T getResult();
protected abstract T buildResult();
@Override
public final void consumeResponse(
@ -70,34 +60,18 @@ public abstract class AbstractBinResponseConsumer<T> implements AsyncResponseCon
}
} else {
start(response, null);
resultCallback.completed(getResult());
completed();
}
}
@Override
public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
capacityChannel.update(capacity());
}
@Override
public final int consume(final ByteBuffer src) throws IOException {
data(src, false);
return capacity();
}
@Override
public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
data(EMPTY, true);
resultCallback.completed(getResult());
protected final void completed() {
resultCallback.completed(buildResult());
}
@Override
public void failed(final Exception cause) {
}
@Override
public void releaseResources() {
}
}

View File

@ -0,0 +1,100 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.async.methods;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CoderResult;
import java.util.List;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.nio.AsyncDataConsumer;
import org.apache.hc.core5.http.nio.CapacityChannel;
import org.apache.hc.core5.util.Asserts;
public abstract class AbstractCharDataConsumer implements AsyncDataConsumer {
private static final ByteBuffer EMPTY_BIN = ByteBuffer.wrap(new byte[0]);
private final CharBuffer charbuf = CharBuffer.allocate(8192);
private volatile CharsetDecoder charsetDecoder;
protected abstract int capacity();
protected abstract void data(CharBuffer data, boolean endOfStream) throws IOException;
protected abstract void completed() throws IOException;
protected final void setCharsetDecoder(final CharsetDecoder charsetDecoder) {
this.charsetDecoder = charsetDecoder;
}
@Override
public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
capacityChannel.update(capacity());
}
private void checkResult(final CoderResult result) throws IOException {
if (result.isError()) {
result.throwException();
}
}
private void doDecode(final boolean endOfStream) throws IOException {
charbuf.flip();
final int chunk = charbuf.remaining();
if (chunk > 0) {
data(charbuf, endOfStream);
}
charbuf.clear();
}
@Override
public final int consume(final ByteBuffer src) throws IOException {
Asserts.notNull(charsetDecoder, "Charset decoder");
while (src.hasRemaining()) {
checkResult(charsetDecoder.decode(src, charbuf, false));
doDecode(false);
}
return capacity();
}
@Override
public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
Asserts.notNull(charsetDecoder, "Charset decoder");
checkResult(charsetDecoder.decode(EMPTY_BIN, charbuf, true));
doDecode(false);
checkResult(charsetDecoder.flush(charbuf));
doDecode(true);
completed();
}
}

View File

@ -0,0 +1,74 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.async.methods;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.charset.UnsupportedCharsetException;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
public abstract class AbstractCharPushConsumer extends AbstractCharDataConsumer implements AsyncPushConsumer {
protected abstract void start(HttpRequest promise, HttpResponse response, ContentType contentType) throws HttpException, IOException;
@Override
public final void consumePromise(
final HttpRequest promise,
final HttpResponse response,
final EntityDetails entityDetails) throws HttpException, IOException {
if (entityDetails != null) {
final ContentType contentType;
try {
contentType = ContentType.parse(entityDetails.getContentType());
} catch (UnsupportedCharsetException ex) {
throw new UnsupportedEncodingException(ex.getMessage());
}
Charset charset = contentType != null ? contentType.getCharset() : null;
if (charset == null) {
charset = StandardCharsets.US_ASCII;
}
setCharsetDecoder(charset.newDecoder());
start(promise, response, contentType);
} else {
start(promise, response, null);
completed();
}
}
@Override
public void failed(final Exception cause) {
}
}

View File

@ -28,42 +28,24 @@ package org.apache.hc.client5.http.async.methods;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CoderResult;
import java.nio.charset.StandardCharsets;
import java.nio.charset.UnsupportedCharsetException;
import java.util.List;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.apache.hc.core5.http.nio.CapacityChannel;
import org.apache.hc.core5.util.Asserts;
public abstract class AbstractCharResponseConsumer<T> implements AsyncResponseConsumer<T> {
public abstract class AbstractCharResponseConsumer<T> extends AbstractCharDataConsumer implements AsyncResponseConsumer<T> {
private static final ByteBuffer EMPTY_BIN = ByteBuffer.wrap(new byte[0]);
private static final CharBuffer EMPTY_CHAR = CharBuffer.wrap(new char[0]);
private final CharBuffer charbuf = CharBuffer.allocate(8192);
private volatile CharsetDecoder charsetDecoder;
private volatile FutureCallback<T> resultCallback;
protected abstract void start(HttpResponse response, ContentType contentType) throws HttpException, IOException;
protected abstract int capacity();
protected abstract void data(CharBuffer data, boolean endOfStream) throws IOException;
protected abstract T getResult();
protected abstract T buildResult() throws IOException;
@Override
public final void consumeResponse(
@ -72,7 +54,7 @@ public abstract class AbstractCharResponseConsumer<T> implements AsyncResponseCo
final FutureCallback<T> resultCallback) throws HttpException, IOException {
this.resultCallback = resultCallback;
if (entityDetails != null) {
ContentType contentType;
final ContentType contentType;
try {
contentType = ContentType.parse(entityDetails.getContentType());
} catch (UnsupportedCharsetException ex) {
@ -82,61 +64,21 @@ public abstract class AbstractCharResponseConsumer<T> implements AsyncResponseCo
if (charset == null) {
charset = StandardCharsets.US_ASCII;
}
charsetDecoder = charset.newDecoder();
setCharsetDecoder(charset.newDecoder());
start(response, contentType);
} else {
start(response, null);
resultCallback.completed(getResult());
completed();
}
}
@Override
public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
capacityChannel.update(capacity());
}
private void checkResult(final CoderResult result) throws IOException {
if (result.isError()) {
result.throwException();
}
}
private void doDecode() throws IOException {
charbuf.flip();
final int chunk = charbuf.remaining();
if (chunk > 0) {
data(charbuf, false);
}
charbuf.clear();
}
@Override
public final int consume(final ByteBuffer src) throws IOException {
Asserts.notNull(charsetDecoder, "Charset decoder");
while (src.hasRemaining()) {
checkResult(charsetDecoder.decode(src, charbuf, false));
doDecode();
}
return capacity();
}
@Override
public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
Asserts.notNull(charsetDecoder, "Charset decoder");
checkResult(charsetDecoder.decode(EMPTY_BIN, charbuf, true));
doDecode();
checkResult(charsetDecoder.flush(charbuf));
doDecode();
data(EMPTY_CHAR, true);
resultCallback.completed(getResult());
protected final void completed() throws IOException {
resultCallback.completed(buildResult());
}
@Override
public void failed(final Exception cause) {
}
@Override
public void releaseResources() {
}
}

View File

@ -34,7 +34,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.http.ExceptionListener;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.command.ShutdownCommand;
import org.apache.hc.core5.http.nio.command.ShutdownType;
import org.apache.hc.core5.reactor.ConnectionInitiator;
@ -54,6 +56,7 @@ abstract class AbstractHttpAsyncClientBase extends CloseableHttpAsyncClient {
final Logger log = LogManager.getLogger(getClass());
private final AsyncPushConsumerRegistry pushConsumerRegistry;
private final DefaultConnectingIOReactor ioReactor;
private final ExceptionListener exceptionListener;
private final ExecutorService executorService;
@ -61,6 +64,7 @@ abstract class AbstractHttpAsyncClientBase extends CloseableHttpAsyncClient {
public AbstractHttpAsyncClientBase(
final IOEventHandlerFactory eventHandlerFactory,
final AsyncPushConsumerRegistry pushConsumerRegistry,
final IOReactorConfig reactorConfig,
final ThreadFactory threadFactory,
final ThreadFactory workerThreadFactory) throws IOReactorException {
@ -77,6 +81,7 @@ abstract class AbstractHttpAsyncClientBase extends CloseableHttpAsyncClient {
}
});
this.pushConsumerRegistry = pushConsumerRegistry;
this.exceptionListener = new ExceptionListener() {
@Override
public void onError(final Exception ex) {
@ -97,15 +102,18 @@ abstract class AbstractHttpAsyncClientBase extends CloseableHttpAsyncClient {
try {
ioReactor.execute();
} catch (Exception ex) {
if (exceptionListener != null) {
exceptionListener.onError(ex);
}
exceptionListener.onError(ex);
}
}
});
}
}
@Override
public void register(final String hostname, final String uriPattern, final Supplier<AsyncPushConsumer> supplier) {
pushConsumerRegistry.register(hostname, uriPattern, supplier);
}
void ensureRunning() {
switch (status.get()) {
case READY:

View File

@ -0,0 +1,99 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.async;
import java.util.Locale;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.protocol.UriPatternMatcher;
import org.apache.hc.core5.net.URIAuthority;
import org.apache.hc.core5.util.Args;
class AsyncPushConsumerRegistry {
private final UriPatternMatcher<Supplier<AsyncPushConsumer>> primary;
private final ConcurrentMap<String, UriPatternMatcher<Supplier<AsyncPushConsumer>>> hostMap;
public AsyncPushConsumerRegistry() {
this.primary = new UriPatternMatcher<>();
this.hostMap = new ConcurrentHashMap<>();
}
private UriPatternMatcher<Supplier<AsyncPushConsumer>> getPatternMatcher(final String hostname) {
if (hostname == null) {
return primary;
}
final UriPatternMatcher<Supplier<AsyncPushConsumer>> hostMatcher = hostMap.get(hostname);
if (hostMatcher != null) {
return hostMatcher;
}
return primary;
}
public AsyncPushConsumer get(final HttpRequest request) throws HttpException {
Args.notNull(request, "Request");
final URIAuthority authority = request.getAuthority();
final String key = authority != null ? authority.getHostName().toLowerCase(Locale.ROOT) : null;
final UriPatternMatcher<Supplier<AsyncPushConsumer>> patternMatcher = getPatternMatcher(key);
if (patternMatcher == null) {
return null;
}
String path = request.getPath();
final int i = path.indexOf("?");
if (i != -1) {
path = path.substring(0, i);
}
final Supplier<AsyncPushConsumer> supplier = patternMatcher.lookup(path);
return supplier != null ? supplier.get() : null;
}
public void register(final String hostname, final String uriPattern, final Supplier<AsyncPushConsumer> supplier) {
Args.notBlank(uriPattern, "URI pattern");
Args.notNull(supplier, "Supplier");
if (hostname == null) {
primary.register(uriPattern, supplier);
} else {
final String key = hostname.toLowerCase(Locale.ROOT);
UriPatternMatcher<Supplier<AsyncPushConsumer>> matcher = hostMap.get(key);
if (matcher == null) {
final UriPatternMatcher<Supplier<AsyncPushConsumer>> newMatcher = new UriPatternMatcher<>();
matcher = hostMap.putIfAbsent(key, newMatcher);
if (matcher == null) {
matcher = newMatcher;
}
}
matcher.register(uriPattern, supplier);
}
}
}

View File

@ -37,7 +37,9 @@ import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.apache.hc.core5.reactor.ExceptionEvent;
@ -76,4 +78,8 @@ public abstract class CloseableHttpAsyncClient implements HttpAsyncClient, Close
return execute(requestProducer, responseConsumer, HttpClientContext.create(), callback);
}
public final void register(final String uriPattern, final Supplier<AsyncPushConsumer> supplier) {
register(null, uriPattern, supplier);
}
}

View File

@ -43,13 +43,13 @@ import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.DefaultConnectionKeepAliveStrategy;
import org.apache.hc.client5.http.impl.DefaultSchemePortResolver;
import org.apache.hc.client5.http.impl.DefaultThreadFactory;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.impl.DefaultUserTokenHandler;
import org.apache.hc.client5.http.impl.IdleConnectionEvictor;
import org.apache.hc.client5.http.impl.NoopUserTokenHandler;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.impl.routing.DefaultProxyRoutePlanner;
import org.apache.hc.client5.http.impl.routing.DefaultRoutePlanner;
import org.apache.hc.client5.http.impl.routing.SystemDefaultRoutePlanner;
import org.apache.hc.client5.http.impl.IdleConnectionEvictor;
import org.apache.hc.client5.http.impl.NoopUserTokenHandler;
import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
import org.apache.hc.client5.http.protocol.RequestDefaultHeaders;
import org.apache.hc.client5.http.protocol.RequestExpectContinue;
@ -57,6 +57,7 @@ import org.apache.hc.client5.http.protocol.UserTokenHandler;
import org.apache.hc.client5.http.routing.HttpRoutePlanner;
import org.apache.hc.core5.http.ConnectionReuseStrategy;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpRequestInterceptor;
@ -66,6 +67,8 @@ import org.apache.hc.core5.http.HttpVersion;
import org.apache.hc.core5.http.config.ConnectionConfig;
import org.apache.hc.core5.http.config.H1Config;
import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.http.protocol.HttpProcessor;
import org.apache.hc.core5.http.protocol.HttpProcessorBuilder;
@ -518,11 +521,19 @@ public class HttpAsyncClientBuilder {
}
closeablesCopy.add(connManagerCopy);
}
final AsyncPushConsumerRegistry pushConsumerRegistry = new AsyncPushConsumerRegistry();
final IOEventHandlerFactory ioEventHandlerFactory;
if (protocolVersion != null && protocolVersion.greaterEquals(HttpVersion.HTTP_2)) {
ioEventHandlerFactory = new DefaultAsyncHttp2ClientEventHandlerFactory(
httpProcessor,
null,
new HandlerFactory<AsyncPushConsumer>() {
@Override
public AsyncPushConsumer create(final HttpRequest request) throws HttpException {
return pushConsumerRegistry.get(request);
}
},
StandardCharsets.US_ASCII,
h2Config != null ? h2Config : H2Config.DEFAULT);
} else {
@ -554,6 +565,7 @@ public class HttpAsyncClientBuilder {
try {
return new InternalHttpAsyncClient(
ioEventHandlerFactory,
pushConsumerRegistry,
ioReactorConfig != null ? ioReactorConfig : IOReactorConfig.DEFAULT,
new DefaultThreadFactory("httpclient-main", true),
new DefaultThreadFactory("httpclient-dispatch", true),

View File

@ -32,10 +32,14 @@ import java.nio.charset.StandardCharsets;
import org.apache.hc.client5.http.impl.DefaultThreadFactory;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpVersion;
import org.apache.hc.core5.http.config.ConnectionConfig;
import org.apache.hc.core5.http.config.H1Config;
import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
import org.apache.hc.core5.http.protocol.HttpProcessor;
import org.apache.hc.core5.http.protocol.RequestUserAgent;
@ -117,10 +121,12 @@ public class HttpAsyncClients {
private static MinimalHttpAsyncClient createMinimalImpl(
final IOEventHandlerFactory eventHandlerFactory,
final AsyncPushConsumerRegistry pushConsumerRegistry,
final AsyncClientConnectionManager connmgr) {
try {
return new MinimalHttpAsyncClient(
eventHandlerFactory,
pushConsumerRegistry,
IOReactorConfig.DEFAULT,
new DefaultThreadFactory("httpclient-main", true),
new DefaultThreadFactory("httpclient-dispatch", true),
@ -133,22 +139,34 @@ public class HttpAsyncClients {
private static MinimalHttpAsyncClient createMinimalImpl(
final H1Config h1Config,
final AsyncClientConnectionManager connmgr) {
return createMinimalImpl(new DefaultAsyncHttp1ClientEventHandlerFactory(
return createMinimalImpl(
new DefaultAsyncHttp1ClientEventHandlerFactory(
createMinimalProtocolProcessor(),
h1Config,
ConnectionConfig.DEFAULT,
DefaultConnectionReuseStrategy.INSTANCE),
new AsyncPushConsumerRegistry(),
connmgr);
}
private static MinimalHttpAsyncClient createMinimalImpl(
final H2Config h2Config,
final AsyncClientConnectionManager connmgr) {
return createMinimalImpl(new DefaultAsyncHttp2ClientEventHandlerFactory(
final AsyncPushConsumerRegistry pushConsumerRegistry = new AsyncPushConsumerRegistry();
return createMinimalImpl(
new DefaultAsyncHttp2ClientEventHandlerFactory(
createMinimalProtocolProcessor(),
null,
new HandlerFactory<AsyncPushConsumer>() {
@Override
public AsyncPushConsumer create(final HttpRequest request) throws HttpException {
return pushConsumerRegistry.get(request);
}
},
StandardCharsets.US_ASCII,
h2Config),
pushConsumerRegistry,
connmgr);
}

View File

@ -82,8 +82,9 @@ class InternalHttpAsyncClient extends AbstractHttpAsyncClientBase {
private final RequestConfig defaultConfig;
private final List<Closeable> closeables;
public InternalHttpAsyncClient(
InternalHttpAsyncClient(
final IOEventHandlerFactory eventHandlerFactory,
final AsyncPushConsumerRegistry pushConsumerRegistry,
final IOReactorConfig reactorConfig,
final ThreadFactory threadFactory,
final ThreadFactory workerThreadFactory,
@ -93,7 +94,7 @@ class InternalHttpAsyncClient extends AbstractHttpAsyncClientBase {
final UserTokenHandler userTokenHandler,
final RequestConfig defaultConfig,
final List<Closeable> closeables) throws IOReactorException {
super(eventHandlerFactory, reactorConfig, threadFactory, workerThreadFactory);
super(eventHandlerFactory, pushConsumerRegistry, reactorConfig, threadFactory, workerThreadFactory);
this.connmgr = connmgr;
this.routePlanner = routePlanner;
this.keepAliveStrategy = keepAliveStrategy;

View File

@ -61,11 +61,12 @@ class MinimalHttpAsyncClient extends AbstractHttpAsyncClientBase {
public MinimalHttpAsyncClient(
final IOEventHandlerFactory eventHandlerFactory,
final AsyncPushConsumerRegistry pushConsumerRegistry,
final IOReactorConfig reactorConfig,
final ThreadFactory threadFactory,
final ThreadFactory workerThreadFactory,
final AsyncClientConnectionManager connmgr) throws IOReactorException {
super(eventHandlerFactory, reactorConfig, threadFactory, workerThreadFactory);
super(eventHandlerFactory, pushConsumerRegistry, reactorConfig, threadFactory, workerThreadFactory);
this.connmgr = connmgr;
}