diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index 32a84a05da3..3e562457248 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -1,6 +1,6 @@ /* * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License @@ -31,6 +31,7 @@ import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; import com.google.common.base.Charsets; import com.google.common.base.Throwables; import com.google.common.collect.Maps; +import com.google.common.io.ByteSource; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -43,9 +44,8 @@ import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.common.logger.Logger; import com.metamx.http.client.HttpClient; -import com.metamx.http.client.io.AppendableByteArrayInputStream; import com.metamx.http.client.response.ClientResponse; -import com.metamx.http.client.response.InputStreamResponseHandler; +import com.metamx.http.client.response.HttpResponseHandler; import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHolder; import io.druid.query.BySegmentResultValueClass; @@ -57,6 +57,8 @@ import io.druid.query.QueryToolChestWarehouse; import io.druid.query.QueryWatcher; import io.druid.query.Result; import io.druid.query.aggregation.MetricManipulatorFns; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBufferInputStream; import org.jboss.netty.handler.codec.http.HttpChunk; import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpResponse; @@ -65,13 +67,19 @@ import javax.ws.rs.core.MediaType; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.io.SequenceInputStream; import java.net.URL; +import java.util.Enumeration; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; /** */ @@ -143,68 +151,168 @@ public class DirectDruidClient implements QueryRunner try { log.debug("Querying url[%s]", url); + + final HttpResponseHandler responseHandler = new HttpResponseHandler() + { + private long startTime; + private final AtomicLong byteCount = new AtomicLong(0); + private final BlockingQueue queue = new LinkedBlockingQueue<>(); + private final AtomicBoolean done = new AtomicBoolean(false); + + @Override + public ClientResponse handleResponse(HttpResponse response) + { + log.debug("Initial response from url[%s]", url); + startTime = System.currentTimeMillis(); + try { + final String responseContext = response.headers().get("X-Druid-Response-Context"); + // context may be null in case of error or query timeout + if (responseContext != null) { + context.putAll( + objectMapper.>readValue( + responseContext, new TypeReference>() + { + } + ) + ); + } + queue.put(new ChannelBufferInputStream(response.getContent())); + } + catch (final IOException e) { + log.error(e, "Error parsing response context from url [%s]", url); + return ClientResponse.finished( + new InputStream() + { + @Override + public int read() throws IOException + { + throw e; + } + } + ); + } + catch (InterruptedException e) { + log.error(e, "Queue appending interrupted"); + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } + byteCount.addAndGet(response.getContent().readableBytes()); + return ClientResponse.finished( + new SequenceInputStream( + new Enumeration() + { + @Override + public boolean hasMoreElements() + { + // Done is always true until the last stream has be put in the queue. + // Then the stream should be spouting good InputStreams. + synchronized (done) { + return !done.get() || !queue.isEmpty(); + } + } + + @Override + public InputStream nextElement() + { + synchronized (done) { + try { + // Ensures more elements are expected via `done` + return queue.take(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } + } + } + } + ) + ); + } + + @Override + public ClientResponse handleChunk( + ClientResponse clientResponse, HttpChunk chunk + ) + { + final ChannelBuffer channelBuffer = chunk.getContent(); + final int bytes = channelBuffer.readableBytes(); + if (bytes > 0) { + try { + queue.put(new ChannelBufferInputStream(channelBuffer)); + } + catch (InterruptedException e) { + log.error(e, "Unable to put finalizing input stream into Sequence queue for url [%s]", url); + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } + byteCount.addAndGet(bytes); + } + return clientResponse; + } + + @Override + public ClientResponse done(ClientResponse clientResponse) + { + long stopTime = System.currentTimeMillis(); + log.debug( + "Completed request to url[%s] with %,d bytes returned in %,d millis [%,f b/s].", + url, + byteCount.get(), + stopTime - startTime, + byteCount.get() / (0.0001 * (stopTime - startTime)) + ); + synchronized (done) { + try { + // An empty byte array is put at the end to give the SequenceInputStream.close() as something to close out + // after done is set to true, regardless of the rest of the stream's state. + queue.put(ByteSource.empty().openStream()); + } + catch (InterruptedException e) { + log.error(e, "Unable to put finalizing input stream into Sequence queue for url [%s]", url); + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } + catch (IOException e) { + // This should never happen + throw Throwables.propagate(e); + } + finally { + done.set(true); + } + } + return ClientResponse.finished(clientResponse.getObj()); + } + + @Override + public void exceptionCaught(final ClientResponse clientResponse, final Throwable e) + { + // Don't wait for lock in case the lock had something to do with the error + synchronized (done) { + done.set(true); + // Make a best effort to put a zero length buffer into the queue in case something is waiting on the take() + // If nothing is waiting on take(), this will be closed out anyways. + queue.offer( + new InputStream() + { + @Override + public int read() throws IOException + { + throw new IOException(e); + } + } + ); + } + } + }; future = httpClient .post(new URL(url)) .setContent(objectMapper.writeValueAsBytes(query)) - .setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON) - .go( - new InputStreamResponseHandler() - { - long startTime; - long byteCount = 0; - - @Override - public ClientResponse handleResponse(HttpResponse response) - { - log.debug("Initial response from url[%s]", url); - startTime = System.currentTimeMillis(); - byteCount += response.getContent().readableBytes(); - - try { - final String responseContext = response.headers().get("X-Druid-Response-Context"); - // context may be null in case of error or query timeout - if (responseContext != null) { - context.putAll( - objectMapper.>readValue( - responseContext, new TypeReference>() - { - } - ) - ); - } - } - catch (IOException e) { - log.error(e, "Unable to parse response context from url[%s]", url); - } - - return super.handleResponse(response); - } - - @Override - public ClientResponse handleChunk( - ClientResponse clientResponse, HttpChunk chunk - ) - { - final int bytes = chunk.getContent().readableBytes(); - byteCount += bytes; - return super.handleChunk(clientResponse, chunk); - } - - @Override - public ClientResponse done(ClientResponse clientResponse) - { - long stopTime = System.currentTimeMillis(); - log.debug( - "Completed request to url[%s] with %,d bytes returned in %,d millis [%,f b/s].", - url, - byteCount, - stopTime - startTime, - byteCount / (0.0001 * (stopTime - startTime)) - ); - return super.done(clientResponse); - } - } - ); + .setHeader( + HttpHeaders.Names.CONTENT_TYPE, + isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON + ) + .go(responseHandler); queryWatcher.registerQuery(query, future); @@ -228,7 +336,10 @@ public class DirectDruidClient implements QueryRunner StatusResponseHolder res = httpClient .delete(new URL(cancelUrl)) .setContent(objectMapper.writeValueAsBytes(query)) - .setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON) + .setHeader( + HttpHeaders.Names.CONTENT_TYPE, + isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON + ) .go(new StatusResponseHandler(Charsets.UTF_8)) .get(); if (res.getStatus().getCode() >= 500) {