add ability for client code to provide InputStream of input data in addition to File

It would be needed when input data file does not reside in the same jar
but you could still use getResourceAsStream() to read the data inside a file
This commit is contained in:
Himanshu Gupta 2015-08-20 00:54:58 -05:00
parent 1ecec1da5a
commit c57c07f28a
1 changed files with 63 additions and 15 deletions

View File

@ -29,6 +29,7 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.CharSource;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.guava.CloseQuietly;
@ -62,10 +63,13 @@ import io.druid.segment.Segment;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.LineIterator;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Iterator;
@ -145,6 +149,25 @@ public class AggregationTestHelper
}
}
public Sequence<Row> createIndexAndRunQueryOnSegment(
InputStream inputDataStream,
String parserJson,
String aggregators,
long minTimestamp,
QueryGranularity gran,
int maxRowCount,
String groupByQueryJson
) throws Exception
{
File segmentDir = Files.createTempDir();
try {
createIndex(inputDataStream, parserJson, aggregators, segmentDir, minTimestamp, gran, maxRowCount);
return runQueryOnSegments(Lists.newArrayList(segmentDir), groupByQueryJson);
} finally {
FileUtils.deleteDirectory(segmentDir);
}
}
public void createIndex(
File inputDataFile,
String parserJson,
@ -155,10 +178,31 @@ public class AggregationTestHelper
int maxRowCount
) throws Exception
{
createIndex(
new FileInputStream(inputDataFile),
parserJson,
aggregators,
outDir,
minTimestamp,
gran,
maxRowCount
);
}
public void createIndex(
InputStream inputDataStream,
String parserJson,
String aggregators,
File outDir,
long minTimestamp,
QueryGranularity gran,
int maxRowCount
) throws Exception
{
try {
StringInputRowParser parser = mapper.readValue(parserJson, StringInputRowParser.class);
LineIterator iter = FileUtils.lineIterator(inputDataFile, "UTF-8");
LineIterator iter = IOUtils.lineIterator(inputDataStream, "UTF-8");
List<AggregatorFactory> aggregatorSpecs = mapper.readValue(
aggregators,
new TypeReference<List<AggregatorFactory>>()
@ -177,6 +221,10 @@ public class AggregationTestHelper
maxRowCount
);
}
finally {
Closeables.close(inputDataStream, true);
}
}
public void createIndex(
Iterator rows,