Merge pull request #774 from metamx/better-ingest

Make loading local files easier
This commit is contained in:
fjy 2014-10-01 10:14:23 -06:00
commit fc3f885948
4 changed files with 50 additions and 21 deletions

View File

@ -40,6 +40,36 @@ See [Examples](Examples.html). This firehose creates a stream of random numbers.
This firehose ingests events from a define rabbit-mq queue. This firehose ingests events from a define rabbit-mq queue.
#### LocalFirehose
This Firehose can be used to read the data from files on local disk.
It can be used for POCs to ingest data on disk.
A sample local firehose spec is shown below:
```json
{
"type" : "local",
"filter" : "*.csv",
"parser" : {
"timestampSpec": {
"column": "mytimestamp",
"format": "yyyy-MM-dd HH:mm:ss"
},
"data": {
"format": "csv",
"columns": [...],
"dimensions": [...]
}
}
}
```
|property|description|required?|
|--------|-----------|---------|
|type|This should be "local".|yes|
|filter|A wildcard filter for files. See [here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter.html) for more information.|yes|
|data|A data spec similar to what is used for batch ingestion.|yes|
#### IngestSegmentFirehose #### IngestSegmentFirehose
This Firehose can be used to read the data from existing druid segments. This Firehose can be used to read the data from existing druid segments.
@ -63,11 +93,6 @@ A sample ingest firehose spec is shown below -
|metrics|The list of metrics to select. If left empty, no metrics are returned. If left null or not defined, all metrics are selected.|no| |metrics|The list of metrics to select. If left empty, no metrics are returned. If left null or not defined, all metrics are selected.|no|
|filter| See [Filters](Filters.html)|yes| |filter| See [Filters](Filters.html)|yes|
Parsing Data Parsing Data
------------ ------------

View File

@ -37,13 +37,13 @@ public class IndexingServiceTaskLogsModule implements Module
public void configure(Binder binder) public void configure(Binder binder)
{ {
PolyBind.createChoice(binder, "druid.indexer.logs.type", Key.get(TaskLogs.class), Key.get(FileTaskLogs.class)); PolyBind.createChoice(binder, "druid.indexer.logs.type", Key.get(TaskLogs.class), Key.get(FileTaskLogs.class));
JsonConfigProvider.bind(binder, "druid.indexer.logs", FileTaskLogsConfig.class);
final MapBinder<String, TaskLogs> taskLogBinder = Binders.taskLogsBinder(binder); final MapBinder<String, TaskLogs> taskLogBinder = Binders.taskLogsBinder(binder);
taskLogBinder.addBinding("noop").to(NoopTaskLogs.class).in(LazySingleton.class); taskLogBinder.addBinding("noop").to(NoopTaskLogs.class).in(LazySingleton.class);
taskLogBinder.addBinding("file").to(FileTaskLogs.class).in(LazySingleton.class); taskLogBinder.addBinding("file").to(FileTaskLogs.class).in(LazySingleton.class);
binder.bind(NoopTaskLogs.class).in(LazySingleton.class); binder.bind(NoopTaskLogs.class).in(LazySingleton.class);
binder.bind(FileTaskLogs.class).in(LazySingleton.class); binder.bind(FileTaskLogs.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.indexer.logs", FileTaskLogsConfig.class);
binder.bind(TaskLogPusher.class).to(TaskLogs.class); binder.bind(TaskLogPusher.class).to(TaskLogs.class);
} }

View File

@ -30,6 +30,10 @@ public class FileTaskLogsConfig
@NotNull @NotNull
private File directory = new File("log"); private File directory = new File("log");
public FileTaskLogsConfig()
{
}
public FileTaskLogsConfig(File directory) public FileTaskLogsConfig(File directory)
{ {
this.directory = directory; this.directory = directory;

View File

@ -24,17 +24,19 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.repackaged.com.google.common.base.Throwables; import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose; import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory; import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.impl.FileIteratingFirehose; import io.druid.data.input.impl.FileIteratingFirehose;
import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.StringInputRowParser;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator; import org.apache.commons.io.LineIterator;
import org.apache.commons.io.filefilter.TrueFileFilter;
import org.apache.commons.io.filefilter.WildcardFileFilter;
import java.io.File; import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
@ -42,6 +44,8 @@ import java.util.LinkedList;
*/ */
public class LocalFirehoseFactory implements FirehoseFactory<StringInputRowParser> public class LocalFirehoseFactory implements FirehoseFactory<StringInputRowParser>
{ {
private static final EmittingLogger log = new EmittingLogger(LocalFirehoseFactory.class);
private final File baseDir; private final File baseDir;
private final String filter; private final String filter;
private final StringInputRowParser parser; private final StringInputRowParser parser;
@ -79,26 +83,22 @@ public class LocalFirehoseFactory implements FirehoseFactory<StringInputRowParse
@Override @Override
public Firehose connect(StringInputRowParser firehoseParser) throws IOException public Firehose connect(StringInputRowParser firehoseParser) throws IOException
{ {
File[] foundFiles = baseDir.listFiles( log.info("Searching for all [%s] in [%s]", filter, baseDir.getAbsoluteFile());
new FilenameFilter()
{ Collection<File> foundFiles = FileUtils.listFiles(
@Override baseDir.getAbsoluteFile(),
public boolean accept(File file, String name) new WildcardFileFilter(filter),
{ TrueFileFilter.INSTANCE
return name.contains(filter);
}
}
); );
if (foundFiles == null || foundFiles.length == 0) { if (foundFiles == null || foundFiles.isEmpty()) {
throw new ISE("Found no files to ingest! Check your schema."); throw new ISE("Found no files to ingest! Check your schema.");
} }
final LinkedList<File> files = Lists.<File>newLinkedList( final LinkedList<File> files = Lists.newLinkedList(
Arrays.asList(foundFiles) foundFiles
); );
return new FileIteratingFirehose( return new FileIteratingFirehose(
new Iterator<LineIterator>() new Iterator<LineIterator>()
{ {