Merge pull request #1106 from druid-io/update-http-client

Update http client to 1.0.0
This commit is contained in:
Xavier Léauté 2015-02-10 13:37:27 -08:00
commit 31e248736d
17 changed files with 230 additions and 131 deletions

View File

@ -24,6 +24,7 @@ import com.google.common.base.Throwables;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.client.selector.Server;
@ -32,6 +33,7 @@ import io.druid.indexing.common.RetryPolicy;
import io.druid.indexing.common.RetryPolicyFactory;
import io.druid.indexing.common.task.Task;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Duration;
import javax.ws.rs.core.MediaType;
@ -92,10 +94,11 @@ public class RemoteTaskActionClient implements TaskActionClient
log.info("Submitting action for task[%s] to overlord[%s]: %s", task.getId(), serviceUri, taskAction);
try {
response = httpClient.post(serviceUri.toURL())
.setContent(MediaType.APPLICATION_JSON, dataToSend)
.go(new StatusResponseHandler(Charsets.UTF_8))
.get();
response = httpClient.go(
new Request(HttpMethod.POST, serviceUri.toURL())
.setContent(MediaType.APPLICATION_JSON, dataToSend),
new StatusResponseHandler(Charsets.UTF_8)
).get();
}
catch (Exception e) {
Throwables.propagateIfInstanceOf(e.getCause(), IOException.class);

View File

@ -40,6 +40,7 @@ import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.InputStreamResponseHandler;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
@ -61,6 +62,7 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
@ -352,9 +354,10 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
try {
final URL url = makeWorkerURL(zkWorker.getWorker(), String.format("/task/%s/shutdown", taskId));
final StatusResponseHolder response = httpClient.post(url)
.go(RESPONSE_HANDLER)
.get();
final StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.POST, url),
RESPONSE_HANDLER
).get();
log.info(
"Sent shutdown message to worker: %s, status %s, response: %s",
@ -391,9 +394,10 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
public InputStream openStream() throws IOException
{
try {
return httpClient.get(url)
.go(new InputStreamResponseHandler())
.get();
return httpClient.go(
new Request(HttpMethod.GET, url),
new InputStreamResponseHandler()
).get();
}
catch (InterruptedException e) {
throw Throwables.propagate(e);

View File

@ -24,10 +24,12 @@ import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.guice.annotations.Global;
import io.druid.testing.IntegrationTestingConfig;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import java.net.URL;
@ -61,11 +63,17 @@ public class ClientInfoResourceTestClient
);
}
public List<String> getDimensions(String dataSource, String interval){
public List<String> getDimensions(String dataSource, String interval)
{
try {
StatusResponseHolder response = httpClient.get(new URL(String.format("%s/%s/dimensions?interval=%s", getBrokerURL(), dataSource, interval)))
.go(responseHandler)
.get();
StatusResponseHolder response = httpClient.go(
new Request(
HttpMethod.GET,
new URL(String.format("%s/%s/dimensions?interval=%s", getBrokerURL(), dataSource, interval))
),
responseHandler
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while querying[%s] status[%s] content[%s]",

View File

@ -25,7 +25,7 @@ import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.RequestBuilder;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.guice.annotations.Global;
@ -120,12 +120,10 @@ public class CoordinatorResourceTestClient
private StatusResponseHolder makeRequest(HttpMethod method, String url)
{
try {
StatusResponseHolder response = new RequestBuilder(
this.httpClient,
method, new URL(url)
)
.go(responseHandler)
.get();
StatusResponseHolder response = httpClient.go(
new Request(method, new URL(url)),
responseHandler
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while making request to url[%s] status[%s] content[%s]",

View File

@ -23,8 +23,10 @@ import com.google.api.client.util.Charsets;
import com.google.common.base.Throwables;
import com.metamx.common.ISE;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import javax.ws.rs.core.MediaType;
@ -71,13 +73,16 @@ public class EventReceiverFirehoseTestClient
public int postEvents(Collection<Map<String, Object>> events)
{
try {
StatusResponseHolder response = httpClient.post(new URL(getURL()))
.setContent(
StatusResponseHolder response = httpClient.go(
new Request(
HttpMethod.POST, new URL(getURL())
).setContent(
MediaType.APPLICATION_JSON,
this.jsonMapper.writeValueAsBytes(events)
)
.go(responseHandler)
.get();
),
responseHandler
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while posting events to url[%s] status[%s] content[%s]",

View File

@ -25,6 +25,7 @@ import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.guice.annotations.Global;
@ -32,6 +33,7 @@ import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
import io.druid.testing.IntegrationTestingConfig;
import io.druid.testing.utils.RetryUtil;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import java.net.URL;
@ -81,13 +83,14 @@ public class OverlordResourceTestClient
public String submitTask(String task)
{
try {
StatusResponseHolder response = httpClient.post(new URL(getIndexerURL() + "task"))
StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.POST, new URL(getIndexerURL() + "task"))
.setContent(
"application/json",
task.getBytes()
)
.go(responseHandler)
.get();
),
responseHandler
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while submitting task to indexer response [%s %s]",
@ -194,9 +197,7 @@ public class OverlordResourceTestClient
{
try {
StatusResponseHolder response = this.httpClient
.get(new URL(url))
.go(responseHandler)
.get();
.go(new Request(HttpMethod.GET, new URL(url)), responseHandler).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE("Error while making request to indexer [%s %s]", response.getStatus(), response.getContent());
}

View File

@ -25,11 +25,13 @@ import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.guice.annotations.Global;
import io.druid.query.Query;
import io.druid.testing.IntegrationTestingConfig;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import java.net.URL;
@ -67,13 +69,14 @@ public class QueryResourceTestClient
public List<Map<String, Object>> query(Query query)
{
try {
StatusResponseHolder response = httpClient.post(new URL(getBrokerURL()))
.setContent(
StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.POST, new URL(getBrokerURL())).setContent(
"application/json",
jsonMapper.writeValueAsBytes(query)
)
.go(responseHandler)
.get();
), responseHandler
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while querying[%s] status[%s] content[%s]",

View File

@ -24,12 +24,14 @@ import com.google.inject.Key;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.guice.annotations.Global;
import io.druid.testing.IntegrationTestingConfig;
import io.druid.testing.guice.DruidTestModuleFactory;
import io.druid.testing.utils.RetryUtil;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.testng.internal.IConfiguration;
import org.testng.internal.annotations.IAnnotationFinder;
@ -119,16 +121,19 @@ public class DruidTestRunnerFactory implements ITestRunnerFactory
public Boolean call() throws Exception
{
try {
StatusResponseHolder response = client.get(
StatusResponseHolder response = client.go(
new Request(
HttpMethod.GET,
new URL(
String.format(
"http://%s/status",
host
)
)
)
.go(handler)
.get();
),
handler
).get();
System.out.println(response.getStatus() + response.getContent());
if (response.getStatus().equals(HttpResponseStatus.OK)) {
return true;

View File

@ -40,7 +40,7 @@
<metamx.java-util.version>0.26.14</metamx.java-util.version>
<apache.curator.version>2.7.0</apache.curator.version>
<jetty.version>9.2.5.v20141112</jetty.version>
<druid.api.version>0.3.2</druid.api.version>
<druid.api.version>0.3.3</druid.api.version>
<jackson.version>2.4.4</jackson.version>
<log4j.version>2.1</log4j.version>
</properties>
@ -79,12 +79,12 @@
<dependency>
<groupId>com.metamx</groupId>
<artifactId>emitter</artifactId>
<version>0.2.13</version>
<version>0.3.0</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>http-client</artifactId>
<version>0.9.12</version>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
@ -99,7 +99,7 @@
<dependency>
<groupId>com.metamx</groupId>
<artifactId>server-metrics</artifactId>
<version>0.0.9</version>
<version>0.0.10</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>

View File

@ -28,6 +28,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -171,7 +172,7 @@ public class BenchmarkIndexibleWrites extends AbstractBenchmark
private final Integer totalIndexSize = 1<<20;
@BenchmarkOptions(warmupRounds = 100, benchmarkRounds = 100, clock = Clock.REAL_TIME, callgc = true)
@Test
@Ignore @Test
/**
* CALLEN - 2015-01-15 - OSX - Java 1.7.0_71-b14
BenchmarkIndexibleWrites.testConcurrentWrites[0]: [measured 100 out of 200 rounds, threads: 1 (sequential)]
@ -230,7 +231,7 @@ public class BenchmarkIndexibleWrites extends AbstractBenchmark
*/
@BenchmarkOptions(warmupRounds = 100, benchmarkRounds = 100, clock = Clock.REAL_TIME, callgc = true)
@Test
@Ignore @Test
public void testConcurrentReads() throws ExecutionException, InterruptedException
{
final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(

View File

@ -53,6 +53,7 @@ import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.Segment;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -191,7 +192,6 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
}
}
@Parameterized.Parameters
public static Collection<Object[]> getParameters()
{
@ -221,7 +221,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
return new MapBasedInputRow(timestamp, dimensionList, builder.build());
}
@Test
@Ignore @Test
@BenchmarkOptions(callgc = true, clock = Clock.REAL_TIME, warmupRounds = 10, benchmarkRounds = 20)
public void testConcurrentAddRead()
throws InterruptedException, ExecutionException, NoSuchMethodException, IllegalAccessException,

View File

@ -42,6 +42,7 @@ import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.ClientResponse;
import com.metamx.http.client.response.HttpResponseHandler;
import com.metamx.http.client.response.StatusResponseHandler;
@ -59,6 +60,7 @@ import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferInputStream;
import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponse;
import javax.ws.rs.core.MediaType;
@ -300,14 +302,17 @@ public class DirectDruidClient<T> implements QueryRunner<T>
}
}
};
future = httpClient
.post(new URL(url))
.setContent(objectMapper.writeValueAsBytes(query))
future = httpClient.go(
new Request(
HttpMethod.POST,
new URL(url)
).setContent(objectMapper.writeValueAsBytes(query))
.setHeader(
HttpHeaders.Names.CONTENT_TYPE,
isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON
)
.go(responseHandler);
),
responseHandler
);
queryWatcher.registerQuery(query, future);
@ -328,15 +333,19 @@ public class DirectDruidClient<T> implements QueryRunner<T>
if (future.isCancelled()) {
// forward the cancellation to underlying queriable node
try {
StatusResponseHolder res = httpClient
.delete(new URL(cancelUrl))
.setContent(objectMapper.writeValueAsBytes(query))
StatusResponseHolder res = httpClient.go(
new Request(
HttpMethod.DELETE,
new URL(cancelUrl)
).setContent(objectMapper.writeValueAsBytes(query))
.setHeader(
HttpHeaders.Names.CONTENT_TYPE,
isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON
)
.go(new StatusResponseHandler(Charsets.UTF_8))
.get();
isSmile
? SmileMediaTypes.APPLICATION_JACKSON_SMILE
: MediaType.APPLICATION_JSON
),
new StatusResponseHandler(Charsets.UTF_8)
).get();
if (res.getStatus().getCode() >= 500) {
throw new RE(
"Error cancelling query[%s]: queriable node returned status[%d] [%s].",

View File

@ -23,11 +23,13 @@ import com.google.inject.Inject;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.InputStreamResponseHandler;
import io.druid.client.selector.Server;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.guice.annotations.Global;
import io.druid.timeline.DataSegment;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Interval;
import javax.ws.rs.core.MediaType;
@ -92,10 +94,13 @@ public class IndexingServiceClient
private InputStream runQuery(Object queryObject)
{
try {
return client.post(new URL(String.format("%s/task", baseUrl())))
.setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(queryObject))
.go(RESPONSE_HANDLER)
.get();
return client.go(
new Request(
HttpMethod.POST,
new URL(String.format("%s/task", baseUrl()))
).setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(queryObject)),
RESPONSE_HANDLER
).get();
}
catch (Exception e) {
throw Throwables.propagate(e);

View File

@ -25,6 +25,7 @@ import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.client.selector.Server;
@ -34,6 +35,7 @@ import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.SegmentDescriptor;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Interval;
import javax.ws.rs.core.MediaType;
@ -99,13 +101,16 @@ public class BridgeQuerySegmentWalker implements QuerySegmentWalker
brokerSelector.pick().getHost()
);
StatusResponseHolder response = httpClient.post(new URL(url))
.setContent(
StatusResponseHolder response = httpClient.go(
new Request(
HttpMethod.POST,
new URL(url)
).setContent(
MediaType.APPLICATION_JSON,
jsonMapper.writeValueAsBytes(query)
)
.go(responseHandler)
.get();
),
responseHandler
).get();
List<T> results = jsonMapper.readValue(
response.getContent(), new TypeReference<List<T>>()

View File

@ -28,6 +28,7 @@ import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.FullResponseHandler;
import com.metamx.http.client.response.FullResponseHolder;
import io.druid.client.selector.Server;
@ -37,6 +38,7 @@ import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Json;
import io.druid.server.coordinator.rules.Rule;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.Duration;
@ -145,16 +147,24 @@ public class CoordinatorRuleManager
return;
}
FullResponseHolder response = httpClient.get(new URL(url))
.go(responseHandler)
.get();
FullResponseHolder response = httpClient.go(
new Request(
HttpMethod.GET,
new URL(url)
),
responseHandler
).get();
if (response.getStatus().equals(HttpResponseStatus.FOUND)) {
url = response.getResponse().getHeader("Location");
log.info("Redirecting rule request to [%s]", url);
response = httpClient.get(new URL(url))
.go(responseHandler)
.get();
response = httpClient.go(
new Request(
HttpMethod.GET,
new URL(url)
),
responseHandler
).get();
}
ConcurrentHashMap<String, List<Rule>> newRules = new ConcurrentHashMap<>(

View File

@ -26,7 +26,7 @@ import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.RequestBuilder;
import com.metamx.http.client.response.HttpResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.client.selector.ConnectionCountServerSelectorStrategy;
import io.druid.client.selector.HighestPriorityTierSelectorStrategy;
@ -41,6 +41,7 @@ import io.druid.query.Result;
import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
@ -62,15 +63,38 @@ public class DirectDruidClientTest
public void testRun() throws Exception
{
HttpClient httpClient = EasyMock.createMock(HttpClient.class);
RequestBuilder requestBuilder = new RequestBuilder(httpClient, HttpMethod.POST, new URL("http://foo.com"));
EasyMock.expect(httpClient.post(EasyMock.<URL>anyObject())).andReturn(requestBuilder).atLeastOnce();
SettableFuture futureException = SettableFuture.create();
final URL url = new URL("http://foo/druid/v2/");
SettableFuture<InputStream> futureResult = SettableFuture.create();
EasyMock.expect(httpClient.go(EasyMock.<Request>anyObject())).andReturn(futureResult).times(1);
EasyMock.expect(httpClient.go(EasyMock.<Request>anyObject())).andReturn(futureException).times(1);
EasyMock.expect(httpClient.go(EasyMock.<Request>anyObject())).andReturn(SettableFuture.create()).atLeastOnce();
Capture<Request> capturedRequest = EasyMock.newCapture();
EasyMock.expect(
httpClient.go(
EasyMock.capture(capturedRequest),
EasyMock.<HttpResponseHandler>anyObject()
)
)
.andReturn(futureResult)
.times(1);
SettableFuture futureException = SettableFuture.create();
EasyMock.expect(
httpClient.go(
EasyMock.capture(capturedRequest),
EasyMock.<HttpResponseHandler>anyObject()
)
)
.andReturn(futureException)
.times(1);
EasyMock.expect(
httpClient.go(
EasyMock.capture(capturedRequest),
EasyMock.<HttpResponseHandler>anyObject()
)
)
.andReturn(SettableFuture.create())
.atLeastOnce();
EasyMock.replay(httpClient);
final ServerSelector serverSelector = new ServerSelector(
@ -115,8 +139,11 @@ public class DirectDruidClientTest
serverSelector.addServer(queryableDruidServer2);
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
HashMap<String,List> context = new HashMap<String, List>();
HashMap<String, List> context = Maps.newHashMap();
Sequence s1 = client1.run(query, context);
Assert.assertTrue(capturedRequest.hasCaptured());
Assert.assertEquals(url, capturedRequest.getValue().getUrl());
Assert.assertEquals(HttpMethod.POST, capturedRequest.getValue().getMethod());
Assert.assertEquals(1, client1.getNumOpenConnections());
// simulate read timeout
@ -153,18 +180,28 @@ public class DirectDruidClientTest
public void testCancel() throws Exception
{
HttpClient httpClient = EasyMock.createStrictMock(HttpClient.class);
EasyMock.expect(httpClient.post(EasyMock.<URL>anyObject())).andReturn(
new RequestBuilder(httpClient, HttpMethod.POST, new URL("http://foo.com"))
).once();
Capture<Request> capturedRequest = EasyMock.newCapture();
ListenableFuture<Object> cancelledFuture = Futures.immediateCancelledFuture();
EasyMock.expect(httpClient.go(EasyMock.<Request>anyObject())).andReturn(cancelledFuture).once();
EasyMock.expect(httpClient.delete(EasyMock.<URL>anyObject()))
.andReturn(new RequestBuilder(httpClient, HttpMethod.DELETE, new URL("http://foo.com/delete")))
.once();
SettableFuture<Object> cancellationFuture = SettableFuture.create();
EasyMock.expect(httpClient.go(EasyMock.<Request>anyObject())).andReturn(cancellationFuture).once();
EasyMock.expect(
httpClient.go(
EasyMock.capture(capturedRequest),
EasyMock.<HttpResponseHandler>anyObject()
)
)
.andReturn(cancelledFuture)
.once();
EasyMock.expect(
httpClient.go(
EasyMock.capture(capturedRequest),
EasyMock.<HttpResponseHandler>anyObject()
)
)
.andReturn(cancellationFuture)
.once();
EasyMock.replay(httpClient);
@ -198,16 +235,18 @@ public class DirectDruidClientTest
serverSelector.addServer(queryableDruidServer1);
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
HashMap<String,List> context = new HashMap<String, List>();
HashMap<String, List> context = Maps.newHashMap();
cancellationFuture.set(new StatusResponseHolder(HttpResponseStatus.OK, new StringBuilder("cancelled")));
Sequence results = client1.run(query, context);
Assert.assertEquals(HttpMethod.DELETE, capturedRequest.getValue().getMethod());
Assert.assertEquals(0, client1.getNumOpenConnections());
QueryInterruptedException exception = null;
try {
Sequences.toList(results, Lists.newArrayList());
} catch(QueryInterruptedException e) {
}
catch (QueryInterruptedException e) {
exception = e;
}
Assert.assertNotNull(exception);

View File

@ -29,6 +29,7 @@ import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.HttpClientConfig;
import com.metamx.http.client.HttpClientInit;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.InputStreamResponseHandler;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
@ -47,6 +48,7 @@ import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Assert;
@ -147,8 +149,10 @@ public class JettyTest
long startTime2 = 0;
try {
ListenableFuture<StatusResponseHolder> go =
client.get(new URL("http://localhost:" + port + "/slow/hello"))
.go(new StatusResponseHandler(Charset.defaultCharset()));
client.go(
new Request(HttpMethod.GET, new URL("http://localhost:" + port + "/slow/hello")),
new StatusResponseHandler(Charset.defaultCharset())
);
startTime2 = System.currentTimeMillis();
go.get();
}
@ -201,8 +205,10 @@ public class JettyTest
public void testChunkNotFinalized() throws Exception
{
ListenableFuture<InputStream> go =
client.get(new URL("http://localhost:" + port + "/exception/exception"))
.go(new InputStreamResponseHandler());
client.go(
new Request(HttpMethod.GET, new URL("http://localhost:" + port + "/exception/exception")),
new InputStreamResponseHandler()
);
try {
StringWriter writer = new StringWriter();
IOUtils.copy(go.get(), writer, "utf-8");
@ -225,13 +231,10 @@ public class JettyTest
public void run()
{
try {
ListenableFuture<InputStream> go = client.get(
new URL(
"http://localhost:" + port + "/exception/exception"
)
)
.go(new InputStreamResponseHandler());
ListenableFuture<InputStream> go = client.go(
new Request(HttpMethod.GET, new URL("http://localhost:" + port + "/exception/exception")),
new InputStreamResponseHandler()
);
StringWriter writer = new StringWriter();
IOUtils.copy(go.get(), writer, "utf-8");
}