Allow Hadoop dataSource inputSpec to be specified multiple times. (#5717)

* Allow Hadoop dataSource inputSpec to be specified multiple times.

* Fix test
This commit is contained in:
Gian Merlino 2018-05-03 13:51:57 -07:00 committed by Fangjin Yang
parent df01998213
commit 739e347320
8 changed files with 387 additions and 204 deletions

View File

@ -76,7 +76,7 @@ For example
#### `multi` #### `multi`
This is a composing inputSpec to combine other inputSpecs. This inputSpec is used for delta ingestion. Please note that you can have only one `dataSource` as child of `multi` inputSpec. This is a composing inputSpec to combine other inputSpecs. This inputSpec is used for delta ingestion. You can also use a `multi` inputSpec to combine data from multiple dataSources. However, each particular dataSource can only be specified one time.
Note that, "useNewAggs" must be set to default value false to support delta-ingestion. Note that, "useNewAggs" must be set to default value false to support delta-ingestion.
|Field|Type|Description|Required| |Field|Type|Description|Required|

View File

@ -21,14 +21,14 @@ package io.druid.indexer.hadoop;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.JobHelper; import io.druid.indexer.JobHelper;
import io.druid.java.util.common.ISE; import io.druid.java.util.common.ISE;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.logger.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -56,82 +56,74 @@ public class DatasourceInputFormat extends InputFormat<NullWritable, InputRow>
{ {
private static final Logger logger = new Logger(DatasourceInputFormat.class); private static final Logger logger = new Logger(DatasourceInputFormat.class);
public static final String CONF_INPUT_SEGMENTS = "druid.segments"; private static final String CONF_DATASOURCES = "druid.datasource.input.datasources";
public static final String CONF_DRUID_SCHEMA = "druid.datasource.schema"; private static final String CONF_SCHEMA = "druid.datasource.input.schema";
public static final String CONF_TRANSFORM_SPEC = "druid.datasource.transformSpec"; private static final String CONF_SEGMENTS = "druid.datasource.input.segments";
public static final String CONF_MAX_SPLIT_SIZE = "druid.datasource.split.max.size"; private static final String CONF_MAX_SPLIT_SIZE = "druid.datasource.input.split.max.size";
@Override @Override
public List<InputSplit> getSplits(JobContext context) throws IOException public List<InputSplit> getSplits(JobContext context) throws IOException
{ {
JobConf conf = new JobConf(context.getConfiguration()); JobConf conf = new JobConf(context.getConfiguration());
String segmentsStr = Preconditions.checkNotNull( List<String> dataSources = getDataSources(conf);
conf.get(CONF_INPUT_SEGMENTS), List<InputSplit> splits = new ArrayList<>();
"No segments found to read"
); for (String dataSource : dataSources) {
List<WindowedDataSegment> segments = HadoopDruidIndexerConfig.JSON_MAPPER.readValue( List<WindowedDataSegment> segments = getSegments(conf, dataSource);
segmentsStr, if (segments == null || segments.size() == 0) {
new TypeReference<List<WindowedDataSegment>>() throw new ISE("No segments found to read for dataSource[%s]", dataSource);
{ }
// Note: Each segment is logged separately to avoid creating a huge String if we are loading lots of segments.
for (int i = 0; i < segments.size(); i++) {
final WindowedDataSegment segment = segments.get(i);
logger.info(
"Segment %,d/%,d for dataSource[%s] has identifier[%s], interval[%s]",
i,
segments.size(),
dataSource,
segment.getSegment().getIdentifier(),
segment.getInterval()
);
}
long maxSize = getMaxSplitSize(conf, dataSource);
if (maxSize < 0) {
long totalSize = 0;
for (WindowedDataSegment segment : segments) {
totalSize += segment.getSegment().getSize();
} }
); int mapTask = conf.getNumMapTasks();
if (segments == null || segments.size() == 0) { if (mapTask > 0) {
throw new ISE("No segments found to read"); maxSize = totalSize / mapTask;
} }
}
// Note: log is splitted into two lines so that a new String is not generated to print it. if (maxSize > 0) {
// segmentsStr could be quite large when re-indexing multiple months of data. //combining is to happen, let us sort the segments list by size so that they
logger.info("Segment to read are..."); //are combined appropriately
logger.info(segmentsStr); segments.sort(Comparator.comparingLong(s -> s.getSegment().getSize()));
}
long maxSize = conf.getLong(CONF_MAX_SPLIT_SIZE, 0); List<WindowedDataSegment> list = new ArrayList<>();
if (maxSize < 0) { long size = 0;
long totalSize = 0;
org.apache.hadoop.mapred.InputFormat fio = supplier.get();
for (WindowedDataSegment segment : segments) { for (WindowedDataSegment segment : segments) {
totalSize += segment.getSegment().getSize(); if (size + segment.getSegment().getSize() > maxSize && size > 0) {
splits.add(toDataSourceSplit(list, fio, conf));
list = new ArrayList<>();
size = 0;
}
list.add(segment);
size += segment.getSegment().getSize();
} }
int mapTask = conf.getNumMapTasks();
if (mapTask > 0) {
maxSize = totalSize / mapTask;
}
}
if (maxSize > 0) { if (list.size() > 0) {
//combining is to happen, let us sort the segments list by size so that they
//are combined appropriately
Collections.sort(
segments,
new Comparator<WindowedDataSegment>()
{
@Override
public int compare(WindowedDataSegment s1, WindowedDataSegment s2)
{
return Long.compare(s1.getSegment().getSize(), s2.getSegment().getSize());
}
}
);
}
List<InputSplit> splits = Lists.newArrayList();
List<WindowedDataSegment> list = new ArrayList<>();
long size = 0;
org.apache.hadoop.mapred.InputFormat fio = supplier.get();
for (WindowedDataSegment segment : segments) {
if (size + segment.getSegment().getSize() > maxSize && size > 0) {
splits.add(toDataSourceSplit(list, fio, conf)); splits.add(toDataSourceSplit(list, fio, conf));
list = Lists.newArrayList();
size = 0;
} }
list.add(segment);
size += segment.getSegment().getSize();
}
if (list.size() > 0) {
splits.add(toDataSourceSplit(list, fio, conf));
} }
logger.info("Number of splits [%d]", splits.size()); logger.info("Number of splits [%d]", splits.size());
@ -167,7 +159,7 @@ public class DatasourceInputFormat extends InputFormat<NullWritable, InputRow>
protected FileStatus[] listStatus(JobConf job) throws IOException protected FileStatus[] listStatus(JobConf job) throws IOException
{ {
// to avoid globbing which needs input path should be hadoop-compatible (':' is not acceptable in path, etc.) // to avoid globbing which needs input path should be hadoop-compatible (':' is not acceptable in path, etc.)
List<FileStatus> statusList = Lists.newArrayList(); List<FileStatus> statusList = new ArrayList<>();
for (Path path : FileInputFormat.getInputPaths(job)) { for (Path path : FileInputFormat.getInputPaths(job)) {
// load spec in segment points specifically zip file itself // load spec in segment points specifically zip file itself
statusList.add(path.getFileSystem(job).getFileStatus(path)); statusList.add(path.getFileSystem(job).getFileStatus(path));
@ -250,4 +242,90 @@ public class DatasourceInputFormat extends InputFormat<NullWritable, InputRow>
.map(Map.Entry::getKey) .map(Map.Entry::getKey)
.toArray(String[]::new); .toArray(String[]::new);
} }
public static List<String> getDataSources(final Configuration conf) throws IOException
{
final String currentDatasources = conf.get(CONF_DATASOURCES);
if (currentDatasources == null) {
return Collections.emptyList();
}
return HadoopDruidIndexerConfig.JSON_MAPPER.readValue(
currentDatasources,
new TypeReference<List<String>>() {}
);
}
public static DatasourceIngestionSpec getIngestionSpec(final Configuration conf, final String dataSource)
throws IOException
{
final String specString = conf.get(StringUtils.format("%s.%s", CONF_SCHEMA, dataSource));
if (specString == null) {
throw new NullPointerException(StringUtils.format("null spec for dataSource[%s]", dataSource));
}
final DatasourceIngestionSpec spec = HadoopDruidIndexerConfig.JSON_MAPPER.readValue(
specString,
DatasourceIngestionSpec.class
);
if (spec.getDimensions() == null || spec.getDimensions().size() == 0) {
throw new ISE("load schema does not have dimensions");
}
if (spec.getMetrics() == null || spec.getMetrics().size() == 0) {
throw new ISE("load schema does not have metrics");
}
return spec;
}
public static List<WindowedDataSegment> getSegments(final Configuration conf, final String dataSource)
throws IOException
{
return HadoopDruidIndexerConfig.JSON_MAPPER.readValue(
conf.get(StringUtils.format("%s.%s", CONF_SEGMENTS, dataSource)),
new TypeReference<List<WindowedDataSegment>>() {}
);
}
public static long getMaxSplitSize(final Configuration conf, final String dataSource)
{
return conf.getLong(StringUtils.format("%s.%s", CONF_MAX_SPLIT_SIZE, dataSource), 0L);
}
public static void addDataSource(
final Configuration conf,
final DatasourceIngestionSpec spec,
final List<WindowedDataSegment> segments,
final long maxSplitSize
) throws IOException
{
final List<String> dataSources = getDataSources(conf);
if (dataSources.contains(spec.getDataSource())) {
throw new ISE("Oops, cannot load the same dataSource twice!");
}
final List<String> newDataSources = new ArrayList<>(dataSources);
newDataSources.add(spec.getDataSource());
conf.set(CONF_DATASOURCES, HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(newDataSources));
conf.set(
StringUtils.format("%s.%s", CONF_SCHEMA, spec.getDataSource()),
HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(spec)
);
conf.set(
StringUtils.format("%s.%s", CONF_SEGMENTS, spec.getDataSource()),
HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(segments)
);
conf.set(
StringUtils.format("%s.%s", CONF_MAX_SPLIT_SIZE, spec.getDataSource()),
String.valueOf(maxSplitSize)
);
}
} }

View File

@ -19,10 +19,9 @@
package io.druid.indexer.hadoop; package io.druid.indexer.hadoop;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.io.Closeables; import com.google.common.io.Closeables;
import com.google.common.io.Files; import com.google.common.io.Files;
@ -30,14 +29,12 @@ import io.druid.data.input.InputRow;
import io.druid.data.input.Row; import io.druid.data.input.Row;
import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.JobHelper; import io.druid.indexer.JobHelper;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.logger.Logger;
import io.druid.segment.QueryableIndex; import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexStorageAdapter; import io.druid.segment.QueryableIndexStorageAdapter;
import io.druid.segment.realtime.firehose.IngestSegmentFirehose; import io.druid.segment.realtime.firehose.IngestSegmentFirehose;
import io.druid.segment.realtime.firehose.WindowedStorageAdapter; import io.druid.segment.realtime.firehose.WindowedStorageAdapter;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.InputSplit;
@ -63,11 +60,18 @@ public class DatasourceRecordReader extends RecordReader<NullWritable, InputRow>
private int numRows; private int numRows;
@Override @Override
public void initialize(InputSplit split, final TaskAttemptContext context) public void initialize(InputSplit split, final TaskAttemptContext context) throws IOException
{ {
spec = readAndVerifyDatasourceIngestionSpec(context.getConfiguration(), HadoopDruidIndexerConfig.JSON_MAPPER);
List<WindowedDataSegment> segments = ((DatasourceInputSplit) split).getSegments(); List<WindowedDataSegment> segments = ((DatasourceInputSplit) split).getSegments();
String dataSource = Iterators.getOnlyElement(
segments.stream()
.map(s -> s.getSegment().getDataSource())
.distinct()
.iterator()
);
spec = DatasourceInputFormat.getIngestionSpec(context.getConfiguration(), dataSource);
logger.info("load schema [%s]", spec);
List<WindowedStorageAdapter> adapters = Lists.transform( List<WindowedStorageAdapter> adapters = Lists.transform(
segments, segments,
@ -160,27 +164,4 @@ public class DatasourceRecordReader extends RecordReader<NullWritable, InputRow>
FileUtils.deleteDirectory(dir); FileUtils.deleteDirectory(dir);
} }
} }
private DatasourceIngestionSpec readAndVerifyDatasourceIngestionSpec(Configuration config, ObjectMapper jsonMapper)
{
try {
String schema = Preconditions.checkNotNull(config.get(DatasourceInputFormat.CONF_DRUID_SCHEMA), "null schema");
logger.info("load schema [%s]", schema);
DatasourceIngestionSpec spec = jsonMapper.readValue(schema, DatasourceIngestionSpec.class);
if (spec.getDimensions() == null || spec.getDimensions().size() == 0) {
throw new ISE("load schema does not have dimensions");
}
if (spec.getMetrics() == null || spec.getMetrics().size() == 0) {
throw new ISE("load schema does not have metrics");
}
return spec;
}
catch (IOException ex) {
throw new RuntimeException("couldn't load segment load spec", ex);
}
}
} }

View File

@ -190,11 +190,8 @@ public class DatasourcePathSpec implements PathSpec
config.getSchema().getDataSchema().getTransformSpec() config.getSchema().getDataSchema().getTransformSpec()
); );
job.getConfiguration().set(DatasourceInputFormat.CONF_DRUID_SCHEMA, mapper.writeValueAsString(updatedIngestionSpec)); DatasourceInputFormat.addDataSource(job.getConfiguration(), updatedIngestionSpec, segments, maxSplitSize);
job.getConfiguration().set(DatasourceInputFormat.CONF_INPUT_SEGMENTS, mapper.writeValueAsString(segments));
job.getConfiguration().set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, String.valueOf(maxSplitSize));
MultipleInputs.addInputPath(job, new Path("/dummy/tobe/ignored"), DatasourceInputFormat.class); MultipleInputs.addInputPath(job, new Path("/dummy/tobe/ignored"), DatasourceInputFormat.class);
return job; return job;
} }

