JsonParserIterator.init future timeout (#8550)

* add timeout support for JsonParserIterator init future

* add queryId

* should be less than 1

* fix

* fix npe

* fix lgtm

* adjust exception, nullable

* fix test

* refactor

* revert queryId change

* add log.warn to tie exception to json parser iterator
This commit is contained in:
Clint Wylie 2019-09-26 17:13:37 -07:00 committed by Jihoon Son
parent 7f2b6577ef
commit 7781820dea
4 changed files with 250 additions and 163 deletions

View File

@ -163,7 +163,7 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
throw new QueryInterruptedException(e);
}
catch (TimeoutException e) {
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
log.warn("Query timeout, cancelling pending results for query id [%s]", query.getId());
futures.cancel(true);
throw new QueryInterruptedException(e);
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.StringUtils;
import javax.annotation.Nullable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeoutException;
@ -57,10 +58,10 @@ public class QueryInterruptedException extends RuntimeException
@JsonCreator
public QueryInterruptedException(
@JsonProperty("error") String errorCode,
@JsonProperty("error") @Nullable String errorCode,
@JsonProperty("errorMessage") String errorMessage,
@JsonProperty("errorClass") String errorClass,
@JsonProperty("host") String host
@JsonProperty("errorClass") @Nullable String errorClass,
@JsonProperty("host") @Nullable String host
)
{
super(errorMessage);
@ -88,6 +89,7 @@ public class QueryInterruptedException extends RuntimeException
this.host = host;
}
@Nullable
@JsonProperty("error")
public String getErrorCode()
{
@ -144,6 +146,7 @@ public class QueryInterruptedException extends RuntimeException
}
}
@Nullable
private static String getErrorClassFromThrowable(Throwable e)
{
if (e instanceof QueryInterruptedException) {
@ -155,6 +158,7 @@ public class QueryInterruptedException extends RuntimeException
}
}
@Nullable
private static String getHostFromThrowable(Throwable e)
{
if (e instanceof QueryInterruptedException) {

View File

@ -26,12 +26,15 @@ import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletResponse;
import java.io.Closeable;
import java.io.IOException;
@ -40,24 +43,30 @@ import java.util.Iterator;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class JsonParserIterator<T> implements Iterator<T>, Closeable
{
private static final Logger LOG = new Logger(JsonParserIterator.class);
private JsonParser jp;
private ObjectCodec objectCodec;
private final JavaType typeRef;
private final Future<InputStream> future;
private final Query<T> query;
private final String url;
private final String host;
private final ObjectMapper objectMapper;
private final BytesAccumulatingResponseHandler responseHandler;
private final boolean hasTimeout;
private final long timeoutAt;
private final String queryId;
public JsonParserIterator(
JavaType typeRef,
Future<InputStream> future,
String url,
Query<T> query,
@Nullable Query<T> query,
String host,
ObjectMapper objectMapper,
BytesAccumulatingResponseHandler responseHandler
@ -66,11 +75,18 @@ public class JsonParserIterator<T> implements Iterator<T>, Closeable
this.typeRef = typeRef;
this.future = future;
this.url = url;
this.query = query;
jp = null;
if (query != null) {
this.timeoutAt = query.<Long>getContextValue(DirectDruidClient.QUERY_FAIL_TIME, -1L);
this.queryId = query.getId();
} else {
this.timeoutAt = -1;
this.queryId = null;
}
this.jp = null;
this.host = host;
this.objectMapper = objectMapper;
this.responseHandler = responseHandler;
this.hasTimeout = timeoutAt > -1;
}
@Override
@ -114,49 +130,47 @@ public class JsonParserIterator<T> implements Iterator<T>, Closeable
{
if (jp == null) {
try {
InputStream is = future.get();
long timeLeftMillis = timeoutAt - System.currentTimeMillis();
if (hasTimeout && timeLeftMillis < 1) {
throw new TimeoutException(StringUtils.format("url[%s] timed out", url));
}
InputStream is = hasTimeout
? future.get(timeLeftMillis, TimeUnit.MILLISECONDS)
: future.get();
if (responseHandler != null && responseHandler.getStatus() != HttpServletResponse.SC_OK) {
throw new RE(
interruptQuery(
new RE(
"Unexpected response status [%s] description [%s] from request url[%s]",
responseHandler.getStatus(),
responseHandler.getDescription(),
url
)
);
}
if (is == null) {
throw new QueryInterruptedException(
new ResourceLimitExceededException(
"query[%s] url[%s] timed out or max bytes limit reached.",
query.getId(),
url
),
host
);
} else {
if (is != null) {
jp = objectMapper.getFactory().createParser(is);
} else {
interruptQuery(
new ResourceLimitExceededException(
"url[%s] timed out or max bytes limit reached.",
url
)
);
}
final JsonToken nextToken = jp.nextToken();
if (nextToken == JsonToken.START_OBJECT) {
QueryInterruptedException cause = jp.getCodec().readValue(jp, QueryInterruptedException.class);
throw new QueryInterruptedException(cause, host);
} else if (nextToken != JsonToken.START_ARRAY) {
throw new IAE("Next token wasn't a START_ARRAY, was[%s] from url [%s]", jp.getCurrentToken(), url);
} else {
if (nextToken == JsonToken.START_ARRAY) {
jp.nextToken();
objectCodec = jp.getCodec();
}
}
catch (IOException | InterruptedException | ExecutionException e) {
throw new RE(
e,
"Failure getting results for query[%s] url[%s] because of [%s]",
query == null ? null : query.getId(),
url,
e.getMessage()
} else if (nextToken == JsonToken.START_OBJECT) {
interruptQuery(jp.getCodec().readValue(jp, QueryInterruptedException.class));
} else {
interruptQuery(
new IAE("Next token wasn't a START_ARRAY, was[%s] from url[%s]", jp.getCurrentToken(), url)
);
}
catch (CancellationException e) {
throw new QueryInterruptedException(e, host);
}
catch (IOException | InterruptedException | ExecutionException | CancellationException | TimeoutException e) {
interruptQuery(e);
}
}
}
@ -168,5 +182,11 @@ public class JsonParserIterator<T> implements Iterator<T>, Closeable
jp.close();
}
}
private void interruptQuery(Exception cause)
{
LOG.warn(cause, "Query [%s] to host [%s] interrupted", queryId, host);
throw new QueryInterruptedException(cause, host);
}
}

View File

@ -54,10 +54,14 @@ import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.timeout.ReadTimeoutException;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
@ -65,11 +69,62 @@ import java.util.List;
public class DirectDruidClientTest
{
private final String hostName = "localhost:8080";
private final DataSegment dataSegment = new DataSegment(
"test",
Intervals.of("2013-01-01/2013-01-02"),
DateTimes.of("2013-01-01").toString(),
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
0,
0L
);
private ServerSelector serverSelector;
private HttpClient httpClient;
private DirectDruidClient client;
private QueryableDruidServer queryableDruidServer;
@Before
public void setup()
{
httpClient = EasyMock.createMock(HttpClient.class);
serverSelector = new ServerSelector(
dataSegment,
new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy())
);
client = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
new DefaultObjectMapper(),
httpClient,
"http",
hostName,
new NoopServiceEmitter()
);
queryableDruidServer = new QueryableDruidServer(
new DruidServer(
"test1",
"localhost",
null,
0,
ServerType.HISTORICAL,
DruidServer.DEFAULT_TIER,
0
),
client
);
serverSelector.addServerAndUpdateSegment(queryableDruidServer, serverSelector.getSegment());
}
@Test
public void testRun() throws Exception
{
HttpClient httpClient = EasyMock.createMock(HttpClient.class);
final URL url = new URL("http://foo/druid/v2/");
final URL url = new URL(StringUtils.format("http://%s/druid/v2/", hostName));
SettableFuture<InputStream> futureResult = SettableFuture.create();
Capture<Request> capturedRequest = EasyMock.newCapture();
@ -106,30 +161,6 @@ public class DirectDruidClientTest
EasyMock.replay(httpClient);
final ServerSelector serverSelector = new ServerSelector(
new DataSegment(
"test",
Intervals.of("2013-01-01/2013-01-02"),
DateTimes.of("2013-01-01").toString(),
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
0,
0L
),
new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy())
);
DirectDruidClient client1 = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
new DefaultObjectMapper(),
httpClient,
"http",
"foo",
new NoopServiceEmitter()
);
DirectDruidClient client2 = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
@ -140,37 +171,40 @@ public class DirectDruidClientTest
new NoopServiceEmitter()
);
QueryableDruidServer queryableDruidServer1 = new QueryableDruidServer(
new DruidServer("test1", "localhost", null, 0, ServerType.REALTIME, DruidServer.DEFAULT_TIER, 0),
client1
);
serverSelector.addServerAndUpdateSegment(queryableDruidServer1, serverSelector.getSegment());
QueryableDruidServer queryableDruidServer2 = new QueryableDruidServer(
new DruidServer("test1", "localhost", null, 0, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0),
new DruidServer(
"test1",
"localhost",
null,
0,
ServerType.HISTORICAL,
DruidServer.DEFAULT_TIER,
0
),
client2
);
serverSelector.addServerAndUpdateSegment(queryableDruidServer2, serverSelector.getSegment());
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
query = query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE));
Sequence s1 = client1.run(QueryPlus.wrap(query));
Sequence s1 = client.run(QueryPlus.wrap(query));
Assert.assertTrue(capturedRequest.hasCaptured());
Assert.assertEquals(url, capturedRequest.getValue().getUrl());
Assert.assertEquals(HttpMethod.POST, capturedRequest.getValue().getMethod());
Assert.assertEquals(1, client1.getNumOpenConnections());
Assert.assertEquals(1, client.getNumOpenConnections());
// simulate read timeout
client1.run(QueryPlus.wrap(query));
Assert.assertEquals(2, client1.getNumOpenConnections());
client.run(QueryPlus.wrap(query));
Assert.assertEquals(2, client.getNumOpenConnections());
futureException.setException(new ReadTimeoutException());
Assert.assertEquals(1, client1.getNumOpenConnections());
Assert.assertEquals(1, client.getNumOpenConnections());
// subsequent connections should work
client1.run(QueryPlus.wrap(query));
client1.run(QueryPlus.wrap(query));
client1.run(QueryPlus.wrap(query));
client.run(QueryPlus.wrap(query));
client.run(QueryPlus.wrap(query));
client.run(QueryPlus.wrap(query));
Assert.assertTrue(client1.getNumOpenConnections() == 4);
Assert.assertTrue(client.getNumOpenConnections() == 4);
// produce result for first connection
futureResult.set(
@ -181,14 +215,14 @@ public class DirectDruidClientTest
List<Result> results = s1.toList();
Assert.assertEquals(1, results.size());
Assert.assertEquals(DateTimes.of("2014-01-01T01:02:03Z"), results.get(0).getTimestamp());
Assert.assertEquals(3, client1.getNumOpenConnections());
Assert.assertEquals(3, client.getNumOpenConnections());
client2.run(QueryPlus.wrap(query));
client2.run(QueryPlus.wrap(query));
Assert.assertTrue(client2.getNumOpenConnections() == 2);
Assert.assertEquals(2, client2.getNumOpenConnections());
Assert.assertTrue(serverSelector.pick() == queryableDruidServer2);
Assert.assertEquals(serverSelector.pick(), queryableDruidServer2);
EasyMock.verify(httpClient);
}
@ -196,8 +230,6 @@ public class DirectDruidClientTest
@Test
public void testCancel()
{
HttpClient httpClient = EasyMock.createStrictMock(HttpClient.class);
Capture<Request> capturedRequest = EasyMock.newCapture();
ListenableFuture<Object> cancelledFuture = Futures.immediateCancelledFuture();
SettableFuture<Object> cancellationFuture = SettableFuture.create();
@ -224,43 +256,13 @@ public class DirectDruidClientTest
EasyMock.replay(httpClient);
final ServerSelector serverSelector = new ServerSelector(
new DataSegment(
"test",
Intervals.of("2013-01-01/2013-01-02"),
DateTimes.of("2013-01-01").toString(),
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
0,
0L
),
new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy())
);
DirectDruidClient client1 = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
new DefaultObjectMapper(),
httpClient,
"http",
"foo",
new NoopServiceEmitter()
);
QueryableDruidServer queryableDruidServer1 = new QueryableDruidServer(
new DruidServer("test1", "localhost", null, 0, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0),
client1
);
serverSelector.addServerAndUpdateSegment(queryableDruidServer1, serverSelector.getSegment());
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
query = query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE));
cancellationFuture.set(new StatusResponseHolder(HttpResponseStatus.OK, new StringBuilder("cancelled")));
Sequence results = client1.run(QueryPlus.wrap(query));
Sequence results = client.run(QueryPlus.wrap(query));
Assert.assertEquals(HttpMethod.DELETE, capturedRequest.getValue().getMethod());
Assert.assertEquals(0, client1.getNumOpenConnections());
Assert.assertEquals(0, client.getNumOpenConnections());
QueryInterruptedException exception = null;
@ -278,11 +280,11 @@ public class DirectDruidClientTest
@Test
public void testQueryInterruptionExceptionLogMessage()
{
HttpClient httpClient = EasyMock.createMock(HttpClient.class);
SettableFuture<Object> interruptionFuture = SettableFuture.create();
Capture<Request> capturedRequest = EasyMock.newCapture();
String hostName = "localhost:8080";
EasyMock.expect(
final String hostName = "localhost:8080";
EasyMock
.expect(
httpClient.go(
EasyMock.capture(capturedRequest),
EasyMock.<HttpResponseHandler>anyObject(),
@ -294,39 +296,7 @@ public class DirectDruidClientTest
EasyMock.replay(httpClient);
DataSegment dataSegment = new DataSegment(
"test",
Intervals.of("2013-01-01/2013-01-02"),
DateTimes.of("2013-01-01").toString(),
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
0,
0L
);
final ServerSelector serverSelector = new ServerSelector(
dataSegment,
new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy())
);
DirectDruidClient client1 = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
new DefaultObjectMapper(),
httpClient,
"http",
hostName,
new NoopServiceEmitter()
);
QueryableDruidServer queryableDruidServer = new QueryableDruidServer(
new DruidServer("test1", hostName, null, 0, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0),
client1
);
serverSelector.addServerAndUpdateSegment(queryableDruidServer, dataSegment);
// test error
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
query = query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE));
interruptionFuture.set(
@ -334,7 +304,7 @@ public class DirectDruidClientTest
StringUtils.toUtf8("{\"error\":\"testing1\",\"errorMessage\":\"testing2\"}")
)
);
Sequence results = client1.run(QueryPlus.wrap(query));
Sequence results = client.run(QueryPlus.wrap(query));
QueryInterruptedException actualException = null;
try {
@ -349,4 +319,97 @@ public class DirectDruidClientTest
Assert.assertEquals(hostName, actualException.getHost());
EasyMock.verify(httpClient);
}
@Test
public void testQueryTimeoutBeforeFuture() throws IOException, InterruptedException
{
SettableFuture<Object> timeoutFuture = SettableFuture.create();
Capture<Request> capturedRequest = EasyMock.newCapture();
final String queryId = "timeout-before-future";
EasyMock
.expect(
httpClient.go(
EasyMock.capture(capturedRequest),
EasyMock.<HttpResponseHandler>anyObject(),
EasyMock.anyObject(Duration.class)
)
)
.andReturn(timeoutFuture)
.anyTimes();
EasyMock.replay(httpClient);
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
query = query.withOverriddenContext(
ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, System.currentTimeMillis() + 250, "queryId", queryId)
);
Sequence results = client.run(QueryPlus.wrap(query));
// incomplete result set
PipedInputStream in = new PipedInputStream();
final PipedOutputStream out = new PipedOutputStream(in);
timeoutFuture.set(
in
);
QueryInterruptedException actualException = null;
try {
out.write(StringUtils.toUtf8("[{\"timestamp\":\"2014-01-01T01:02:03Z\"}"));
Thread.sleep(250);
out.write(StringUtils.toUtf8("]"));
out.close();
results.toList();
}
catch (QueryInterruptedException e) {
actualException = e;
}
Assert.assertNotNull(actualException);
Assert.assertEquals("Query timeout", actualException.getErrorCode());
Assert.assertEquals("url[http://localhost:8080/druid/v2/] timed out", actualException.getMessage());
Assert.assertEquals(hostName, actualException.getHost());
EasyMock.verify(httpClient);
}
@Test
public void testQueryTimeoutFromFuture()
{
SettableFuture<Object> noFuture = SettableFuture.create();
Capture<Request> capturedRequest = EasyMock.newCapture();
final String queryId = "never-ending-future";
EasyMock
.expect(
httpClient.go(
EasyMock.capture(capturedRequest),
EasyMock.<HttpResponseHandler>anyObject(),
EasyMock.anyObject(Duration.class)
)
)
.andReturn(noFuture)
.anyTimes();
EasyMock.replay(httpClient);
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
query = query.withOverriddenContext(
ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, System.currentTimeMillis() + 500, "queryId", queryId)
);
Sequence results = client.run(QueryPlus.wrap(query));
QueryInterruptedException actualException = null;
try {
results.toList();
}
catch (QueryInterruptedException e) {
actualException = e;
}
Assert.assertNotNull(actualException);
Assert.assertEquals("Query timeout", actualException.getErrorCode());
Assert.assertEquals("Timeout waiting for task.", actualException.getMessage());
Assert.assertEquals(hostName, actualException.getHost());
EasyMock.verify(httpClient);
}
}