mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-22 21:05:23 +00:00
Add support for cancelling async requests in low-level REST client (#45379)
The low-level REST client exposes a `performRequestAsync` method that allows to send async requests, but today it does not expose the ability to cancel such requests. That is something that the underlying apache async http client supports, and it makes sense for us to expose. This commit adds a return value to the `performRequestAsync` method, which is backwards compatible. A `Cancellable` object gets returned, which exposes a `cancel` public method. When calling `cancel`, the on-going request associated with the returned `Cancellable` instance will be cancelled by calling its `abort` method. This works throughout multiple retries, though some special care was needed for the case where `cancel` is called between different attempts (when one attempt has failed and the consecutive one has not been sent yet). Note that cancelling a request on the client side does not automatically translate to cancelling the server side execution of it. That needs to be specifically implemented, which is on the work for the search API (see #43332). Relates to #44802
This commit is contained in:
parent
1c706656c2
commit
cfb186afaf
@ -0,0 +1,87 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.client;
|
||||
|
||||
import org.apache.http.client.methods.AbstractExecutionAwareRequest;
|
||||
import org.apache.http.client.methods.HttpRequestBase;
|
||||
|
||||
import java.util.concurrent.CancellationException;
|
||||
|
||||
/**
|
||||
* Represents an operation that can be cancelled.
|
||||
* Returned when executing async requests through {@link RestClient#performRequestAsync(Request, ResponseListener)}, so that the request
|
||||
* can be cancelled if needed. Cancelling a request will result in calling {@link AbstractExecutionAwareRequest#abort()} on the underlying
|
||||
* request object, which will in turn cancel its corresponding {@link java.util.concurrent.Future}.
|
||||
* Note that cancelling a request does not automatically translate to aborting its execution on the server side, which needs to be
|
||||
* specifically implemented in each API.
|
||||
*/
|
||||
public class Cancellable {
|
||||
|
||||
static final Cancellable NO_OP = new Cancellable(null) {
|
||||
@Override
|
||||
public void cancel() {
|
||||
}
|
||||
|
||||
@Override
|
||||
void runIfNotCancelled(Runnable runnable) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
|
||||
static Cancellable fromRequest(HttpRequestBase httpRequest) {
|
||||
return new Cancellable(httpRequest);
|
||||
}
|
||||
|
||||
private final HttpRequestBase httpRequest;
|
||||
|
||||
private Cancellable(HttpRequestBase httpRequest) {
|
||||
this.httpRequest = httpRequest;
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancels the on-going request that is associated with the current instance of {@link Cancellable}.
|
||||
*
|
||||
*/
|
||||
public synchronized void cancel() {
|
||||
this.httpRequest.abort();
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes some arbitrary code iff the on-going request has not been cancelled, otherwise throws {@link CancellationException}.
|
||||
* This is needed to guarantee that cancelling a request works correctly even in case {@link #cancel()} is called between different
|
||||
* attempts of the same request. The low-level client reuses the same instance of the {@link AbstractExecutionAwareRequest} by calling
|
||||
* {@link AbstractExecutionAwareRequest#reset()} between subsequent retries. The {@link #cancel()} method can be called at anytime,
|
||||
* and we need to handle the case where it gets called while there is no request being executed as one attempt may have failed and
|
||||
* the subsequent attempt has not been started yet.
|
||||
* If the request has already been cancelled we don't go ahead with the next attempt, and artificially raise the
|
||||
* {@link CancellationException}, otherwise we run the provided {@link Runnable} which will reset the request and send the next attempt.
|
||||
* Note that this method must be synchronized as well as the {@link #cancel()} method, to prevent a request from being cancelled
|
||||
* when there is no future to cancel, which would make cancelling the request a no-op.
|
||||
*/
|
||||
synchronized void runIfNotCancelled(Runnable runnable) {
|
||||
if (this.httpRequest.isAborted()) {
|
||||
throw newCancellationException();
|
||||
}
|
||||
runnable.run();
|
||||
}
|
||||
|
||||
static CancellationException newCancellationException() {
|
||||
return new CancellationException("request was cancelled");
|
||||
}
|
||||
}
|
@ -277,60 +277,64 @@ public class RestClient implements Closeable {
|
||||
* @param responseListener the {@link ResponseListener} to notify when the
|
||||
* request is completed or fails
|
||||
*/
|
||||
public void performRequestAsync(Request request, ResponseListener responseListener) {
|
||||
public Cancellable performRequestAsync(Request request, ResponseListener responseListener) {
|
||||
try {
|
||||
FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(responseListener);
|
||||
InternalRequest internalRequest = new InternalRequest(request);
|
||||
performRequestAsync(nextNodes(), internalRequest, failureTrackingResponseListener);
|
||||
return internalRequest.cancellable;
|
||||
} catch (Exception e) {
|
||||
responseListener.onFailure(e);
|
||||
return Cancellable.NO_OP;
|
||||
}
|
||||
}
|
||||
|
||||
private void performRequestAsync(final NodeTuple<Iterator<Node>> nodeTuple,
|
||||
final InternalRequest request,
|
||||
final FailureTrackingResponseListener listener) {
|
||||
final RequestContext context = request.createContextForNextAttempt(nodeTuple.nodes.next(), nodeTuple.authCache);
|
||||
client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, new FutureCallback<HttpResponse>() {
|
||||
@Override
|
||||
public void completed(HttpResponse httpResponse) {
|
||||
try {
|
||||
ResponseOrResponseException responseOrResponseException = convertResponse(request, context.node, httpResponse);
|
||||
if (responseOrResponseException.responseException == null) {
|
||||
listener.onSuccess(responseOrResponseException.response);
|
||||
} else {
|
||||
request.cancellable.runIfNotCancelled(() -> {
|
||||
final RequestContext context = request.createContextForNextAttempt(nodeTuple.nodes.next(), nodeTuple.authCache);
|
||||
client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, new FutureCallback<HttpResponse>() {
|
||||
@Override
|
||||
public void completed(HttpResponse httpResponse) {
|
||||
try {
|
||||
ResponseOrResponseException responseOrResponseException = convertResponse(request, context.node, httpResponse);
|
||||
if (responseOrResponseException.responseException == null) {
|
||||
listener.onSuccess(responseOrResponseException.response);
|
||||
} else {
|
||||
if (nodeTuple.nodes.hasNext()) {
|
||||
listener.trackFailure(responseOrResponseException.responseException);
|
||||
performRequestAsync(nodeTuple, request, listener);
|
||||
} else {
|
||||
listener.onDefinitiveFailure(responseOrResponseException.responseException);
|
||||
}
|
||||
}
|
||||
} catch(Exception e) {
|
||||
listener.onDefinitiveFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Exception failure) {
|
||||
try {
|
||||
RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, failure);
|
||||
onFailure(context.node);
|
||||
if (nodeTuple.nodes.hasNext()) {
|
||||
listener.trackFailure(responseOrResponseException.responseException);
|
||||
listener.trackFailure(failure);
|
||||
performRequestAsync(nodeTuple, request, listener);
|
||||
} else {
|
||||
listener.onDefinitiveFailure(responseOrResponseException.responseException);
|
||||
listener.onDefinitiveFailure(failure);
|
||||
}
|
||||
} catch(Exception e) {
|
||||
listener.onDefinitiveFailure(e);
|
||||
}
|
||||
} catch(Exception e) {
|
||||
listener.onDefinitiveFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Exception failure) {
|
||||
try {
|
||||
RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, failure);
|
||||
onFailure(context.node);
|
||||
if (nodeTuple.nodes.hasNext()) {
|
||||
listener.trackFailure(failure);
|
||||
performRequestAsync(nodeTuple, request, listener);
|
||||
} else {
|
||||
listener.onDefinitiveFailure(failure);
|
||||
}
|
||||
} catch(Exception e) {
|
||||
listener.onDefinitiveFailure(e);
|
||||
@Override
|
||||
public void cancelled() {
|
||||
listener.onDefinitiveFailure(Cancellable.newCancellationException());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancelled() {
|
||||
listener.onDefinitiveFailure(new ExecutionException("request was cancelled", null));
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@ -651,19 +655,20 @@ public class RestClient implements Closeable {
|
||||
|
||||
private class InternalRequest {
|
||||
private final Request request;
|
||||
private final Map<String, String> params;
|
||||
private final Set<Integer> ignoreErrorCodes;
|
||||
private final HttpRequestBase httpRequest;
|
||||
private final Cancellable cancellable;
|
||||
private final WarningsHandler warningsHandler;
|
||||
|
||||
InternalRequest(Request request) {
|
||||
this.request = request;
|
||||
this.params = new HashMap<>(request.getParameters());
|
||||
Map<String, String> params = new HashMap<>(request.getParameters());
|
||||
//ignore is a special parameter supported by the clients, shouldn't be sent to es
|
||||
String ignoreString = params.remove("ignore");
|
||||
this.ignoreErrorCodes = getIgnoreErrorCodes(ignoreString, request.getMethod());
|
||||
URI uri = buildUri(pathPrefix, request.getEndpoint(), params);
|
||||
this.httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity());
|
||||
this.cancellable = Cancellable.fromRequest(httpRequest);
|
||||
setHeaders(httpRequest, request.getOptions().getHeaders());
|
||||
this.warningsHandler = request.getOptions().getWarningsHandler() == null ?
|
||||
RestClient.this.warningsHandler : request.getOptions().getWarningsHandler();
|
||||
|
@ -35,6 +35,7 @@ import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CancellationException;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -42,7 +43,9 @@ import java.util.concurrent.TimeUnit;
|
||||
import static org.elasticsearch.client.RestClientTestUtil.getAllStatusCodes;
|
||||
import static org.elasticsearch.client.RestClientTestUtil.randomErrorNoRetryStatusCode;
|
||||
import static org.elasticsearch.client.RestClientTestUtil.randomOkStatusCode;
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
@ -52,6 +55,7 @@ import static org.junit.Assert.fail;
|
||||
*/
|
||||
public class RestClientMultipleHostsIntegTests extends RestClientTestCase {
|
||||
|
||||
private static WaitForCancelHandler waitForCancelHandler;
|
||||
private static HttpServer[] httpServers;
|
||||
private static HttpHost[] httpHosts;
|
||||
private static boolean stoppedFirstHost = false;
|
||||
@ -94,9 +98,34 @@ public class RestClientMultipleHostsIntegTests extends RestClientTestCase {
|
||||
for (int statusCode : getAllStatusCodes()) {
|
||||
httpServer.createContext(pathPrefix + "/" + statusCode, new ResponseHandler(statusCode));
|
||||
}
|
||||
waitForCancelHandler = new WaitForCancelHandler();
|
||||
httpServer.createContext(pathPrefix + "/wait", waitForCancelHandler);
|
||||
return httpServer;
|
||||
}
|
||||
|
||||
private static class WaitForCancelHandler implements HttpHandler {
|
||||
private CountDownLatch cancelHandlerLatch;
|
||||
|
||||
void reset() {
|
||||
cancelHandlerLatch = new CountDownLatch(1);
|
||||
}
|
||||
|
||||
void cancelDone() {
|
||||
cancelHandlerLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(HttpExchange exchange) throws IOException {
|
||||
try {
|
||||
cancelHandlerLatch.await();
|
||||
} catch (InterruptedException ignore) {
|
||||
} finally {
|
||||
exchange.sendResponseHeaders(200, 0);
|
||||
exchange.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class ResponseHandler implements HttpHandler {
|
||||
private final int statusCode;
|
||||
|
||||
@ -127,7 +156,7 @@ public class RestClientMultipleHostsIntegTests extends RestClientTestCase {
|
||||
//verify that shutting down some hosts doesn't matter as long as one working host is left behind
|
||||
if (httpServers.length > 1 && randomBoolean()) {
|
||||
List<HttpServer> updatedHttpServers = new ArrayList<>(httpServers.length - 1);
|
||||
int nodeIndex = randomInt(httpServers.length - 1);
|
||||
int nodeIndex = randomIntBetween(0, httpServers.length - 1);
|
||||
if (0 == nodeIndex) {
|
||||
stoppedFirstHost = true;
|
||||
}
|
||||
@ -139,7 +168,7 @@ public class RestClientMultipleHostsIntegTests extends RestClientTestCase {
|
||||
updatedHttpServers.add(httpServer);
|
||||
}
|
||||
}
|
||||
httpServers = updatedHttpServers.toArray(new HttpServer[updatedHttpServers.size()]);
|
||||
httpServers = updatedHttpServers.toArray(new HttpServer[0]);
|
||||
}
|
||||
}
|
||||
|
||||
@ -195,6 +224,40 @@ public class RestClientMultipleHostsIntegTests extends RestClientTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
public void testCancelAsyncRequests() throws Exception {
|
||||
int numRequests = randomIntBetween(5, 20);
|
||||
final CountDownLatch latch = new CountDownLatch(numRequests);
|
||||
final List<Response> responses = new CopyOnWriteArrayList<>();
|
||||
final List<Exception> exceptions = new CopyOnWriteArrayList<>();
|
||||
for (int i = 0; i < numRequests; i++) {
|
||||
waitForCancelHandler.reset();
|
||||
final String method = RestClientTestUtil.randomHttpMethod(getRandom());
|
||||
//we don't test status codes that are subject to retries as they interfere with hosts being stopped
|
||||
final int statusCode = randomBoolean() ? randomOkStatusCode(getRandom()) : randomErrorNoRetryStatusCode(getRandom());
|
||||
Cancellable cancellable = restClient.performRequestAsync(new Request(method, "/" + statusCode), new ResponseListener() {
|
||||
@Override
|
||||
public void onSuccess(Response response) {
|
||||
responses.add(response);
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception exception) {
|
||||
exceptions.add(exception);
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
cancellable.cancel();
|
||||
waitForCancelHandler.cancelDone();
|
||||
}
|
||||
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||
assertEquals(0, responses.size());
|
||||
assertEquals(numRequests, exceptions.size());
|
||||
for (Exception exception : exceptions) {
|
||||
assertThat(exception, instanceOf(CancellationException.class));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test host selector against a real server <strong>and</strong>
|
||||
* test what happens after calling
|
||||
@ -249,13 +312,10 @@ public class RestClientMultipleHostsIntegTests extends RestClientTestCase {
|
||||
}
|
||||
|
||||
private NodeSelector firstPositionNodeSelector() {
|
||||
return new NodeSelector() {
|
||||
@Override
|
||||
public void select(Iterable<Node> nodes) {
|
||||
for (Iterator<Node> itr = nodes.iterator(); itr.hasNext();) {
|
||||
if (httpHosts[0] != itr.next().getHost()) {
|
||||
itr.remove();
|
||||
}
|
||||
return nodes -> {
|
||||
for (Iterator<Node> itr = nodes.iterator(); itr.hasNext();) {
|
||||
if (httpHosts[0] != itr.next().getHost()) {
|
||||
itr.remove();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -243,19 +243,16 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
|
||||
}
|
||||
|
||||
public void testNodeSelector() throws Exception {
|
||||
NodeSelector firstPositionOnly = new NodeSelector() {
|
||||
@Override
|
||||
public void select(Iterable<Node> restClientNodes) {
|
||||
boolean found = false;
|
||||
for (Iterator<Node> itr = restClientNodes.iterator(); itr.hasNext();) {
|
||||
if (nodes.get(0) == itr.next()) {
|
||||
found = true;
|
||||
} else {
|
||||
itr.remove();
|
||||
}
|
||||
NodeSelector firstPositionOnly = restClientNodes -> {
|
||||
boolean found = false;
|
||||
for (Iterator<Node> itr = restClientNodes.iterator(); itr.hasNext();) {
|
||||
if (nodes.get(0) == itr.next()) {
|
||||
found = true;
|
||||
} else {
|
||||
itr.remove();
|
||||
}
|
||||
assertTrue(found);
|
||||
}
|
||||
assertTrue(found);
|
||||
};
|
||||
RestClient restClient = createRestClient(firstPositionOnly);
|
||||
int rounds = between(1, 10);
|
||||
|
@ -26,11 +26,15 @@ import com.sun.net.httpserver.HttpServer;
|
||||
import org.apache.http.Consts;
|
||||
import org.apache.http.Header;
|
||||
import org.apache.http.HttpHost;
|
||||
import org.apache.http.HttpResponse;
|
||||
import org.apache.http.auth.AuthScope;
|
||||
import org.apache.http.auth.UsernamePasswordCredentials;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.client.methods.HttpRequestBase;
|
||||
import org.apache.http.entity.ContentType;
|
||||
import org.apache.http.impl.client.BasicCredentialsProvider;
|
||||
import org.apache.http.impl.client.TargetAuthenticationStrategy;
|
||||
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
|
||||
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
|
||||
import org.apache.http.message.BasicHeader;
|
||||
import org.apache.http.nio.entity.NStringEntity;
|
||||
@ -49,16 +53,22 @@ import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CancellationException;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.elasticsearch.client.RestClientTestUtil.getAllStatusCodes;
|
||||
import static org.elasticsearch.client.RestClientTestUtil.getHttpMethods;
|
||||
import static org.elasticsearch.client.RestClientTestUtil.randomHttpMethod;
|
||||
import static org.elasticsearch.client.RestClientTestUtil.randomStatusCode;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
@ -73,6 +83,7 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase {
|
||||
private RestClient restClient;
|
||||
private String pathPrefix;
|
||||
private Header[] defaultHeaders;
|
||||
private WaitForCancelHandler waitForCancelHandler;
|
||||
|
||||
@Before
|
||||
public void startHttpServer() throws Exception {
|
||||
@ -89,9 +100,31 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase {
|
||||
for (int statusCode : getAllStatusCodes()) {
|
||||
httpServer.createContext(pathPrefix + "/" + statusCode, new ResponseHandler(statusCode));
|
||||
}
|
||||
waitForCancelHandler = new WaitForCancelHandler();
|
||||
httpServer.createContext(pathPrefix + "/wait", waitForCancelHandler);
|
||||
return httpServer;
|
||||
}
|
||||
|
||||
private class WaitForCancelHandler implements HttpHandler {
|
||||
|
||||
private final CountDownLatch cancelHandlerLatch = new CountDownLatch(1);
|
||||
|
||||
void cancelDone() {
|
||||
cancelHandlerLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(HttpExchange exchange) throws IOException {
|
||||
try {
|
||||
cancelHandlerLatch.await();
|
||||
} catch (InterruptedException ignore) {
|
||||
} finally {
|
||||
exchange.sendResponseHeaders(200, 0);
|
||||
exchange.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class ResponseHandler implements HttpHandler {
|
||||
private final int statusCode;
|
||||
|
||||
@ -201,6 +234,75 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
public void testCancelAsyncRequest() throws Exception {
|
||||
Request request = new Request(randomHttpMethod(getRandom()), "/wait");
|
||||
CountDownLatch requestLatch = new CountDownLatch(1);
|
||||
AtomicReference<Exception> error = new AtomicReference<>();
|
||||
Cancellable cancellable = restClient.performRequestAsync(request, new ResponseListener() {
|
||||
@Override
|
||||
public void onSuccess(Response response) {
|
||||
throw new AssertionError("onResponse called unexpectedly");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception exception) {
|
||||
error.set(exception);
|
||||
requestLatch.countDown();
|
||||
}
|
||||
});
|
||||
cancellable.cancel();
|
||||
waitForCancelHandler.cancelDone();
|
||||
assertTrue(requestLatch.await(5, TimeUnit.SECONDS));
|
||||
assertThat(error.get(), instanceOf(CancellationException.class));
|
||||
}
|
||||
|
||||
/**
|
||||
* This test verifies some assumptions that we rely upon around the way the async http client works when reusing the same request
|
||||
* throughout multiple retries, and the use of the {@link HttpRequestBase#abort()} method.
|
||||
*/
|
||||
public void testRequestResetAndAbort() throws Exception {
|
||||
try (CloseableHttpAsyncClient client = HttpAsyncClientBuilder.create().build()) {
|
||||
client.start();
|
||||
HttpHost httpHost = new HttpHost(httpServer.getAddress().getHostString(), httpServer.getAddress().getPort());
|
||||
HttpGet httpGet = new HttpGet(pathPrefix + "/200");
|
||||
|
||||
//calling abort before the request is sent is a no-op
|
||||
httpGet.abort();
|
||||
assertTrue(httpGet.isAborted());
|
||||
|
||||
{
|
||||
httpGet.reset();
|
||||
assertFalse(httpGet.isAborted());
|
||||
httpGet.abort();//this has no effect on the next call (although isAborted will return true until the next reset)
|
||||
Future<HttpResponse> future = client.execute(httpHost, httpGet, null);
|
||||
assertEquals(200, future.get().getStatusLine().getStatusCode());
|
||||
assertFalse(future.isCancelled());
|
||||
}
|
||||
{
|
||||
httpGet.reset();
|
||||
Future<HttpResponse> future = client.execute(httpHost, httpGet, null);
|
||||
assertFalse(httpGet.isAborted());
|
||||
httpGet.abort();
|
||||
assertTrue(httpGet.isAborted());
|
||||
try {
|
||||
assertTrue(future.isCancelled());
|
||||
future.get();
|
||||
throw new AssertionError("exception should have been thrown");
|
||||
} catch(CancellationException e) {
|
||||
//expected
|
||||
}
|
||||
}
|
||||
{
|
||||
httpGet.reset();
|
||||
assertFalse(httpGet.isAborted());
|
||||
Future<HttpResponse> future = client.execute(httpHost, httpGet, null);
|
||||
assertFalse(httpGet.isAborted());
|
||||
assertEquals(200, future.get().getStatusLine().getStatusCode());
|
||||
assertFalse(future.isCancelled());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* End to end test for headers. We test it explicitly against a real http client as there are different ways
|
||||
* to set/add headers to the {@link org.apache.http.client.HttpClient}.
|
||||
@ -356,7 +458,6 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase {
|
||||
assertThat(response200.getHeader("Authorization"), startsWith("Basic"));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void testUrlWithoutLeadingSlash() throws Exception {
|
||||
|
@ -52,7 +52,6 @@ import org.apache.http.util.EntityUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import javax.net.ssl.SSLHandshakeException;
|
||||
@ -68,7 +67,6 @@ import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
@ -126,30 +124,24 @@ public class RestClientSingleHostTests extends RestClientTestCase {
|
||||
static CloseableHttpAsyncClient mockHttpClient(final ExecutorService exec) {
|
||||
CloseableHttpAsyncClient httpClient = mock(CloseableHttpAsyncClient.class);
|
||||
when(httpClient.<HttpResponse>execute(any(HttpAsyncRequestProducer.class), any(HttpAsyncResponseConsumer.class),
|
||||
any(HttpClientContext.class), any(FutureCallback.class))).thenAnswer(new Answer<Future<HttpResponse>>() {
|
||||
@Override
|
||||
public Future<HttpResponse> answer(InvocationOnMock invocationOnMock) throws Throwable {
|
||||
any(HttpClientContext.class), any(FutureCallback.class))).thenAnswer((Answer<Future<HttpResponse>>) invocationOnMock -> {
|
||||
final HttpAsyncRequestProducer requestProducer = (HttpAsyncRequestProducer) invocationOnMock.getArguments()[0];
|
||||
final FutureCallback<HttpResponse> futureCallback =
|
||||
(FutureCallback<HttpResponse>) invocationOnMock.getArguments()[3];
|
||||
// Call the callback asynchronous to better simulate how async http client works
|
||||
return exec.submit(new Callable<HttpResponse>() {
|
||||
@Override
|
||||
public HttpResponse call() throws Exception {
|
||||
if (futureCallback != null) {
|
||||
try {
|
||||
HttpResponse httpResponse = responseOrException(requestProducer);
|
||||
futureCallback.completed(httpResponse);
|
||||
} catch(Exception e) {
|
||||
futureCallback.failed(e);
|
||||
}
|
||||
return null;
|
||||
return exec.submit(() -> {
|
||||
if (futureCallback != null) {
|
||||
try {
|
||||
HttpResponse httpResponse = responseOrException(requestProducer);
|
||||
futureCallback.completed(httpResponse);
|
||||
} catch(Exception e) {
|
||||
futureCallback.failed(e);
|
||||
}
|
||||
return responseOrException(requestProducer);
|
||||
return null;
|
||||
}
|
||||
return responseOrException(requestProducer);
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
return httpClient;
|
||||
}
|
||||
|
||||
|
@ -36,6 +36,7 @@ import org.apache.http.nio.entity.NStringEntity;
|
||||
import org.apache.http.ssl.SSLContextBuilder;
|
||||
import org.apache.http.ssl.SSLContexts;
|
||||
import org.apache.http.util.EntityUtils;
|
||||
import org.elasticsearch.client.Cancellable;
|
||||
import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
|
||||
import org.elasticsearch.client.Node;
|
||||
import org.elasticsearch.client.NodeSelector;
|
||||
@ -206,16 +207,17 @@ public class RestClientDocumentation {
|
||||
Request request = new Request(
|
||||
"GET", // <1>
|
||||
"/"); // <2>
|
||||
restClient.performRequestAsync(request, new ResponseListener() {
|
||||
@Override
|
||||
public void onSuccess(Response response) {
|
||||
// <3>
|
||||
}
|
||||
Cancellable cancellable = restClient.performRequestAsync(request,
|
||||
new ResponseListener() {
|
||||
@Override
|
||||
public void onSuccess(Response response) {
|
||||
// <3>
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception exception) {
|
||||
// <4>
|
||||
}
|
||||
@Override
|
||||
public void onFailure(Exception exception) {
|
||||
// <4>
|
||||
}
|
||||
});
|
||||
//end::rest-client-async
|
||||
}
|
||||
@ -271,6 +273,26 @@ public class RestClientDocumentation {
|
||||
latch.await();
|
||||
//end::rest-client-async-example
|
||||
}
|
||||
{
|
||||
//tag::rest-client-async-cancel
|
||||
Request request = new Request("GET", "/posts/_search");
|
||||
Cancellable cancellable = restClient.performRequestAsync(
|
||||
request,
|
||||
new ResponseListener() {
|
||||
@Override
|
||||
public void onSuccess(Response response) {
|
||||
// <1>
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception exception) {
|
||||
// <2>
|
||||
}
|
||||
}
|
||||
);
|
||||
cancellable.cancel();
|
||||
//end::rest-client-async-cancel
|
||||
}
|
||||
{
|
||||
//tag::rest-client-response2
|
||||
Response response = restClient.performRequest(new Request("GET", "/"));
|
||||
|
@ -224,7 +224,7 @@ Once the `RestClient` has been created, requests can be sent by calling either
|
||||
will block the calling thread and return the `Response` when the request is
|
||||
successful or throw an exception if it fails. `performRequestAsync` is
|
||||
asynchronous and accepts a `ResponseListener` argument that it calls with a
|
||||
`Response` when the request is successful or with an `Exception` if it4 fails.
|
||||
`Response` when the request is successful or with an `Exception` if it fails.
|
||||
|
||||
This is synchronous:
|
||||
|
||||
@ -329,6 +329,22 @@ include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-async-examp
|
||||
<2> Handle the returned exception, due to communication error or a response
|
||||
with status code that indicates an error
|
||||
|
||||
==== Cancelling asynchronous requests
|
||||
|
||||
The `performRequestAsync` method returns a `Cancellable` that exposes a single
|
||||
public method called `cancel`. Such method can be called to cancel the on-going
|
||||
request. Cancelling a request will result in aborting the http request through
|
||||
the underlying http client. On the server side, this does not automatically
|
||||
translate to the execution of that request being cancelled, which needs to be
|
||||
specifically implemented in the API itself.
|
||||
|
||||
["source","java",subs="attributes,callouts,macros"]
|
||||
--------------------------------------------------
|
||||
include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-async-cancel]
|
||||
--------------------------------------------------
|
||||
<1> Process the returned response, in case it was ready before the request got cancelled
|
||||
<2> Handle the returned exception, which will most likely be a `CancellationException` as the request got cancelled
|
||||
|
||||
[[java-rest-low-usage-responses]]
|
||||
=== Reading responses
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user