View File

@ -119,7 +119,7 @@ public class BatchDeltaIngestionTest
"ingestionSpec", "ingestionSpec",
ImmutableMap.of( ImmutableMap.of(
"dataSource", "dataSource",
"xyz", "testds",
"interval", "interval",
INTERVAL_FULL INTERVAL_FULL
), ),
@ -180,7 +180,7 @@ public class BatchDeltaIngestionTest
"ingestionSpec", "ingestionSpec",
ImmutableMap.of( ImmutableMap.of(
"dataSource", "dataSource",
"xyz", "testds",
"interval", "interval",
INTERVAL_FULL INTERVAL_FULL
), ),
@ -239,7 +239,7 @@ public class BatchDeltaIngestionTest
"ingestionSpec", "ingestionSpec",
ImmutableMap.of( ImmutableMap.of(
"dataSource", "dataSource",
"xyz", "testds",
"interval", "interval",
INTERVAL_FULL INTERVAL_FULL
), ),
@ -314,7 +314,7 @@ public class BatchDeltaIngestionTest
"ingestionSpec", "ingestionSpec",
ImmutableMap.of( ImmutableMap.of(
"dataSource", "dataSource",
"xyz", "testds",
"interval", "interval",
INTERVAL_FULL INTERVAL_FULL
), ),

