From 92ea82da6d30264bb18c6bcf5d94e0a564d86748 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Thu, 11 Dec 2014 16:18:12 -0800 Subject: [PATCH 1/6] Fix the twitter firehose * It was missing some json annotations --- .../TwitterSpritzerFirehoseFactory.java | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/examples/src/main/java/io/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java b/examples/src/main/java/io/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java index e211bf8ba5c..6e338a27939 100644 --- a/examples/src/main/java/io/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java +++ b/examples/src/main/java/io/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java @@ -210,22 +210,22 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory theMap = new HashMap(2); // DIY json parsing // private final ObjectMapper omapper = new ObjectMapper(); private boolean maxTimeReached() { - if (maxRunMinutes <= 0) { + if (getMaxRunMinutes() <= 0) { return false; } else { - return (System.currentTimeMillis() - startMsec) / 60000L >= maxRunMinutes; + return (System.currentTimeMillis() - startMsec) / 60000L >= getMaxRunMinutes(); } } private boolean maxCountReached() { - return maxEventCount >= 0 && rowCount >= maxEventCount; + return getMaxEventCount() >= 0 && rowCount >= getMaxEventCount(); } @Override @@ -311,4 +311,15 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory Date: Fri, 12 Dec 2014 12:59:49 -0800 Subject: [PATCH 2/6] add loadstatus endpoint for serverView status --- .../io/druid/client/BrokerServerView.java | 8 +++ .../coordination/BaseZkCoordinator.java | 2 +- .../io/druid/server/http/BrokerResource.java | 50 +++++++++++++++++++ .../src/main/java/io/druid/cli/CliBroker.java | 3 ++ 4 files changed, 62 insertions(+), 1 deletion(-) create mode 100644 server/src/main/java/io/druid/server/http/BrokerResource.java diff --git a/server/src/main/java/io/druid/client/BrokerServerView.java b/server/src/main/java/io/druid/client/BrokerServerView.java index 543951df1c3..ca2d4d6cb99 100644 --- a/server/src/main/java/io/druid/client/BrokerServerView.java +++ b/server/src/main/java/io/druid/client/BrokerServerView.java @@ -66,6 +66,8 @@ public class BrokerServerView implements TimelineServerView private final ServerInventoryView baseView; private final TierSelectorStrategy tierSelectorStrategy; + private volatile boolean initialized = false; + @Inject public BrokerServerView( QueryToolChestWarehouse warehouse, @@ -109,6 +111,7 @@ public class BrokerServerView implements TimelineServerView @Override public CallbackAction segmentViewInitialized() { + initialized = true; return ServerView.CallbackAction.CONTINUE; } } @@ -128,6 +131,11 @@ public class BrokerServerView implements TimelineServerView ); } + public boolean isInitialized() + { + return initialized; + } + public void clear() { synchronized (lock) { diff --git a/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java index 43776480829..97fa1253ec5 100644 --- a/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java +++ b/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java @@ -54,7 +54,7 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler private final CuratorFramework curator; private volatile PathChildrenCache loadQueueCache; - private volatile boolean started; + private volatile boolean started = false; private final ListeningExecutorService loadingExec; public BaseZkCoordinator( diff --git a/server/src/main/java/io/druid/server/http/BrokerResource.java b/server/src/main/java/io/druid/server/http/BrokerResource.java new file mode 100644 index 00000000000..d843ca50e63 --- /dev/null +++ b/server/src/main/java/io/druid/server/http/BrokerResource.java @@ -0,0 +1,50 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server.http; + +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import io.druid.client.BrokerServerView; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +@Path("/druid/broker/v1") +public class BrokerResource +{ + private final BrokerServerView brokerServerView; + + @Inject + public BrokerResource(BrokerServerView brokerServerView) + { + this.brokerServerView = brokerServerView; + } + + @GET + @Path("/loadstatus") + @Produces(MediaType.APPLICATION_JSON) + public Response getLoadStatus() + { + return Response.ok(ImmutableMap.of("inventoryInitialized", brokerServerView.isInitialized())).build(); + } +} diff --git a/services/src/main/java/io/druid/cli/CliBroker.java b/services/src/main/java/io/druid/cli/CliBroker.java index 9050bc827aa..e5ac925da02 100644 --- a/services/src/main/java/io/druid/cli/CliBroker.java +++ b/services/src/main/java/io/druid/cli/CliBroker.java @@ -48,6 +48,7 @@ import io.druid.server.ClientInfoResource; import io.druid.server.ClientQuerySegmentWalker; import io.druid.server.QueryResource; import io.druid.server.coordination.broker.DruidBroker; +import io.druid.server.http.BrokerResource; import io.druid.server.initialization.JettyServerInitializer; import io.druid.server.metrics.MetricsModule; import io.druid.server.router.TieredBrokerConfig; @@ -87,6 +88,7 @@ public class CliBroker extends ServerRunnable binder.bind(QueryToolChestWarehouse.class).to(MapQueryToolChestWarehouse.class); binder.bind(CachingClusteredClient.class).in(LazySingleton.class); + binder.bind(BrokerServerView.class).in(LazySingleton.class); binder.bind(TimelineServerView.class).to(BrokerServerView.class).in(LazySingleton.class); binder.bind(Cache.class).toProvider(CacheProvider.class).in(ManageLifecycle.class); @@ -101,6 +103,7 @@ public class CliBroker extends ServerRunnable binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class); Jerseys.addResource(binder, QueryResource.class); + Jerseys.addResource(binder, BrokerResource.class); Jerseys.addResource(binder, ClientInfoResource.class); LifecycleModule.register(binder, QueryResource.class); LifecycleModule.register(binder, DruidBroker.class); From ca8300a461d814d10197d177795c0586616390b5 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Wed, 19 Nov 2014 17:12:04 -0800 Subject: [PATCH 3/6] Replace AppendableByteArrayInputStream in DirectDruidClient * Replace with SequenceInputStream fueled by an enumeration of ChannelBufferInputStream which directly wrap the response context ChannelBuffer * Added zero-length byte array when DirectDruidClient is done. This prevents an odd race condition on `done`. --- .../io/druid/client/DirectDruidClient.java | 237 +++++++++++++----- 1 file changed, 174 insertions(+), 63 deletions(-) diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index 32a84a05da3..3e562457248 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -1,6 +1,6 @@ /* * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License @@ -31,6 +31,7 @@ import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; import com.google.common.base.Charsets; import com.google.common.base.Throwables; import com.google.common.collect.Maps; +import com.google.common.io.ByteSource; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -43,9 +44,8 @@ import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.common.logger.Logger; import com.metamx.http.client.HttpClient; -import com.metamx.http.client.io.AppendableByteArrayInputStream; import com.metamx.http.client.response.ClientResponse; -import com.metamx.http.client.response.InputStreamResponseHandler; +import com.metamx.http.client.response.HttpResponseHandler; import com.metamx.http.client.response.StatusResponseHandler; import com.metamx.http.client.response.StatusResponseHolder; import io.druid.query.BySegmentResultValueClass; @@ -57,6 +57,8 @@ import io.druid.query.QueryToolChestWarehouse; import io.druid.query.QueryWatcher; import io.druid.query.Result; import io.druid.query.aggregation.MetricManipulatorFns; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBufferInputStream; import org.jboss.netty.handler.codec.http.HttpChunk; import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpResponse; @@ -65,13 +67,19 @@ import javax.ws.rs.core.MediaType; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.io.SequenceInputStream; import java.net.URL; +import java.util.Enumeration; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; /** */ @@ -143,68 +151,168 @@ public class DirectDruidClient implements QueryRunner try { log.debug("Querying url[%s]", url); + + final HttpResponseHandler responseHandler = new HttpResponseHandler() + { + private long startTime; + private final AtomicLong byteCount = new AtomicLong(0); + private final BlockingQueue queue = new LinkedBlockingQueue<>(); + private final AtomicBoolean done = new AtomicBoolean(false); + + @Override + public ClientResponse handleResponse(HttpResponse response) + { + log.debug("Initial response from url[%s]", url); + startTime = System.currentTimeMillis(); + try { + final String responseContext = response.headers().get("X-Druid-Response-Context"); + // context may be null in case of error or query timeout + if (responseContext != null) { + context.putAll( + objectMapper.>readValue( + responseContext, new TypeReference>() + { + } + ) + ); + } + queue.put(new ChannelBufferInputStream(response.getContent())); + } + catch (final IOException e) { + log.error(e, "Error parsing response context from url [%s]", url); + return ClientResponse.finished( + new InputStream() + { + @Override + public int read() throws IOException + { + throw e; + } + } + ); + } + catch (InterruptedException e) { + log.error(e, "Queue appending interrupted"); + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } + byteCount.addAndGet(response.getContent().readableBytes()); + return ClientResponse.finished( + new SequenceInputStream( + new Enumeration() + { + @Override + public boolean hasMoreElements() + { + // Done is always true until the last stream has be put in the queue. + // Then the stream should be spouting good InputStreams. + synchronized (done) { + return !done.get() || !queue.isEmpty(); + } + } + + @Override + public InputStream nextElement() + { + synchronized (done) { + try { + // Ensures more elements are expected via `done` + return queue.take(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } + } + } + } + ) + ); + } + + @Override + public ClientResponse handleChunk( + ClientResponse clientResponse, HttpChunk chunk + ) + { + final ChannelBuffer channelBuffer = chunk.getContent(); + final int bytes = channelBuffer.readableBytes(); + if (bytes > 0) { + try { + queue.put(new ChannelBufferInputStream(channelBuffer)); + } + catch (InterruptedException e) { + log.error(e, "Unable to put finalizing input stream into Sequence queue for url [%s]", url); + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } + byteCount.addAndGet(bytes); + } + return clientResponse; + } + + @Override + public ClientResponse done(ClientResponse clientResponse) + { + long stopTime = System.currentTimeMillis(); + log.debug( + "Completed request to url[%s] with %,d bytes returned in %,d millis [%,f b/s].", + url, + byteCount.get(), + stopTime - startTime, + byteCount.get() / (0.0001 * (stopTime - startTime)) + ); + synchronized (done) { + try { + // An empty byte array is put at the end to give the SequenceInputStream.close() as something to close out + // after done is set to true, regardless of the rest of the stream's state. + queue.put(ByteSource.empty().openStream()); + } + catch (InterruptedException e) { + log.error(e, "Unable to put finalizing input stream into Sequence queue for url [%s]", url); + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } + catch (IOException e) { + // This should never happen + throw Throwables.propagate(e); + } + finally { + done.set(true); + } + } + return ClientResponse.finished(clientResponse.getObj()); + } + + @Override + public void exceptionCaught(final ClientResponse clientResponse, final Throwable e) + { + // Don't wait for lock in case the lock had something to do with the error + synchronized (done) { + done.set(true); + // Make a best effort to put a zero length buffer into the queue in case something is waiting on the take() + // If nothing is waiting on take(), this will be closed out anyways. + queue.offer( + new InputStream() + { + @Override + public int read() throws IOException + { + throw new IOException(e); + } + } + ); + } + } + }; future = httpClient .post(new URL(url)) .setContent(objectMapper.writeValueAsBytes(query)) - .setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON) - .go( - new InputStreamResponseHandler() - { - long startTime; - long byteCount = 0; - - @Override - public ClientResponse handleResponse(HttpResponse response) - { - log.debug("Initial response from url[%s]", url); - startTime = System.currentTimeMillis(); - byteCount += response.getContent().readableBytes(); - - try { - final String responseContext = response.headers().get("X-Druid-Response-Context"); - // context may be null in case of error or query timeout - if (responseContext != null) { - context.putAll( - objectMapper.>readValue( - responseContext, new TypeReference>() - { - } - ) - ); - } - } - catch (IOException e) { - log.error(e, "Unable to parse response context from url[%s]", url); - } - - return super.handleResponse(response); - } - - @Override - public ClientResponse handleChunk( - ClientResponse clientResponse, HttpChunk chunk - ) - { - final int bytes = chunk.getContent().readableBytes(); - byteCount += bytes; - return super.handleChunk(clientResponse, chunk); - } - - @Override - public ClientResponse done(ClientResponse clientResponse) - { - long stopTime = System.currentTimeMillis(); - log.debug( - "Completed request to url[%s] with %,d bytes returned in %,d millis [%,f b/s].", - url, - byteCount, - stopTime - startTime, - byteCount / (0.0001 * (stopTime - startTime)) - ); - return super.done(clientResponse); - } - } - ); + .setHeader( + HttpHeaders.Names.CONTENT_TYPE, + isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON + ) + .go(responseHandler); queryWatcher.registerQuery(query, future); @@ -228,7 +336,10 @@ public class DirectDruidClient implements QueryRunner StatusResponseHolder res = httpClient .delete(new URL(cancelUrl)) .setContent(objectMapper.writeValueAsBytes(query)) - .setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON) + .setHeader( + HttpHeaders.Names.CONTENT_TYPE, + isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON + ) .go(new StatusResponseHandler(Charsets.UTF_8)) .get(); if (res.getStatus().getCode() >= 500) { From 092dfe0309964d1025415588906f2bf14f2d477a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Fri, 12 Dec 2014 17:05:40 -0800 Subject: [PATCH 4/6] fix IndexTaskTest tmp dir - Create local firehose files in a clean temp directory to avoid firehose reading other random temp files that start with 'druid' --- .../java/io/druid/indexing/common/task/IndexTaskTest.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index 5a486e1cedb..8e40afcc291 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -20,6 +20,7 @@ package io.druid.indexing.common.task; import com.google.common.collect.Lists; +import com.google.common.io.Files; import com.metamx.common.Granularity; import io.druid.data.input.impl.CSVParseSpec; import io.druid.data.input.impl.DimensionsSpec; @@ -57,7 +58,10 @@ public class IndexTaskTest @Test public void testDeterminePartitions() throws Exception { - File tmpFile = File.createTempFile("druid", "index"); + File tmpDir = Files.createTempDir(); + tmpDir.deleteOnExit(); + + File tmpFile = File.createTempFile("druid", "index", tmpDir); tmpFile.deleteOnExit(); PrintWriter writer = new PrintWriter(tmpFile); @@ -97,7 +101,7 @@ public class IndexTaskTest ), new IndexTask.IndexIOConfig( new LocalFirehoseFactory( - tmpFile.getParentFile(), + tmpDir, "druid*", null ) From 020f8653f0d426e083bd8484db5b07d3dabc5bf3 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Sat, 13 Dec 2014 08:01:38 -0800 Subject: [PATCH 5/6] Fix documented default value for druid.coordinator.merge.on. --- docs/content/Coordinator-Config.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/content/Coordinator-Config.md b/docs/content/Coordinator-Config.md index d1bc8fb49b0..40a117608a6 100644 --- a/docs/content/Coordinator-Config.md +++ b/docs/content/Coordinator-Config.md @@ -15,7 +15,7 @@ The coordinator module uses several of the default modules in [Configuration](Co |`druid.coordinator.period`|The run period for the coordinator. The coordinator’s operates by maintaining the current state of the world in memory and periodically looking at the set of segments available and segments being served to make decisions about whether any changes need to be made to the data topology. This property sets the delay between each of these runs.|PT60S| |`druid.coordinator.period.indexingPeriod`|How often to send indexing tasks to the indexing service. Only applies if merge or conversion is turned on.|PT1800S (30 mins)| |`druid.coordinator.startDelay`|The operation of the Coordinator works on the assumption that it has an up-to-date view of the state of the world when it runs, the current ZK interaction code, however, is written in a way that doesn’t allow the Coordinator to know for a fact that it’s done loading the current state of the world. This delay is a hack to give it enough time to believe that it has all the data.|PT300S| -|`druid.coordinator.merge.on`|Boolean flag for whether or not the coordinator should try and merge small segments into a more optimal segment size.|PT300S| +|`druid.coordinator.merge.on`|Boolean flag for whether or not the coordinator should try and merge small segments into a more optimal segment size.|false| |`druid.coordinator.conversion.on`|Boolean flag for converting old segment indexing versions to the latest segment indexing version.|false| |`druid.coordinator.load.timeout`|The timeout duration for when the coordinator assigns a segment to a historical node.|15 minutes| |`druid.manager.segment.pollDuration`|The duration between polls the Coordinator does for updates to the set of active segments. Generally defines the amount of lag time it can take for the coordinator to notice new segments.|PT1M| From bd91a404919d0e2a9c025db8e69a7f7e02f49732 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Sat, 13 Dec 2014 15:22:55 -0800 Subject: [PATCH 6/6] fix task log streaming --- .../io/druid/indexing/overlord/http/OverlordResource.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java index 050a5620856..76523504f53 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java @@ -412,9 +412,7 @@ public class OverlordResource try { final Optional stream = taskLogStreamer.streamTaskLog(taskid, offset); if (stream.isPresent()) { - try(InputStream istream = stream.get().openStream()) { - return Response.ok(istream).build(); - } + return Response.ok(stream.get().openStream()).build(); } else { return Response.status(Response.Status.NOT_FOUND) .entity(