diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java index 5fde00f9a4f..6b1e2d02b0a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java @@ -24,6 +24,7 @@ import com.google.common.base.Throwables; import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import com.metamx.http.client.HttpClient; +import com.metamx.http.client.Request; import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHolder; import io.druid.client.selector.Server; @@ -32,6 +33,7 @@ import io.druid.indexing.common.RetryPolicy; import io.druid.indexing.common.RetryPolicyFactory; import io.druid.indexing.common.task.Task; import org.jboss.netty.channel.ChannelException; +import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.Duration; import javax.ws.rs.core.MediaType; @@ -92,10 +94,11 @@ public class RemoteTaskActionClient implements TaskActionClient log.info("Submitting action for task[%s] to overlord[%s]: %s", task.getId(), serviceUri, taskAction); try { - response = httpClient.post(serviceUri.toURL()) - .setContent(MediaType.APPLICATION_JSON, dataToSend) - .go(new StatusResponseHandler(Charsets.UTF_8)) - .get(); + response = httpClient.go( + new Request(HttpMethod.POST, serviceUri.toURL()) + .setContent(MediaType.APPLICATION_JSON, dataToSend), + new StatusResponseHandler(Charsets.UTF_8) + ).get(); } catch (Exception e) { Throwables.propagateIfInstanceOf(e.getCause(), IOException.class); diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index 58a42651e08..08a71d2808c 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -40,6 +40,7 @@ import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.emitter.EmittingLogger; import com.metamx.http.client.HttpClient; +import com.metamx.http.client.Request; import com.metamx.http.client.response.InputStreamResponseHandler; import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHolder; @@ -61,6 +62,7 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.utils.ZKPaths; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.DateTime; @@ -352,9 +354,10 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer try { final URL url = makeWorkerURL(zkWorker.getWorker(), String.format("/task/%s/shutdown", taskId)); - final StatusResponseHolder response = httpClient.post(url) - .go(RESPONSE_HANDLER) - .get(); + final StatusResponseHolder response = httpClient.go( + new Request(HttpMethod.POST, url), + RESPONSE_HANDLER + ).get(); log.info( "Sent shutdown message to worker: %s, status %s, response: %s", @@ -391,9 +394,10 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer public InputStream openStream() throws IOException { try { - return httpClient.get(url) - .go(new InputStreamResponseHandler()) - .get(); + return httpClient.go( + new Request(HttpMethod.GET, url), + new InputStreamResponseHandler() + ).get(); } catch (InterruptedException e) { throw Throwables.propagate(e); diff --git a/integration-tests/src/main/java/io/druid/testing/clients/ClientInfoResourceTestClient.java b/integration-tests/src/main/java/io/druid/testing/clients/ClientInfoResourceTestClient.java index c29dd324034..ec36662e059 100644 --- a/integration-tests/src/main/java/io/druid/testing/clients/ClientInfoResourceTestClient.java +++ b/integration-tests/src/main/java/io/druid/testing/clients/ClientInfoResourceTestClient.java @@ -24,10 +24,12 @@ import com.google.common.base.Throwables; import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.http.client.HttpClient; +import com.metamx.http.client.Request; import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHolder; import io.druid.guice.annotations.Global; import io.druid.testing.IntegrationTestingConfig; +import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import java.net.URL; @@ -61,11 +63,17 @@ public class ClientInfoResourceTestClient ); } - public List getDimensions(String dataSource, String interval){ + public List getDimensions(String dataSource, String interval) + { try { - StatusResponseHolder response = httpClient.get(new URL(String.format("%s/%s/dimensions?interval=%s", getBrokerURL(), dataSource, interval))) - .go(responseHandler) - .get(); + StatusResponseHolder response = httpClient.go( + new Request( + HttpMethod.GET, + new URL(String.format("%s/%s/dimensions?interval=%s", getBrokerURL(), dataSource, interval)) + ), + responseHandler + ).get(); + if (!response.getStatus().equals(HttpResponseStatus.OK)) { throw new ISE( "Error while querying[%s] status[%s] content[%s]", diff --git a/integration-tests/src/main/java/io/druid/testing/clients/CoordinatorResourceTestClient.java b/integration-tests/src/main/java/io/druid/testing/clients/CoordinatorResourceTestClient.java index afb0fe77f9b..e34a3fdf5bc 100644 --- a/integration-tests/src/main/java/io/druid/testing/clients/CoordinatorResourceTestClient.java +++ b/integration-tests/src/main/java/io/druid/testing/clients/CoordinatorResourceTestClient.java @@ -25,7 +25,7 @@ import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import com.metamx.http.client.HttpClient; -import com.metamx.http.client.RequestBuilder; +import com.metamx.http.client.Request; import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHolder; import io.druid.guice.annotations.Global; @@ -120,12 +120,10 @@ public class CoordinatorResourceTestClient private StatusResponseHolder makeRequest(HttpMethod method, String url) { try { - StatusResponseHolder response = new RequestBuilder( - this.httpClient, - method, new URL(url) - ) - .go(responseHandler) - .get(); + StatusResponseHolder response = httpClient.go( + new Request(method, new URL(url)), + responseHandler + ).get(); if (!response.getStatus().equals(HttpResponseStatus.OK)) { throw new ISE( "Error while making request to url[%s] status[%s] content[%s]", diff --git a/integration-tests/src/main/java/io/druid/testing/clients/EventReceiverFirehoseTestClient.java b/integration-tests/src/main/java/io/druid/testing/clients/EventReceiverFirehoseTestClient.java index 853534420c8..77e27e2254c 100644 --- a/integration-tests/src/main/java/io/druid/testing/clients/EventReceiverFirehoseTestClient.java +++ b/integration-tests/src/main/java/io/druid/testing/clients/EventReceiverFirehoseTestClient.java @@ -23,8 +23,10 @@ import com.google.api.client.util.Charsets; import com.google.common.base.Throwables; import com.metamx.common.ISE; import com.metamx.http.client.HttpClient; +import com.metamx.http.client.Request; import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHolder; +import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import javax.ws.rs.core.MediaType; @@ -71,13 +73,16 @@ public class EventReceiverFirehoseTestClient public int postEvents(Collection> events) { try { - StatusResponseHolder response = httpClient.post(new URL(getURL())) - .setContent( - MediaType.APPLICATION_JSON, - this.jsonMapper.writeValueAsBytes(events) - ) - .go(responseHandler) - .get(); + StatusResponseHolder response = httpClient.go( + new Request( + HttpMethod.POST, new URL(getURL()) + ).setContent( + MediaType.APPLICATION_JSON, + this.jsonMapper.writeValueAsBytes(events) + ), + responseHandler + ).get(); + if (!response.getStatus().equals(HttpResponseStatus.OK)) { throw new ISE( "Error while posting events to url[%s] status[%s] content[%s]", diff --git a/integration-tests/src/main/java/io/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/io/druid/testing/clients/OverlordResourceTestClient.java index 0103447b890..69c15c095a5 100644 --- a/integration-tests/src/main/java/io/druid/testing/clients/OverlordResourceTestClient.java +++ b/integration-tests/src/main/java/io/druid/testing/clients/OverlordResourceTestClient.java @@ -25,6 +25,7 @@ import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import com.metamx.http.client.HttpClient; +import com.metamx.http.client.Request; import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHolder; import io.druid.guice.annotations.Global; @@ -32,6 +33,7 @@ import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.task.Task; import io.druid.testing.IntegrationTestingConfig; import io.druid.testing.utils.RetryUtil; +import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import java.net.URL; @@ -81,13 +83,14 @@ public class OverlordResourceTestClient public String submitTask(String task) { try { - StatusResponseHolder response = httpClient.post(new URL(getIndexerURL() + "task")) - .setContent( - "application/json", - task.getBytes() - ) - .go(responseHandler) - .get(); + StatusResponseHolder response = httpClient.go( + new Request(HttpMethod.POST, new URL(getIndexerURL() + "task")) + .setContent( + "application/json", + task.getBytes() + ), + responseHandler + ).get(); if (!response.getStatus().equals(HttpResponseStatus.OK)) { throw new ISE( "Error while submitting task to indexer response [%s %s]", @@ -194,9 +197,7 @@ public class OverlordResourceTestClient { try { StatusResponseHolder response = this.httpClient - .get(new URL(url)) - .go(responseHandler) - .get(); + .go(new Request(HttpMethod.GET, new URL(url)), responseHandler).get(); if (!response.getStatus().equals(HttpResponseStatus.OK)) { throw new ISE("Error while making request to indexer [%s %s]", response.getStatus(), response.getContent()); } diff --git a/integration-tests/src/main/java/io/druid/testing/clients/QueryResourceTestClient.java b/integration-tests/src/main/java/io/druid/testing/clients/QueryResourceTestClient.java index d9ca00d721d..43c6ed7b938 100644 --- a/integration-tests/src/main/java/io/druid/testing/clients/QueryResourceTestClient.java +++ b/integration-tests/src/main/java/io/druid/testing/clients/QueryResourceTestClient.java @@ -25,11 +25,13 @@ import com.google.common.base.Throwables; import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.http.client.HttpClient; +import com.metamx.http.client.Request; import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHolder; import io.druid.guice.annotations.Global; import io.druid.query.Query; import io.druid.testing.IntegrationTestingConfig; +import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import java.net.URL; @@ -67,13 +69,14 @@ public class QueryResourceTestClient public List> query(Query query) { try { - StatusResponseHolder response = httpClient.post(new URL(getBrokerURL())) - .setContent( - "application/json", - jsonMapper.writeValueAsBytes(query) - ) - .go(responseHandler) - .get(); + StatusResponseHolder response = httpClient.go( + new Request(HttpMethod.POST, new URL(getBrokerURL())).setContent( + "application/json", + jsonMapper.writeValueAsBytes(query) + ), responseHandler + + ).get(); + if (!response.getStatus().equals(HttpResponseStatus.OK)) { throw new ISE( "Error while querying[%s] status[%s] content[%s]", diff --git a/integration-tests/src/main/java/org/testng/DruidTestRunnerFactory.java b/integration-tests/src/main/java/org/testng/DruidTestRunnerFactory.java index 2de502f6587..f06afecdd20 100644 --- a/integration-tests/src/main/java/org/testng/DruidTestRunnerFactory.java +++ b/integration-tests/src/main/java/org/testng/DruidTestRunnerFactory.java @@ -24,12 +24,14 @@ import com.google.inject.Key; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; import com.metamx.http.client.HttpClient; +import com.metamx.http.client.Request; import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHolder; import io.druid.guice.annotations.Global; import io.druid.testing.IntegrationTestingConfig; import io.druid.testing.guice.DruidTestModuleFactory; import io.druid.testing.utils.RetryUtil; +import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.testng.internal.IConfiguration; import org.testng.internal.annotations.IAnnotationFinder; @@ -119,16 +121,19 @@ public class DruidTestRunnerFactory implements ITestRunnerFactory public Boolean call() throws Exception { try { - StatusResponseHolder response = client.get( - new URL( - String.format( - "http://%s/status", - host + StatusResponseHolder response = client.go( + new Request( + HttpMethod.GET, + new URL( + String.format( + "http://%s/status", + host + ) ) - ) - ) - .go(handler) - .get(); + ), + handler + ).get(); + System.out.println(response.getStatus() + response.getContent()); if (response.getStatus().equals(HttpResponseStatus.OK)) { return true; diff --git a/pom.xml b/pom.xml index 0b741b62774..4a930fb9f5a 100644 --- a/pom.xml +++ b/pom.xml @@ -40,7 +40,7 @@ 0.26.14 2.7.0 9.2.5.v20141112 - 0.3.2 + 0.3.3 2.4.4 2.1 @@ -79,12 +79,12 @@ com.metamx emitter - 0.2.13 + 0.3.0 com.metamx http-client - 0.9.12 + 1.0.0 com.metamx @@ -99,7 +99,7 @@ com.metamx server-metrics - 0.0.9 + 0.0.10 commons-codec diff --git a/processing/src/test/java/io/druid/segment/data/BenchmarkIndexibleWrites.java b/processing/src/test/java/io/druid/segment/data/BenchmarkIndexibleWrites.java index 9297cfbc589..a53d3be1d2b 100644 --- a/processing/src/test/java/io/druid/segment/data/BenchmarkIndexibleWrites.java +++ b/processing/src/test/java/io/druid/segment/data/BenchmarkIndexibleWrites.java @@ -28,6 +28,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -171,7 +172,7 @@ public class BenchmarkIndexibleWrites extends AbstractBenchmark private final Integer totalIndexSize = 1<<20; @BenchmarkOptions(warmupRounds = 100, benchmarkRounds = 100, clock = Clock.REAL_TIME, callgc = true) - @Test + @Ignore @Test /** * CALLEN - 2015-01-15 - OSX - Java 1.7.0_71-b14 BenchmarkIndexibleWrites.testConcurrentWrites[0]: [measured 100 out of 200 rounds, threads: 1 (sequential)] @@ -230,7 +231,7 @@ public class BenchmarkIndexibleWrites extends AbstractBenchmark */ @BenchmarkOptions(warmupRounds = 100, benchmarkRounds = 100, clock = Clock.REAL_TIME, callgc = true) - @Test + @Ignore @Test public void testConcurrentReads() throws ExecutionException, InterruptedException { final ListeningExecutorService executorService = MoreExecutors.listeningDecorator( diff --git a/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java b/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java index 760297c89e7..ed835c4cb29 100644 --- a/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java +++ b/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java @@ -53,6 +53,7 @@ import io.druid.segment.IncrementalIndexSegment; import io.druid.segment.Segment; import org.joda.time.Interval; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -191,7 +192,6 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark } } - @Parameterized.Parameters public static Collection getParameters() { @@ -221,7 +221,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark return new MapBasedInputRow(timestamp, dimensionList, builder.build()); } - @Test + @Ignore @Test @BenchmarkOptions(callgc = true, clock = Clock.REAL_TIME, warmupRounds = 10, benchmarkRounds = 20) public void testConcurrentAddRead() throws InterruptedException, ExecutionException, NoSuchMethodException, IllegalAccessException, diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index 191d895f6ab..911cddcbf8a 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -42,6 +42,7 @@ import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.common.logger.Logger; import com.metamx.http.client.HttpClient; +import com.metamx.http.client.Request; import com.metamx.http.client.response.ClientResponse; import com.metamx.http.client.response.HttpResponseHandler; import com.metamx.http.client.response.StatusResponseHandler; @@ -59,6 +60,7 @@ import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBufferInputStream; import org.jboss.netty.handler.codec.http.HttpChunk; import org.jboss.netty.handler.codec.http.HttpHeaders; +import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponse; import javax.ws.rs.core.MediaType; @@ -300,14 +302,17 @@ public class DirectDruidClient implements QueryRunner } } }; - future = httpClient - .post(new URL(url)) - .setContent(objectMapper.writeValueAsBytes(query)) - .setHeader( - HttpHeaders.Names.CONTENT_TYPE, - isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON - ) - .go(responseHandler); + future = httpClient.go( + new Request( + HttpMethod.POST, + new URL(url) + ).setContent(objectMapper.writeValueAsBytes(query)) + .setHeader( + HttpHeaders.Names.CONTENT_TYPE, + isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON + ), + responseHandler + ); queryWatcher.registerQuery(query, future); @@ -328,15 +333,19 @@ public class DirectDruidClient implements QueryRunner if (future.isCancelled()) { // forward the cancellation to underlying queriable node try { - StatusResponseHolder res = httpClient - .delete(new URL(cancelUrl)) - .setContent(objectMapper.writeValueAsBytes(query)) - .setHeader( - HttpHeaders.Names.CONTENT_TYPE, - isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON - ) - .go(new StatusResponseHandler(Charsets.UTF_8)) - .get(); + StatusResponseHolder res = httpClient.go( + new Request( + HttpMethod.DELETE, + new URL(cancelUrl) + ).setContent(objectMapper.writeValueAsBytes(query)) + .setHeader( + HttpHeaders.Names.CONTENT_TYPE, + isSmile + ? SmileMediaTypes.APPLICATION_JACKSON_SMILE + : MediaType.APPLICATION_JSON + ), + new StatusResponseHandler(Charsets.UTF_8) + ).get(); if (res.getStatus().getCode() >= 500) { throw new RE( "Error cancelling query[%s]: queriable node returned status[%d] [%s].", diff --git a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java index 16324e8bc53..844bb4821cc 100644 --- a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java +++ b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java @@ -23,11 +23,13 @@ import com.google.inject.Inject; import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.http.client.HttpClient; +import com.metamx.http.client.Request; import com.metamx.http.client.response.InputStreamResponseHandler; import io.druid.client.selector.Server; import io.druid.curator.discovery.ServerDiscoverySelector; import io.druid.guice.annotations.Global; import io.druid.timeline.DataSegment; +import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.Interval; import javax.ws.rs.core.MediaType; @@ -92,10 +94,13 @@ public class IndexingServiceClient private InputStream runQuery(Object queryObject) { try { - return client.post(new URL(String.format("%s/task", baseUrl()))) - .setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(queryObject)) - .go(RESPONSE_HANDLER) - .get(); + return client.go( + new Request( + HttpMethod.POST, + new URL(String.format("%s/task", baseUrl())) + ).setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(queryObject)), + RESPONSE_HANDLER + ).get(); } catch (Exception e) { throw Throwables.propagate(e); diff --git a/server/src/main/java/io/druid/server/bridge/BridgeQuerySegmentWalker.java b/server/src/main/java/io/druid/server/bridge/BridgeQuerySegmentWalker.java index e6261d3a362..48c8c4adfda 100644 --- a/server/src/main/java/io/druid/server/bridge/BridgeQuerySegmentWalker.java +++ b/server/src/main/java/io/druid/server/bridge/BridgeQuerySegmentWalker.java @@ -25,6 +25,7 @@ import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.common.logger.Logger; import com.metamx.http.client.HttpClient; +import com.metamx.http.client.Request; import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHolder; import io.druid.client.selector.Server; @@ -34,6 +35,7 @@ import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QuerySegmentWalker; import io.druid.query.SegmentDescriptor; +import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.Interval; import javax.ws.rs.core.MediaType; @@ -99,18 +101,21 @@ public class BridgeQuerySegmentWalker implements QuerySegmentWalker brokerSelector.pick().getHost() ); - StatusResponseHolder response = httpClient.post(new URL(url)) - .setContent( - MediaType.APPLICATION_JSON, - jsonMapper.writeValueAsBytes(query) - ) - .go(responseHandler) - .get(); + StatusResponseHolder response = httpClient.go( + new Request( + HttpMethod.POST, + new URL(url) + ).setContent( + MediaType.APPLICATION_JSON, + jsonMapper.writeValueAsBytes(query) + ), + responseHandler + ).get(); List results = jsonMapper.readValue( response.getContent(), new TypeReference>() - { - } + { + } ); return Sequences.simple(results); diff --git a/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java b/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java index 3997a19c4f6..c97d5fe48fd 100644 --- a/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java +++ b/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java @@ -28,6 +28,7 @@ import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; import com.metamx.http.client.HttpClient; +import com.metamx.http.client.Request; import com.metamx.http.client.response.FullResponseHandler; import com.metamx.http.client.response.FullResponseHolder; import io.druid.client.selector.Server; @@ -37,6 +38,7 @@ import io.druid.guice.ManageLifecycle; import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Json; import io.druid.server.coordinator.rules.Rule; +import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.Duration; @@ -145,23 +147,31 @@ public class CoordinatorRuleManager return; } - FullResponseHolder response = httpClient.get(new URL(url)) - .go(responseHandler) - .get(); + FullResponseHolder response = httpClient.go( + new Request( + HttpMethod.GET, + new URL(url) + ), + responseHandler + ).get(); if (response.getStatus().equals(HttpResponseStatus.FOUND)) { url = response.getResponse().getHeader("Location"); log.info("Redirecting rule request to [%s]", url); - response = httpClient.get(new URL(url)) - .go(responseHandler) - .get(); + response = httpClient.go( + new Request( + HttpMethod.GET, + new URL(url) + ), + responseHandler + ).get(); } ConcurrentHashMap> newRules = new ConcurrentHashMap<>( (Map>) jsonMapper.readValue( response.getContent(), new TypeReference>>() - { - } + { + } ) ); diff --git a/server/src/test/java/io/druid/client/DirectDruidClientTest.java b/server/src/test/java/io/druid/client/DirectDruidClientTest.java index be32e0efc9d..918b49dcd28 100644 --- a/server/src/test/java/io/druid/client/DirectDruidClientTest.java +++ b/server/src/test/java/io/druid/client/DirectDruidClientTest.java @@ -26,7 +26,7 @@ import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.http.client.HttpClient; import com.metamx.http.client.Request; -import com.metamx.http.client.RequestBuilder; +import com.metamx.http.client.response.HttpResponseHandler; import com.metamx.http.client.response.StatusResponseHolder; import io.druid.client.selector.ConnectionCountServerSelectorStrategy; import io.druid.client.selector.HighestPriorityTierSelectorStrategy; @@ -41,6 +41,7 @@ import io.druid.query.Result; import io.druid.query.timeboundary.TimeBoundaryQuery; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; +import org.easymock.Capture; import org.easymock.EasyMock; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; @@ -62,15 +63,38 @@ public class DirectDruidClientTest public void testRun() throws Exception { HttpClient httpClient = EasyMock.createMock(HttpClient.class); - RequestBuilder requestBuilder = new RequestBuilder(httpClient, HttpMethod.POST, new URL("http://foo.com")); - EasyMock.expect(httpClient.post(EasyMock.anyObject())).andReturn(requestBuilder).atLeastOnce(); - - SettableFuture futureException = SettableFuture.create(); + final URL url = new URL("http://foo/druid/v2/"); SettableFuture futureResult = SettableFuture.create(); - EasyMock.expect(httpClient.go(EasyMock.anyObject())).andReturn(futureResult).times(1); - EasyMock.expect(httpClient.go(EasyMock.anyObject())).andReturn(futureException).times(1); - EasyMock.expect(httpClient.go(EasyMock.anyObject())).andReturn(SettableFuture.create()).atLeastOnce(); + Capture capturedRequest = EasyMock.newCapture(); + EasyMock.expect( + httpClient.go( + EasyMock.capture(capturedRequest), + EasyMock.anyObject() + ) + ) + .andReturn(futureResult) + .times(1); + + SettableFuture futureException = SettableFuture.create(); + EasyMock.expect( + httpClient.go( + EasyMock.capture(capturedRequest), + EasyMock.anyObject() + ) + ) + .andReturn(futureException) + .times(1); + + EasyMock.expect( + httpClient.go( + EasyMock.capture(capturedRequest), + EasyMock.anyObject() + ) + ) + .andReturn(SettableFuture.create()) + .atLeastOnce(); + EasyMock.replay(httpClient); final ServerSelector serverSelector = new ServerSelector( @@ -115,8 +139,11 @@ public class DirectDruidClientTest serverSelector.addServer(queryableDruidServer2); TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); - HashMap context = new HashMap(); + HashMap context = Maps.newHashMap(); Sequence s1 = client1.run(query, context); + Assert.assertTrue(capturedRequest.hasCaptured()); + Assert.assertEquals(url, capturedRequest.getValue().getUrl()); + Assert.assertEquals(HttpMethod.POST, capturedRequest.getValue().getMethod()); Assert.assertEquals(1, client1.getNumOpenConnections()); // simulate read timeout @@ -153,18 +180,28 @@ public class DirectDruidClientTest public void testCancel() throws Exception { HttpClient httpClient = EasyMock.createStrictMock(HttpClient.class); - EasyMock.expect(httpClient.post(EasyMock.anyObject())).andReturn( - new RequestBuilder(httpClient, HttpMethod.POST, new URL("http://foo.com")) - ).once(); + Capture capturedRequest = EasyMock.newCapture(); ListenableFuture cancelledFuture = Futures.immediateCancelledFuture(); - EasyMock.expect(httpClient.go(EasyMock.anyObject())).andReturn(cancelledFuture).once(); - - EasyMock.expect(httpClient.delete(EasyMock.anyObject())) - .andReturn(new RequestBuilder(httpClient, HttpMethod.DELETE, new URL("http://foo.com/delete"))) - .once(); SettableFuture cancellationFuture = SettableFuture.create(); - EasyMock.expect(httpClient.go(EasyMock.anyObject())).andReturn(cancellationFuture).once(); + + EasyMock.expect( + httpClient.go( + EasyMock.capture(capturedRequest), + EasyMock.anyObject() + ) + ) + .andReturn(cancelledFuture) + .once(); + + EasyMock.expect( + httpClient.go( + EasyMock.capture(capturedRequest), + EasyMock.anyObject() + ) + ) + .andReturn(cancellationFuture) + .once(); EasyMock.replay(httpClient); @@ -198,16 +235,18 @@ public class DirectDruidClientTest serverSelector.addServer(queryableDruidServer1); TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); - HashMap context = new HashMap(); + HashMap context = Maps.newHashMap(); cancellationFuture.set(new StatusResponseHolder(HttpResponseStatus.OK, new StringBuilder("cancelled"))); Sequence results = client1.run(query, context); + Assert.assertEquals(HttpMethod.DELETE, capturedRequest.getValue().getMethod()); Assert.assertEquals(0, client1.getNumOpenConnections()); QueryInterruptedException exception = null; try { Sequences.toList(results, Lists.newArrayList()); - } catch(QueryInterruptedException e) { + } + catch (QueryInterruptedException e) { exception = e; } Assert.assertNotNull(exception); diff --git a/server/src/test/java/io/druid/server/initialization/JettyTest.java b/server/src/test/java/io/druid/server/initialization/JettyTest.java index 34843e07148..a214941af94 100644 --- a/server/src/test/java/io/druid/server/initialization/JettyTest.java +++ b/server/src/test/java/io/druid/server/initialization/JettyTest.java @@ -29,6 +29,7 @@ import com.metamx.common.lifecycle.Lifecycle; import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClientConfig; import com.metamx.http.client.HttpClientInit; +import com.metamx.http.client.Request; import com.metamx.http.client.response.InputStreamResponseHandler; import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHolder; @@ -47,6 +48,7 @@ import org.eclipse.jetty.server.handler.HandlerList; import org.eclipse.jetty.servlet.DefaultServlet; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; +import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.Duration; import org.junit.After; import org.junit.Assert; @@ -147,8 +149,10 @@ public class JettyTest long startTime2 = 0; try { ListenableFuture go = - client.get(new URL("http://localhost:" + port + "/slow/hello")) - .go(new StatusResponseHandler(Charset.defaultCharset())); + client.go( + new Request(HttpMethod.GET, new URL("http://localhost:" + port + "/slow/hello")), + new StatusResponseHandler(Charset.defaultCharset()) + ); startTime2 = System.currentTimeMillis(); go.get(); } @@ -201,8 +205,10 @@ public class JettyTest public void testChunkNotFinalized() throws Exception { ListenableFuture go = - client.get(new URL("http://localhost:" + port + "/exception/exception")) - .go(new InputStreamResponseHandler()); + client.go( + new Request(HttpMethod.GET, new URL("http://localhost:" + port + "/exception/exception")), + new InputStreamResponseHandler() + ); try { StringWriter writer = new StringWriter(); IOUtils.copy(go.get(), writer, "utf-8"); @@ -225,13 +231,10 @@ public class JettyTest public void run() { try { - ListenableFuture go = client.get( - new URL( - "http://localhost:" + port + "/exception/exception" - ) - - ) - .go(new InputStreamResponseHandler()); + ListenableFuture go = client.go( + new Request(HttpMethod.GET, new URL("http://localhost:" + port + "/exception/exception")), + new InputStreamResponseHandler() + ); StringWriter writer = new StringWriter(); IOUtils.copy(go.get(), writer, "utf-8"); }