Replace AppendableByteArrayInputStream in DirectDruidClient

* Replace with SequenceInputStream fueled by an enumeration of ChannelBufferInputStream which directly wrap the response context ChannelBuffer
* Added zero-length byte array when DirectDruidClient is done. This prevents an odd race condition on `done`.
This commit is contained in:
Charles Allen 2014-11-19 17:12:04 -08:00
parent 016d07fda3
commit ca8300a461
1 changed files with 174 additions and 63 deletions

View File

@ -1,6 +1,6 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
@ -31,6 +31,7 @@ import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -43,9 +44,8 @@ import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.io.AppendableByteArrayInputStream;
import com.metamx.http.client.response.ClientResponse;
import com.metamx.http.client.response.InputStreamResponseHandler;
import com.metamx.http.client.response.HttpResponseHandler;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.query.BySegmentResultValueClass;
@ -57,6 +57,8 @@ import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import io.druid.query.aggregation.MetricManipulatorFns;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferInputStream;
import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpResponse;
@ -65,13 +67,19 @@ import javax.ws.rs.core.MediaType;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.net.URL;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
*/
@ -143,68 +151,168 @@ public class DirectDruidClient<T> implements QueryRunner<T>
try {
log.debug("Querying url[%s]", url);
final HttpResponseHandler<InputStream, InputStream> responseHandler = new HttpResponseHandler<InputStream, InputStream>()
{
private long startTime;
private final AtomicLong byteCount = new AtomicLong(0);
private final BlockingQueue<InputStream> queue = new LinkedBlockingQueue<>();
private final AtomicBoolean done = new AtomicBoolean(false);
@Override
public ClientResponse<InputStream> handleResponse(HttpResponse response)
{
log.debug("Initial response from url[%s]", url);
startTime = System.currentTimeMillis();
try {
final String responseContext = response.headers().get("X-Druid-Response-Context");
// context may be null in case of error or query timeout
if (responseContext != null) {
context.putAll(
objectMapper.<Map<String, Object>>readValue(
responseContext, new TypeReference<Map<String, Object>>()
{
}
)
);
}
queue.put(new ChannelBufferInputStream(response.getContent()));
}
catch (final IOException e) {
log.error(e, "Error parsing response context from url [%s]", url);
return ClientResponse.<InputStream>finished(
new InputStream()
{
@Override
public int read() throws IOException
{
throw e;
}
}
);
}
catch (InterruptedException e) {
log.error(e, "Queue appending interrupted");
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
byteCount.addAndGet(response.getContent().readableBytes());
return ClientResponse.<InputStream>finished(
new SequenceInputStream(
new Enumeration<InputStream>()
{
@Override
public boolean hasMoreElements()
{
// Done is always true until the last stream has be put in the queue.
// Then the stream should be spouting good InputStreams.
synchronized (done) {
return !done.get() || !queue.isEmpty();
}
}
@Override
public InputStream nextElement()
{
synchronized (done) {
try {
// Ensures more elements are expected via `done`
return queue.take();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
}
}
}
)
);
}
@Override
public ClientResponse<InputStream> handleChunk(
ClientResponse<InputStream> clientResponse, HttpChunk chunk
)
{
final ChannelBuffer channelBuffer = chunk.getContent();
final int bytes = channelBuffer.readableBytes();
if (bytes > 0) {
try {
queue.put(new ChannelBufferInputStream(channelBuffer));
}
catch (InterruptedException e) {
log.error(e, "Unable to put finalizing input stream into Sequence queue for url [%s]", url);
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
byteCount.addAndGet(bytes);
}
return clientResponse;
}
@Override
public ClientResponse<InputStream> done(ClientResponse<InputStream> clientResponse)
{
long stopTime = System.currentTimeMillis();
log.debug(
"Completed request to url[%s] with %,d bytes returned in %,d millis [%,f b/s].",
url,
byteCount.get(),
stopTime - startTime,
byteCount.get() / (0.0001 * (stopTime - startTime))
);
synchronized (done) {
try {
// An empty byte array is put at the end to give the SequenceInputStream.close() as something to close out
// after done is set to true, regardless of the rest of the stream's state.
queue.put(ByteSource.empty().openStream());
}
catch (InterruptedException e) {
log.error(e, "Unable to put finalizing input stream into Sequence queue for url [%s]", url);
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
catch (IOException e) {
// This should never happen
throw Throwables.propagate(e);
}
finally {
done.set(true);
}
}
return ClientResponse.<InputStream>finished(clientResponse.getObj());
}
@Override
public void exceptionCaught(final ClientResponse<InputStream> clientResponse, final Throwable e)
{
// Don't wait for lock in case the lock had something to do with the error
synchronized (done) {
done.set(true);
// Make a best effort to put a zero length buffer into the queue in case something is waiting on the take()
// If nothing is waiting on take(), this will be closed out anyways.
queue.offer(
new InputStream()
{
@Override
public int read() throws IOException
{
throw new IOException(e);
}
}
);
}
}
};
future = httpClient
.post(new URL(url))
.setContent(objectMapper.writeValueAsBytes(query))
.setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON)
.go(
new InputStreamResponseHandler()
{
long startTime;
long byteCount = 0;
@Override
public ClientResponse<AppendableByteArrayInputStream> handleResponse(HttpResponse response)
{
log.debug("Initial response from url[%s]", url);
startTime = System.currentTimeMillis();
byteCount += response.getContent().readableBytes();
try {
final String responseContext = response.headers().get("X-Druid-Response-Context");
// context may be null in case of error or query timeout
if (responseContext != null) {
context.putAll(
objectMapper.<Map<String, Object>>readValue(
responseContext, new TypeReference<Map<String, Object>>()
{
}
)
);
}
}
catch (IOException e) {
log.error(e, "Unable to parse response context from url[%s]", url);
}
return super.handleResponse(response);
}
@Override
public ClientResponse<AppendableByteArrayInputStream> handleChunk(
ClientResponse<AppendableByteArrayInputStream> clientResponse, HttpChunk chunk
)
{
final int bytes = chunk.getContent().readableBytes();
byteCount += bytes;
return super.handleChunk(clientResponse, chunk);
}
@Override
public ClientResponse<InputStream> done(ClientResponse<AppendableByteArrayInputStream> clientResponse)
{
long stopTime = System.currentTimeMillis();
log.debug(
"Completed request to url[%s] with %,d bytes returned in %,d millis [%,f b/s].",
url,
byteCount,
stopTime - startTime,
byteCount / (0.0001 * (stopTime - startTime))
);
return super.done(clientResponse);
}
}
);
.setHeader(
HttpHeaders.Names.CONTENT_TYPE,
isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON
)
.go(responseHandler);
queryWatcher.registerQuery(query, future);
@ -228,7 +336,10 @@ public class DirectDruidClient<T> implements QueryRunner<T>
StatusResponseHolder res = httpClient
.delete(new URL(cancelUrl))
.setContent(objectMapper.writeValueAsBytes(query))
.setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON)
.setHeader(
HttpHeaders.Names.CONTENT_TYPE,
isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON
)
.go(new StatusResponseHandler(Charsets.UTF_8))
.get();
if (res.getStatus().getCode() >= 500) {