From ed9e0cf9f6ea0a0c81388014ba4997a752d17360 Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 30 Sep 2013 16:03:26 -0700 Subject: [PATCH 1/6] add a local firehose for indexing local files --- .../common/index/FileIteratingFirehose.java | 116 ++++++++++++++ .../common/index/LocalFirehoseFactory.java | 102 ++++++++++++ .../common/index/StaticS3FirehoseFactory.java | 147 +++++------------- .../druid/indexing/common/task/IndexTask.java | 7 +- .../io/druid/data/input/TimestampSpec.java | 6 +- .../java/io/druid/cli/CliMiddleManager.java | 41 ++++- .../main/java/io/druid/cli/CliOverlord.java | 4 +- .../src/main/java/io/druid/cli/CliPeon.java | 4 +- .../java/io/druid/cli/CliRealtimeExample.java | 4 +- .../java/io/druid/guice/RealtimeModule.java | 5 +- 10 files changed, 315 insertions(+), 121 deletions(-) create mode 100644 indexing-service/src/main/java/io/druid/indexing/common/index/FileIteratingFirehose.java create mode 100644 indexing-service/src/main/java/io/druid/indexing/common/index/LocalFirehoseFactory.java diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/FileIteratingFirehose.java b/indexing-service/src/main/java/io/druid/indexing/common/index/FileIteratingFirehose.java new file mode 100644 index 00000000000..222009fd797 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/FileIteratingFirehose.java @@ -0,0 +1,116 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.indexing.common.index; + +import com.google.common.base.Throwables; +import io.druid.data.input.Firehose; +import io.druid.data.input.InputRow; +import io.druid.data.input.StringInputRowParser; +import org.apache.commons.io.LineIterator; + +import java.io.IOException; +import java.util.NoSuchElementException; +import java.util.Queue; + +/** + */ +public abstract class FileIteratingFirehose implements Firehose +{ + private final Queue objectQueue; + private final StringInputRowParser parser; + + private LineIterator lineIterator = null; + + protected FileIteratingFirehose(Queue objectQueue, StringInputRowParser parser) + { + this.objectQueue = objectQueue; + this.parser = parser; + } + + public abstract LineIterator makeLineIterator(T val) throws Exception; + + @Override + public boolean hasMore() + { + try { + nextFile(); + } catch(Exception e) { + throw Throwables.propagate(e); + } + + return lineIterator != null && lineIterator.hasNext(); + } + + @Override + public InputRow nextRow() + { + try { + nextFile(); + } catch(Exception e) { + throw Throwables.propagate(e); + } + + if(lineIterator == null) { + throw new NoSuchElementException(); + } + + return parser.parse(lineIterator.next()); + } + + @Override + public Runnable commit() + { + // Do nothing. + return new Runnable() { public void run() {} }; + } + + @Override + public void close() throws IOException + { + objectQueue.clear(); + if(lineIterator != null) { + lineIterator.close(); + } + } + + // Rolls over our streams and iterators to the next file, if appropriate + private void nextFile() throws Exception + { + + if (lineIterator == null || !lineIterator.hasNext()) { + + // Close old streams, maybe. + if (lineIterator != null) { + lineIterator.close(); + } + + // Open new streams, maybe. + final T nextObj = objectQueue.poll(); + if (nextObj != null) { + + try { + lineIterator = makeLineIterator(nextObj); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + } + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/LocalFirehoseFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/index/LocalFirehoseFactory.java new file mode 100644 index 00000000000..9918073aa6d --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/LocalFirehoseFactory.java @@ -0,0 +1,102 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.indexing.common.index; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Lists; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.StringInputRowParser; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.LineIterator; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.util.Arrays; + +/** + */ +public class LocalFirehoseFactory implements FirehoseFactory +{ + private final File baseDir; + private final String filter; + private final StringInputRowParser parser; + + @JsonCreator + public LocalFirehoseFactory( + @JsonProperty("baseDir") File baseDir, + @JsonProperty("filter") String filter, + @JsonProperty("parser") StringInputRowParser parser + ) + { + this.baseDir = baseDir; + this.filter = filter; + this.parser = parser; + } + + @JsonProperty + public File getBaseDir() + { + return baseDir; + } + + @JsonProperty + public String getFilter() + { + return filter; + } + + @JsonProperty + public StringInputRowParser getParser() + { + return parser; + } + + @Override + public Firehose connect() throws IOException + { + return new FileIteratingFirehose( + Lists.newLinkedList( + Arrays.asList( + baseDir.listFiles( + new FilenameFilter() + { + @Override + public boolean accept(File file, String name) + { + return name.contains(filter); + } + } + ) + ) + ), + parser + ) + { + @Override + public LineIterator makeLineIterator(File file) throws Exception + { + return FileUtils.lineIterator(file); + } + }; + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/StaticS3FirehoseFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/index/StaticS3FirehoseFactory.java index 6c06896b7c3..6c099b9683f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/StaticS3FirehoseFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/StaticS3FirehoseFactory.java @@ -23,15 +23,14 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.api.client.repackaged.com.google.common.base.Throwables; import com.google.common.base.Charsets; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.metamx.common.logger.Logger; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; -import io.druid.data.input.InputRow; import io.druid.data.input.StringInputRowParser; import org.apache.commons.io.IOUtils; import org.apache.commons.io.LineIterator; @@ -44,8 +43,6 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.net.URI; import java.util.List; -import java.util.NoSuchElementException; -import java.util.Queue; import java.util.zip.GZIPInputStream; /** @@ -54,15 +51,12 @@ import java.util.zip.GZIPInputStream; @JsonTypeName("s3") public class StaticS3FirehoseFactory implements FirehoseFactory { + private static final Logger log = new Logger(StaticS3FirehoseFactory.class); + private final S3Service s3Client; private final StringInputRowParser parser; private final List uris; - private final long retryCount = 5; - private final long retryMillis = 5000; - - private static final Logger log = new Logger(StaticS3FirehoseFactory.class); - @JsonCreator public StaticS3FirehoseFactory( @JacksonInject("s3Client") S3Service s3Client, @@ -96,112 +90,47 @@ public class StaticS3FirehoseFactory implements FirehoseFactory { Preconditions.checkNotNull(s3Client, "null s3Client"); - return new Firehose() + return new FileIteratingFirehose( + Lists.newLinkedList(uris), + parser + ) { - LineIterator lineIterator = null; - final Queue objectQueue = Lists.newLinkedList(uris); - - // Rolls over our streams and iterators to the next file, if appropriate - private void maybeNextFile() throws Exception - { - - if (lineIterator == null || !lineIterator.hasNext()) { - - // Close old streams, maybe. - if (lineIterator != null) { - lineIterator.close(); - } - - // Open new streams, maybe. - final URI nextURI = objectQueue.poll(); - if (nextURI != null) { - - final String s3Bucket = nextURI.getAuthority(); - final S3Object s3Object = new S3Object( - nextURI.getPath().startsWith("/") - ? nextURI.getPath().substring(1) - : nextURI.getPath() - ); - - log.info("Reading from bucket[%s] object[%s] (%s)", s3Bucket, s3Object.getKey(), nextURI); - - int ntry = 0; - try { - final InputStream innerInputStream = s3Client.getObject(s3Bucket, s3Object.getKey()) - .getDataInputStream(); - - final InputStream outerInputStream = s3Object.getKey().endsWith(".gz") - ? new GZIPInputStream(innerInputStream) - : innerInputStream; - - lineIterator = IOUtils.lineIterator( - new BufferedReader( - new InputStreamReader(outerInputStream, Charsets.UTF_8) - ) - ); - } catch(IOException e) { - log.error( - e, - "Exception reading from bucket[%s] object[%s] (try %d) (sleeping %d millis)", - s3Bucket, - s3Object.getKey(), - ntry, - retryMillis - ); - - ntry ++; - if(ntry <= retryCount) { - Thread.sleep(retryMillis); - } - } - - } - } - - } - @Override - public boolean hasMore() + public LineIterator makeLineIterator(URI nextURI) throws Exception { + final String s3Bucket = nextURI.getAuthority(); + final S3Object s3Object = new S3Object( + nextURI.getPath().startsWith("/") + ? nextURI.getPath().substring(1) + : nextURI.getPath() + ); + + log.info("Reading from bucket[%s] object[%s] (%s)", s3Bucket, s3Object.getKey(), nextURI); + try { - maybeNextFile(); - } catch(Exception e) { + final InputStream innerInputStream = s3Client.getObject(s3Bucket, s3Object.getKey()) + .getDataInputStream(); + + final InputStream outerInputStream = s3Object.getKey().endsWith(".gz") + ? new GZIPInputStream(innerInputStream) + : innerInputStream; + + return IOUtils.lineIterator( + new BufferedReader( + new InputStreamReader(outerInputStream, Charsets.UTF_8) + ) + ); + } + catch (IOException e) { + log.error( + e, + "Exception reading from bucket[%s] object[%s]", + s3Bucket, + s3Object.getKey() + ); + throw Throwables.propagate(e); } - - return lineIterator != null && lineIterator.hasNext(); - } - - @Override - public InputRow nextRow() - { - try { - maybeNextFile(); - } catch(Exception e) { - throw Throwables.propagate(e); - } - - if(lineIterator == null) { - throw new NoSuchElementException(); - } - - return parser.parse(lineIterator.next()); - } - - @Override - public Runnable commit() - { - // Do nothing. - return new Runnable() { public void run() {} }; - } - - @Override - public void close() throws IOException - { - objectQueue.clear(); - if(lineIterator != null) { - lineIterator.close(); - } } }; } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index adc46835285..8056e71dc11 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -43,6 +43,8 @@ import java.util.List; public class IndexTask extends AbstractTask { + private static final Logger log = new Logger(IndexTask.class); + @JsonIgnore private final GranularitySpec granularitySpec; @@ -64,8 +66,6 @@ public class IndexTask extends AbstractTask @JsonIgnore private final int rowFlushBoundary; - private static final Logger log = new Logger(IndexTask.class); - @JsonCreator public IndexTask( @JsonProperty("id") String id, @@ -94,7 +94,7 @@ public class IndexTask extends AbstractTask ? Lists.newArrayList() : spatialDimensions; this.aggregators = aggregators; - this.indexGranularity = indexGranularity; + this.indexGranularity = (indexGranularity == null) ? QueryGranularity.NONE : indexGranularity; this.targetPartitionSize = targetPartitionSize; this.firehoseFactory = firehoseFactory; this.rowFlushBoundary = rowFlushBoundary; @@ -202,5 +202,4 @@ public class IndexTask extends AbstractTask { return rowFlushBoundary; } - } diff --git a/processing/src/main/java/io/druid/data/input/TimestampSpec.java b/processing/src/main/java/io/druid/data/input/TimestampSpec.java index 4052cb63537..ef10d726c4e 100644 --- a/processing/src/main/java/io/druid/data/input/TimestampSpec.java +++ b/processing/src/main/java/io/druid/data/input/TimestampSpec.java @@ -31,6 +31,8 @@ import java.util.Map; */ public class TimestampSpec { + private static final String defaultFormat = "auto"; + private final String timestampColumn; private final String timestampFormat; private final Function timestampConverter; @@ -42,8 +44,8 @@ public class TimestampSpec ) { this.timestampColumn = timestampColumn; - this.timestampFormat = format; - this.timestampConverter = ParserUtils.createTimestampParser(format); + this.timestampFormat = format == null ? defaultFormat : format; + this.timestampConverter = ParserUtils.createTimestampParser(timestampFormat); } @JsonProperty("column") diff --git a/services/src/main/java/io/druid/cli/CliMiddleManager.java b/services/src/main/java/io/druid/cli/CliMiddleManager.java index f2a0659b9e9..6dd11175ef6 100644 --- a/services/src/main/java/io/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/io/druid/cli/CliMiddleManager.java @@ -19,11 +19,16 @@ package io.druid.cli; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; -import com.google.inject.Module; import com.google.inject.Provides; import com.metamx.common.logger.Logger; +import druid.examples.flights.FlightsFirehoseFactory; +import druid.examples.rand.RandomFirehoseFactory; +import druid.examples.twitter.TwitterSpritzerFirehoseFactory; +import druid.examples.web.WebFirehoseFactory; import io.airlift.command.Command; import io.druid.guice.IndexingServiceModuleHelper; import io.druid.guice.Jerseys; @@ -32,6 +37,9 @@ import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; import io.druid.guice.ManageLifecycle; import io.druid.guice.annotations.Self; +import io.druid.indexing.common.index.EventReceiverFirehoseFactory; +import io.druid.indexing.common.index.LocalFirehoseFactory; +import io.druid.indexing.common.index.StaticS3FirehoseFactory; import io.druid.indexing.coordinator.ForkingTaskRunner; import io.druid.indexing.coordinator.TaskRunner; import io.druid.indexing.worker.Worker; @@ -39,10 +47,17 @@ import io.druid.indexing.worker.WorkerCuratorCoordinator; import io.druid.indexing.worker.WorkerTaskMonitor; import io.druid.indexing.worker.config.WorkerConfig; import io.druid.indexing.worker.http.WorkerResource; +import io.druid.initialization.DruidModule; +import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; +import io.druid.segment.realtime.firehose.IrcFirehoseFactory; +import io.druid.segment.realtime.firehose.KafkaFirehoseFactory; +import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory; +import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; import io.druid.server.DruidNode; import io.druid.server.initialization.JettyServerInitializer; import org.eclipse.jetty.server.Server; +import java.util.Arrays; import java.util.List; /** @@ -64,7 +79,7 @@ public class CliMiddleManager extends ServerRunnable protected List getModules() { return ImmutableList.of( - new Module() + new DruidModule() { @Override public void configure(Binder binder) @@ -97,6 +112,28 @@ public class CliMiddleManager extends ServerRunnable config.getVersion() ); } + + @Override + public List getJacksonModules() + { + return Arrays.asList( + new SimpleModule("RealtimeModule") + .registerSubtypes( + new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"), + new NamedType(FlightsFirehoseFactory.class, "flights"), + new NamedType(RandomFirehoseFactory.class, "rand"), + new NamedType(WebFirehoseFactory.class, "webstream"), + new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2"), + new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"), + new NamedType(ClippedFirehoseFactory.class, "clipped"), + new NamedType(TimedShutoffFirehoseFactory.class, "timed"), + new NamedType(IrcFirehoseFactory.class, "irc"), + new NamedType(StaticS3FirehoseFactory.class, "s3"), + new NamedType(EventReceiverFirehoseFactory.class, "receiver"), + new NamedType(LocalFirehoseFactory.class, "local") + ) + ); + } } ); } diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index c8fe70ab2b8..3ea3c44a284 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -48,6 +48,7 @@ import io.druid.indexing.common.actions.LocalTaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionToolbox; import io.druid.indexing.common.index.EventReceiverFirehoseFactory; +import io.druid.indexing.common.index.LocalFirehoseFactory; import io.druid.indexing.common.index.StaticS3FirehoseFactory; import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer; import io.druid.indexing.common.tasklogs.TaskLogStreamer; @@ -231,7 +232,8 @@ public class CliOverlord extends ServerRunnable new NamedType(TimedShutoffFirehoseFactory.class, "timed"), new NamedType(IrcFirehoseFactory.class, "irc"), new NamedType(StaticS3FirehoseFactory.class, "s3"), - new NamedType(EventReceiverFirehoseFactory.class, "receiver") + new NamedType(EventReceiverFirehoseFactory.class, "receiver"), + new NamedType(LocalFirehoseFactory.class, "local") ) ); } diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index 46edf595802..3dcdfc38fe7 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -53,6 +53,7 @@ import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.index.ChatHandlerProvider; import io.druid.indexing.common.index.EventReceiverFirehoseFactory; import io.druid.indexing.common.index.EventReceivingChatHandlerProvider; +import io.druid.indexing.common.index.LocalFirehoseFactory; import io.druid.indexing.common.index.NoopChatHandlerProvider; import io.druid.indexing.common.index.StaticS3FirehoseFactory; import io.druid.indexing.coordinator.TaskRunner; @@ -174,7 +175,8 @@ public class CliPeon extends GuiceRunnable new NamedType(TimedShutoffFirehoseFactory.class, "timed"), new NamedType(IrcFirehoseFactory.class, "irc"), new NamedType(StaticS3FirehoseFactory.class, "s3"), - new NamedType(EventReceiverFirehoseFactory.class, "receiver") + new NamedType(EventReceiverFirehoseFactory.class, "receiver"), + new NamedType(LocalFirehoseFactory.class, "local") ) ); } diff --git a/services/src/main/java/io/druid/cli/CliRealtimeExample.java b/services/src/main/java/io/druid/cli/CliRealtimeExample.java index e0ad5b471cd..5b73be2bb51 100644 --- a/services/src/main/java/io/druid/cli/CliRealtimeExample.java +++ b/services/src/main/java/io/druid/cli/CliRealtimeExample.java @@ -36,6 +36,7 @@ import io.druid.client.ServerView; import io.druid.guice.NoopSegmentPublisherProvider; import io.druid.guice.RealtimeModule; import io.druid.indexing.common.index.EventReceiverFirehoseFactory; +import io.druid.indexing.common.index.LocalFirehoseFactory; import io.druid.indexing.common.index.StaticS3FirehoseFactory; import io.druid.initialization.DruidModule; import io.druid.segment.loading.DataSegmentPusher; @@ -102,7 +103,8 @@ public class CliRealtimeExample extends ServerRunnable new NamedType(TimedShutoffFirehoseFactory.class, "timed"), new NamedType(IrcFirehoseFactory.class, "irc"), new NamedType(StaticS3FirehoseFactory.class, "s3"), - new NamedType(EventReceiverFirehoseFactory.class, "receiver") + new NamedType(EventReceiverFirehoseFactory.class, "receiver"), + new NamedType(LocalFirehoseFactory.class, "local") ) ); } diff --git a/services/src/main/java/io/druid/guice/RealtimeModule.java b/services/src/main/java/io/druid/guice/RealtimeModule.java index f3bfb156547..195439662de 100644 --- a/services/src/main/java/io/druid/guice/RealtimeModule.java +++ b/services/src/main/java/io/druid/guice/RealtimeModule.java @@ -32,6 +32,8 @@ import druid.examples.twitter.TwitterSpritzerFirehoseFactory; import druid.examples.web.WebFirehoseFactory; import io.druid.cli.QueryJettyServerInitializer; import io.druid.indexing.common.index.EventReceiverFirehoseFactory; +import io.druid.indexing.common.index.FileIteratingFirehose; +import io.druid.indexing.common.index.LocalFirehoseFactory; import io.druid.indexing.common.index.StaticS3FirehoseFactory; import io.druid.initialization.DruidModule; import io.druid.query.QuerySegmentWalker; @@ -100,7 +102,8 @@ public class RealtimeModule implements DruidModule new NamedType(TimedShutoffFirehoseFactory.class, "timed"), new NamedType(IrcFirehoseFactory.class, "irc"), new NamedType(StaticS3FirehoseFactory.class, "s3"), - new NamedType(EventReceiverFirehoseFactory.class, "receiver") + new NamedType(EventReceiverFirehoseFactory.class, "receiver"), + new NamedType(LocalFirehoseFactory.class, "local") ) ); } From f55a5199b14ed697726a168ed98ad3c2fd47584a Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 30 Sep 2013 16:29:20 -0700 Subject: [PATCH 2/6] add a firehose module to remove so much copy and pasted code --- .../common/index/FileIteratingFirehose.java | 4 +- .../java/io/druid/cli/CliMiddleManager.java | 41 +---------- .../main/java/io/druid/cli/CliOverlord.java | 40 +--------- .../src/main/java/io/druid/cli/CliPeon.java | 24 +----- .../java/io/druid/cli/CliRealtimeExample.java | 42 +---------- .../java/io/druid/cli/Initialization.java | 4 +- .../java/io/druid/guice/FirehoseModule.java | 73 +++++++++++++++++++ .../java/io/druid/guice/RealtimeModule.java | 48 ++---------- 8 files changed, 90 insertions(+), 186 deletions(-) create mode 100644 services/src/main/java/io/druid/guice/FirehoseModule.java diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/FileIteratingFirehose.java b/indexing-service/src/main/java/io/druid/indexing/common/index/FileIteratingFirehose.java index 222009fd797..b6f1070d4bf 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/FileIteratingFirehose.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/FileIteratingFirehose.java @@ -20,6 +20,7 @@ package io.druid.indexing.common.index; import com.google.common.base.Throwables; +import io.druid.common.guava.Runnables; import io.druid.data.input.Firehose; import io.druid.data.input.InputRow; import io.druid.data.input.StringInputRowParser; @@ -77,8 +78,7 @@ public abstract class FileIteratingFirehose implements Firehose @Override public Runnable commit() { - // Do nothing. - return new Runnable() { public void run() {} }; + return Runnables.getNoopRunnable(); } @Override diff --git a/services/src/main/java/io/druid/cli/CliMiddleManager.java b/services/src/main/java/io/druid/cli/CliMiddleManager.java index 6dd11175ef6..f2a0659b9e9 100644 --- a/services/src/main/java/io/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/io/druid/cli/CliMiddleManager.java @@ -19,16 +19,11 @@ package io.druid.cli; -import com.fasterxml.jackson.databind.jsontype.NamedType; -import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; +import com.google.inject.Module; import com.google.inject.Provides; import com.metamx.common.logger.Logger; -import druid.examples.flights.FlightsFirehoseFactory; -import druid.examples.rand.RandomFirehoseFactory; -import druid.examples.twitter.TwitterSpritzerFirehoseFactory; -import druid.examples.web.WebFirehoseFactory; import io.airlift.command.Command; import io.druid.guice.IndexingServiceModuleHelper; import io.druid.guice.Jerseys; @@ -37,9 +32,6 @@ import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; import io.druid.guice.ManageLifecycle; import io.druid.guice.annotations.Self; -import io.druid.indexing.common.index.EventReceiverFirehoseFactory; -import io.druid.indexing.common.index.LocalFirehoseFactory; -import io.druid.indexing.common.index.StaticS3FirehoseFactory; import io.druid.indexing.coordinator.ForkingTaskRunner; import io.druid.indexing.coordinator.TaskRunner; import io.druid.indexing.worker.Worker; @@ -47,17 +39,10 @@ import io.druid.indexing.worker.WorkerCuratorCoordinator; import io.druid.indexing.worker.WorkerTaskMonitor; import io.druid.indexing.worker.config.WorkerConfig; import io.druid.indexing.worker.http.WorkerResource; -import io.druid.initialization.DruidModule; -import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; -import io.druid.segment.realtime.firehose.IrcFirehoseFactory; -import io.druid.segment.realtime.firehose.KafkaFirehoseFactory; -import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory; -import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; import io.druid.server.DruidNode; import io.druid.server.initialization.JettyServerInitializer; import org.eclipse.jetty.server.Server; -import java.util.Arrays; import java.util.List; /** @@ -79,7 +64,7 @@ public class CliMiddleManager extends ServerRunnable protected List getModules() { return ImmutableList.of( - new DruidModule() + new Module() { @Override public void configure(Binder binder) @@ -112,28 +97,6 @@ public class CliMiddleManager extends ServerRunnable config.getVersion() ); } - - @Override - public List getJacksonModules() - { - return Arrays.asList( - new SimpleModule("RealtimeModule") - .registerSubtypes( - new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"), - new NamedType(FlightsFirehoseFactory.class, "flights"), - new NamedType(RandomFirehoseFactory.class, "rand"), - new NamedType(WebFirehoseFactory.class, "webstream"), - new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2"), - new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"), - new NamedType(ClippedFirehoseFactory.class, "clipped"), - new NamedType(TimedShutoffFirehoseFactory.class, "timed"), - new NamedType(IrcFirehoseFactory.class, "irc"), - new NamedType(StaticS3FirehoseFactory.class, "s3"), - new NamedType(EventReceiverFirehoseFactory.class, "receiver"), - new NamedType(LocalFirehoseFactory.class, "local") - ) - ); - } } ); } diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index 3ea3c44a284..446283e0dee 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -19,8 +19,6 @@ package io.druid.cli; -import com.fasterxml.jackson.databind.jsontype.NamedType; -import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Injector; @@ -30,10 +28,6 @@ import com.google.inject.TypeLiteral; import com.google.inject.multibindings.MapBinder; import com.google.inject.servlet.GuiceFilter; import com.metamx.common.logger.Logger; -import druid.examples.flights.FlightsFirehoseFactory; -import druid.examples.rand.RandomFirehoseFactory; -import druid.examples.twitter.TwitterSpritzerFirehoseFactory; -import druid.examples.web.WebFirehoseFactory; import io.airlift.command.Command; import io.druid.guice.IndexingServiceModuleHelper; import io.druid.guice.JacksonConfigProvider; @@ -47,9 +41,6 @@ import io.druid.guice.PolyBind; import io.druid.indexing.common.actions.LocalTaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionToolbox; -import io.druid.indexing.common.index.EventReceiverFirehoseFactory; -import io.druid.indexing.common.index.LocalFirehoseFactory; -import io.druid.indexing.common.index.StaticS3FirehoseFactory; import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer; import io.druid.indexing.common.tasklogs.TaskLogStreamer; import io.druid.indexing.common.tasklogs.TaskLogs; @@ -78,12 +69,6 @@ import io.druid.indexing.coordinator.scaling.ResourceManagementStrategy; import io.druid.indexing.coordinator.scaling.SimpleResourceManagementConfig; import io.druid.indexing.coordinator.scaling.SimpleResourceManagementStrategy; import io.druid.indexing.coordinator.setup.WorkerSetupData; -import io.druid.initialization.DruidModule; -import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; -import io.druid.segment.realtime.firehose.IrcFirehoseFactory; -import io.druid.segment.realtime.firehose.KafkaFirehoseFactory; -import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory; -import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; import io.druid.server.http.RedirectFilter; import io.druid.server.http.RedirectInfo; import io.druid.server.initialization.JettyServerInitializer; @@ -99,7 +84,6 @@ import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.servlets.GzipFilter; import org.eclipse.jetty.util.resource.ResourceCollection; -import java.util.Arrays; import java.util.List; /** @@ -121,7 +105,7 @@ public class CliOverlord extends ServerRunnable protected List getModules() { return ImmutableList.of( - new DruidModule() + new Module() { @Override public void configure(Binder binder) @@ -215,28 +199,6 @@ public class CliOverlord extends ServerRunnable JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleResourceManagementConfig.class); } - - @Override - public List getJacksonModules() - { - return Arrays.asList( - new SimpleModule("RealtimeModule") - .registerSubtypes( - new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"), - new NamedType(FlightsFirehoseFactory.class, "flights"), - new NamedType(RandomFirehoseFactory.class, "rand"), - new NamedType(WebFirehoseFactory.class, "webstream"), - new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2"), - new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"), - new NamedType(ClippedFirehoseFactory.class, "clipped"), - new NamedType(TimedShutoffFirehoseFactory.class, "timed"), - new NamedType(IrcFirehoseFactory.class, "irc"), - new NamedType(StaticS3FirehoseFactory.class, "s3"), - new NamedType(EventReceiverFirehoseFactory.class, "receiver"), - new NamedType(LocalFirehoseFactory.class, "local") - ) - ); - } } ); } diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index 3dcdfc38fe7..77e140bbd8f 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -105,7 +105,7 @@ public class CliPeon extends GuiceRunnable protected List getModules() { return ImmutableList.of( - new DruidModule() + new Module() { @Override public void configure(Binder binder) @@ -158,28 +158,6 @@ public class CliPeon extends GuiceRunnable LifecycleModule.register(binder, Server.class); } - - @Override - public List getJacksonModules() - { - return Arrays.asList( - new SimpleModule("RealtimeModule") - .registerSubtypes( - new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"), - new NamedType(FlightsFirehoseFactory.class, "flights"), - new NamedType(RandomFirehoseFactory.class, "rand"), - new NamedType(WebFirehoseFactory.class, "webstream"), - new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2"), - new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"), - new NamedType(ClippedFirehoseFactory.class, "clipped"), - new NamedType(TimedShutoffFirehoseFactory.class, "timed"), - new NamedType(IrcFirehoseFactory.class, "irc"), - new NamedType(StaticS3FirehoseFactory.class, "s3"), - new NamedType(EventReceiverFirehoseFactory.class, "receiver"), - new NamedType(LocalFirehoseFactory.class, "local") - ) - ); - } } ); } diff --git a/services/src/main/java/io/druid/cli/CliRealtimeExample.java b/services/src/main/java/io/druid/cli/CliRealtimeExample.java index 5b73be2bb51..07e862b1178 100644 --- a/services/src/main/java/io/druid/cli/CliRealtimeExample.java +++ b/services/src/main/java/io/druid/cli/CliRealtimeExample.java @@ -19,39 +19,23 @@ package io.druid.cli; -import com.fasterxml.jackson.databind.Module; -import com.fasterxml.jackson.databind.jsontype.NamedType; -import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; +import com.google.inject.Module; import com.metamx.common.logger.Logger; -import druid.examples.flights.FlightsFirehoseFactory; -import druid.examples.rand.RandomFirehoseFactory; -import druid.examples.twitter.TwitterSpritzerFirehoseFactory; -import druid.examples.web.WebFirehoseFactory; import io.airlift.command.Command; import io.druid.client.DruidServer; import io.druid.client.InventoryView; import io.druid.client.ServerView; import io.druid.guice.NoopSegmentPublisherProvider; import io.druid.guice.RealtimeModule; -import io.druid.indexing.common.index.EventReceiverFirehoseFactory; -import io.druid.indexing.common.index.LocalFirehoseFactory; -import io.druid.indexing.common.index.StaticS3FirehoseFactory; -import io.druid.initialization.DruidModule; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.SegmentPublisher; -import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; -import io.druid.segment.realtime.firehose.IrcFirehoseFactory; -import io.druid.segment.realtime.firehose.KafkaFirehoseFactory; -import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory; -import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; import java.io.File; import java.io.IOException; -import java.util.Arrays; import java.util.List; import java.util.concurrent.Executor; @@ -75,7 +59,7 @@ public class CliRealtimeExample extends ServerRunnable { return ImmutableList.of( new RealtimeModule(), - new DruidModule() + new Module() { @Override public void configure(Binder binder) @@ -86,28 +70,6 @@ public class CliRealtimeExample extends ServerRunnable binder.bind(InventoryView.class).to(NoopInventoryView.class); binder.bind(ServerView.class).to(NoopServerView.class); } - - @Override - public List getJacksonModules() - { - return Arrays.asList( - new SimpleModule("RealtimeExampleModule") - .registerSubtypes( - new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"), - new NamedType(FlightsFirehoseFactory.class, "flights"), - new NamedType(RandomFirehoseFactory.class, "rand"), - new NamedType(WebFirehoseFactory.class, "webstream"), - new NamedType(KafkaFirehoseFactory.class, "kafka"), - new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"), - new NamedType(ClippedFirehoseFactory.class, "clipped"), - new NamedType(TimedShutoffFirehoseFactory.class, "timed"), - new NamedType(IrcFirehoseFactory.class, "irc"), - new NamedType(StaticS3FirehoseFactory.class, "s3"), - new NamedType(EventReceiverFirehoseFactory.class, "receiver"), - new NamedType(LocalFirehoseFactory.class, "local") - ) - ); - } } ); } diff --git a/services/src/main/java/io/druid/cli/Initialization.java b/services/src/main/java/io/druid/cli/Initialization.java index 76f502b1563..a3d687bb70c 100644 --- a/services/src/main/java/io/druid/cli/Initialization.java +++ b/services/src/main/java/io/druid/cli/Initialization.java @@ -40,6 +40,7 @@ import io.druid.guice.DataSegmentPusherPullerModule; import io.druid.guice.DbConnectorModule; import io.druid.guice.DruidProcessingModule; import io.druid.guice.DruidSecondaryModule; +import io.druid.guice.FirehoseModule; import io.druid.guice.HttpClientModule; import io.druid.guice.IndexingServiceDiscoveryModule; import io.druid.guice.JacksonConfigManagerModule; @@ -225,7 +226,8 @@ public class new JacksonConfigManagerModule(), new IndexingServiceDiscoveryModule(), new DataSegmentPusherPullerModule(), - new TaskLogsModule() + new TaskLogsModule(), + new FirehoseModule() ); ModuleList actualModules = new ModuleList(baseInjector); diff --git a/services/src/main/java/io/druid/guice/FirehoseModule.java b/services/src/main/java/io/druid/guice/FirehoseModule.java new file mode 100644 index 00000000000..5026d93e752 --- /dev/null +++ b/services/src/main/java/io/druid/guice/FirehoseModule.java @@ -0,0 +1,73 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.guice; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.inject.Binder; +import druid.examples.flights.FlightsFirehoseFactory; +import druid.examples.rand.RandomFirehoseFactory; +import druid.examples.twitter.TwitterSpritzerFirehoseFactory; +import druid.examples.web.WebFirehoseFactory; +import io.druid.indexing.common.index.EventReceiverFirehoseFactory; +import io.druid.indexing.common.index.LocalFirehoseFactory; +import io.druid.indexing.common.index.StaticS3FirehoseFactory; +import io.druid.initialization.DruidModule; +import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; +import io.druid.segment.realtime.firehose.IrcFirehoseFactory; +import io.druid.segment.realtime.firehose.KafkaFirehoseFactory; +import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory; +import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; + +import java.util.Arrays; +import java.util.List; + +/** + */ +public class FirehoseModule implements DruidModule +{ + @Override + public void configure(Binder binder) + { + } + + @Override + public List getJacksonModules() + { + return Arrays.asList( + new SimpleModule("FirehoseModule") + .registerSubtypes( + new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"), + new NamedType(FlightsFirehoseFactory.class, "flights"), + new NamedType(RandomFirehoseFactory.class, "rand"), + new NamedType(WebFirehoseFactory.class, "webstream"), + new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2"), + new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"), + new NamedType(ClippedFirehoseFactory.class, "clipped"), + new NamedType(TimedShutoffFirehoseFactory.class, "timed"), + new NamedType(IrcFirehoseFactory.class, "irc"), + new NamedType(StaticS3FirehoseFactory.class, "s3"), + new NamedType(EventReceiverFirehoseFactory.class, "receiver"), + new NamedType(LocalFirehoseFactory.class, "local") + ) + ); + } +} diff --git a/services/src/main/java/io/druid/guice/RealtimeModule.java b/services/src/main/java/io/druid/guice/RealtimeModule.java index 195439662de..c4fa8aa6461 100644 --- a/services/src/main/java/io/druid/guice/RealtimeModule.java +++ b/services/src/main/java/io/druid/guice/RealtimeModule.java @@ -19,43 +19,26 @@ package io.druid.guice; -import com.fasterxml.jackson.databind.Module; -import com.fasterxml.jackson.databind.jsontype.NamedType; -import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.inject.Binder; import com.google.inject.Key; +import com.google.inject.Module; import com.google.inject.TypeLiteral; import com.google.inject.multibindings.MapBinder; -import druid.examples.flights.FlightsFirehoseFactory; -import druid.examples.rand.RandomFirehoseFactory; -import druid.examples.twitter.TwitterSpritzerFirehoseFactory; -import druid.examples.web.WebFirehoseFactory; import io.druid.cli.QueryJettyServerInitializer; -import io.druid.indexing.common.index.EventReceiverFirehoseFactory; -import io.druid.indexing.common.index.FileIteratingFirehose; -import io.druid.indexing.common.index.LocalFirehoseFactory; -import io.druid.indexing.common.index.StaticS3FirehoseFactory; -import io.druid.initialization.DruidModule; import io.druid.query.QuerySegmentWalker; import io.druid.segment.realtime.DbSegmentPublisher; import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.NoopSegmentPublisher; import io.druid.segment.realtime.RealtimeManager; import io.druid.segment.realtime.SegmentPublisher; -import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; -import io.druid.segment.realtime.firehose.IrcFirehoseFactory; -import io.druid.segment.realtime.firehose.KafkaFirehoseFactory; -import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory; -import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; import io.druid.server.initialization.JettyServerInitializer; import org.eclipse.jetty.server.Server; -import java.util.Arrays; import java.util.List; /** */ -public class RealtimeModule implements DruidModule +public class RealtimeModule implements Module { @Override public void configure(Binder binder) @@ -66,7 +49,10 @@ public class RealtimeModule implements DruidModule Key.get(SegmentPublisher.class), Key.get(NoopSegmentPublisher.class) ); - final MapBinder publisherBinder = PolyBind.optionBinder(binder, Key.get(SegmentPublisher.class)); + final MapBinder publisherBinder = PolyBind.optionBinder( + binder, + Key.get(SegmentPublisher.class) + ); publisherBinder.addBinding("db").to(DbSegmentPublisher.class); binder.bind(DbSegmentPublisher.class).in(LazySingleton.class); @@ -85,26 +71,4 @@ public class RealtimeModule implements DruidModule LifecycleModule.register(binder, Server.class); } - - @Override - public List getJacksonModules() - { - return Arrays.asList( - new SimpleModule("RealtimeModule") - .registerSubtypes( - new NamedType(TwitterSpritzerFirehoseFactory.class, "twitzer"), - new NamedType(FlightsFirehoseFactory.class, "flights"), - new NamedType(RandomFirehoseFactory.class, "rand"), - new NamedType(WebFirehoseFactory.class, "webstream"), - new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2"), - new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"), - new NamedType(ClippedFirehoseFactory.class, "clipped"), - new NamedType(TimedShutoffFirehoseFactory.class, "timed"), - new NamedType(IrcFirehoseFactory.class, "irc"), - new NamedType(StaticS3FirehoseFactory.class, "s3"), - new NamedType(EventReceiverFirehoseFactory.class, "receiver"), - new NamedType(LocalFirehoseFactory.class, "local") - ) - ); - } } From 53698a135a07c755f84b637e547c09fcb1f1d5c9 Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 30 Sep 2013 18:00:59 -0700 Subject: [PATCH 3/6] add interface to new firehose as per code review comments --- .../common/index/StaticS3FirehoseFactory.java | 83 ++++++++++--------- .../firehose}/FileIteratingFirehose.java | 29 ++++--- .../firehose/LineIteratorFactory.java | 29 +++++++ .../firehose}/LocalFirehoseFactory.java | 19 +++-- .../src/main/java/io/druid/cli/CliPeon.java | 2 +- .../java/io/druid/guice/FirehoseModule.java | 2 +- 6 files changed, 102 insertions(+), 62 deletions(-) rename {indexing-service/src/main/java/io/druid/indexing/common/index => realtime/src/main/java/io/druid/segment/realtime/firehose}/FileIteratingFirehose.java (80%) create mode 100644 realtime/src/main/java/io/druid/segment/realtime/firehose/LineIteratorFactory.java rename {indexing-service/src/main/java/io/druid/indexing/common/index => realtime/src/main/java/io/druid/segment/realtime/firehose}/LocalFirehoseFactory.java (90%) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/StaticS3FirehoseFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/index/StaticS3FirehoseFactory.java index 6c099b9683f..3337c3b2edc 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/StaticS3FirehoseFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/StaticS3FirehoseFactory.java @@ -32,6 +32,8 @@ import com.metamx.common.logger.Logger; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.StringInputRowParser; +import io.druid.segment.realtime.firehose.FileIteratingFirehose; +import io.druid.segment.realtime.firehose.LineIteratorFactory; import org.apache.commons.io.IOUtils; import org.apache.commons.io.LineIterator; import org.jets3t.service.S3Service; @@ -91,47 +93,48 @@ public class StaticS3FirehoseFactory implements FirehoseFactory Preconditions.checkNotNull(s3Client, "null s3Client"); return new FileIteratingFirehose( + new LineIteratorFactory() + { + @Override + public LineIterator make(URI nextURI) throws Exception + { + final String s3Bucket = nextURI.getAuthority(); + final S3Object s3Object = new S3Object( + nextURI.getPath().startsWith("/") + ? nextURI.getPath().substring(1) + : nextURI.getPath() + ); + + log.info("Reading from bucket[%s] object[%s] (%s)", s3Bucket, s3Object.getKey(), nextURI); + + try { + final InputStream innerInputStream = s3Client.getObject(s3Bucket, s3Object.getKey()) + .getDataInputStream(); + + final InputStream outerInputStream = s3Object.getKey().endsWith(".gz") + ? new GZIPInputStream(innerInputStream) + : innerInputStream; + + return IOUtils.lineIterator( + new BufferedReader( + new InputStreamReader(outerInputStream, Charsets.UTF_8) + ) + ); + } + catch (IOException e) { + log.error( + e, + "Exception reading from bucket[%s] object[%s]", + s3Bucket, + s3Object.getKey() + ); + + throw Throwables.propagate(e); + } + } + }, Lists.newLinkedList(uris), parser - ) - { - @Override - public LineIterator makeLineIterator(URI nextURI) throws Exception - { - final String s3Bucket = nextURI.getAuthority(); - final S3Object s3Object = new S3Object( - nextURI.getPath().startsWith("/") - ? nextURI.getPath().substring(1) - : nextURI.getPath() - ); - - log.info("Reading from bucket[%s] object[%s] (%s)", s3Bucket, s3Object.getKey(), nextURI); - - try { - final InputStream innerInputStream = s3Client.getObject(s3Bucket, s3Object.getKey()) - .getDataInputStream(); - - final InputStream outerInputStream = s3Object.getKey().endsWith(".gz") - ? new GZIPInputStream(innerInputStream) - : innerInputStream; - - return IOUtils.lineIterator( - new BufferedReader( - new InputStreamReader(outerInputStream, Charsets.UTF_8) - ) - ); - } - catch (IOException e) { - log.error( - e, - "Exception reading from bucket[%s] object[%s]", - s3Bucket, - s3Object.getKey() - ); - - throw Throwables.propagate(e); - } - } - }; + ); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/FileIteratingFirehose.java b/realtime/src/main/java/io/druid/segment/realtime/firehose/FileIteratingFirehose.java similarity index 80% rename from indexing-service/src/main/java/io/druid/indexing/common/index/FileIteratingFirehose.java rename to realtime/src/main/java/io/druid/segment/realtime/firehose/FileIteratingFirehose.java index b6f1070d4bf..1b1018d29d4 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/FileIteratingFirehose.java +++ b/realtime/src/main/java/io/druid/segment/realtime/firehose/FileIteratingFirehose.java @@ -17,7 +17,7 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package io.druid.indexing.common.index; +package io.druid.segment.realtime.firehose; import com.google.common.base.Throwables; import io.druid.common.guava.Runnables; @@ -32,27 +32,32 @@ import java.util.Queue; /** */ -public abstract class FileIteratingFirehose implements Firehose +public class FileIteratingFirehose implements Firehose { + private final LineIteratorFactory lineIteratorFactory; private final Queue objectQueue; private final StringInputRowParser parser; private LineIterator lineIterator = null; - protected FileIteratingFirehose(Queue objectQueue, StringInputRowParser parser) + public FileIteratingFirehose( + LineIteratorFactory lineIteratorFactory, + Queue objectQueue, + StringInputRowParser parser + ) { + this.lineIteratorFactory = lineIteratorFactory; this.objectQueue = objectQueue; this.parser = parser; } - public abstract LineIterator makeLineIterator(T val) throws Exception; - @Override public boolean hasMore() { try { nextFile(); - } catch(Exception e) { + } + catch (Exception e) { throw Throwables.propagate(e); } @@ -64,11 +69,12 @@ public abstract class FileIteratingFirehose implements Firehose { try { nextFile(); - } catch(Exception e) { + } + catch (Exception e) { throw Throwables.propagate(e); } - if(lineIterator == null) { + if (lineIterator == null) { throw new NoSuchElementException(); } @@ -85,7 +91,7 @@ public abstract class FileIteratingFirehose implements Firehose public void close() throws IOException { objectQueue.clear(); - if(lineIterator != null) { + if (lineIterator != null) { lineIterator.close(); } } @@ -106,8 +112,9 @@ public abstract class FileIteratingFirehose implements Firehose if (nextObj != null) { try { - lineIterator = makeLineIterator(nextObj); - } catch (Exception e) { + lineIterator = lineIteratorFactory.make(nextObj); + } + catch (Exception e) { throw Throwables.propagate(e); } } diff --git a/realtime/src/main/java/io/druid/segment/realtime/firehose/LineIteratorFactory.java b/realtime/src/main/java/io/druid/segment/realtime/firehose/LineIteratorFactory.java new file mode 100644 index 00000000000..eb0c6f2d33b --- /dev/null +++ b/realtime/src/main/java/io/druid/segment/realtime/firehose/LineIteratorFactory.java @@ -0,0 +1,29 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.segment.realtime.firehose; + +import org.apache.commons.io.LineIterator; + +/** + */ +public interface LineIteratorFactory +{ + public LineIterator make(T val) throws Exception; +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/LocalFirehoseFactory.java b/realtime/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java similarity index 90% rename from indexing-service/src/main/java/io/druid/indexing/common/index/LocalFirehoseFactory.java rename to realtime/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java index 9918073aa6d..0e5397cf701 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/LocalFirehoseFactory.java +++ b/realtime/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java @@ -17,7 +17,7 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package io.druid.indexing.common.index; +package io.druid.segment.realtime.firehose; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -75,6 +75,14 @@ public class LocalFirehoseFactory implements FirehoseFactory public Firehose connect() throws IOException { return new FileIteratingFirehose( + new LineIteratorFactory() + { + @Override + public LineIterator make(File file) throws Exception + { + return FileUtils.lineIterator(file); + } + }, Lists.newLinkedList( Arrays.asList( baseDir.listFiles( @@ -90,13 +98,6 @@ public class LocalFirehoseFactory implements FirehoseFactory ) ), parser - ) - { - @Override - public LineIterator makeLineIterator(File file) throws Exception - { - return FileUtils.lineIterator(file); - } - }; + ); } } diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index 77e140bbd8f..960c0b2fd67 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -53,7 +53,7 @@ import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.index.ChatHandlerProvider; import io.druid.indexing.common.index.EventReceiverFirehoseFactory; import io.druid.indexing.common.index.EventReceivingChatHandlerProvider; -import io.druid.indexing.common.index.LocalFirehoseFactory; +import io.druid.segment.realtime.firehose.LocalFirehoseFactory; import io.druid.indexing.common.index.NoopChatHandlerProvider; import io.druid.indexing.common.index.StaticS3FirehoseFactory; import io.druid.indexing.coordinator.TaskRunner; diff --git a/services/src/main/java/io/druid/guice/FirehoseModule.java b/services/src/main/java/io/druid/guice/FirehoseModule.java index 5026d93e752..8312149f1c1 100644 --- a/services/src/main/java/io/druid/guice/FirehoseModule.java +++ b/services/src/main/java/io/druid/guice/FirehoseModule.java @@ -28,7 +28,7 @@ import druid.examples.rand.RandomFirehoseFactory; import druid.examples.twitter.TwitterSpritzerFirehoseFactory; import druid.examples.web.WebFirehoseFactory; import io.druid.indexing.common.index.EventReceiverFirehoseFactory; -import io.druid.indexing.common.index.LocalFirehoseFactory; +import io.druid.segment.realtime.firehose.LocalFirehoseFactory; import io.druid.indexing.common.index.StaticS3FirehoseFactory; import io.druid.initialization.DruidModule; import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; From d6445cd8f3958a7c4dc453be8ca0c3756460ee45 Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 1 Oct 2013 09:42:28 -0700 Subject: [PATCH 4/6] remove imports --- services/src/main/java/io/druid/cli/CliPeon.java | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index 960c0b2fd67..b4b9ce4e661 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -19,8 +19,6 @@ package io.druid.cli; -import com.fasterxml.jackson.databind.jsontype.NamedType; -import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; @@ -30,10 +28,6 @@ import com.google.inject.Module; import com.google.inject.multibindings.MapBinder; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; -import druid.examples.flights.FlightsFirehoseFactory; -import druid.examples.rand.RandomFirehoseFactory; -import druid.examples.twitter.TwitterSpritzerFirehoseFactory; -import druid.examples.web.WebFirehoseFactory; import io.airlift.command.Arguments; import io.airlift.command.Command; import io.airlift.command.Option; @@ -51,27 +45,18 @@ import io.druid.indexing.common.actions.RemoteTaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.index.ChatHandlerProvider; -import io.druid.indexing.common.index.EventReceiverFirehoseFactory; import io.druid.indexing.common.index.EventReceivingChatHandlerProvider; -import io.druid.segment.realtime.firehose.LocalFirehoseFactory; import io.druid.indexing.common.index.NoopChatHandlerProvider; -import io.druid.indexing.common.index.StaticS3FirehoseFactory; import io.druid.indexing.coordinator.TaskRunner; import io.druid.indexing.coordinator.ThreadPoolTaskRunner; import io.druid.indexing.worker.executor.ChatHandlerResource; import io.druid.indexing.worker.executor.ExecutorLifecycle; import io.druid.indexing.worker.executor.ExecutorLifecycleConfig; -import io.druid.initialization.DruidModule; import io.druid.query.QuerySegmentWalker; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.S3DataSegmentKiller; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.StorageLocationConfig; -import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; -import io.druid.segment.realtime.firehose.IrcFirehoseFactory; -import io.druid.segment.realtime.firehose.KafkaFirehoseFactory; -import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory; -import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; import io.druid.server.initialization.JettyServerInitializer; import org.eclipse.jetty.server.Server; From 5d0d71250b68845966d89e89128146e6c234070f Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 1 Oct 2013 11:25:39 -0700 Subject: [PATCH 5/6] fix chat handler resources not correctly registering themselves --- .../common/index/ChatHandlerProvider.java | 2 ++ .../index/EventReceiverFirehoseFactory.java | 32 ++++++++++------- .../EventReceivingChatHandlerProvider.java | 7 +++- .../common/index/NoopChatHandlerProvider.java | 6 ++++ .../druid/indexing/common/task/NoopTask.java | 34 ++++++++++++++++--- .../coordinator/RemoteTaskRunner.java | 2 +- .../worker/executor/ChatHandlerResource.java | 2 ++ .../java/io/druid/cli/CliMiddleManager.java | 4 +++ .../main/java/io/druid/cli/CliOverlord.java | 9 ++++- .../src/main/java/io/druid/cli/CliPeon.java | 6 ++-- 10 files changed, 82 insertions(+), 22 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/ChatHandlerProvider.java b/indexing-service/src/main/java/io/druid/indexing/common/index/ChatHandlerProvider.java index 01af5c3cc62..49e74df0717 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/ChatHandlerProvider.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/ChatHandlerProvider.java @@ -25,6 +25,8 @@ import com.google.common.base.Optional; */ public interface ChatHandlerProvider { + public String getType(); + public void register(final String key, ChatHandler handler); public void unregister(final String key); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/EventReceiverFirehoseFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/index/EventReceiverFirehoseFactory.java index f26e0090953..8e66fd8f357 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/EventReceiverFirehoseFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/EventReceiverFirehoseFactory.java @@ -57,20 +57,20 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory private static final EmittingLogger log = new EmittingLogger(EventReceiverFirehoseFactory.class); private static final int DEFAULT_BUFFER_SIZE = 100000; - private final String firehoseId; + private final String serviceName; private final int bufferSize; private final MapInputRowParser parser; - private final Optional chatHandlerProvider; + private final Optional chatHandlerProvider; @JsonCreator public EventReceiverFirehoseFactory( - @JsonProperty("firehoseId") String firehoseId, + @JsonProperty("serviceName") String serviceName, @JsonProperty("bufferSize") Integer bufferSize, @JsonProperty("parser") MapInputRowParser parser, - @JacksonInject("chatHandlerProvider") EventReceivingChatHandlerProvider chatHandlerProvider + @JacksonInject ChatHandlerProvider chatHandlerProvider ) { - this.firehoseId = Preconditions.checkNotNull(firehoseId, "firehoseId"); + this.serviceName = Preconditions.checkNotNull(serviceName, "serviceName"); this.bufferSize = bufferSize == null || bufferSize <= 0 ? DEFAULT_BUFFER_SIZE : bufferSize; this.parser = Preconditions.checkNotNull(parser, "parser"); this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider); @@ -79,21 +79,24 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory @Override public Firehose connect() throws IOException { - log.info("Connecting firehose: %s", firehoseId); + log.info("Connecting firehose: %s", serviceName); final EventReceiverFirehose firehose = new EventReceiverFirehose(); if (chatHandlerProvider.isPresent()) { - chatHandlerProvider.get().register(firehoseId, firehose); + log.info("Found chathandler with type[%s]", chatHandlerProvider.get().getType()); + chatHandlerProvider.get().register(serviceName, firehose); + } else { + log.info("No chathandler detected"); } return firehose; } @JsonProperty - public String getFirehoseId() + public String getServiceName() { - return firehoseId; + return serviceName; } @JsonProperty @@ -111,7 +114,9 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory public class EventReceiverFirehose implements ChatHandler, Firehose { private final BlockingQueue buffer; + private final Object readLock = new Object(); + private volatile InputRow nextRow = null; private volatile boolean closed = false; @@ -125,7 +130,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory @Produces("application/json") public Response addAll(Collection> events) { - log.debug("Adding %,d events to firehose: %s", events.size(), firehoseId); + log.debug("Adding %,d events to firehose: %s", events.size(), serviceName); final List rows = Lists.newArrayList(); for (final Map event : events) { @@ -146,7 +151,8 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory } return Response.ok().entity(ImmutableMap.of("eventCount", events.size())).build(); - } catch (InterruptedException e) { + } + catch (InterruptedException e) { Thread.currentThread().interrupt(); throw Throwables.propagate(e); } @@ -167,7 +173,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory } return nextRow != null; - } + } } @Override @@ -205,7 +211,7 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory closed = true; if (chatHandlerProvider.isPresent()) { - chatHandlerProvider.get().unregister(firehoseId); + chatHandlerProvider.get().unregister(serviceName); } } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/EventReceivingChatHandlerProvider.java b/indexing-service/src/main/java/io/druid/indexing/common/index/EventReceivingChatHandlerProvider.java index 8fce0a3935a..93bdd2a53c6 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/EventReceivingChatHandlerProvider.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/EventReceivingChatHandlerProvider.java @@ -54,6 +54,12 @@ public class EventReceivingChatHandlerProvider implements ChatHandlerProvider this.handlers = Maps.newConcurrentMap(); } + @Override + public String getType() + { + return "eventReceiving"; + } + @Override public void register(final String service, ChatHandler handler) { @@ -76,7 +82,6 @@ public class EventReceivingChatHandlerProvider implements ChatHandlerProvider @Override public void unregister(final String service) { - log.info("Unregistering chat handler[%s]", service); final ChatHandler handler = handlers.get(service); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/NoopChatHandlerProvider.java b/indexing-service/src/main/java/io/druid/indexing/common/index/NoopChatHandlerProvider.java index e75cce88e28..b0b2d9069a7 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/NoopChatHandlerProvider.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/NoopChatHandlerProvider.java @@ -25,6 +25,12 @@ import com.google.common.base.Optional; */ public class NoopChatHandlerProvider implements ChatHandlerProvider { + @Override + public String getType() + { + return "noop"; + } + @Override public void register(String key, ChatHandler handler) { diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java index 230f074db72..c6291fdb4c9 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java @@ -22,6 +22,7 @@ package io.druid.indexing.common.task; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.metamx.common.logger.Logger; +import io.druid.data.input.FirehoseFactory; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; import org.joda.time.DateTime; @@ -33,11 +34,17 @@ import org.joda.time.Period; public class NoopTask extends AbstractTask { private static final Logger log = new Logger(NoopTask.class); + private static int defaultRunTime = 2500; + + private final int runTime; + private final FirehoseFactory firehoseFactory; @JsonCreator public NoopTask( @JsonProperty("id") String id, - @JsonProperty("interval") Interval interval + @JsonProperty("interval") Interval interval, + @JsonProperty("runTime") int runTime, + @JsonProperty("firehose") FirehoseFactory firehoseFactory ) { super( @@ -45,6 +52,10 @@ public class NoopTask extends AbstractTask "none", interval == null ? new Interval(Period.days(1), new DateTime()) : interval ); + + this.runTime = (runTime == 0) ? defaultRunTime : runTime; + + this.firehoseFactory = firehoseFactory; } @Override @@ -53,14 +64,29 @@ public class NoopTask extends AbstractTask return "noop"; } + @JsonProperty("runTime") + public int getRunTime() + { + return runTime; + } + + @JsonProperty("firehose") + public FirehoseFactory getFirehoseFactory() + { + return firehoseFactory; + } + @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { - final int sleepTime = 2500; + if (firehoseFactory != null) { + log.info("Connecting firehose"); + firehoseFactory.connect(); + } log.info("Running noop task[%s]", getId()); - log.info("Sleeping for %,d millis.", sleepTime); - Thread.sleep(sleepTime); + log.info("Sleeping for %,d millis.", runTime); + Thread.sleep(runTime); log.info("Woke up!"); return TaskStatus.success(getId()); } diff --git a/indexing-service/src/main/java/io/druid/indexing/coordinator/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/coordinator/RemoteTaskRunner.java index fb15cd75354..23767a87cb3 100644 --- a/indexing-service/src/main/java/io/druid/indexing/coordinator/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/coordinator/RemoteTaskRunner.java @@ -639,7 +639,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer log.info("Task[%s] just disappeared!", taskId); taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId())); } else { - log.warn("Task[%s] just disappeared but I didn't know about it?!", taskId); + log.info("Task[%s] went bye bye.", taskId); } break; } diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/executor/ChatHandlerResource.java b/indexing-service/src/main/java/io/druid/indexing/worker/executor/ChatHandlerResource.java index c52745c5d58..da061eb9bc2 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/executor/ChatHandlerResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/executor/ChatHandlerResource.java @@ -24,6 +24,7 @@ import com.google.inject.Inject; import io.druid.indexing.common.index.ChatHandler; import io.druid.indexing.common.index.ChatHandlerProvider; +import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.core.Response; @@ -39,6 +40,7 @@ public class ChatHandlerResource this.handlers = handlers; } + @POST @Path("/chat/{id}") public Object doTaskChat( @PathParam("id") String handlerId diff --git a/services/src/main/java/io/druid/cli/CliMiddleManager.java b/services/src/main/java/io/druid/cli/CliMiddleManager.java index f2a0659b9e9..dd3103a5b81 100644 --- a/services/src/main/java/io/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/io/druid/cli/CliMiddleManager.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provides; +import com.google.inject.util.Providers; import com.metamx.common.logger.Logger; import io.airlift.command.Command; import io.druid.guice.IndexingServiceModuleHelper; @@ -32,6 +33,7 @@ import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; import io.druid.guice.ManageLifecycle; import io.druid.guice.annotations.Self; +import io.druid.indexing.common.index.ChatHandlerProvider; import io.druid.indexing.coordinator.ForkingTaskRunner; import io.druid.indexing.coordinator.TaskRunner; import io.druid.indexing.worker.Worker; @@ -76,6 +78,8 @@ public class CliMiddleManager extends ServerRunnable binder.bind(TaskRunner.class).to(ForkingTaskRunner.class); binder.bind(ForkingTaskRunner.class).in(LazySingleton.class); + binder.bind(ChatHandlerProvider.class).toProvider(Providers.of(null)); + binder.bind(WorkerTaskMonitor.class).in(ManageLifecycle.class); binder.bind(WorkerCuratorCoordinator.class).in(ManageLifecycle.class); diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index 446283e0dee..a617b53d4c0 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -27,6 +27,7 @@ import com.google.inject.Module; import com.google.inject.TypeLiteral; import com.google.inject.multibindings.MapBinder; import com.google.inject.servlet.GuiceFilter; +import com.google.inject.util.Providers; import com.metamx.common.logger.Logger; import io.airlift.command.Command; import io.druid.guice.IndexingServiceModuleHelper; @@ -41,6 +42,7 @@ import io.druid.guice.PolyBind; import io.druid.indexing.common.actions.LocalTaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionToolbox; +import io.druid.indexing.common.index.ChatHandlerProvider; import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer; import io.druid.indexing.common.tasklogs.TaskLogStreamer; import io.druid.indexing.common.tasklogs.TaskLogs; @@ -131,6 +133,8 @@ public class CliOverlord extends ServerRunnable .to(ResourceManagementSchedulerFactoryImpl.class) .in(LazySingleton.class); + binder.bind(ChatHandlerProvider.class).toProvider(Providers.of(null)); + configureTaskStorage(binder); configureRunners(binder); configureAutoscale(binder); @@ -162,7 +166,10 @@ public class CliOverlord extends ServerRunnable private void configureRunners(Binder binder) { PolyBind.createChoice( - binder, "druid.indexer.runner.type", Key.get(TaskRunnerFactory.class), Key.get(ForkingTaskRunnerFactory.class) + binder, + "druid.indexer.runner.type", + Key.get(TaskRunnerFactory.class), + Key.get(ForkingTaskRunnerFactory.class) ); final MapBinder biddy = PolyBind.optionBinder(binder, Key.get(TaskRunnerFactory.class)); diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index b4b9ce4e661..aea4bfe881c 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -104,8 +104,10 @@ public class CliPeon extends GuiceRunnable final MapBinder handlerProviderBinder = PolyBind.optionBinder( binder, Key.get(ChatHandlerProvider.class) ); - handlerProviderBinder.addBinding("curator").to(EventReceivingChatHandlerProvider.class); - handlerProviderBinder.addBinding("noop").to(NoopChatHandlerProvider.class); + handlerProviderBinder.addBinding("receiver") + .to(EventReceivingChatHandlerProvider.class).in(LazySingleton.class); + handlerProviderBinder.addBinding("noop") + .to(NoopChatHandlerProvider.class).in(LazySingleton.class); binder.bind(TaskToolboxFactory.class).in(LazySingleton.class); From 30df53671ec28bda91618e3713eb3b215c718f59 Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 1 Oct 2013 13:21:20 -0700 Subject: [PATCH 6/6] remove line iterator factory because it is not needed --- .../common/index/StaticS3FirehoseFactory.java | 28 +++++++-- .../firehose/FileIteratingFirehose.java | 59 +++++-------------- .../firehose/LineIteratorFactory.java | 29 --------- .../firehose/LocalFirehoseFactory.java | 57 ++++++++++++------ 4 files changed, 77 insertions(+), 96 deletions(-) delete mode 100644 realtime/src/main/java/io/druid/segment/realtime/firehose/LineIteratorFactory.java diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/StaticS3FirehoseFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/index/StaticS3FirehoseFactory.java index 3337c3b2edc..69bdfa8d5ab 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/StaticS3FirehoseFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/StaticS3FirehoseFactory.java @@ -33,7 +33,6 @@ import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.StringInputRowParser; import io.druid.segment.realtime.firehose.FileIteratingFirehose; -import io.druid.segment.realtime.firehose.LineIteratorFactory; import org.apache.commons.io.IOUtils; import org.apache.commons.io.LineIterator; import org.jets3t.service.S3Service; @@ -44,6 +43,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.URI; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.zip.GZIPInputStream; @@ -92,12 +93,22 @@ public class StaticS3FirehoseFactory implements FirehoseFactory { Preconditions.checkNotNull(s3Client, "null s3Client"); - return new FileIteratingFirehose( - new LineIteratorFactory() + final LinkedList objectQueue = Lists.newLinkedList(uris); + + return new FileIteratingFirehose( + new Iterator() { @Override - public LineIterator make(URI nextURI) throws Exception + public boolean hasNext() { + return !objectQueue.isEmpty(); + } + + @Override + public LineIterator next() + { + final URI nextURI = objectQueue.poll(); + final String s3Bucket = nextURI.getAuthority(); final S3Object s3Object = new S3Object( nextURI.getPath().startsWith("/") @@ -121,7 +132,7 @@ public class StaticS3FirehoseFactory implements FirehoseFactory ) ); } - catch (IOException e) { + catch (Exception e) { log.error( e, "Exception reading from bucket[%s] object[%s]", @@ -132,8 +143,13 @@ public class StaticS3FirehoseFactory implements FirehoseFactory throw Throwables.propagate(e); } } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } }, - Lists.newLinkedList(uris), parser ); } diff --git a/realtime/src/main/java/io/druid/segment/realtime/firehose/FileIteratingFirehose.java b/realtime/src/main/java/io/druid/segment/realtime/firehose/FileIteratingFirehose.java index 1b1018d29d4..a8a6be14002 100644 --- a/realtime/src/main/java/io/druid/segment/realtime/firehose/FileIteratingFirehose.java +++ b/realtime/src/main/java/io/druid/segment/realtime/firehose/FileIteratingFirehose.java @@ -27,27 +27,25 @@ import io.druid.data.input.StringInputRowParser; import org.apache.commons.io.LineIterator; import java.io.IOException; +import java.util.Iterator; import java.util.NoSuchElementException; import java.util.Queue; /** */ -public class FileIteratingFirehose implements Firehose +public class FileIteratingFirehose implements Firehose { - private final LineIteratorFactory lineIteratorFactory; - private final Queue objectQueue; + private final Iterator lineIterators; private final StringInputRowParser parser; private LineIterator lineIterator = null; public FileIteratingFirehose( - LineIteratorFactory lineIteratorFactory, - Queue objectQueue, + Iterator lineIterators, StringInputRowParser parser ) { - this.lineIteratorFactory = lineIteratorFactory; - this.objectQueue = objectQueue; + this.lineIterators = lineIterators; this.parser = parser; } @@ -55,30 +53,31 @@ public class FileIteratingFirehose implements Firehose public boolean hasMore() { try { - nextFile(); + return lineIterators.hasNext() || (lineIterator != null && lineIterator.hasNext()); } catch (Exception e) { throw Throwables.propagate(e); } - - return lineIterator != null && lineIterator.hasNext(); } @Override public InputRow nextRow() { try { - nextFile(); + if (lineIterator == null || !lineIterator.hasNext()) { + // Close old streams, maybe. + if (lineIterator != null) { + lineIterator.close(); + } + + lineIterator = lineIterators.next(); + } + + return parser.parse(lineIterator.next()); } catch (Exception e) { throw Throwables.propagate(e); } - - if (lineIterator == null) { - throw new NoSuchElementException(); - } - - return parser.parse(lineIterator.next()); } @Override @@ -90,34 +89,8 @@ public class FileIteratingFirehose implements Firehose @Override public void close() throws IOException { - objectQueue.clear(); if (lineIterator != null) { lineIterator.close(); } } - - // Rolls over our streams and iterators to the next file, if appropriate - private void nextFile() throws Exception - { - - if (lineIterator == null || !lineIterator.hasNext()) { - - // Close old streams, maybe. - if (lineIterator != null) { - lineIterator.close(); - } - - // Open new streams, maybe. - final T nextObj = objectQueue.poll(); - if (nextObj != null) { - - try { - lineIterator = lineIteratorFactory.make(nextObj); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - } - } } diff --git a/realtime/src/main/java/io/druid/segment/realtime/firehose/LineIteratorFactory.java b/realtime/src/main/java/io/druid/segment/realtime/firehose/LineIteratorFactory.java deleted file mode 100644 index eb0c6f2d33b..00000000000 --- a/realtime/src/main/java/io/druid/segment/realtime/firehose/LineIteratorFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.segment.realtime.firehose; - -import org.apache.commons.io.LineIterator; - -/** - */ -public interface LineIteratorFactory -{ - public LineIterator make(T val) throws Exception; -} diff --git a/realtime/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java b/realtime/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java index 0e5397cf701..eccf17ff206 100644 --- a/realtime/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java +++ b/realtime/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java @@ -21,6 +21,7 @@ package io.druid.segment.realtime.firehose; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.api.client.repackaged.com.google.common.base.Throwables; import com.google.common.collect.Lists; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; @@ -32,6 +33,8 @@ import java.io.File; import java.io.FilenameFilter; import java.io.IOException; import java.util.Arrays; +import java.util.Iterator; +import java.util.LinkedList; /** */ @@ -74,29 +77,47 @@ public class LocalFirehoseFactory implements FirehoseFactory @Override public Firehose connect() throws IOException { - return new FileIteratingFirehose( - new LineIteratorFactory() + final LinkedList files = Lists.newLinkedList( + Arrays.asList( + baseDir.listFiles( + new FilenameFilter() + { + @Override + public boolean accept(File file, String name) + { + return name.contains(filter); + } + } + ) + ) + ); + + return new FileIteratingFirehose( + new Iterator() { @Override - public LineIterator make(File file) throws Exception + public boolean hasNext() { - return FileUtils.lineIterator(file); + return !files.isEmpty(); + } + + @Override + public LineIterator next() + { + try { + return FileUtils.lineIterator(files.poll()); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); } }, - Lists.newLinkedList( - Arrays.asList( - baseDir.listFiles( - new FilenameFilter() - { - @Override - public boolean accept(File file, String name) - { - return name.contains(filter); - } - } - ) - ) - ), parser ); }