From d01f272a7ab71ab7ba565493404f8330af3e774b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Mon, 2 Jun 2014 17:40:35 -0700 Subject: [PATCH] forward cancellation in direct druid client --- pom.xml | 2 +- .../io/druid/client/BrokerServerView.java | 8 +- .../io/druid/client/DirectDruidClient.java | 46 ++++++++-- .../java/io/druid/server/QueryResource.java | 1 - .../druid/client/DirectDruidClientTest.java | 90 +++++++++++++++++-- 5 files changed, 127 insertions(+), 20 deletions(-) diff --git a/pom.xml b/pom.xml index 2116d7f617f..2a3af54886e 100644 --- a/pom.xml +++ b/pom.xml @@ -78,7 +78,7 @@ com.metamx http-client - 0.9.5 + 0.9.6 com.metamx diff --git a/server/src/main/java/io/druid/client/BrokerServerView.java b/server/src/main/java/io/druid/client/BrokerServerView.java index 57663154156..0070622ac85 100644 --- a/server/src/main/java/io/druid/client/BrokerServerView.java +++ b/server/src/main/java/io/druid/client/BrokerServerView.java @@ -32,10 +32,9 @@ import io.druid.client.selector.TierSelectorStrategy; import io.druid.concurrent.Execs; import io.druid.guice.annotations.Client; import io.druid.query.DataSource; -import io.druid.query.QueryDataSource; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChestWarehouse; -import io.druid.query.TableDataSource; +import io.druid.query.QueryWatcher; import io.druid.server.coordination.DruidServerMetadata; import io.druid.timeline.DataSegment; import io.druid.timeline.VersionedIntervalTimeline; @@ -60,6 +59,7 @@ public class BrokerServerView implements TimelineServerView private final Map> timelines; private final QueryToolChestWarehouse warehouse; + private final QueryWatcher queryWatcher; private final ObjectMapper smileMapper; private final HttpClient httpClient; private final ServerInventoryView baseView; @@ -68,6 +68,7 @@ public class BrokerServerView implements TimelineServerView @Inject public BrokerServerView( QueryToolChestWarehouse warehouse, + QueryWatcher queryWatcher, ObjectMapper smileMapper, @Client HttpClient httpClient, ServerInventoryView baseView, @@ -75,6 +76,7 @@ public class BrokerServerView implements TimelineServerView ) { this.warehouse = warehouse; + this.queryWatcher = queryWatcher; this.smileMapper = smileMapper; this.httpClient = httpClient; this.baseView = baseView; @@ -154,7 +156,7 @@ public class BrokerServerView implements TimelineServerView private DirectDruidClient makeDirectClient(DruidServer server) { - return new DirectDruidClient(warehouse, smileMapper, httpClient, server.getHost()); + return new DirectDruidClient(warehouse, queryWatcher, smileMapper, httpClient, server.getHost()); } private QueryableDruidServer removeServer(DruidServer server) diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index c08cd9e2bd4..34584ee2395 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.type.TypeFactory; import com.fasterxml.jackson.dataformat.smile.SmileFactory; +import com.google.common.base.Charsets; import com.google.common.base.Throwables; import com.google.common.collect.Maps; import com.google.common.io.Closeables; @@ -43,12 +44,15 @@ import com.metamx.http.client.HttpClient; import com.metamx.http.client.io.AppendableByteArrayInputStream; import com.metamx.http.client.response.ClientResponse; import com.metamx.http.client.response.InputStreamResponseHandler; +import com.metamx.http.client.response.StatusResponseHandler; +import com.metamx.http.client.response.StatusResponseHolder; import io.druid.query.BySegmentResultValueClass; import io.druid.query.Query; import io.druid.query.QueryInterruptedException; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChestWarehouse; +import io.druid.query.QueryWatcher; import io.druid.query.Result; import io.druid.query.aggregation.MetricManipulatorFns; import org.jboss.netty.handler.codec.http.HttpChunk; @@ -61,6 +65,7 @@ import java.io.InputStream; import java.net.URL; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; @@ -74,6 +79,7 @@ public class DirectDruidClient implements QueryRunner private static final Map, Pair> typesMap = Maps.newConcurrentMap(); private final QueryToolChestWarehouse warehouse; + private final QueryWatcher queryWatcher; private final ObjectMapper objectMapper; private final HttpClient httpClient; private final String host; @@ -83,12 +89,14 @@ public class DirectDruidClient implements QueryRunner public DirectDruidClient( QueryToolChestWarehouse warehouse, + QueryWatcher queryWatcher, ObjectMapper objectMapper, HttpClient httpClient, String host ) { this.warehouse = warehouse; + this.queryWatcher = queryWatcher; this.objectMapper = objectMapper; this.httpClient = httpClient; this.host = host; @@ -103,7 +111,7 @@ public class DirectDruidClient implements QueryRunner } @Override - public Sequence run(Query query) + public Sequence run(final Query query) { QueryToolChest> toolChest = warehouse.getToolChest(query); boolean isBySegment = query.getContextBySegment(false); @@ -128,6 +136,7 @@ public class DirectDruidClient implements QueryRunner final ListenableFuture future; final String url = String.format("http://%s/druid/v2/", host); + final String cancelUrl = String.format("http://%s/druid/v2/%s", host, query.getId()); try { log.debug("Querying url[%s]", url); @@ -175,6 +184,9 @@ public class DirectDruidClient implements QueryRunner } } ); + + queryWatcher.registerQuery(query, future); + openConnections.getAndIncrement(); Futures.addCallback( future, new FutureCallback() @@ -189,6 +201,27 @@ public class DirectDruidClient implements QueryRunner public void onFailure(Throwable t) { openConnections.getAndDecrement(); + if (future.isCancelled()) { + // forward the cancellation to underlying queriable node + try { + StatusResponseHolder res = httpClient + .delete(new URL(cancelUrl)) + .setContent(objectMapper.writeValueAsBytes(query)) + .setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? "application/smile" : "application/json") + .go(new StatusResponseHandler(Charsets.UTF_8)) + .get(); + if (res.getStatus().getCode() >= 500) { + throw new RE( + "Error cancelling query[%s]: queriable node returned status[%d] [%s].", + res.getStatus().getCode(), + res.getStatus().getReasonPhrase() + ); + } + } + catch (IOException | ExecutionException | InterruptedException e) { + Throwables.propagate(e); + } + } } } ); @@ -197,7 +230,7 @@ public class DirectDruidClient implements QueryRunner throw Throwables.propagate(e); } - Sequence retVal = new BaseSequence>( + Sequence retVal = new BaseSequence<>( new BaseSequence.IteratorMaker>() { @Override @@ -296,14 +329,11 @@ public class DirectDruidClient implements QueryRunner objectCodec = jp.getCodec(); } } - catch (IOException e) { + catch (IOException | InterruptedException | ExecutionException e) { throw new RE(e, "Failure getting results from[%s]", url); } - catch (InterruptedException e) { - throw new RE(e, "Failure getting results from[%s]", url); - } - catch (ExecutionException e) { - throw new RE(e, "Failure getting results from[%s]", url); + catch (CancellationException e) { + throw new QueryInterruptedException(); } } } diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 1e6ea06607f..c97657b4e35 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -73,7 +73,6 @@ import java.util.UUID; public class QueryResource { private static final EmittingLogger log = new EmittingLogger(QueryResource.class); - private static final Charset UTF8 = Charset.forName("UTF-8"); private static final Joiner COMMA_JOIN = Joiner.on(","); public static final String APPLICATION_SMILE = "application/smile"; public static final String APPLICATION_JSON = "application/json"; diff --git a/server/src/test/java/io/druid/client/DirectDruidClientTest.java b/server/src/test/java/io/druid/client/DirectDruidClientTest.java index aba91657686..84a80058d35 100644 --- a/server/src/test/java/io/druid/client/DirectDruidClientTest.java +++ b/server/src/test/java/io/druid/client/DirectDruidClientTest.java @@ -21,18 +21,24 @@ package io.druid.client; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.http.client.HttpClient; import com.metamx.http.client.Request; import com.metamx.http.client.RequestBuilder; +import com.metamx.http.client.response.StatusResponseHolder; import io.druid.client.selector.ConnectionCountServerSelectorStrategy; import io.druid.client.selector.HighestPriorityTierSelectorStrategy; import io.druid.client.selector.QueryableDruidServer; import io.druid.client.selector.ServerSelector; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.Druids; +import io.druid.query.Query; +import io.druid.query.QueryInterruptedException; +import io.druid.query.QueryWatcher; import io.druid.query.ReflectionQueryToolChestWarehouse; import io.druid.query.Result; import io.druid.query.timeboundary.TimeBoundaryQuery; @@ -41,11 +47,13 @@ import io.druid.timeline.partition.NoneShardSpec; import junit.framework.Assert; import org.easymock.EasyMock; import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.timeout.ReadTimeoutException; import org.joda.time.DateTime; import org.joda.time.Interval; -import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.io.ByteArrayInputStream; import java.io.InputStream; @@ -54,17 +62,22 @@ import java.util.List; public class DirectDruidClientTest { - private HttpClient httpClient; - - @Before - public void setUp() throws Exception + public static final QueryWatcher DUMMY_WATCHER = new QueryWatcher() { - httpClient = EasyMock.createMock(HttpClient.class); - } + @Override + public void registerQuery(Query query, ListenableFuture future) + { + + } + }; + + @Rule + public ExpectedException thrown = ExpectedException.none(); @Test public void testRun() throws Exception { + HttpClient httpClient = EasyMock.createMock(HttpClient.class); RequestBuilder requestBuilder = new RequestBuilder(httpClient, HttpMethod.POST, new URL("http://foo.com")); EasyMock.expect(httpClient.post(EasyMock.anyObject())).andReturn(requestBuilder).atLeastOnce(); @@ -93,12 +106,14 @@ public class DirectDruidClientTest DirectDruidClient client1 = new DirectDruidClient( new ReflectionQueryToolChestWarehouse(), + DUMMY_WATCHER, new DefaultObjectMapper(), httpClient, "foo" ); DirectDruidClient client2 = new DirectDruidClient( new ReflectionQueryToolChestWarehouse(), + DUMMY_WATCHER, new DefaultObjectMapper(), httpClient, "foo2" @@ -149,4 +164,65 @@ public class DirectDruidClientTest EasyMock.verify(httpClient); } + + @Test + public void testCancel() throws Exception + { + HttpClient httpClient = EasyMock.createStrictMock(HttpClient.class); + EasyMock.expect(httpClient.post(EasyMock.anyObject())).andReturn( + new RequestBuilder(httpClient, HttpMethod.POST, new URL("http://foo.com")) + ).once(); + + ListenableFuture cancelledFuture = Futures.immediateCancelledFuture(); + EasyMock.expect(httpClient.go(EasyMock.anyObject())).andReturn(cancelledFuture).once(); + + EasyMock.expect(httpClient.delete(EasyMock.anyObject())) + .andReturn(new RequestBuilder(httpClient, HttpMethod.DELETE, new URL("http://foo.com/delete"))) + .once(); + SettableFuture cancellationFuture = SettableFuture.create(); + EasyMock.expect(httpClient.go(EasyMock.anyObject())).andReturn(cancellationFuture).once(); + + EasyMock.replay(httpClient); + + final ServerSelector serverSelector = new ServerSelector( + new DataSegment( + "test", + new Interval("2013-01-01/2013-01-02"), + new DateTime("2013-01-01").toString(), + Maps.newHashMap(), + Lists.newArrayList(), + Lists.newArrayList(), + new NoneShardSpec(), + 0, + 0L + ), + new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy()) + ); + + DirectDruidClient client1 = new DirectDruidClient( + new ReflectionQueryToolChestWarehouse(), + DUMMY_WATCHER, + new DefaultObjectMapper(), + httpClient, + "foo" + ); + + QueryableDruidServer queryableDruidServer1 = new QueryableDruidServer( + new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 0), + client1 + ); + serverSelector.addServer(queryableDruidServer1); + + TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); + + cancellationFuture.set(new StatusResponseHolder(HttpResponseStatus.OK, new StringBuilder("cancelled"))); + Sequence results = client1.run(query); + Assert.assertEquals(0, client1.getNumOpenConnections()); + + + thrown.expect(QueryInterruptedException.class); + Assert.assertTrue(Sequences.toList(results, Lists.newArrayList()).isEmpty()); + + EasyMock.verify(httpClient); + } }