forward cancellation in direct druid client

This commit is contained in:
Xavier Léauté 2014-06-02 17:40:35 -07:00
parent d2c729adec
commit d01f272a7a
5 changed files with 127 additions and 20 deletions

View File

@ -78,7 +78,7 @@
<dependency>
<groupId>com.metamx</groupId>
<artifactId>http-client</artifactId>
<version>0.9.5</version>
<version>0.9.6</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>

View File

@ -32,10 +32,9 @@ import io.druid.client.selector.TierSelectorStrategy;
import io.druid.concurrent.Execs;
import io.druid.guice.annotations.Client;
import io.druid.query.DataSource;
import io.druid.query.QueryDataSource;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.TableDataSource;
import io.druid.query.QueryWatcher;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;
import io.druid.timeline.VersionedIntervalTimeline;
@ -60,6 +59,7 @@ public class BrokerServerView implements TimelineServerView
private final Map<String, VersionedIntervalTimeline<String, ServerSelector>> timelines;
private final QueryToolChestWarehouse warehouse;
private final QueryWatcher queryWatcher;
private final ObjectMapper smileMapper;
private final HttpClient httpClient;
private final ServerInventoryView baseView;
@ -68,6 +68,7 @@ public class BrokerServerView implements TimelineServerView
@Inject
public BrokerServerView(
QueryToolChestWarehouse warehouse,
QueryWatcher queryWatcher,
ObjectMapper smileMapper,
@Client HttpClient httpClient,
ServerInventoryView baseView,
@ -75,6 +76,7 @@ public class BrokerServerView implements TimelineServerView
)
{
this.warehouse = warehouse;
this.queryWatcher = queryWatcher;
this.smileMapper = smileMapper;
this.httpClient = httpClient;
this.baseView = baseView;
@ -154,7 +156,7 @@ public class BrokerServerView implements TimelineServerView
private DirectDruidClient makeDirectClient(DruidServer server)
{
return new DirectDruidClient(warehouse, smileMapper, httpClient, server.getHost());
return new DirectDruidClient(warehouse, queryWatcher, smileMapper, httpClient, server.getHost());
}
private QueryableDruidServer removeServer(DruidServer server)

View File

@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
@ -43,12 +44,15 @@ import com.metamx.http.client.HttpClient;
import com.metamx.http.client.io.AppendableByteArrayInputStream;
import com.metamx.http.client.response.ClientResponse;
import com.metamx.http.client.response.InputStreamResponseHandler;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.query.BySegmentResultValueClass;
import io.druid.query.Query;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import io.druid.query.aggregation.MetricManipulatorFns;
import org.jboss.netty.handler.codec.http.HttpChunk;
@ -61,6 +65,7 @@ import java.io.InputStream;
import java.net.URL;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
@ -74,6 +79,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
private static final Map<Class<? extends Query>, Pair<JavaType, JavaType>> typesMap = Maps.newConcurrentMap();
private final QueryToolChestWarehouse warehouse;
private final QueryWatcher queryWatcher;
private final ObjectMapper objectMapper;
private final HttpClient httpClient;
private final String host;
@ -83,12 +89,14 @@ public class DirectDruidClient<T> implements QueryRunner<T>
public DirectDruidClient(
QueryToolChestWarehouse warehouse,
QueryWatcher queryWatcher,
ObjectMapper objectMapper,
HttpClient httpClient,
String host
)
{
this.warehouse = warehouse;
this.queryWatcher = queryWatcher;
this.objectMapper = objectMapper;
this.httpClient = httpClient;
this.host = host;
@ -103,7 +111,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(Query<T> query)
public Sequence<T> run(final Query<T> query)
{
QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
boolean isBySegment = query.getContextBySegment(false);
@ -128,6 +136,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
final ListenableFuture<InputStream> future;
final String url = String.format("http://%s/druid/v2/", host);
final String cancelUrl = String.format("http://%s/druid/v2/%s", host, query.getId());
try {
log.debug("Querying url[%s]", url);
@ -175,6 +184,9 @@ public class DirectDruidClient<T> implements QueryRunner<T>
}
}
);
queryWatcher.registerQuery(query, future);
openConnections.getAndIncrement();
Futures.addCallback(
future, new FutureCallback<InputStream>()
@ -189,6 +201,27 @@ public class DirectDruidClient<T> implements QueryRunner<T>
public void onFailure(Throwable t)
{
openConnections.getAndDecrement();
if (future.isCancelled()) {
// forward the cancellation to underlying queriable node
try {
StatusResponseHolder res = httpClient
.delete(new URL(cancelUrl))
.setContent(objectMapper.writeValueAsBytes(query))
.setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? "application/smile" : "application/json")
.go(new StatusResponseHandler(Charsets.UTF_8))
.get();
if (res.getStatus().getCode() >= 500) {
throw new RE(
"Error cancelling query[%s]: queriable node returned status[%d] [%s].",
res.getStatus().getCode(),
res.getStatus().getReasonPhrase()
);
}
}
catch (IOException | ExecutionException | InterruptedException e) {
Throwables.propagate(e);
}
}
}
}
);
@ -197,7 +230,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
throw Throwables.propagate(e);
}
Sequence<T> retVal = new BaseSequence<T, JsonParserIterator<T>>(
Sequence<T> retVal = new BaseSequence<>(
new BaseSequence.IteratorMaker<T, JsonParserIterator<T>>()
{
@Override
@ -296,14 +329,11 @@ public class DirectDruidClient<T> implements QueryRunner<T>
objectCodec = jp.getCodec();
}
}
catch (IOException e) {
catch (IOException | InterruptedException | ExecutionException e) {
throw new RE(e, "Failure getting results from[%s]", url);
}
catch (InterruptedException e) {
throw new RE(e, "Failure getting results from[%s]", url);
}
catch (ExecutionException e) {
throw new RE(e, "Failure getting results from[%s]", url);
catch (CancellationException e) {
throw new QueryInterruptedException();
}
}
}

View File

@ -73,7 +73,6 @@ import java.util.UUID;
public class QueryResource
{
private static final EmittingLogger log = new EmittingLogger(QueryResource.class);
private static final Charset UTF8 = Charset.forName("UTF-8");
private static final Joiner COMMA_JOIN = Joiner.on(",");
public static final String APPLICATION_SMILE = "application/smile";
public static final String APPLICATION_JSON = "application/json";

View File

@ -21,18 +21,24 @@ package io.druid.client;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.RequestBuilder;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.client.selector.ConnectionCountServerSelectorStrategy;
import io.druid.client.selector.HighestPriorityTierSelectorStrategy;
import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelector;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryWatcher;
import io.druid.query.ReflectionQueryToolChestWarehouse;
import io.druid.query.Result;
import io.druid.query.timeboundary.TimeBoundaryQuery;
@ -41,11 +47,13 @@ import io.druid.timeline.partition.NoneShardSpec;
import junit.framework.Assert;
import org.easymock.EasyMock;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.timeout.ReadTimeoutException;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
@ -54,17 +62,22 @@ import java.util.List;
public class DirectDruidClientTest
{
private HttpClient httpClient;
@Before
public void setUp() throws Exception
public static final QueryWatcher DUMMY_WATCHER = new QueryWatcher()
{
httpClient = EasyMock.createMock(HttpClient.class);
}
@Override
public void registerQuery(Query query, ListenableFuture future)
{
}
};
@Rule
public ExpectedException thrown = ExpectedException.none();
@Test
public void testRun() throws Exception
{
HttpClient httpClient = EasyMock.createMock(HttpClient.class);
RequestBuilder requestBuilder = new RequestBuilder(httpClient, HttpMethod.POST, new URL("http://foo.com"));
EasyMock.expect(httpClient.post(EasyMock.<URL>anyObject())).andReturn(requestBuilder).atLeastOnce();
@ -93,12 +106,14 @@ public class DirectDruidClientTest
DirectDruidClient client1 = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
DUMMY_WATCHER,
new DefaultObjectMapper(),
httpClient,
"foo"
);
DirectDruidClient client2 = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
DUMMY_WATCHER,
new DefaultObjectMapper(),
httpClient,
"foo2"
@ -149,4 +164,65 @@ public class DirectDruidClientTest
EasyMock.verify(httpClient);
}
@Test
public void testCancel() throws Exception
{
HttpClient httpClient = EasyMock.createStrictMock(HttpClient.class);
EasyMock.expect(httpClient.post(EasyMock.<URL>anyObject())).andReturn(
new RequestBuilder(httpClient, HttpMethod.POST, new URL("http://foo.com"))
).once();
ListenableFuture<Object> cancelledFuture = Futures.immediateCancelledFuture();
EasyMock.expect(httpClient.go(EasyMock.<Request>anyObject())).andReturn(cancelledFuture).once();
EasyMock.expect(httpClient.delete(EasyMock.<URL>anyObject()))
.andReturn(new RequestBuilder(httpClient, HttpMethod.DELETE, new URL("http://foo.com/delete")))
.once();
SettableFuture<Object> cancellationFuture = SettableFuture.create();
EasyMock.expect(httpClient.go(EasyMock.<Request>anyObject())).andReturn(cancellationFuture).once();
EasyMock.replay(httpClient);
final ServerSelector serverSelector = new ServerSelector(
new DataSegment(
"test",
new Interval("2013-01-01/2013-01-02"),
new DateTime("2013-01-01").toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
0,
0L
),
new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy())
);
DirectDruidClient client1 = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
DUMMY_WATCHER,
new DefaultObjectMapper(),
httpClient,
"foo"
);
QueryableDruidServer queryableDruidServer1 = new QueryableDruidServer(
new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 0),
client1
);
serverSelector.addServer(queryableDruidServer1);
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
cancellationFuture.set(new StatusResponseHolder(HttpResponseStatus.OK, new StringBuilder("cancelled")));
Sequence results = client1.run(query);
Assert.assertEquals(0, client1.getNumOpenConnections());
thrown.expect(QueryInterruptedException.class);
Assert.assertTrue(Sequences.toList(results, Lists.newArrayList()).isEmpty());
EasyMock.verify(httpClient);
}
}