From 7781820dead727e84f1b61a0e6c219ec115862c1 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 26 Sep 2019 17:13:37 -0700 Subject: [PATCH] JsonParserIterator.init future timeout (#8550) * add timeout support for JsonParserIterator init future * add queryId * should be less than 1 * fix * fix npe * fix lgtm * adjust exception, nullable * fix test * refactor * revert queryId change * add log.warn to tie exception to json parser iterator --- .../query/ChainedExecutionQueryRunner.java | 2 +- .../query/QueryInterruptedException.java | 10 +- .../druid/client/JsonParserIterator.java | 94 +++--- .../druid/client/DirectDruidClientTest.java | 307 +++++++++++------- 4 files changed, 250 insertions(+), 163 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java index 08a0e4d53b9..0990c982517 100644 --- a/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java @@ -163,7 +163,7 @@ public class ChainedExecutionQueryRunner implements QueryRunner throw new QueryInterruptedException(e); } catch (TimeoutException e) { - log.info("Query timeout, cancelling pending results for query id [%s]", query.getId()); + log.warn("Query timeout, cancelling pending results for query id [%s]", query.getId()); futures.cancel(true); throw new QueryInterruptedException(e); } diff --git a/processing/src/main/java/org/apache/druid/query/QueryInterruptedException.java b/processing/src/main/java/org/apache/druid/query/QueryInterruptedException.java index c9b4cd0719e..bc2ffeb4c32 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryInterruptedException.java +++ b/processing/src/main/java/org/apache/druid/query/QueryInterruptedException.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.java.util.common.StringUtils; +import javax.annotation.Nullable; import java.util.concurrent.CancellationException; import java.util.concurrent.TimeoutException; @@ -57,10 +58,10 @@ public class QueryInterruptedException extends RuntimeException @JsonCreator public QueryInterruptedException( - @JsonProperty("error") String errorCode, + @JsonProperty("error") @Nullable String errorCode, @JsonProperty("errorMessage") String errorMessage, - @JsonProperty("errorClass") String errorClass, - @JsonProperty("host") String host + @JsonProperty("errorClass") @Nullable String errorClass, + @JsonProperty("host") @Nullable String host ) { super(errorMessage); @@ -88,6 +89,7 @@ public class QueryInterruptedException extends RuntimeException this.host = host; } + @Nullable @JsonProperty("error") public String getErrorCode() { @@ -144,6 +146,7 @@ public class QueryInterruptedException extends RuntimeException } } + @Nullable private static String getErrorClassFromThrowable(Throwable e) { if (e instanceof QueryInterruptedException) { @@ -155,6 +158,7 @@ public class QueryInterruptedException extends RuntimeException } } + @Nullable private static String getHostFromThrowable(Throwable e) { if (e instanceof QueryInterruptedException) { diff --git a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java index 3494c86d204..9478f246764 100644 --- a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java +++ b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java @@ -26,12 +26,15 @@ import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.guava.CloseQuietly; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.Query; import org.apache.druid.query.QueryInterruptedException; import org.apache.druid.query.ResourceLimitExceededException; import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; +import javax.annotation.Nullable; import javax.servlet.http.HttpServletResponse; import java.io.Closeable; import java.io.IOException; @@ -40,24 +43,30 @@ import java.util.Iterator; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; public class JsonParserIterator implements Iterator, Closeable { + private static final Logger LOG = new Logger(JsonParserIterator.class); + private JsonParser jp; private ObjectCodec objectCodec; private final JavaType typeRef; private final Future future; - private final Query query; private final String url; private final String host; private final ObjectMapper objectMapper; private final BytesAccumulatingResponseHandler responseHandler; + private final boolean hasTimeout; + private final long timeoutAt; + private final String queryId; public JsonParserIterator( JavaType typeRef, Future future, String url, - Query query, + @Nullable Query query, String host, ObjectMapper objectMapper, BytesAccumulatingResponseHandler responseHandler @@ -66,11 +75,18 @@ public class JsonParserIterator implements Iterator, Closeable this.typeRef = typeRef; this.future = future; this.url = url; - this.query = query; - jp = null; + if (query != null) { + this.timeoutAt = query.getContextValue(DirectDruidClient.QUERY_FAIL_TIME, -1L); + this.queryId = query.getId(); + } else { + this.timeoutAt = -1; + this.queryId = null; + } + this.jp = null; this.host = host; this.objectMapper = objectMapper; this.responseHandler = responseHandler; + this.hasTimeout = timeoutAt > -1; } @Override @@ -114,49 +130,47 @@ public class JsonParserIterator implements Iterator, Closeable { if (jp == null) { try { - InputStream is = future.get(); + long timeLeftMillis = timeoutAt - System.currentTimeMillis(); + if (hasTimeout && timeLeftMillis < 1) { + throw new TimeoutException(StringUtils.format("url[%s] timed out", url)); + } + InputStream is = hasTimeout + ? future.get(timeLeftMillis, TimeUnit.MILLISECONDS) + : future.get(); if (responseHandler != null && responseHandler.getStatus() != HttpServletResponse.SC_OK) { - throw new RE( - "Unexpected response status [%s] description [%s] from request url [%s]", - responseHandler.getStatus(), - responseHandler.getDescription(), - url + interruptQuery( + new RE( + "Unexpected response status [%s] description [%s] from request url[%s]", + responseHandler.getStatus(), + responseHandler.getDescription(), + url + ) ); } - if (is == null) { - throw new QueryInterruptedException( - new ResourceLimitExceededException( - "query[%s] url[%s] timed out or max bytes limit reached.", - query.getId(), - url - ), - host - ); - } else { + if (is != null) { jp = objectMapper.getFactory().createParser(is); + } else { + interruptQuery( + new ResourceLimitExceededException( + "url[%s] timed out or max bytes limit reached.", + url + ) + ); } final JsonToken nextToken = jp.nextToken(); - if (nextToken == JsonToken.START_OBJECT) { - QueryInterruptedException cause = jp.getCodec().readValue(jp, QueryInterruptedException.class); - throw new QueryInterruptedException(cause, host); - } else if (nextToken != JsonToken.START_ARRAY) { - throw new IAE("Next token wasn't a START_ARRAY, was[%s] from url [%s]", jp.getCurrentToken(), url); - } else { + if (nextToken == JsonToken.START_ARRAY) { jp.nextToken(); objectCodec = jp.getCodec(); + } else if (nextToken == JsonToken.START_OBJECT) { + interruptQuery(jp.getCodec().readValue(jp, QueryInterruptedException.class)); + } else { + interruptQuery( + new IAE("Next token wasn't a START_ARRAY, was[%s] from url[%s]", jp.getCurrentToken(), url) + ); } } - catch (IOException | InterruptedException | ExecutionException e) { - throw new RE( - e, - "Failure getting results for query[%s] url[%s] because of [%s]", - query == null ? null : query.getId(), - url, - e.getMessage() - ); - } - catch (CancellationException e) { - throw new QueryInterruptedException(e, host); + catch (IOException | InterruptedException | ExecutionException | CancellationException | TimeoutException e) { + interruptQuery(e); } } } @@ -168,5 +182,11 @@ public class JsonParserIterator implements Iterator, Closeable jp.close(); } } + + private void interruptQuery(Exception cause) + { + LOG.warn(cause, "Query [%s] to host [%s] interrupted", queryId, host); + throw new QueryInterruptedException(cause, host); + } } diff --git a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java index e50fe6dffe3..caa549a5b14 100644 --- a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java +++ b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java @@ -54,10 +54,14 @@ import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.timeout.ReadTimeoutException; import org.joda.time.Duration; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.InputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; import java.net.URL; import java.util.ArrayList; import java.util.HashMap; @@ -65,11 +69,62 @@ import java.util.List; public class DirectDruidClientTest { + private final String hostName = "localhost:8080"; + + private final DataSegment dataSegment = new DataSegment( + "test", + Intervals.of("2013-01-01/2013-01-02"), + DateTimes.of("2013-01-01").toString(), + new HashMap<>(), + new ArrayList<>(), + new ArrayList<>(), + NoneShardSpec.instance(), + 0, + 0L + ); + private ServerSelector serverSelector; + + private HttpClient httpClient; + private DirectDruidClient client; + private QueryableDruidServer queryableDruidServer; + + @Before + public void setup() + { + httpClient = EasyMock.createMock(HttpClient.class); + serverSelector = new ServerSelector( + dataSegment, + new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy()) + ); + client = new DirectDruidClient( + new ReflectionQueryToolChestWarehouse(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER, + new DefaultObjectMapper(), + httpClient, + "http", + hostName, + new NoopServiceEmitter() + ); + queryableDruidServer = new QueryableDruidServer( + new DruidServer( + "test1", + "localhost", + null, + 0, + ServerType.HISTORICAL, + DruidServer.DEFAULT_TIER, + 0 + ), + client + ); + serverSelector.addServerAndUpdateSegment(queryableDruidServer, serverSelector.getSegment()); + } + + @Test public void testRun() throws Exception { - HttpClient httpClient = EasyMock.createMock(HttpClient.class); - final URL url = new URL("http://foo/druid/v2/"); + final URL url = new URL(StringUtils.format("http://%s/druid/v2/", hostName)); SettableFuture futureResult = SettableFuture.create(); Capture capturedRequest = EasyMock.newCapture(); @@ -106,30 +161,6 @@ public class DirectDruidClientTest EasyMock.replay(httpClient); - final ServerSelector serverSelector = new ServerSelector( - new DataSegment( - "test", - Intervals.of("2013-01-01/2013-01-02"), - DateTimes.of("2013-01-01").toString(), - new HashMap<>(), - new ArrayList<>(), - new ArrayList<>(), - NoneShardSpec.instance(), - 0, - 0L - ), - new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy()) - ); - - DirectDruidClient client1 = new DirectDruidClient( - new ReflectionQueryToolChestWarehouse(), - QueryRunnerTestHelper.NOOP_QUERYWATCHER, - new DefaultObjectMapper(), - httpClient, - "http", - "foo", - new NoopServiceEmitter() - ); DirectDruidClient client2 = new DirectDruidClient( new ReflectionQueryToolChestWarehouse(), QueryRunnerTestHelper.NOOP_QUERYWATCHER, @@ -140,37 +171,40 @@ public class DirectDruidClientTest new NoopServiceEmitter() ); - QueryableDruidServer queryableDruidServer1 = new QueryableDruidServer( - new DruidServer("test1", "localhost", null, 0, ServerType.REALTIME, DruidServer.DEFAULT_TIER, 0), - client1 - ); - serverSelector.addServerAndUpdateSegment(queryableDruidServer1, serverSelector.getSegment()); QueryableDruidServer queryableDruidServer2 = new QueryableDruidServer( - new DruidServer("test1", "localhost", null, 0, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0), + new DruidServer( + "test1", + "localhost", + null, + 0, + ServerType.HISTORICAL, + DruidServer.DEFAULT_TIER, + 0 + ), client2 ); serverSelector.addServerAndUpdateSegment(queryableDruidServer2, serverSelector.getSegment()); TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); query = query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE)); - Sequence s1 = client1.run(QueryPlus.wrap(query)); + Sequence s1 = client.run(QueryPlus.wrap(query)); Assert.assertTrue(capturedRequest.hasCaptured()); Assert.assertEquals(url, capturedRequest.getValue().getUrl()); Assert.assertEquals(HttpMethod.POST, capturedRequest.getValue().getMethod()); - Assert.assertEquals(1, client1.getNumOpenConnections()); + Assert.assertEquals(1, client.getNumOpenConnections()); // simulate read timeout - client1.run(QueryPlus.wrap(query)); - Assert.assertEquals(2, client1.getNumOpenConnections()); + client.run(QueryPlus.wrap(query)); + Assert.assertEquals(2, client.getNumOpenConnections()); futureException.setException(new ReadTimeoutException()); - Assert.assertEquals(1, client1.getNumOpenConnections()); + Assert.assertEquals(1, client.getNumOpenConnections()); // subsequent connections should work - client1.run(QueryPlus.wrap(query)); - client1.run(QueryPlus.wrap(query)); - client1.run(QueryPlus.wrap(query)); + client.run(QueryPlus.wrap(query)); + client.run(QueryPlus.wrap(query)); + client.run(QueryPlus.wrap(query)); - Assert.assertTrue(client1.getNumOpenConnections() == 4); + Assert.assertTrue(client.getNumOpenConnections() == 4); // produce result for first connection futureResult.set( @@ -181,14 +215,14 @@ public class DirectDruidClientTest List results = s1.toList(); Assert.assertEquals(1, results.size()); Assert.assertEquals(DateTimes.of("2014-01-01T01:02:03Z"), results.get(0).getTimestamp()); - Assert.assertEquals(3, client1.getNumOpenConnections()); + Assert.assertEquals(3, client.getNumOpenConnections()); client2.run(QueryPlus.wrap(query)); client2.run(QueryPlus.wrap(query)); - Assert.assertTrue(client2.getNumOpenConnections() == 2); + Assert.assertEquals(2, client2.getNumOpenConnections()); - Assert.assertTrue(serverSelector.pick() == queryableDruidServer2); + Assert.assertEquals(serverSelector.pick(), queryableDruidServer2); EasyMock.verify(httpClient); } @@ -196,8 +230,6 @@ public class DirectDruidClientTest @Test public void testCancel() { - HttpClient httpClient = EasyMock.createStrictMock(HttpClient.class); - Capture capturedRequest = EasyMock.newCapture(); ListenableFuture cancelledFuture = Futures.immediateCancelledFuture(); SettableFuture cancellationFuture = SettableFuture.create(); @@ -224,43 +256,13 @@ public class DirectDruidClientTest EasyMock.replay(httpClient); - final ServerSelector serverSelector = new ServerSelector( - new DataSegment( - "test", - Intervals.of("2013-01-01/2013-01-02"), - DateTimes.of("2013-01-01").toString(), - new HashMap<>(), - new ArrayList<>(), - new ArrayList<>(), - NoneShardSpec.instance(), - 0, - 0L - ), - new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy()) - ); - - DirectDruidClient client1 = new DirectDruidClient( - new ReflectionQueryToolChestWarehouse(), - QueryRunnerTestHelper.NOOP_QUERYWATCHER, - new DefaultObjectMapper(), - httpClient, - "http", - "foo", - new NoopServiceEmitter() - ); - - QueryableDruidServer queryableDruidServer1 = new QueryableDruidServer( - new DruidServer("test1", "localhost", null, 0, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0), - client1 - ); - serverSelector.addServerAndUpdateSegment(queryableDruidServer1, serverSelector.getSegment()); TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); query = query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE)); cancellationFuture.set(new StatusResponseHolder(HttpResponseStatus.OK, new StringBuilder("cancelled"))); - Sequence results = client1.run(QueryPlus.wrap(query)); + Sequence results = client.run(QueryPlus.wrap(query)); Assert.assertEquals(HttpMethod.DELETE, capturedRequest.getValue().getMethod()); - Assert.assertEquals(0, client1.getNumOpenConnections()); + Assert.assertEquals(0, client.getNumOpenConnections()); QueryInterruptedException exception = null; @@ -278,55 +280,23 @@ public class DirectDruidClientTest @Test public void testQueryInterruptionExceptionLogMessage() { - HttpClient httpClient = EasyMock.createMock(HttpClient.class); SettableFuture interruptionFuture = SettableFuture.create(); Capture capturedRequest = EasyMock.newCapture(); - String hostName = "localhost:8080"; - EasyMock.expect( - httpClient.go( - EasyMock.capture(capturedRequest), - EasyMock.anyObject(), - EasyMock.anyObject(Duration.class) + final String hostName = "localhost:8080"; + EasyMock + .expect( + httpClient.go( + EasyMock.capture(capturedRequest), + EasyMock.anyObject(), + EasyMock.anyObject(Duration.class) + ) ) - ) - .andReturn(interruptionFuture) - .anyTimes(); + .andReturn(interruptionFuture) + .anyTimes(); EasyMock.replay(httpClient); - DataSegment dataSegment = new DataSegment( - "test", - Intervals.of("2013-01-01/2013-01-02"), - DateTimes.of("2013-01-01").toString(), - new HashMap<>(), - new ArrayList<>(), - new ArrayList<>(), - NoneShardSpec.instance(), - 0, - 0L - ); - final ServerSelector serverSelector = new ServerSelector( - dataSegment, - new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy()) - ); - - DirectDruidClient client1 = new DirectDruidClient( - new ReflectionQueryToolChestWarehouse(), - QueryRunnerTestHelper.NOOP_QUERYWATCHER, - new DefaultObjectMapper(), - httpClient, - "http", - hostName, - new NoopServiceEmitter() - ); - - QueryableDruidServer queryableDruidServer = new QueryableDruidServer( - new DruidServer("test1", hostName, null, 0, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0), - client1 - ); - - serverSelector.addServerAndUpdateSegment(queryableDruidServer, dataSegment); - + // test error TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); query = query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE)); interruptionFuture.set( @@ -334,7 +304,7 @@ public class DirectDruidClientTest StringUtils.toUtf8("{\"error\":\"testing1\",\"errorMessage\":\"testing2\"}") ) ); - Sequence results = client1.run(QueryPlus.wrap(query)); + Sequence results = client.run(QueryPlus.wrap(query)); QueryInterruptedException actualException = null; try { @@ -349,4 +319,97 @@ public class DirectDruidClientTest Assert.assertEquals(hostName, actualException.getHost()); EasyMock.verify(httpClient); } + + @Test + public void testQueryTimeoutBeforeFuture() throws IOException, InterruptedException + { + SettableFuture timeoutFuture = SettableFuture.create(); + Capture capturedRequest = EasyMock.newCapture(); + final String queryId = "timeout-before-future"; + + EasyMock + .expect( + httpClient.go( + EasyMock.capture(capturedRequest), + EasyMock.anyObject(), + EasyMock.anyObject(Duration.class) + ) + ) + .andReturn(timeoutFuture) + .anyTimes(); + + EasyMock.replay(httpClient); + + TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); + query = query.withOverriddenContext( + ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, System.currentTimeMillis() + 250, "queryId", queryId) + ); + + Sequence results = client.run(QueryPlus.wrap(query)); + + // incomplete result set + PipedInputStream in = new PipedInputStream(); + final PipedOutputStream out = new PipedOutputStream(in); + timeoutFuture.set( + in + ); + + QueryInterruptedException actualException = null; + try { + out.write(StringUtils.toUtf8("[{\"timestamp\":\"2014-01-01T01:02:03Z\"}")); + Thread.sleep(250); + out.write(StringUtils.toUtf8("]")); + out.close(); + results.toList(); + } + catch (QueryInterruptedException e) { + actualException = e; + } + Assert.assertNotNull(actualException); + Assert.assertEquals("Query timeout", actualException.getErrorCode()); + Assert.assertEquals("url[http://localhost:8080/druid/v2/] timed out", actualException.getMessage()); + Assert.assertEquals(hostName, actualException.getHost()); + EasyMock.verify(httpClient); + } + + @Test + public void testQueryTimeoutFromFuture() + { + SettableFuture noFuture = SettableFuture.create(); + Capture capturedRequest = EasyMock.newCapture(); + final String queryId = "never-ending-future"; + + EasyMock + .expect( + httpClient.go( + EasyMock.capture(capturedRequest), + EasyMock.anyObject(), + EasyMock.anyObject(Duration.class) + ) + ) + .andReturn(noFuture) + .anyTimes(); + + EasyMock.replay(httpClient); + + TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); + query = query.withOverriddenContext( + ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, System.currentTimeMillis() + 500, "queryId", queryId) + ); + + Sequence results = client.run(QueryPlus.wrap(query)); + + QueryInterruptedException actualException = null; + try { + results.toList(); + } + catch (QueryInterruptedException e) { + actualException = e; + } + Assert.assertNotNull(actualException); + Assert.assertEquals("Query timeout", actualException.getErrorCode()); + Assert.assertEquals("Timeout waiting for task.", actualException.getMessage()); + Assert.assertEquals(hostName, actualException.getHost()); + EasyMock.verify(httpClient); + } }