Make SyncResponseListener safer
Throw explicit IllegalStateException in unexpected situations, like where both response and exception are set, or when both are unset. Add unit test for SyncResponseListener.
This commit is contained in:
parent
175c327e17
commit
37e075a506
|
@ -61,6 +61,7 @@ import java.util.concurrent.CountDownLatch;
|
|||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
* Client that connects to an elasticsearch cluster through http.
|
||||
|
@ -199,7 +200,7 @@ public final class RestClient implements Closeable {
|
|||
public Response performRequest(String method, String endpoint, Map<String, String> params,
|
||||
HttpEntity entity, HttpAsyncResponseConsumer<HttpResponse> responseConsumer,
|
||||
Header... headers) throws IOException {
|
||||
SyncResponseListener listener = new SyncResponseListener();
|
||||
SyncResponseListener listener = new SyncResponseListener(maxRetryTimeoutMillis);
|
||||
performRequest(method, endpoint, params, entity, responseConsumer, listener, headers);
|
||||
return listener.get();
|
||||
}
|
||||
|
@ -525,42 +526,72 @@ public final class RestClient implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
private static class SyncResponseListener implements ResponseListener {
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
volatile Response response;
|
||||
volatile Exception exception;
|
||||
static class SyncResponseListener implements ResponseListener {
|
||||
private final CountDownLatch latch = new CountDownLatch(1);
|
||||
private final AtomicReference<Response> response = new AtomicReference<>();
|
||||
private final AtomicReference<Exception> exception = new AtomicReference<>();
|
||||
|
||||
private final long timeout;
|
||||
|
||||
SyncResponseListener(long timeout) {
|
||||
assert timeout > 0;
|
||||
this.timeout = timeout;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSuccess(Response response) {
|
||||
this.response = response;
|
||||
Objects.requireNonNull(response, "response must not be null");
|
||||
boolean wasResponseNull = this.response.compareAndSet(null, response);
|
||||
if (wasResponseNull == false) {
|
||||
throw new IllegalStateException("response is already set");
|
||||
}
|
||||
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception exception) {
|
||||
this.exception = exception;
|
||||
Objects.requireNonNull(exception, "exception must not be null");
|
||||
boolean wasExceptionNull = this.exception.compareAndSet(null, exception);
|
||||
if (wasExceptionNull == false) {
|
||||
throw new IllegalStateException("exception is already set");
|
||||
}
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
Response get() throws IOException {
|
||||
try {
|
||||
latch.await();
|
||||
//providing timeout is just a safety measure to prevent everlasting waits
|
||||
//the different client timeouts should already do their jobs
|
||||
if (latch.await(timeout, TimeUnit.MILLISECONDS) == false) {
|
||||
throw new IOException("listener timeout after waiting for [" + timeout + "] ms");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException("thread waiting for the response was interrupted", e);
|
||||
}
|
||||
if (response != null) {
|
||||
assert exception == null;
|
||||
return response;
|
||||
|
||||
Exception exception = this.exception.get();
|
||||
Response response = this.response.get();
|
||||
if (exception != null) {
|
||||
if (response != null) {
|
||||
IllegalStateException e = new IllegalStateException("response and exception are unexpectedly set at the same time");
|
||||
e.addSuppressed(exception);
|
||||
throw e;
|
||||
}
|
||||
//try and leave the exception untouched as much as possible but we don't want to just add throws Exception clause everywhere
|
||||
if (exception instanceof IOException) {
|
||||
throw (IOException) exception;
|
||||
}
|
||||
if (exception instanceof RuntimeException){
|
||||
throw (RuntimeException) exception;
|
||||
}
|
||||
throw new RuntimeException("error while performing request", exception);
|
||||
}
|
||||
assert exception != null;
|
||||
//try and leave the exception untouched as much as possible but we don't want to just add throws Exception clause everywhere
|
||||
if (exception instanceof IOException) {
|
||||
throw (IOException) exception;
|
||||
|
||||
if (response == null) {
|
||||
throw new IllegalStateException("response not set and no exception caught either");
|
||||
}
|
||||
if (exception instanceof RuntimeException){
|
||||
throw (RuntimeException) exception;
|
||||
}
|
||||
throw new IOException("error while performing request", exception);
|
||||
return response;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,172 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.client;
|
||||
|
||||
import org.apache.http.HttpHost;
|
||||
import org.apache.http.HttpResponse;
|
||||
import org.apache.http.ProtocolVersion;
|
||||
import org.apache.http.RequestLine;
|
||||
import org.apache.http.StatusLine;
|
||||
import org.apache.http.message.BasicHttpResponse;
|
||||
import org.apache.http.message.BasicRequestLine;
|
||||
import org.apache.http.message.BasicStatusLine;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class SyncResponseListenerTests extends RestClientTestCase {
|
||||
|
||||
public void testOnSuccessNullResponse() {
|
||||
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000);
|
||||
try {
|
||||
syncResponseListener.onSuccess(null);
|
||||
fail("onSuccess should have failed");
|
||||
} catch(NullPointerException e) {
|
||||
assertEquals("response must not be null", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public void testOnFailureNullException() {
|
||||
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000);
|
||||
try {
|
||||
syncResponseListener.onFailure(null);
|
||||
fail("onFailure should have failed");
|
||||
} catch(NullPointerException e) {
|
||||
assertEquals("exception must not be null", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public void testOnSuccess() throws Exception {
|
||||
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000);
|
||||
Response mockResponse = mockResponse();
|
||||
syncResponseListener.onSuccess(mockResponse);
|
||||
Response response = syncResponseListener.get();
|
||||
assertSame(response, mockResponse);
|
||||
|
||||
try {
|
||||
syncResponseListener.onSuccess(mockResponse);
|
||||
fail("get should have failed");
|
||||
} catch(IllegalStateException e) {
|
||||
assertEquals(e.getMessage(), "response is already set");
|
||||
}
|
||||
response = syncResponseListener.get();
|
||||
assertSame(response, mockResponse);
|
||||
|
||||
RuntimeException runtimeException = new RuntimeException("test");
|
||||
syncResponseListener.onFailure(runtimeException);
|
||||
try {
|
||||
syncResponseListener.get();
|
||||
fail("get should have failed");
|
||||
} catch(IllegalStateException e) {
|
||||
assertEquals("response and exception are unexpectedly set at the same time", e.getMessage());
|
||||
assertNotNull(e.getSuppressed());
|
||||
assertEquals(1, e.getSuppressed().length);
|
||||
assertSame(runtimeException, e.getSuppressed()[0]);
|
||||
}
|
||||
}
|
||||
|
||||
public void testOnFailure() throws Exception {
|
||||
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000);
|
||||
RuntimeException firstException = new RuntimeException("first-test");
|
||||
syncResponseListener.onFailure(firstException);
|
||||
try {
|
||||
syncResponseListener.get();
|
||||
fail("get should have failed");
|
||||
} catch(RuntimeException e) {
|
||||
assertSame(firstException, e);
|
||||
}
|
||||
|
||||
RuntimeException secondException = new RuntimeException("second-test");
|
||||
try {
|
||||
syncResponseListener.onFailure(secondException);
|
||||
} catch(IllegalStateException e) {
|
||||
assertEquals(e.getMessage(), "exception is already set");
|
||||
}
|
||||
try {
|
||||
syncResponseListener.get();
|
||||
fail("get should have failed");
|
||||
} catch(RuntimeException e) {
|
||||
assertSame(firstException, e);
|
||||
}
|
||||
|
||||
Response response = mockResponse();
|
||||
syncResponseListener.onSuccess(response);
|
||||
try {
|
||||
syncResponseListener.get();
|
||||
fail("get should have failed");
|
||||
} catch(IllegalStateException e) {
|
||||
assertEquals("response and exception are unexpectedly set at the same time", e.getMessage());
|
||||
assertNotNull(e.getSuppressed());
|
||||
assertEquals(1, e.getSuppressed().length);
|
||||
assertSame(firstException, e.getSuppressed()[0]);
|
||||
}
|
||||
}
|
||||
|
||||
public void testRuntimeExceptionIsNotWrapped() throws Exception {
|
||||
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000);
|
||||
RuntimeException runtimeException = new RuntimeException();
|
||||
syncResponseListener.onFailure(runtimeException);
|
||||
try {
|
||||
syncResponseListener.get();
|
||||
fail("get should have failed");
|
||||
} catch(RuntimeException e) {
|
||||
assertSame(runtimeException, e);
|
||||
}
|
||||
}
|
||||
|
||||
public void testIOExceptionIsNotWrapped() throws Exception {
|
||||
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000);
|
||||
IOException ioException = new IOException();
|
||||
syncResponseListener.onFailure(ioException);
|
||||
try {
|
||||
syncResponseListener.get();
|
||||
fail("get should have failed");
|
||||
} catch(IOException e) {
|
||||
assertSame(ioException, e);
|
||||
}
|
||||
}
|
||||
|
||||
public void testExceptionIsWrapped() throws Exception {
|
||||
RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000);
|
||||
//we just need any checked exception
|
||||
URISyntaxException exception = new URISyntaxException("test", "test");
|
||||
syncResponseListener.onFailure(exception);
|
||||
try {
|
||||
syncResponseListener.get();
|
||||
fail("get should have failed");
|
||||
} catch(RuntimeException e) {
|
||||
assertEquals("error while performing request", e.getMessage());
|
||||
assertSame(exception, e.getCause());
|
||||
}
|
||||
}
|
||||
|
||||
private static Response mockResponse() {
|
||||
ProtocolVersion protocolVersion = new ProtocolVersion("HTTP", 1, 1);
|
||||
RequestLine requestLine = new BasicRequestLine("GET", "/", protocolVersion);
|
||||
StatusLine statusLine = new BasicStatusLine(protocolVersion, 200, "OK");
|
||||
HttpResponse httpResponse = new BasicHttpResponse(statusLine);
|
||||
return new Response(requestLine, new HttpHost("localhost", 9200), httpResponse);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue