Update http-client to 1.0.0

This commit is contained in:
fjy 2015-02-09 17:14:33 -08:00
parent 72333731c3
commit 708759e1e0
17 changed files with 230 additions and 131 deletions

View File

@ -24,6 +24,7 @@ import com.google.common.base.Throwables;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder; import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.client.selector.Server; import io.druid.client.selector.Server;
@ -32,6 +33,7 @@ import io.druid.indexing.common.RetryPolicy;
import io.druid.indexing.common.RetryPolicyFactory; import io.druid.indexing.common.RetryPolicyFactory;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import org.jboss.netty.channel.ChannelException; import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Duration; import org.joda.time.Duration;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
@ -92,10 +94,11 @@ public class RemoteTaskActionClient implements TaskActionClient
log.info("Submitting action for task[%s] to overlord[%s]: %s", task.getId(), serviceUri, taskAction); log.info("Submitting action for task[%s] to overlord[%s]: %s", task.getId(), serviceUri, taskAction);
try { try {
response = httpClient.post(serviceUri.toURL()) response = httpClient.go(
.setContent(MediaType.APPLICATION_JSON, dataToSend) new Request(HttpMethod.POST, serviceUri.toURL())
.go(new StatusResponseHandler(Charsets.UTF_8)) .setContent(MediaType.APPLICATION_JSON, dataToSend),
.get(); new StatusResponseHandler(Charsets.UTF_8)
).get();
} }
catch (Exception e) { catch (Exception e) {
Throwables.propagateIfInstanceOf(e.getCause(), IOException.class); Throwables.propagateIfInstanceOf(e.getCause(), IOException.class);

View File

@ -40,6 +40,7 @@ import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.InputStreamResponseHandler; import com.metamx.http.client.response.InputStreamResponseHandler;
import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder; import com.metamx.http.client.response.StatusResponseHolder;
@ -61,6 +62,7 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths; import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -352,9 +354,10 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
try { try {
final URL url = makeWorkerURL(zkWorker.getWorker(), String.format("/task/%s/shutdown", taskId)); final URL url = makeWorkerURL(zkWorker.getWorker(), String.format("/task/%s/shutdown", taskId));
final StatusResponseHolder response = httpClient.post(url) final StatusResponseHolder response = httpClient.go(
.go(RESPONSE_HANDLER) new Request(HttpMethod.POST, url),
.get(); RESPONSE_HANDLER
).get();
log.info( log.info(
"Sent shutdown message to worker: %s, status %s, response: %s", "Sent shutdown message to worker: %s, status %s, response: %s",
@ -391,9 +394,10 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
public InputStream openStream() throws IOException public InputStream openStream() throws IOException
{ {
try { try {
return httpClient.get(url) return httpClient.go(
.go(new InputStreamResponseHandler()) new Request(HttpMethod.GET, url),
.get(); new InputStreamResponseHandler()
).get();
} }
catch (InterruptedException e) { catch (InterruptedException e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);

View File

@ -24,10 +24,12 @@ import com.google.common.base.Throwables;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder; import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Global;
import io.druid.testing.IntegrationTestingConfig; import io.druid.testing.IntegrationTestingConfig;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import java.net.URL; import java.net.URL;
@ -61,11 +63,17 @@ public class ClientInfoResourceTestClient
); );
} }
public List<String> getDimensions(String dataSource, String interval){ public List<String> getDimensions(String dataSource, String interval)
{
try { try {
StatusResponseHolder response = httpClient.get(new URL(String.format("%s/%s/dimensions?interval=%s", getBrokerURL(), dataSource, interval))) StatusResponseHolder response = httpClient.go(
.go(responseHandler) new Request(
.get(); HttpMethod.GET,
new URL(String.format("%s/%s/dimensions?interval=%s", getBrokerURL(), dataSource, interval))
),
responseHandler
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) { if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE( throw new ISE(
"Error while querying[%s] status[%s] content[%s]", "Error while querying[%s] status[%s] content[%s]",

View File

@ -25,7 +25,7 @@ import com.google.inject.Inject;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import com.metamx.http.client.RequestBuilder; import com.metamx.http.client.Request;
import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder; import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Global;
@ -120,12 +120,10 @@ public class CoordinatorResourceTestClient
private StatusResponseHolder makeRequest(HttpMethod method, String url) private StatusResponseHolder makeRequest(HttpMethod method, String url)
{ {
try { try {
StatusResponseHolder response = new RequestBuilder( StatusResponseHolder response = httpClient.go(
this.httpClient, new Request(method, new URL(url)),
method, new URL(url) responseHandler
) ).get();
.go(responseHandler)
.get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) { if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE( throw new ISE(
"Error while making request to url[%s] status[%s] content[%s]", "Error while making request to url[%s] status[%s] content[%s]",

View File

@ -23,8 +23,10 @@ import com.google.api.client.util.Charsets;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder; import com.metamx.http.client.response.StatusResponseHolder;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
@ -71,13 +73,16 @@ public class EventReceiverFirehoseTestClient
public int postEvents(Collection<Map<String, Object>> events) public int postEvents(Collection<Map<String, Object>> events)
{ {
try { try {
StatusResponseHolder response = httpClient.post(new URL(getURL())) StatusResponseHolder response = httpClient.go(
.setContent( new Request(
MediaType.APPLICATION_JSON, HttpMethod.POST, new URL(getURL())
this.jsonMapper.writeValueAsBytes(events) ).setContent(
) MediaType.APPLICATION_JSON,
.go(responseHandler) this.jsonMapper.writeValueAsBytes(events)
.get(); ),
responseHandler
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) { if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE( throw new ISE(
"Error while posting events to url[%s] status[%s] content[%s]", "Error while posting events to url[%s] status[%s] content[%s]",

View File

@ -25,6 +25,7 @@ import com.google.inject.Inject;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder; import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Global;
@ -32,6 +33,7 @@ import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.testing.IntegrationTestingConfig; import io.druid.testing.IntegrationTestingConfig;
import io.druid.testing.utils.RetryUtil; import io.druid.testing.utils.RetryUtil;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import java.net.URL; import java.net.URL;
@ -81,13 +83,14 @@ public class OverlordResourceTestClient
public String submitTask(String task) public String submitTask(String task)
{ {
try { try {
StatusResponseHolder response = httpClient.post(new URL(getIndexerURL() + "task")) StatusResponseHolder response = httpClient.go(
.setContent( new Request(HttpMethod.POST, new URL(getIndexerURL() + "task"))
"application/json", .setContent(
task.getBytes() "application/json",
) task.getBytes()
.go(responseHandler) ),
.get(); responseHandler
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) { if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE( throw new ISE(
"Error while submitting task to indexer response [%s %s]", "Error while submitting task to indexer response [%s %s]",
@ -194,9 +197,7 @@ public class OverlordResourceTestClient
{ {
try { try {
StatusResponseHolder response = this.httpClient StatusResponseHolder response = this.httpClient
.get(new URL(url)) .go(new Request(HttpMethod.GET, new URL(url)), responseHandler).get();
.go(responseHandler)
.get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) { if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE("Error while making request to indexer [%s %s]", response.getStatus(), response.getContent()); throw new ISE("Error while making request to indexer [%s %s]", response.getStatus(), response.getContent());
} }

View File

@ -25,11 +25,13 @@ import com.google.common.base.Throwables;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder; import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Global;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.testing.IntegrationTestingConfig; import io.druid.testing.IntegrationTestingConfig;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import java.net.URL; import java.net.URL;
@ -67,13 +69,14 @@ public class QueryResourceTestClient
public List<Map<String, Object>> query(Query query) public List<Map<String, Object>> query(Query query)
{ {
try { try {
StatusResponseHolder response = httpClient.post(new URL(getBrokerURL())) StatusResponseHolder response = httpClient.go(
.setContent( new Request(HttpMethod.POST, new URL(getBrokerURL())).setContent(
"application/json", "application/json",
jsonMapper.writeValueAsBytes(query) jsonMapper.writeValueAsBytes(query)
) ), responseHandler
.go(responseHandler)
.get(); ).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) { if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE( throw new ISE(
"Error while querying[%s] status[%s] content[%s]", "Error while querying[%s] status[%s] content[%s]",

View File

@ -24,12 +24,14 @@ import com.google.inject.Key;
import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder; import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Global;
import io.druid.testing.IntegrationTestingConfig; import io.druid.testing.IntegrationTestingConfig;
import io.druid.testing.guice.DruidTestModuleFactory; import io.druid.testing.guice.DruidTestModuleFactory;
import io.druid.testing.utils.RetryUtil; import io.druid.testing.utils.RetryUtil;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.testng.internal.IConfiguration; import org.testng.internal.IConfiguration;
import org.testng.internal.annotations.IAnnotationFinder; import org.testng.internal.annotations.IAnnotationFinder;
@ -119,16 +121,19 @@ public class DruidTestRunnerFactory implements ITestRunnerFactory
public Boolean call() throws Exception public Boolean call() throws Exception
{ {
try { try {
StatusResponseHolder response = client.get( StatusResponseHolder response = client.go(
new URL( new Request(
String.format( HttpMethod.GET,
"http://%s/status", new URL(
host String.format(
"http://%s/status",
host
)
) )
) ),
) handler
.go(handler) ).get();
.get();
System.out.println(response.getStatus() + response.getContent()); System.out.println(response.getStatus() + response.getContent());
if (response.getStatus().equals(HttpResponseStatus.OK)) { if (response.getStatus().equals(HttpResponseStatus.OK)) {
return true; return true;

View File

@ -40,7 +40,7 @@
<metamx.java-util.version>0.26.14</metamx.java-util.version> <metamx.java-util.version>0.26.14</metamx.java-util.version>
<apache.curator.version>2.7.0</apache.curator.version> <apache.curator.version>2.7.0</apache.curator.version>
<jetty.version>9.2.5.v20141112</jetty.version> <jetty.version>9.2.5.v20141112</jetty.version>
<druid.api.version>0.3.2</druid.api.version> <druid.api.version>0.3.3</druid.api.version>
<jackson.version>2.4.4</jackson.version> <jackson.version>2.4.4</jackson.version>
<log4j.version>2.1</log4j.version> <log4j.version>2.1</log4j.version>
</properties> </properties>
@ -79,12 +79,12 @@
<dependency> <dependency>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>emitter</artifactId> <artifactId>emitter</artifactId>
<version>0.2.13</version> <version>0.3.0</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>http-client</artifactId> <artifactId>http-client</artifactId>
<version>0.9.12</version> <version>1.0.0</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
@ -99,7 +99,7 @@
<dependency> <dependency>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>server-metrics</artifactId> <artifactId>server-metrics</artifactId>
<version>0.0.9</version> <version>0.0.10</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>commons-codec</groupId> <groupId>commons-codec</groupId>

View File

@ -28,6 +28,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
@ -171,7 +172,7 @@ public class BenchmarkIndexibleWrites extends AbstractBenchmark
private final Integer totalIndexSize = 1<<20; private final Integer totalIndexSize = 1<<20;
@BenchmarkOptions(warmupRounds = 100, benchmarkRounds = 100, clock = Clock.REAL_TIME, callgc = true) @BenchmarkOptions(warmupRounds = 100, benchmarkRounds = 100, clock = Clock.REAL_TIME, callgc = true)
@Test @Ignore @Test
/** /**
* CALLEN - 2015-01-15 - OSX - Java 1.7.0_71-b14 * CALLEN - 2015-01-15 - OSX - Java 1.7.0_71-b14
BenchmarkIndexibleWrites.testConcurrentWrites[0]: [measured 100 out of 200 rounds, threads: 1 (sequential)] BenchmarkIndexibleWrites.testConcurrentWrites[0]: [measured 100 out of 200 rounds, threads: 1 (sequential)]
@ -230,7 +231,7 @@ public class BenchmarkIndexibleWrites extends AbstractBenchmark
*/ */
@BenchmarkOptions(warmupRounds = 100, benchmarkRounds = 100, clock = Clock.REAL_TIME, callgc = true) @BenchmarkOptions(warmupRounds = 100, benchmarkRounds = 100, clock = Clock.REAL_TIME, callgc = true)
@Test @Ignore @Test
public void testConcurrentReads() throws ExecutionException, InterruptedException public void testConcurrentReads() throws ExecutionException, InterruptedException
{ {
final ListeningExecutorService executorService = MoreExecutors.listeningDecorator( final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(

View File

@ -53,6 +53,7 @@ import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.Segment; import io.druid.segment.Segment;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
@ -191,7 +192,6 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
} }
} }
@Parameterized.Parameters @Parameterized.Parameters
public static Collection<Object[]> getParameters() public static Collection<Object[]> getParameters()
{ {
@ -221,7 +221,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
return new MapBasedInputRow(timestamp, dimensionList, builder.build()); return new MapBasedInputRow(timestamp, dimensionList, builder.build());
} }
@Test @Ignore @Test
@BenchmarkOptions(callgc = true, clock = Clock.REAL_TIME, warmupRounds = 10, benchmarkRounds = 20) @BenchmarkOptions(callgc = true, clock = Clock.REAL_TIME, warmupRounds = 10, benchmarkRounds = 20)
public void testConcurrentAddRead() public void testConcurrentAddRead()
throws InterruptedException, ExecutionException, NoSuchMethodException, IllegalAccessException, throws InterruptedException, ExecutionException, NoSuchMethodException, IllegalAccessException,

View File

@ -42,6 +42,7 @@ import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.ClientResponse; import com.metamx.http.client.response.ClientResponse;
import com.metamx.http.client.response.HttpResponseHandler; import com.metamx.http.client.response.HttpResponseHandler;
import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHandler;
@ -59,6 +60,7 @@ import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferInputStream; import org.jboss.netty.buffer.ChannelBufferInputStream;
import org.jboss.netty.handler.codec.http.HttpChunk; import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponse; import org.jboss.netty.handler.codec.http.HttpResponse;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
@ -300,14 +302,17 @@ public class DirectDruidClient<T> implements QueryRunner<T>
} }
} }
}; };
future = httpClient future = httpClient.go(
.post(new URL(url)) new Request(
.setContent(objectMapper.writeValueAsBytes(query)) HttpMethod.POST,
.setHeader( new URL(url)
HttpHeaders.Names.CONTENT_TYPE, ).setContent(objectMapper.writeValueAsBytes(query))
isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON .setHeader(
) HttpHeaders.Names.CONTENT_TYPE,
.go(responseHandler); isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON
),
responseHandler
);
queryWatcher.registerQuery(query, future); queryWatcher.registerQuery(query, future);
@ -328,15 +333,19 @@ public class DirectDruidClient<T> implements QueryRunner<T>
if (future.isCancelled()) { if (future.isCancelled()) {
// forward the cancellation to underlying queriable node // forward the cancellation to underlying queriable node
try { try {
StatusResponseHolder res = httpClient StatusResponseHolder res = httpClient.go(
.delete(new URL(cancelUrl)) new Request(
.setContent(objectMapper.writeValueAsBytes(query)) HttpMethod.DELETE,
.setHeader( new URL(cancelUrl)
HttpHeaders.Names.CONTENT_TYPE, ).setContent(objectMapper.writeValueAsBytes(query))
isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON .setHeader(
) HttpHeaders.Names.CONTENT_TYPE,
.go(new StatusResponseHandler(Charsets.UTF_8)) isSmile
.get(); ? SmileMediaTypes.APPLICATION_JACKSON_SMILE
: MediaType.APPLICATION_JSON
),
new StatusResponseHandler(Charsets.UTF_8)
).get();
if (res.getStatus().getCode() >= 500) { if (res.getStatus().getCode() >= 500) {
throw new RE( throw new RE(
"Error cancelling query[%s]: queriable node returned status[%d] [%s].", "Error cancelling query[%s]: queriable node returned status[%d] [%s].",

View File

@ -23,11 +23,13 @@ import com.google.inject.Inject;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.InputStreamResponseHandler; import com.metamx.http.client.response.InputStreamResponseHandler;
import io.druid.client.selector.Server; import io.druid.client.selector.Server;
import io.druid.curator.discovery.ServerDiscoverySelector; import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Global;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
@ -92,10 +94,13 @@ public class IndexingServiceClient
private InputStream runQuery(Object queryObject) private InputStream runQuery(Object queryObject)
{ {
try { try {
return client.post(new URL(String.format("%s/task", baseUrl()))) return client.go(
.setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(queryObject)) new Request(
.go(RESPONSE_HANDLER) HttpMethod.POST,
.get(); new URL(String.format("%s/task", baseUrl()))
).setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(queryObject)),
RESPONSE_HANDLER
).get();
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);

View File

@ -25,6 +25,7 @@ import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder; import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.client.selector.Server; import io.druid.client.selector.Server;
@ -34,6 +35,7 @@ import io.druid.query.Query;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.QuerySegmentWalker; import io.druid.query.QuerySegmentWalker;
import io.druid.query.SegmentDescriptor; import io.druid.query.SegmentDescriptor;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
@ -99,18 +101,21 @@ public class BridgeQuerySegmentWalker implements QuerySegmentWalker
brokerSelector.pick().getHost() brokerSelector.pick().getHost()
); );
StatusResponseHolder response = httpClient.post(new URL(url)) StatusResponseHolder response = httpClient.go(
.setContent( new Request(
MediaType.APPLICATION_JSON, HttpMethod.POST,
jsonMapper.writeValueAsBytes(query) new URL(url)
) ).setContent(
.go(responseHandler) MediaType.APPLICATION_JSON,
.get(); jsonMapper.writeValueAsBytes(query)
),
responseHandler
).get();
List<T> results = jsonMapper.readValue( List<T> results = jsonMapper.readValue(
response.getContent(), new TypeReference<List<T>>() response.getContent(), new TypeReference<List<T>>()
{ {
} }
); );
return Sequences.simple(results); return Sequences.simple(results);

View File

@ -28,6 +28,7 @@ import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.FullResponseHandler; import com.metamx.http.client.response.FullResponseHandler;
import com.metamx.http.client.response.FullResponseHolder; import com.metamx.http.client.response.FullResponseHolder;
import io.druid.client.selector.Server; import io.druid.client.selector.Server;
@ -37,6 +38,7 @@ import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Json;
import io.druid.server.coordinator.rules.Rule; import io.druid.server.coordinator.rules.Rule;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.Duration; import org.joda.time.Duration;
@ -145,23 +147,31 @@ public class CoordinatorRuleManager
return; return;
} }
FullResponseHolder response = httpClient.get(new URL(url)) FullResponseHolder response = httpClient.go(
.go(responseHandler) new Request(
.get(); HttpMethod.GET,
new URL(url)
),
responseHandler
).get();
if (response.getStatus().equals(HttpResponseStatus.FOUND)) { if (response.getStatus().equals(HttpResponseStatus.FOUND)) {
url = response.getResponse().getHeader("Location"); url = response.getResponse().getHeader("Location");
log.info("Redirecting rule request to [%s]", url); log.info("Redirecting rule request to [%s]", url);
response = httpClient.get(new URL(url)) response = httpClient.go(
.go(responseHandler) new Request(
.get(); HttpMethod.GET,
new URL(url)
),
responseHandler
).get();
} }
ConcurrentHashMap<String, List<Rule>> newRules = new ConcurrentHashMap<>( ConcurrentHashMap<String, List<Rule>> newRules = new ConcurrentHashMap<>(
(Map<String, List<Rule>>) jsonMapper.readValue( (Map<String, List<Rule>>) jsonMapper.readValue(
response.getContent(), new TypeReference<Map<String, List<Rule>>>() response.getContent(), new TypeReference<Map<String, List<Rule>>>()
{ {
} }
) )
); );

View File

@ -26,7 +26,7 @@ import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request; import com.metamx.http.client.Request;
import com.metamx.http.client.RequestBuilder; import com.metamx.http.client.response.HttpResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder; import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.client.selector.ConnectionCountServerSelectorStrategy; import io.druid.client.selector.ConnectionCountServerSelectorStrategy;
import io.druid.client.selector.HighestPriorityTierSelectorStrategy; import io.druid.client.selector.HighestPriorityTierSelectorStrategy;
@ -41,6 +41,7 @@ import io.druid.query.Result;
import io.druid.query.timeboundary.TimeBoundaryQuery; import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.NoneShardSpec;
import org.easymock.Capture;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.codec.http.HttpResponseStatus;
@ -62,15 +63,38 @@ public class DirectDruidClientTest
public void testRun() throws Exception public void testRun() throws Exception
{ {
HttpClient httpClient = EasyMock.createMock(HttpClient.class); HttpClient httpClient = EasyMock.createMock(HttpClient.class);
RequestBuilder requestBuilder = new RequestBuilder(httpClient, HttpMethod.POST, new URL("http://foo.com")); final URL url = new URL("http://foo/druid/v2/");
EasyMock.expect(httpClient.post(EasyMock.<URL>anyObject())).andReturn(requestBuilder).atLeastOnce();
SettableFuture futureException = SettableFuture.create();
SettableFuture<InputStream> futureResult = SettableFuture.create(); SettableFuture<InputStream> futureResult = SettableFuture.create();
EasyMock.expect(httpClient.go(EasyMock.<Request>anyObject())).andReturn(futureResult).times(1); Capture<Request> capturedRequest = EasyMock.newCapture();
EasyMock.expect(httpClient.go(EasyMock.<Request>anyObject())).andReturn(futureException).times(1); EasyMock.expect(
EasyMock.expect(httpClient.go(EasyMock.<Request>anyObject())).andReturn(SettableFuture.create()).atLeastOnce(); httpClient.go(
EasyMock.capture(capturedRequest),
EasyMock.<HttpResponseHandler>anyObject()
)
)
.andReturn(futureResult)
.times(1);
SettableFuture futureException = SettableFuture.create();
EasyMock.expect(
httpClient.go(
EasyMock.capture(capturedRequest),
EasyMock.<HttpResponseHandler>anyObject()
)
)
.andReturn(futureException)
.times(1);
EasyMock.expect(
httpClient.go(
EasyMock.capture(capturedRequest),
EasyMock.<HttpResponseHandler>anyObject()
)
)
.andReturn(SettableFuture.create())
.atLeastOnce();
EasyMock.replay(httpClient); EasyMock.replay(httpClient);
final ServerSelector serverSelector = new ServerSelector( final ServerSelector serverSelector = new ServerSelector(
@ -115,8 +139,11 @@ public class DirectDruidClientTest
serverSelector.addServer(queryableDruidServer2); serverSelector.addServer(queryableDruidServer2);
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
HashMap<String,List> context = new HashMap<String, List>(); HashMap<String, List> context = Maps.newHashMap();
Sequence s1 = client1.run(query, context); Sequence s1 = client1.run(query, context);
Assert.assertTrue(capturedRequest.hasCaptured());
Assert.assertEquals(url, capturedRequest.getValue().getUrl());
Assert.assertEquals(HttpMethod.POST, capturedRequest.getValue().getMethod());
Assert.assertEquals(1, client1.getNumOpenConnections()); Assert.assertEquals(1, client1.getNumOpenConnections());
// simulate read timeout // simulate read timeout
@ -153,18 +180,28 @@ public class DirectDruidClientTest
public void testCancel() throws Exception public void testCancel() throws Exception
{ {
HttpClient httpClient = EasyMock.createStrictMock(HttpClient.class); HttpClient httpClient = EasyMock.createStrictMock(HttpClient.class);
EasyMock.expect(httpClient.post(EasyMock.<URL>anyObject())).andReturn(
new RequestBuilder(httpClient, HttpMethod.POST, new URL("http://foo.com"))
).once();
Capture<Request> capturedRequest = EasyMock.newCapture();
ListenableFuture<Object> cancelledFuture = Futures.immediateCancelledFuture(); ListenableFuture<Object> cancelledFuture = Futures.immediateCancelledFuture();
EasyMock.expect(httpClient.go(EasyMock.<Request>anyObject())).andReturn(cancelledFuture).once();
EasyMock.expect(httpClient.delete(EasyMock.<URL>anyObject()))
.andReturn(new RequestBuilder(httpClient, HttpMethod.DELETE, new URL("http://foo.com/delete")))
.once();
SettableFuture<Object> cancellationFuture = SettableFuture.create(); SettableFuture<Object> cancellationFuture = SettableFuture.create();
EasyMock.expect(httpClient.go(EasyMock.<Request>anyObject())).andReturn(cancellationFuture).once();
EasyMock.expect(
httpClient.go(
EasyMock.capture(capturedRequest),
EasyMock.<HttpResponseHandler>anyObject()
)
)
.andReturn(cancelledFuture)
.once();
EasyMock.expect(
httpClient.go(
EasyMock.capture(capturedRequest),
EasyMock.<HttpResponseHandler>anyObject()
)
)
.andReturn(cancellationFuture)
.once();
EasyMock.replay(httpClient); EasyMock.replay(httpClient);
@ -198,16 +235,18 @@ public class DirectDruidClientTest
serverSelector.addServer(queryableDruidServer1); serverSelector.addServer(queryableDruidServer1);
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
HashMap<String,List> context = new HashMap<String, List>(); HashMap<String, List> context = Maps.newHashMap();
cancellationFuture.set(new StatusResponseHolder(HttpResponseStatus.OK, new StringBuilder("cancelled"))); cancellationFuture.set(new StatusResponseHolder(HttpResponseStatus.OK, new StringBuilder("cancelled")));
Sequence results = client1.run(query, context); Sequence results = client1.run(query, context);
Assert.assertEquals(HttpMethod.DELETE, capturedRequest.getValue().getMethod());
Assert.assertEquals(0, client1.getNumOpenConnections()); Assert.assertEquals(0, client1.getNumOpenConnections());
QueryInterruptedException exception = null; QueryInterruptedException exception = null;
try { try {
Sequences.toList(results, Lists.newArrayList()); Sequences.toList(results, Lists.newArrayList());
} catch(QueryInterruptedException e) { }
catch (QueryInterruptedException e) {
exception = e; exception = e;
} }
Assert.assertNotNull(exception); Assert.assertNotNull(exception);

View File

@ -29,6 +29,7 @@ import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import com.metamx.http.client.HttpClientConfig; import com.metamx.http.client.HttpClientConfig;
import com.metamx.http.client.HttpClientInit; import com.metamx.http.client.HttpClientInit;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.InputStreamResponseHandler; import com.metamx.http.client.response.InputStreamResponseHandler;
import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder; import com.metamx.http.client.response.StatusResponseHolder;
@ -47,6 +48,7 @@ import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.DefaultServlet; import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.servlet.ServletHolder;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Duration; import org.joda.time.Duration;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -147,8 +149,10 @@ public class JettyTest
long startTime2 = 0; long startTime2 = 0;
try { try {
ListenableFuture<StatusResponseHolder> go = ListenableFuture<StatusResponseHolder> go =
client.get(new URL("http://localhost:" + port + "/slow/hello")) client.go(
.go(new StatusResponseHandler(Charset.defaultCharset())); new Request(HttpMethod.GET, new URL("http://localhost:" + port + "/slow/hello")),
new StatusResponseHandler(Charset.defaultCharset())
);
startTime2 = System.currentTimeMillis(); startTime2 = System.currentTimeMillis();
go.get(); go.get();
} }
@ -201,8 +205,10 @@ public class JettyTest
public void testChunkNotFinalized() throws Exception public void testChunkNotFinalized() throws Exception
{ {
ListenableFuture<InputStream> go = ListenableFuture<InputStream> go =
client.get(new URL("http://localhost:" + port + "/exception/exception")) client.go(
.go(new InputStreamResponseHandler()); new Request(HttpMethod.GET, new URL("http://localhost:" + port + "/exception/exception")),
new InputStreamResponseHandler()
);
try { try {
StringWriter writer = new StringWriter(); StringWriter writer = new StringWriter();
IOUtils.copy(go.get(), writer, "utf-8"); IOUtils.copy(go.get(), writer, "utf-8");
@ -225,13 +231,10 @@ public class JettyTest
public void run() public void run()
{ {
try { try {
ListenableFuture<InputStream> go = client.get( ListenableFuture<InputStream> go = client.go(
new URL( new Request(HttpMethod.GET, new URL("http://localhost:" + port + "/exception/exception")),
"http://localhost:" + port + "/exception/exception" new InputStreamResponseHandler()
) );
)
.go(new InputStreamResponseHandler());
StringWriter writer = new StringWriter(); StringWriter writer = new StringWriter();
IOUtils.copy(go.get(), writer, "utf-8"); IOUtils.copy(go.get(), writer, "utf-8");
} }