diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java index 4a4e23ae7ba..69a244ea84b 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java @@ -42,6 +42,7 @@ public class HadoopTuningConfig implements TuningConfig private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 80000; private static final int DEFAULT_BUFFER_SIZE = 128 * 1024 * 1024; private static final float DEFAULT_AGG_BUFFER_RATIO = 0.5f; + private static final boolean DEFAULT_USE_COMBINER = false; public static HadoopTuningConfig makeDefaultTuningConfig() { @@ -61,7 +62,8 @@ public class HadoopTuningConfig implements TuningConfig false, false, DEFAULT_BUFFER_SIZE, - DEFAULT_AGG_BUFFER_RATIO + DEFAULT_AGG_BUFFER_RATIO, + DEFAULT_USE_COMBINER ); } @@ -81,6 +83,7 @@ public class HadoopTuningConfig implements TuningConfig private final boolean ingestOffheap; private final int bufferSize; private final float aggregationBufferRatio; + private final boolean useCombiner; @JsonCreator public HadoopTuningConfig( @@ -99,7 +102,8 @@ public class HadoopTuningConfig implements TuningConfig final @JsonProperty("persistInHeap") boolean persistInHeap, final @JsonProperty("ingestOffheap") boolean ingestOffheap, final @JsonProperty("bufferSize") Integer bufferSize, - final @JsonProperty("aggregationBufferRatio") Float aggregationBufferRatio + final @JsonProperty("aggregationBufferRatio") Float aggregationBufferRatio, + final @JsonProperty("useCombiner") Boolean useCombiner ) { this.workingPath = workingPath; @@ -120,6 +124,7 @@ public class HadoopTuningConfig implements TuningConfig this.ingestOffheap = ingestOffheap; this.bufferSize = bufferSize == null ? DEFAULT_BUFFER_SIZE : bufferSize; this.aggregationBufferRatio = aggregationBufferRatio == null ? DEFAULT_AGG_BUFFER_RATIO : aggregationBufferRatio; + this.useCombiner = useCombiner == null ? DEFAULT_USE_COMBINER : useCombiner.booleanValue(); } @JsonProperty @@ -216,6 +221,12 @@ public class HadoopTuningConfig implements TuningConfig return aggregationBufferRatio; } + @JsonProperty + public boolean getUseCombiner() + { + return useCombiner; + } + public HadoopTuningConfig withWorkingPath(String path) { return new HadoopTuningConfig( @@ -234,7 +245,8 @@ public class HadoopTuningConfig implements TuningConfig persistInHeap, ingestOffheap, bufferSize, - aggregationBufferRatio + aggregationBufferRatio, + useCombiner ); } @@ -256,7 +268,8 @@ public class HadoopTuningConfig implements TuningConfig persistInHeap, ingestOffheap, bufferSize, - aggregationBufferRatio + aggregationBufferRatio, + useCombiner ); } @@ -278,7 +291,8 @@ public class HadoopTuningConfig implements TuningConfig persistInHeap, ingestOffheap, bufferSize, - aggregationBufferRatio + aggregationBufferRatio, + useCombiner ); } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index a8b7ed14567..56230251828 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -31,9 +31,9 @@ import com.google.common.primitives.Longs; import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.common.logger.Logger; -import com.metamx.common.parsers.ParseException; import io.druid.collections.StupidPool; import io.druid.data.input.InputRow; +import io.druid.data.input.Row; import io.druid.data.input.Rows; import io.druid.offheap.OffheapBufferPool; import io.druid.query.aggregation.AggregatorFactory; @@ -64,11 +64,13 @@ import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.joda.time.DateTime; import org.joda.time.Interval; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Iterator; import java.util.List; import java.util.Set; @@ -149,6 +151,11 @@ public class IndexGeneratorJob implements Jobby throw new RuntimeException("No buckets?? seems there is no data to index."); } + if(config.getSchema().getTuningConfig().getUseCombiner()) { + job.setCombinerClass(IndexGeneratorCombiner.class); + job.setCombinerKeyGroupingComparatorClass(BytesWritable.Comparator.class); + } + job.setNumReduceTasks(numReducers); job.setPartitionerClass(IndexGeneratorPartitioner.class); @@ -193,6 +200,36 @@ public class IndexGeneratorJob implements Jobby } } + private static IncrementalIndex makeIncrementalIndex( + Bucket theBucket, + AggregatorFactory[] aggs, + HadoopDruidIndexerConfig config, + boolean isOffHeap, + StupidPool bufferPool + ) + { + final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig(); + final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder() + .withMinTimestamp(theBucket.time.getMillis()) + .withDimensionsSpec(config.getSchema().getDataSchema().getParser()) + .withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity()) + .withMetrics(aggs) + .build(); + if (isOffHeap) { + return new OffheapIncrementalIndex( + indexSchema, + bufferPool, + true, + tuningConfig.getBufferSize() + ); + } else { + return new OnheapIncrementalIndex( + indexSchema, + tuningConfig.getRowFlushBoundary() + ); + } + } + public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper { private static final HashFunction hashFunction = Hashing.murmur3_128(); @@ -245,6 +282,128 @@ public class IndexGeneratorJob implements Jobby } } + public static class IndexGeneratorCombiner extends Reducer + { + private HadoopDruidIndexerConfig config; + private AggregatorFactory[] aggregators; + private AggregatorFactory[] combiningAggs; + + @Override + protected void setup(Context context) + throws IOException, InterruptedException + { + config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); + + aggregators = config.getSchema().getDataSchema().getAggregators(); + combiningAggs = new AggregatorFactory[aggregators.length]; + for (int i = 0; i < aggregators.length; ++i) { + combiningAggs[i] = aggregators[i].getCombiningFactory(); + } + } + + @Override + protected void reduce( + final BytesWritable key, Iterable values, final Context context + ) throws IOException, InterruptedException + { + + Iterator iter = values.iterator(); + BytesWritable first = iter.next(); + + if(iter.hasNext()) { + SortableBytes keyBytes = SortableBytes.fromBytesWritable(key); + Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs; + IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, config, false, null); + index.add(InputRowSerde.fromBytes(first.getBytes(), aggregators)); + + while(iter.hasNext()) { + context.progress(); + InputRow value = InputRowSerde.fromBytes(iter.next().getBytes(), aggregators); + + if(!index.canAppendRow()) { + log.info("current index full due to [%s]. creating new index.", index.getOutOfRowsReason()); + flushIndexToContextAndClose(key, index, context); + index = makeIncrementalIndex(bucket, combiningAggs, config, false, null); + } + + index.add(value); + } + + flushIndexToContextAndClose(key, index, context); + } else { + context.write(key, first); + } + } + + private void flushIndexToContextAndClose(BytesWritable key, IncrementalIndex index, Context context) throws IOException, InterruptedException + { + Iterator rows = index.iterator(); + while(rows.hasNext()) { + context.progress(); + Row row = rows.next(); + InputRow inputRow = getInputRowFromRow(row, index.getDimensions()); + context.write( + key, + new BytesWritable(InputRowSerde.toBytes(inputRow, combiningAggs)) + ); + } + index.close(); + } + + private InputRow getInputRowFromRow(final Row row, final List dimensions) { + return new InputRow() + { + @Override + public List getDimensions() + { + return dimensions; + } + + @Override + public long getTimestampFromEpoch() + { + return row.getTimestampFromEpoch(); + } + + @Override + public DateTime getTimestamp() + { + return row.getTimestamp(); + } + + @Override + public List getDimension(String dimension) + { + return row.getDimension(dimension); + } + + @Override + public Object getRaw(String dimension) + { + return row.getRaw(dimension); + } + + @Override + public float getFloatMetric(String metric) + { + return row.getFloatMetric(metric); + } + + @Override + public long getLongMetric(String metric) + { + return row.getLongMetric(metric); + } + + @Override + public int compareTo(Row o) + { + return row.compareTo(o); + } + }; + } + } + public static class IndexGeneratorPartitioner extends Partitioner implements Configurable { private Configuration config; @@ -350,7 +509,13 @@ public class IndexGeneratorJob implements Jobby * config.getSchema().getTuningConfig().getAggregationBufferRatio()); final StupidPool bufferPool = new OffheapBufferPool(aggregationBufferSize); - IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, bufferPool); + IncrementalIndex index = makeIncrementalIndex( + bucket, + combiningAggs, + config, + config.getSchema().getTuningConfig().isIngestOffheap(), + bufferPool + ); try { File baseFlushFile = File.createTempFile("base", "flush"); baseFlushFile.delete(); @@ -392,7 +557,13 @@ public class IndexGeneratorJob implements Jobby // close this index and make a new one, reusing same buffer index.close(); - index = makeIncrementalIndex(bucket, combiningAggs, bufferPool); + index = makeIncrementalIndex( + bucket, + combiningAggs, + config, + config.getSchema().getTuningConfig().isIngestOffheap(), + bufferPool + ); startTime = System.currentTimeMillis(); ++indexCount; } @@ -475,31 +646,6 @@ public class IndexGeneratorJob implements Jobby index.close(); } } - - private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs, StupidPool bufferPool) - { - final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig(); - final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder() - .withMinTimestamp(theBucket.time.getMillis()) - .withDimensionsSpec(config.getSchema().getDataSchema().getParser()) - .withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity()) - .withMetrics(aggs) - .build(); - if (tuningConfig.isIngestOffheap()) { - - return new OffheapIncrementalIndex( - indexSchema, - bufferPool, - true, - tuningConfig.getBufferSize() - ); - } else { - return new OnheapIncrementalIndex( - indexSchema, - tuningConfig.getRowFlushBoundary() - ); - } - } } public static class IndexGeneratorOutputFormat extends TextOutputFormat diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java index 9e95553db47..4fd7a371675 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java @@ -153,7 +153,8 @@ public class DetermineHashedPartitionsJobTest false, false, null, - null + null, + false ) ); this.indexerConfig = new HadoopDruidIndexerConfig(ingestionSpec); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java index 256bbee8f14..574acaa44b5 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java @@ -262,7 +262,8 @@ public class DeterminePartitionsJobTest false, false, null, - null + null, + false ) ) ); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java index b3aca9d251b..00b7b36577b 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java @@ -197,7 +197,8 @@ public class HadoopDruidIndexerConfigTest false, false, null, - null + null, + false ) ); HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSpec(spec); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorCombinerTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorCombinerTest.java new file mode 100644 index 00000000000..ced64faa398 --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorCombinerTest.java @@ -0,0 +1,179 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.metamx.common.Granularity; +import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedInputRow; +import io.druid.data.input.impl.CSVParseSpec; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.StringInputRowParser; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.granularity.QueryGranularity; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapreduce.Reducer; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +/** + */ +public class IndexGeneratorCombinerTest +{ + private AggregatorFactory[] aggregators; + private IndexGeneratorJob.IndexGeneratorCombiner combiner; + + @Before + public void setUp() throws Exception + { + HadoopDruidIndexerConfig config = new HadoopDruidIndexerConfig( + new HadoopIngestionSpec( + new DataSchema( + "website", + new StringInputRowParser( + new CSVParseSpec( + new TimestampSpec("timestamp", "yyyyMMddHH", null), + new DimensionsSpec(ImmutableList.of("host"), null, null), + null, + ImmutableList.of("timestamp", "host", "visited") + ) + ), + new AggregatorFactory[]{ + new LongSumAggregatorFactory("visited_sum", "visited"), + new HyperUniquesAggregatorFactory("unique_hosts", "host") + }, + new UniformGranularitySpec( + Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(Interval.parse("2010/2011")) + ) + ), + new HadoopIOConfig( + ImmutableMap.of( + "paths", + "/tmp/dummy", + "type", + "static" + ), + null, + "/tmp/dummy" + ), + HadoopTuningConfig.makeDefaultTuningConfig().withWorkingPath("/tmp/work").withVersion("ver") + ) + ); + Configuration hadoopConfig = new Configuration(); + hadoopConfig.set( + HadoopDruidIndexerConfig.CONFIG_PROPERTY, + HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(config) + ); + + Reducer.Context context = EasyMock.createMock(Reducer.Context.class); + EasyMock.expect(context.getConfiguration()).andReturn(hadoopConfig); + EasyMock.replay(context); + + aggregators = config.getSchema().getDataSchema().getAggregators(); + + combiner = new IndexGeneratorJob.IndexGeneratorCombiner(); + combiner.setup(context); + } + + @Test + public void testSingleRowNoMergePassThrough() throws Exception + { + Reducer.Context context = EasyMock.createMock(Reducer.Context.class); + Capture captureKey = Capture.newInstance(); + Capture captureVal = Capture.newInstance(); + context.write(EasyMock.capture(captureKey), EasyMock.capture(captureVal)); + EasyMock.replay(context); + + BytesWritable key = new BytesWritable("dummy_key".getBytes()); + BytesWritable val = new BytesWritable("dummy_row".getBytes()); + + combiner.reduce(key, Lists.newArrayList(val), context); + + Assert.assertTrue(captureKey.getValue() == key); + Assert.assertTrue(captureVal.getValue() == val); + } + + @Test + public void testMultipleRowsMerged() throws Exception + { + long timestamp = System.currentTimeMillis(); + + Bucket bucket = new Bucket(0, new DateTime(timestamp), 0); + SortableBytes keySortableBytes = new SortableBytes( + bucket.toGroupKey(), + new byte[0] + ); + BytesWritable key = keySortableBytes.toBytesWritable(); + + InputRow row1 = new MapBasedInputRow( + timestamp, + ImmutableList.of(), + ImmutableMap.of( + "host", "host1", + "visited", 10 + ) + ); + InputRow row2 = new MapBasedInputRow( + timestamp, + ImmutableList.of(), + ImmutableMap.of( + "host", "host2", + "visited", 5 + ) + ); + List rows = Lists.newArrayList( + new BytesWritable(InputRowSerde.toBytes(row1, aggregators)), + new BytesWritable(InputRowSerde.toBytes(row2, aggregators)) + ); + + Reducer.Context context = EasyMock.createNiceMock(Reducer.Context.class); + Capture captureKey = Capture.newInstance(); + Capture captureVal = Capture.newInstance(); + context.write(EasyMock.capture(captureKey), EasyMock.capture(captureVal)); + EasyMock.replay(context); + + combiner.reduce( + key, + rows, + context + ); + + Assert.assertTrue(captureKey.getValue() == key); + + InputRow capturedRow = InputRowSerde.fromBytes(captureVal.getValue().getBytes(), aggregators); + Assert.assertEquals(15, capturedRow.getLongMetric("visited_sum")); + Assert.assertEquals(2.0, (Double)HyperUniquesAggregatorFactory.estimateCardinality(capturedRow.getRaw("unique_hosts")), 0.001); + } +} diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java index bfaac4a465b..958b64041f1 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java @@ -35,6 +35,7 @@ import io.druid.granularity.QueryGranularity; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.timeline.DataSegment; @@ -70,6 +71,7 @@ public class IndexGeneratorJobTest return Arrays.asList( new Object[][]{ { + false, "single", "2014-10-22T00:00:00Z/P2D", new String[][][]{ @@ -112,6 +114,7 @@ public class IndexGeneratorJobTest ) }, { + false, "hashed", "2014-10-22T00:00:00Z/P1D", new Integer[][][]{ @@ -139,7 +142,41 @@ public class IndexGeneratorJobTest "2014102213,n.example.com,234", "2014102214,o.example.com,325", "2014102215,p.example.com,3533", - "2014102216,q.example.com,587" + "2014102216,q.example.com,500", + "2014102216,q.example.com,87" + ) + }, + { + true, + "hashed", + "2014-10-22T00:00:00Z/P1D", + new Integer[][][]{ + { + { 0, 4 }, + { 1, 4 }, + { 2, 4 }, + { 3, 4 } + } + }, + ImmutableList.of( + "2014102200,a.example.com,100", + "2014102201,b.exmaple.com,50", + "2014102202,c.example.com,200", + "2014102203,d.example.com,250", + "2014102204,e.example.com,123", + "2014102205,f.example.com,567", + "2014102206,g.example.com,11", + "2014102207,h.example.com,251", + "2014102208,i.example.com,963", + "2014102209,j.example.com,333", + "2014102210,k.example.com,253", + "2014102211,l.example.com,321", + "2014102212,m.example.com,3125", + "2014102213,n.example.com,234", + "2014102214,o.example.com,325", + "2014102215,p.example.com,3533", + "2014102216,q.example.com,500", + "2014102216,q.example.com,87" ) } } @@ -156,14 +193,17 @@ public class IndexGeneratorJobTest private String partitionType; private Object[][][] shardInfoForEachSegment; private List data; + private boolean useCombiner; public IndexGeneratorJobTest( + boolean useCombiner, String partitionType, String interval, Object[][][] shardInfoForEachSegment, List data ) throws IOException { + this.useCombiner = useCombiner; this.partitionType = partitionType; this.shardInfoForEachSegment = shardInfoForEachSegment; this.interval = new Interval(interval); @@ -196,7 +236,10 @@ public class IndexGeneratorJobTest ImmutableList.of("timestamp", "host", "visited_num") ) ), - new AggregatorFactory[]{new LongSumAggregatorFactory("visited_num", "visited_num")}, + new AggregatorFactory[]{ + new LongSumAggregatorFactory("visited_num", "visited_num"), + new HyperUniquesAggregatorFactory("unique_hosts", "host") + }, new UniformGranularitySpec( Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(this.interval) ) @@ -227,7 +270,8 @@ public class IndexGeneratorJobTest false, false, null, - null + null, + useCombiner ) ) ); @@ -325,6 +369,7 @@ public class IndexGeneratorJobTest Assert.assertEquals(indexZip.getCanonicalPath(), dataSegment.getLoadSpec().get("path")); Assert.assertEquals("host", dataSegment.getDimensions().get(0)); Assert.assertEquals("visited_num", dataSegment.getMetrics().get(0)); + Assert.assertEquals("unique_hosts", dataSegment.getMetrics().get(1)); Assert.assertEquals(Integer.valueOf(9), dataSegment.getBinaryVersion()); if (partitionType.equals("hashed")) { Integer[] hashShardInfo = (Integer[]) shardInfo[partitionNum]; diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java index 8886c6e8a3c..3334a70ca89 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java @@ -111,7 +111,8 @@ public class JobHelperTest false, false, null, - null + null, + false ) ) ); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java index 91e56ce6f9f..74fc2b69ea2 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java @@ -268,7 +268,8 @@ public class HadoopConverterJobTest false, false, null, - null + null, + false ) ) );