View File

@ -23,14 +23,16 @@ import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.io.Files; import com.google.common.io.Files;
import io.druid.indexer.JobHelper; import io.druid.indexer.JobHelper;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.Intervals; import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.JodaUtils;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.NoneShardSpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -51,8 +53,10 @@ import org.junit.rules.TemporaryFolder;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
/** /**
@ -62,7 +66,8 @@ public class DatasourceInputFormatTest
@Rule @Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder(); public final TemporaryFolder temporaryFolder = new TemporaryFolder();
private List<WindowedDataSegment> segments; private List<WindowedDataSegment> segments1;
private List<WindowedDataSegment> segments2;
private List<LocatedFileStatus> locations; private List<LocatedFileStatus> locations;
private JobConf config; private JobConf config;
private JobContext context; private JobContext context;
@ -70,7 +75,7 @@ public class DatasourceInputFormatTest
@Before @Before
public void setUp() throws Exception public void setUp() throws Exception
{ {
segments = ImmutableList.of( segments1 = ImmutableList.of(
WindowedDataSegment.of( WindowedDataSegment.of(
new DataSegment( new DataSegment(
"test1", "test1",
@ -89,7 +94,7 @@ public class DatasourceInputFormatTest
), ),
WindowedDataSegment.of( WindowedDataSegment.of(
new DataSegment( new DataSegment(
"test2", "test1",
Intervals.of("2050/3000"), Intervals.of("2050/3000"),
"ver", "ver",
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
@ -105,7 +110,7 @@ public class DatasourceInputFormatTest
), ),
WindowedDataSegment.of( WindowedDataSegment.of(
new DataSegment( new DataSegment(
"test3", "test1",
Intervals.of("2030/3000"), Intervals.of("2030/3000"),
"ver", "ver",
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
@ -121,9 +126,29 @@ public class DatasourceInputFormatTest
) )
); );
Path path1 = new Path(JobHelper.getURIFromSegment(segments.get(0).getSegment())); segments2 = ImmutableList.of(
Path path2 = new Path(JobHelper.getURIFromSegment(segments.get(1).getSegment())); WindowedDataSegment.of(
Path path3 = new Path(JobHelper.getURIFromSegment(segments.get(2).getSegment())); new DataSegment(
"test2",
Intervals.of("2000/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "local",
"path", "/tmp/index4.zip"
),
ImmutableList.of("host"),
ImmutableList.of("visited_sum", "unique_hosts"),
NoneShardSpec.instance(),
9,
2
)
)
);
Path path1 = new Path(JobHelper.getURIFromSegment(segments1.get(0).getSegment()));
Path path2 = new Path(JobHelper.getURIFromSegment(segments1.get(1).getSegment()));
Path path3 = new Path(JobHelper.getURIFromSegment(segments1.get(2).getSegment()));
Path path4 = new Path(JobHelper.getURIFromSegment(segments2.get(0).getSegment()));
// dummy locations for test // dummy locations for test
locations = ImmutableList.of( locations = ImmutableList.of(
@ -140,7 +165,7 @@ public class DatasourceInputFormatTest
new BlockLocation(null, new String[]{"s1", "s2"}, 0, 1000), new BlockLocation(null, new String[]{"s1", "s2"}, 0, 1000),
new BlockLocation(null, new String[]{"s1", "s3"}, 1000, 1200), new BlockLocation(null, new String[]{"s1", "s3"}, 1000, 1200),
new BlockLocation(null, new String[]{"s2", "s3"}, 2200, 1100), new BlockLocation(null, new String[]{"s2", "s3"}, 2200, 1100),
new BlockLocation(null, new String[]{"s1", "s2"}, 3300, 700), new BlockLocation(null, new String[]{"s1", "s2"}, 3300, 700)
} }
), ),
new LocatedFileStatus( new LocatedFileStatus(
@ -148,21 +173,56 @@ public class DatasourceInputFormatTest
new BlockLocation[]{ new BlockLocation[]{
new BlockLocation(null, new String[]{"s2", "s3"}, 0, 500) new BlockLocation(null, new String[]{"s2", "s3"}, 0, 500)
} }
),
new LocatedFileStatus(
500, false, 0, 0, 0, 0, null, null, null, null, path4,
new BlockLocation[]{
new BlockLocation(null, new String[]{"s2", "s3"}, 0, 500)
}
) )
); );
config = new JobConf(); config = populateConfiguration(new JobConf(), segments1, 0);
config.set(
DatasourceInputFormat.CONF_INPUT_SEGMENTS,
new DefaultObjectMapper().writeValueAsString(segments)
);
context = EasyMock.createMock(JobContext.class); context = EasyMock.createMock(JobContext.class);
EasyMock.expect(context.getConfiguration()).andReturn(config); EasyMock.expect(context.getConfiguration()).andReturn(config);
EasyMock.replay(context); EasyMock.replay(context);
} }
private Supplier<InputFormat> testFormatter = new Supplier<InputFormat>() { private static <T extends Configuration> T populateConfiguration(
final T conf,
final List<WindowedDataSegment> segments,
final long maxSplitSize
)
throws IOException
{
DatasourceInputFormat.addDataSource(
conf,
new DatasourceIngestionSpec(
Iterators.getOnlyElement(segments.stream().map(s -> s.getSegment().getDataSource()).distinct().iterator()),
null,
Collections.singletonList(
JodaUtils.umbrellaInterval(
segments.stream()
.map(WindowedDataSegment::getInterval)
.collect(Collectors.toList())
)
),
null,
null,
null,
null,
false,
null
),
segments,
maxSplitSize
);
return conf;
}
private Supplier<InputFormat> testFormatter = new Supplier<InputFormat>()
{
@Override @Override
public InputFormat get() public InputFormat get()
{ {
@ -202,25 +262,47 @@ public class DatasourceInputFormatTest
DatasourceInputFormat inputFormat = new DatasourceInputFormat().setSupplier(testFormatter); DatasourceInputFormat inputFormat = new DatasourceInputFormat().setSupplier(testFormatter);
List<InputSplit> splits = inputFormat.getSplits(context); List<InputSplit> splits = inputFormat.getSplits(context);
Assert.assertEquals(segments.size(), splits.size()); Assert.assertEquals(segments1.size(), splits.size());
for (int i = 0; i < segments.size(); i++) { for (int i = 0; i < segments1.size(); i++) {
DatasourceInputSplit split = (DatasourceInputSplit) splits.get(i); DatasourceInputSplit split = (DatasourceInputSplit) splits.get(i);
Assert.assertEquals(segments.get(i), split.getSegments().get(0)); Assert.assertEquals(segments1.get(i), split.getSegments().get(0));
} }
Assert.assertArrayEquals(new String[] {"s1", "s2"}, splits.get(0).getLocations()); Assert.assertArrayEquals(new String[]{"s1", "s2"}, splits.get(0).getLocations());
Assert.assertArrayEquals(new String[] {"s1", "s2"}, splits.get(1).getLocations()); Assert.assertArrayEquals(new String[]{"s1", "s2"}, splits.get(1).getLocations());
Assert.assertArrayEquals(new String[] {"s2", "s3"}, splits.get(2).getLocations()); Assert.assertArrayEquals(new String[]{"s2", "s3"}, splits.get(2).getLocations());
}
@Test
public void testGetSplitsTwoDataSources() throws Exception
{
config.clear();
populateConfiguration(populateConfiguration(config, segments1, 999999), segments2, 999999);
List<InputSplit> splits = new DatasourceInputFormat().setSupplier(testFormatter).getSplits(context);
Assert.assertEquals(2, splits.size());
Assert.assertEquals(
Sets.newHashSet(segments1),
Sets.newHashSet((((DatasourceInputSplit) splits.get(0)).getSegments()))
);
Assert.assertEquals(
Sets.newHashSet(segments2),
Sets.newHashSet((((DatasourceInputSplit) splits.get(1)).getSegments()))
);
Assert.assertArrayEquals(new String[]{"s2", "s1", "s3"}, splits.get(0).getLocations());
Assert.assertArrayEquals(new String[]{"s2", "s3"}, splits.get(1).getLocations());
} }
@Test @Test
public void testGetSplitsAllCombined() throws Exception public void testGetSplitsAllCombined() throws Exception
{ {
config.set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, "999999"); config.clear();
populateConfiguration(config, segments1, 999999);
List<InputSplit> splits = new DatasourceInputFormat().setSupplier(testFormatter).getSplits(context); List<InputSplit> splits = new DatasourceInputFormat().setSupplier(testFormatter).getSplits(context);
Assert.assertEquals(1, splits.size()); Assert.assertEquals(1, splits.size());
Assert.assertEquals( Assert.assertEquals(
Sets.newHashSet(segments), Sets.newHashSet(segments1),
Sets.newHashSet((((DatasourceInputSplit) splits.get(0)).getSegments())) Sets.newHashSet((((DatasourceInputSplit) splits.get(0)).getSegments()))
); );
@ -230,19 +312,20 @@ public class DatasourceInputFormatTest
@Test @Test
public void testGetSplitsCombineInTwo() throws Exception public void testGetSplitsCombineInTwo() throws Exception
{ {
config.set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, "6"); config.clear();
populateConfiguration(config, segments1, 6);
List<InputSplit> splits = new DatasourceInputFormat().setSupplier(testFormatter).getSplits(context); List<InputSplit> splits = new DatasourceInputFormat().setSupplier(testFormatter).getSplits(context);
Assert.assertEquals(2, splits.size()); Assert.assertEquals(2, splits.size());
Assert.assertEquals( Assert.assertEquals(
Sets.newHashSet(segments.get(0), segments.get(2)), Sets.newHashSet(segments1.get(0), segments1.get(2)),
Sets.newHashSet((((DatasourceInputSplit) splits.get(0)).getSegments())) Sets.newHashSet((((DatasourceInputSplit) splits.get(0)).getSegments()))
); );
Assert.assertArrayEquals(new String[]{"s2", "s1", "s3"}, splits.get(0).getLocations()); Assert.assertArrayEquals(new String[]{"s2", "s1", "s3"}, splits.get(0).getLocations());
Assert.assertEquals( Assert.assertEquals(
Sets.newHashSet(segments.get(1)), Sets.newHashSet(segments1.get(1)),
Sets.newHashSet((((DatasourceInputSplit) splits.get(1)).getSegments())) Sets.newHashSet((((DatasourceInputSplit) splits.get(1)).getSegments()))
); );
Assert.assertArrayEquals(new String[]{"s1", "s2"}, splits.get(1).getLocations()); Assert.assertArrayEquals(new String[]{"s1", "s2"}, splits.get(1).getLocations());
@ -251,26 +334,27 @@ public class DatasourceInputFormatTest
@Test @Test
public void testGetSplitsCombineCalculated() throws Exception public void testGetSplitsCombineCalculated() throws Exception
{ {
config.set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, "-1"); config.clear();
populateConfiguration(config, segments1, -1);
config.setNumMapTasks(3); config.setNumMapTasks(3);
List<InputSplit> splits = new DatasourceInputFormat().setSupplier(testFormatter).getSplits(context); List<InputSplit> splits = new DatasourceInputFormat().setSupplier(testFormatter).getSplits(context);
Assert.assertEquals(3, splits.size()); Assert.assertEquals(3, splits.size());
Assert.assertEquals( Assert.assertEquals(
Sets.newHashSet(segments.get(0)), Sets.newHashSet(segments1.get(0)),
Sets.newHashSet((((DatasourceInputSplit) splits.get(0)).getSegments())) Sets.newHashSet((((DatasourceInputSplit) splits.get(0)).getSegments()))
); );
Assert.assertArrayEquals(new String[]{"s1", "s2"}, splits.get(0).getLocations()); Assert.assertArrayEquals(new String[]{"s1", "s2"}, splits.get(0).getLocations());
Assert.assertEquals( Assert.assertEquals(
Sets.newHashSet(segments.get(2)), Sets.newHashSet(segments1.get(2)),
Sets.newHashSet((((DatasourceInputSplit) splits.get(1)).getSegments())) Sets.newHashSet((((DatasourceInputSplit) splits.get(1)).getSegments()))
); );
Assert.assertArrayEquals(new String[]{"s2", "s3"}, splits.get(1).getLocations()); Assert.assertArrayEquals(new String[]{"s2", "s3"}, splits.get(1).getLocations());
Assert.assertEquals( Assert.assertEquals(
Sets.newHashSet(segments.get(1)), Sets.newHashSet(segments1.get(1)),
Sets.newHashSet((((DatasourceInputSplit) splits.get(2)).getSegments())) Sets.newHashSet((((DatasourceInputSplit) splits.get(2)).getSegments()))
); );
Assert.assertArrayEquals(new String[]{"s1", "s2"}, splits.get(2).getLocations()); Assert.assertArrayEquals(new String[]{"s1", "s2"}, splits.get(2).getLocations());
@ -302,12 +386,7 @@ public class DatasourceInputFormatTest
) )
); );
final JobConf myConfig = new JobConf(); final JobConf myConfig = populateConfiguration(new JobConf(), mySegments, 0L);
myConfig.set(
DatasourceInputFormat.CONF_INPUT_SEGMENTS,
new DefaultObjectMapper().writeValueAsString(mySegments)
);
final JobContext myContext = EasyMock.createMock(JobContext.class); final JobContext myContext = EasyMock.createMock(JobContext.class);
EasyMock.expect(myContext.getConfiguration()).andReturn(myConfig); EasyMock.expect(myContext.getConfiguration()).andReturn(myConfig);
EasyMock.replay(myContext); EasyMock.replay(myContext);
@ -367,7 +446,7 @@ public class DatasourceInputFormatTest
Assert.assertEquals( Assert.assertEquals(
0, 0,
DatasourceInputFormat.getLocations(segments.subList(0, 1), fio, config).count() DatasourceInputFormat.getLocations(segments1.subList(0, 1), fio, config).count()
); );
} }
@ -383,7 +462,7 @@ public class DatasourceInputFormatTest
); );
EasyMock.expect(fio.getSplits(config, 1)).andReturn( EasyMock.expect(fio.getSplits(config, 1)).andReturn(
new org.apache.hadoop.mapred.InputSplit[] {split} new org.apache.hadoop.mapred.InputSplit[]{split}
); );
EasyMock.expect(split.getLocations()).andThrow(new IOException("testing")); EasyMock.expect(split.getLocations()).andThrow(new IOException("testing"));
@ -391,7 +470,7 @@ public class DatasourceInputFormatTest
Assert.assertEquals( Assert.assertEquals(
0, 0,
DatasourceInputFormat.getLocations(segments.subList(0, 1), fio, config).count() DatasourceInputFormat.getLocations(segments1.subList(0, 1), fio, config).count()
); );
} }
@ -407,25 +486,25 @@ public class DatasourceInputFormatTest
); );
EasyMock.expect(fio.getSplits(config, 1)).andReturn( EasyMock.expect(fio.getSplits(config, 1)).andReturn(
new org.apache.hadoop.mapred.InputSplit[] {split} new org.apache.hadoop.mapred.InputSplit[]{split}
); );
EasyMock.expect(split.getLocations()).andReturn(new String[] {"s1", "s2"}); EasyMock.expect(split.getLocations()).andReturn(new String[]{"s1", "s2"});
EasyMock.expect(fio.getSplits(config, 1)).andReturn( EasyMock.expect(fio.getSplits(config, 1)).andReturn(
new org.apache.hadoop.mapred.InputSplit[] {split} new org.apache.hadoop.mapred.InputSplit[]{split}
); );
EasyMock.expect(split.getLocations()).andReturn(new String[] {"s3"}); EasyMock.expect(split.getLocations()).andReturn(new String[]{"s3"});
EasyMock.expect(fio.getSplits(config, 1)).andReturn( EasyMock.expect(fio.getSplits(config, 1)).andReturn(
new org.apache.hadoop.mapred.InputSplit[] {split} new org.apache.hadoop.mapred.InputSplit[]{split}
); );
EasyMock.expect(split.getLocations()).andReturn(new String[] {"s4", "s2"}); EasyMock.expect(split.getLocations()).andReturn(new String[]{"s4", "s2"});
EasyMock.replay(fio, split); EasyMock.replay(fio, split);
Assert.assertArrayEquals( Assert.assertArrayEquals(
new String[] {"s1", "s2", "s3", "s4", "s2"}, new String[]{"s1", "s2", "s3", "s4", "s2"},
DatasourceInputFormat.getLocations(segments, fio, config).toArray(String[]::new) DatasourceInputFormat.getLocations(segments1, fio, config).toArray(String[]::new)
); );
} }
} }

View File

@ -35,6 +35,7 @@ import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.net.URL; import java.net.URL;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -59,21 +60,21 @@ public class DatasourceRecordReaderTest
InputSplit split = new DatasourceInputSplit(Lists.newArrayList(WindowedDataSegment.of(segment)), null); InputSplit split = new DatasourceInputSplit(Lists.newArrayList(WindowedDataSegment.of(segment)), null);
Configuration config = new Configuration(); Configuration config = new Configuration();
config.set( DatasourceInputFormat.addDataSource(
DatasourceInputFormat.CONF_DRUID_SCHEMA, config,
HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString( new DatasourceIngestionSpec(
new DatasourceIngestionSpec( segment.getDataSource(),
segment.getDataSource(), segment.getInterval(),
segment.getInterval(), null,
null, null,
null, null,
null, segment.getDimensions(),
segment.getDimensions(), segment.getMetrics(),
segment.getMetrics(), false,
false, null
null ),
) Collections.emptyList(),
) 0
); );
TaskAttemptContext context = EasyMock.createNiceMock(TaskAttemptContext.class); TaskAttemptContext context = EasyMock.createNiceMock(TaskAttemptContext.class);

View File

@ -19,7 +19,6 @@
package io.druid.indexer.path; package io.druid.indexer.path;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
@ -60,6 +59,7 @@ import org.easymock.EasyMock;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -67,12 +67,14 @@ import java.util.Map;
*/ */
public class DatasourcePathSpecTest public class DatasourcePathSpecTest
{ {
private DatasourceIngestionSpec ingestionSpec; private DatasourceIngestionSpec ingestionSpec1;
private List<WindowedDataSegment> segments; private DatasourceIngestionSpec ingestionSpec2;
private List<WindowedDataSegment> segments1;
private List<WindowedDataSegment> segments2;
public DatasourcePathSpecTest() public DatasourcePathSpecTest()
{ {
this.ingestionSpec = new DatasourceIngestionSpec( this.ingestionSpec1 = new DatasourceIngestionSpec(
"test", "test",
Intervals.of("2000/3000"), Intervals.of("2000/3000"),
null, null,
@ -84,10 +86,22 @@ public class DatasourcePathSpecTest
null null
); );
segments = ImmutableList.of( this.ingestionSpec2 = new DatasourceIngestionSpec(
"test2",
Intervals.of("2000/3000"),
null,
null,
null,
null,
null,
false,
null
);
segments1 = ImmutableList.of(
WindowedDataSegment.of( WindowedDataSegment.of(
new DataSegment( new DataSegment(
ingestionSpec.getDataSource(), ingestionSpec1.getDataSource(),
Intervals.of("2000/3000"), Intervals.of("2000/3000"),
"ver", "ver",
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
@ -103,7 +117,7 @@ public class DatasourcePathSpecTest
), ),
WindowedDataSegment.of( WindowedDataSegment.of(
new DataSegment( new DataSegment(
ingestionSpec.getDataSource(), ingestionSpec1.getDataSource(),
Intervals.of("2050/3000"), Intervals.of("2050/3000"),
"ver", "ver",
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
@ -118,6 +132,25 @@ public class DatasourcePathSpecTest
) )
) )
); );
segments2 = ImmutableList.of(
WindowedDataSegment.of(
new DataSegment(
ingestionSpec2.getDataSource(),
Intervals.of("2000/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "local",
"path", "/tmp2/index.zip"
),
ImmutableList.of("product2"),
ImmutableList.of("visited_sum2", "unique_hosts2"),
NoneShardSpec.instance(),
9,
12334
)
)
);
} }
@Test @Test
@ -137,7 +170,9 @@ public class DatasourcePathSpecTest
{ {
binder.bind(UsedSegmentLister.class).toInstance(segmentList); binder.bind(UsedSegmentLister.class).toInstance(segmentList);
JsonConfigProvider.bindInstance( JsonConfigProvider.bindInstance(
binder, Key.get(DruidNode.class, Self.class), new DruidNode("dummy-node", null, null, null, true, false) binder,
Key.get(DruidNode.class, Self.class),
new DruidNode("dummy-node", null, null, null, true, false)
); );
} }
} }
@ -149,7 +184,7 @@ public class DatasourcePathSpecTest
DatasourcePathSpec expected = new DatasourcePathSpec( DatasourcePathSpec expected = new DatasourcePathSpec(
jsonMapper, jsonMapper,
null, null,
ingestionSpec, ingestionSpec1,
Long.valueOf(10), Long.valueOf(10),
false false
); );
@ -159,7 +194,7 @@ public class DatasourcePathSpecTest
expected = new DatasourcePathSpec( expected = new DatasourcePathSpec(
jsonMapper, jsonMapper,
null, null,
ingestionSpec, ingestionSpec1,
null, null,
false false
); );
@ -168,8 +203,8 @@ public class DatasourcePathSpecTest
expected = new DatasourcePathSpec( expected = new DatasourcePathSpec(
jsonMapper, jsonMapper,
segments, segments1,
ingestionSpec, ingestionSpec1,
null, null,
false false
); );
@ -178,8 +213,8 @@ public class DatasourcePathSpecTest
expected = new DatasourcePathSpec( expected = new DatasourcePathSpec(
jsonMapper, jsonMapper,
segments, segments1,
ingestionSpec, ingestionSpec1,
null, null,
true true
); );
@ -194,10 +229,18 @@ public class DatasourcePathSpecTest
ObjectMapper mapper = TestHelper.makeJsonMapper(); ObjectMapper mapper = TestHelper.makeJsonMapper();
DatasourcePathSpec pathSpec = new DatasourcePathSpec( DatasourcePathSpec pathSpec1 = new DatasourcePathSpec(
mapper, mapper,
segments, segments1,
ingestionSpec, ingestionSpec1,
null,
false
);
DatasourcePathSpec pathSpec2 = new DatasourcePathSpec(
mapper,
segments2,
ingestionSpec2,
null, null,
false false
); );
@ -207,25 +250,29 @@ public class DatasourcePathSpecTest
EasyMock.expect(job.getConfiguration()).andReturn(config).anyTimes(); EasyMock.expect(job.getConfiguration()).andReturn(config).anyTimes();
EasyMock.replay(job); EasyMock.replay(job);
pathSpec.addInputPaths(hadoopIndexerConfig, job); pathSpec1.addInputPaths(hadoopIndexerConfig, job);
List<WindowedDataSegment> actualSegments = mapper.readValue( pathSpec2.addInputPaths(hadoopIndexerConfig, job);
config.get(DatasourceInputFormat.CONF_INPUT_SEGMENTS),
new TypeReference<List<WindowedDataSegment>>()
{
}
);
Assert.assertEquals(segments, actualSegments);
DatasourceIngestionSpec actualIngestionSpec = mapper.readValue(
config.get(DatasourceInputFormat.CONF_DRUID_SCHEMA),
DatasourceIngestionSpec.class
);
Assert.assertEquals( Assert.assertEquals(
ingestionSpec ImmutableList.of(ingestionSpec1.getDataSource(), ingestionSpec2.getDataSource()),
DatasourceInputFormat.getDataSources(config)
);
Assert.assertEquals(segments1, DatasourceInputFormat.getSegments(config, ingestionSpec1.getDataSource()));
Assert.assertEquals(segments2, DatasourceInputFormat.getSegments(config, ingestionSpec2.getDataSource()));
Assert.assertEquals(
ingestionSpec1
.withDimensions(ImmutableList.of("product")) .withDimensions(ImmutableList.of("product"))
.withMetrics(ImmutableList.of("visited_sum")), .withMetrics(ImmutableList.of("visited_sum")),
actualIngestionSpec DatasourceInputFormat.getIngestionSpec(config, ingestionSpec1.getDataSource())
);
Assert.assertEquals(
ingestionSpec2
.withDimensions(ImmutableList.of("product2"))
.withMetrics(ImmutableList.of("visited_sum")),
DatasourceInputFormat.getIngestionSpec(config, ingestionSpec2.getDataSource())
); );
} }
@ -239,7 +286,7 @@ public class DatasourcePathSpecTest
DatasourcePathSpec pathSpec = new DatasourcePathSpec( DatasourcePathSpec pathSpec = new DatasourcePathSpec(
mapper, mapper,
null, null,
ingestionSpec, ingestionSpec1,
null, null,
false false
); );
@ -261,22 +308,22 @@ public class DatasourcePathSpecTest
pathSpec = new DatasourcePathSpec( pathSpec = new DatasourcePathSpec(
mapper, mapper,
null, null,
ingestionSpec.withIgnoreWhenNoSegments(true), ingestionSpec1.withIgnoreWhenNoSegments(true),
null, null,
false false
); );
pathSpec.addInputPaths(hadoopIndexerConfig, job); pathSpec.addInputPaths(hadoopIndexerConfig, job);
Assert.assertNull(config.get(DatasourceInputFormat.CONF_INPUT_SEGMENTS)); Assert.assertEquals(Collections.emptyList(), DatasourceInputFormat.getDataSources(config));
Assert.assertNull(config.get(DatasourceInputFormat.CONF_DRUID_SCHEMA));
} }
@SuppressWarnings("unchecked")
private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig() private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig()
{ {
return new HadoopDruidIndexerConfig( return new HadoopDruidIndexerConfig(
new HadoopIngestionSpec( new HadoopIngestionSpec(
new DataSchema( new DataSchema(
ingestionSpec.getDataSource(), ingestionSpec1.getDataSource(),
HadoopDruidIndexerConfig.JSON_MAPPER.convertValue( HadoopDruidIndexerConfig.JSON_MAPPER.convertValue(
new StringInputRowParser( new StringInputRowParser(
new CSVParseSpec( new CSVParseSpec(