From 6d2747adcfdd38941b7d5e8b0cfddc8e8229e6a7 Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 30 Sep 2014 14:50:48 -0700 Subject: [PATCH 1/6] make loading local files easier --- .../firehose/LocalFirehoseFactory.java | 25 ++++++++----------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java index 859b73348ab..6be8f08ec0d 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java @@ -30,11 +30,12 @@ import io.druid.data.input.impl.FileIteratingFirehose; import io.druid.data.input.impl.StringInputRowParser; import org.apache.commons.io.FileUtils; import org.apache.commons.io.LineIterator; +import org.apache.commons.io.filefilter.RegexFileFilter; +import org.apache.commons.io.filefilter.TrueFileFilter; import java.io.File; -import java.io.FilenameFilter; import java.io.IOException; -import java.util.Arrays; +import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; @@ -79,26 +80,20 @@ public class LocalFirehoseFactory implements FirehoseFactory foundFiles = FileUtils.listFiles( + baseDir, + new RegexFileFilter(filter), + TrueFileFilter.INSTANCE ); - if (foundFiles == null || foundFiles.length == 0) { + if (foundFiles == null || foundFiles.isEmpty()) { throw new ISE("Found no files to ingest! Check your schema."); } - final LinkedList files = Lists.newLinkedList( - Arrays.asList(foundFiles) + final LinkedList files = Lists.newLinkedList( + foundFiles ); - return new FileIteratingFirehose( new Iterator() { From 5b166134fbe40f1253481a9dda37c012d0e6fc65 Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 30 Sep 2014 14:59:02 -0700 Subject: [PATCH 2/6] add docs --- docs/content/Firehose.md | 35 ++++++++++++++++--- .../firehose/LocalFirehoseFactory.java | 4 +-- 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/docs/content/Firehose.md b/docs/content/Firehose.md index 065ba2472c1..422e268fb90 100644 --- a/docs/content/Firehose.md +++ b/docs/content/Firehose.md @@ -40,6 +40,36 @@ See [Examples](Examples.html). This firehose creates a stream of random numbers. This firehose ingests events from a define rabbit-mq queue. +#### LocalFirehose + +This Firehose can be used to read the data from files on local disk. +It can be used for POCs to ingest data on disk. +A sample local firehose spec is shown below: + +```json +{ + "type" : "local", + "filter" : "*.csv", + "parser" : { + "timestampSpec": { + "column": "mytimestamp", + "format": "yyyy-MM-dd HH:mm:ss" + }, + "data": { + "format": "tsv", + "columns": [...], + "dimensions": [...] + } + } +} +``` + +|property|description|required?| +|--------|-----------|---------| +|type|This should be local|yes| +|filter|A wildcard filter for files. See [here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter.html) for more information.|yes| +|data|A data spec similar to what is used for batch ingestion.|yes| + #### IngestSegmentFirehose This Firehose can be used to read the data from existing druid segments. @@ -63,11 +93,6 @@ A sample ingest firehose spec is shown below - |metrics|The list of metrics to select. If left empty, no metrics are returned. If left null or not defined, all metrics are selected.|no| |filter| See [Filters](Filters.html)|yes| - - - - - Parsing Data ------------ diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java index 6be8f08ec0d..22848b4cb2d 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java @@ -30,8 +30,8 @@ import io.druid.data.input.impl.FileIteratingFirehose; import io.druid.data.input.impl.StringInputRowParser; import org.apache.commons.io.FileUtils; import org.apache.commons.io.LineIterator; -import org.apache.commons.io.filefilter.RegexFileFilter; import org.apache.commons.io.filefilter.TrueFileFilter; +import org.apache.commons.io.filefilter.WildcardFileFilter; import java.io.File; import java.io.IOException; @@ -82,7 +82,7 @@ public class LocalFirehoseFactory implements FirehoseFactory foundFiles = FileUtils.listFiles( baseDir, - new RegexFileFilter(filter), + new WildcardFileFilter(filter), TrueFileFilter.INSTANCE ); From d9453f29f12cdf41b042ba28eabde759d518e9d5 Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 30 Sep 2014 15:02:06 -0700 Subject: [PATCH 3/6] fix docs --- docs/content/Firehose.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/content/Firehose.md b/docs/content/Firehose.md index 422e268fb90..f2c5d9cfa93 100644 --- a/docs/content/Firehose.md +++ b/docs/content/Firehose.md @@ -56,7 +56,7 @@ A sample local firehose spec is shown below: "format": "yyyy-MM-dd HH:mm:ss" }, "data": { - "format": "tsv", + "format": "csv", "columns": [...], "dimensions": [...] } @@ -66,7 +66,7 @@ A sample local firehose spec is shown below: |property|description|required?| |--------|-----------|---------| -|type|This should be local|yes| +|type|This should be "local".|yes| |filter|A wildcard filter for files. See [here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter.html) for more information.|yes| |data|A data spec similar to what is used for batch ingestion.|yes| From a0782d4c5454c55852d087b87f8af4b270cd5083 Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 30 Sep 2014 16:56:44 -0700 Subject: [PATCH 4/6] fix compile --- .../io/druid/indexing/common/config/FileTaskLogsConfig.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/config/FileTaskLogsConfig.java b/indexing-service/src/main/java/io/druid/indexing/common/config/FileTaskLogsConfig.java index d1add268faa..8228bda465a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/config/FileTaskLogsConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/config/FileTaskLogsConfig.java @@ -19,6 +19,7 @@ package io.druid.indexing.common.config; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import javax.validation.constraints.NotNull; @@ -30,6 +31,7 @@ public class FileTaskLogsConfig @NotNull private File directory = new File("log"); + @JsonCreator public FileTaskLogsConfig(File directory) { this.directory = directory; From c3bea245a7a0962c644b1605ecdea1205ce92982 Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 30 Sep 2014 17:20:52 -0700 Subject: [PATCH 5/6] fix up some bugs --- .../java/io/druid/guice/IndexingServiceTaskLogsModule.java | 2 +- .../io/druid/indexing/common/config/FileTaskLogsConfig.java | 6 ++++-- .../segment/realtime/firehose/LocalFirehoseFactory.java | 2 +- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/guice/IndexingServiceTaskLogsModule.java b/indexing-service/src/main/java/io/druid/guice/IndexingServiceTaskLogsModule.java index 33452d0dfd9..69c5f4b99ba 100644 --- a/indexing-service/src/main/java/io/druid/guice/IndexingServiceTaskLogsModule.java +++ b/indexing-service/src/main/java/io/druid/guice/IndexingServiceTaskLogsModule.java @@ -37,13 +37,13 @@ public class IndexingServiceTaskLogsModule implements Module public void configure(Binder binder) { PolyBind.createChoice(binder, "druid.indexer.logs.type", Key.get(TaskLogs.class), Key.get(FileTaskLogs.class)); + JsonConfigProvider.bind(binder, "druid.indexer.logs", FileTaskLogsConfig.class); final MapBinder taskLogBinder = Binders.taskLogsBinder(binder); taskLogBinder.addBinding("noop").to(NoopTaskLogs.class).in(LazySingleton.class); taskLogBinder.addBinding("file").to(FileTaskLogs.class).in(LazySingleton.class); binder.bind(NoopTaskLogs.class).in(LazySingleton.class); binder.bind(FileTaskLogs.class).in(LazySingleton.class); - JsonConfigProvider.bind(binder, "druid.indexer.logs", FileTaskLogsConfig.class); binder.bind(TaskLogPusher.class).to(TaskLogs.class); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/config/FileTaskLogsConfig.java b/indexing-service/src/main/java/io/druid/indexing/common/config/FileTaskLogsConfig.java index 8228bda465a..cc96ad9ae23 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/config/FileTaskLogsConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/config/FileTaskLogsConfig.java @@ -19,7 +19,6 @@ package io.druid.indexing.common.config; -import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import javax.validation.constraints.NotNull; @@ -31,7 +30,10 @@ public class FileTaskLogsConfig @NotNull private File directory = new File("log"); - @JsonCreator + public FileTaskLogsConfig() + { + } + public FileTaskLogsConfig(File directory) { this.directory = directory; diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java index 22848b4cb2d..4246e145fde 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java @@ -81,7 +81,7 @@ public class LocalFirehoseFactory implements FirehoseFactory foundFiles = FileUtils.listFiles( - baseDir, + baseDir.getAbsoluteFile(), new WildcardFileFilter(filter), TrueFileFilter.INSTANCE ); From 4fc0382d8afc6da502e689fb3a183dab9686356f Mon Sep 17 00:00:00 2001 From: fjy Date: Wed, 1 Oct 2014 09:13:59 -0700 Subject: [PATCH 6/6] add more logging --- .../segment/realtime/firehose/LocalFirehoseFactory.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java index 4246e145fde..dca9f440bb3 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactory.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.api.client.repackaged.com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.metamx.common.ISE; +import com.metamx.emitter.EmittingLogger; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.impl.FileIteratingFirehose; @@ -43,6 +44,8 @@ import java.util.LinkedList; */ public class LocalFirehoseFactory implements FirehoseFactory { + private static final EmittingLogger log = new EmittingLogger(LocalFirehoseFactory.class); + private final File baseDir; private final String filter; private final StringInputRowParser parser; @@ -80,6 +83,8 @@ public class LocalFirehoseFactory implements FirehoseFactory foundFiles = FileUtils.listFiles( baseDir.getAbsoluteFile(), new WildcardFileFilter(filter),