diff --git a/docs/content/Firehose.md b/docs/content/Firehose.md index 065ba2472c1..f2c5d9cfa93 100644 --- a/docs/content/Firehose.md +++ b/docs/content/Firehose.md @@ -40,6 +40,36 @@ See [Examples](Examples.html). This firehose creates a stream of random numbers. This firehose ingests events from a define rabbit-mq queue. +#### LocalFirehose + +This Firehose can be used to read the data from files on local disk. +It can be used for POCs to ingest data on disk. +A sample local firehose spec is shown below: + +```json +{ + "type" : "local", + "filter" : "*.csv", + "parser" : { + "timestampSpec": { + "column": "mytimestamp", + "format": "yyyy-MM-dd HH:mm:ss" + }, + "data": { + "format": "csv", + "columns": [...], + "dimensions": [...] + } + } +} +``` + +|property|description|required?| +|--------|-----------|---------| +|type|This should be "local".|yes| +|filter|A wildcard filter for files. See [here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter.html) for more information.|yes| +|data|A data spec similar to what is used for batch ingestion.|yes| + #### IngestSegmentFirehose This Firehose can be used to read the data from existing druid segments. @@ -63,11 +93,6 @@ A sample ingest firehose spec is shown below - |metrics|The list of metrics to select. If left empty, no metrics are returned. If left null or not defined, all metrics are selected.|no| |filter| See [Filters](Filters.html)|yes| - - - - - Parsing Data ------------ diff --git a/indexing-service/src/main/java/io/druid/guice/IndexingServiceTaskLogsModule.java b/indexing-service/src/main/java/io/druid/guice/IndexingServiceTaskLogsModule.java index 33452d0dfd9..69c5f4b99ba 100644 --- a/indexing-service/src/main/java/io/druid/guice/IndexingServiceTaskLogsModule.java +++ b/indexing-service/src/main/java/io/druid/guice/IndexingServiceTaskLogsModule.java @@ -37,13 +37,13 @@ public class IndexingServiceTaskLogsModule implements Module public void configure(Binder binder) { PolyBind.createChoice(binder, "druid.indexer.logs.type", Key.get(TaskLogs.class), Key.get(FileTaskLogs.class)); + JsonConfigProvider.bind(binder, "druid.indexer.logs", FileTaskLogsConfig.class); final MapBinder taskLogBinder = Binders.taskLogsBinder(binder); taskLogBinder.addBinding("noop").to(NoopTaskLogs.class).in(LazySingleton.class); taskLogBinder.addBinding("file").to(FileTaskLogs.class).in(LazySingleton.class); binder.bind(NoopTaskLogs.class).in(LazySingleton.class); binder.bind(FileTaskLogs.class).in(LazySingleton.class); - JsonConfigProvider.bind(binder, "druid.indexer.logs", FileTaskLogsConfig.class); binder.bind(TaskLogPusher.class).to(TaskLogs.class); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/config/FileTaskLogsConfig.java b/indexing-service/src/main/java/io/druid/indexing/common/config/FileTaskLogsConfig.java index d1add268faa..cc96ad9ae23 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/config/FileTaskLogsConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/config/FileTaskLogsConfig.java @@ -30,6 +30,10 @@ public class FileTaskLogsConfig @NotNull private File directory = new File("log"); + public FileTaskLogsConfig() + { + } + public FileTaskLogsConfig(File directory) { this.directory = directory; diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java index 859b73348ab..dca9f440bb3 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java @@ -24,17 +24,19 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.api.client.repackaged.com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.metamx.common.ISE; +import com.metamx.emitter.EmittingLogger; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.impl.FileIteratingFirehose; import io.druid.data.input.impl.StringInputRowParser; import org.apache.commons.io.FileUtils; import org.apache.commons.io.LineIterator; +import org.apache.commons.io.filefilter.TrueFileFilter; +import org.apache.commons.io.filefilter.WildcardFileFilter; import java.io.File; -import java.io.FilenameFilter; import java.io.IOException; -import java.util.Arrays; +import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; @@ -42,6 +44,8 @@ import java.util.LinkedList; */ public class LocalFirehoseFactory implements FirehoseFactory { + private static final EmittingLogger log = new EmittingLogger(LocalFirehoseFactory.class); + private final File baseDir; private final String filter; private final StringInputRowParser parser; @@ -79,26 +83,22 @@ public class LocalFirehoseFactory implements FirehoseFactory foundFiles = FileUtils.listFiles( + baseDir.getAbsoluteFile(), + new WildcardFileFilter(filter), + TrueFileFilter.INSTANCE ); - if (foundFiles == null || foundFiles.length == 0) { + if (foundFiles == null || foundFiles.isEmpty()) { throw new ISE("Found no files to ingest! Check your schema."); } - final LinkedList files = Lists.newLinkedList( - Arrays.asList(foundFiles) + final LinkedList files = Lists.newLinkedList( + foundFiles ); - return new FileIteratingFirehose( new Iterator() {