diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java index 1458714668d..ad30588297a 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java @@ -78,10 +78,10 @@ public class IndexGeneratorJobTest { @Parameterized.Parameters(name = "partitionType={0}, interval={1}, shardInfoForEachSegment={2}, data={3}, " + - "inputFormatName={4}") + "inputFormatName={4}, buildV9Directly={5}") public static Collection constructFeed() { - return Arrays.asList( + final List baseConstructors = Arrays.asList( new Object[][]{ { false, @@ -273,22 +273,39 @@ public class IndexGeneratorJobTest } } ); + + // Run each baseConstructor with/without buildV9Directly. + final List constructors = Lists.newArrayList(); + for (Object[] baseConstructor : baseConstructors) { + final Object[] c1 = new Object[baseConstructor.length + 1]; + final Object[] c2 = new Object[baseConstructor.length + 1]; + System.arraycopy(baseConstructor, 0, c1, 0, baseConstructor.length); + System.arraycopy(baseConstructor, 0, c2, 0, baseConstructor.length); + c1[c1.length - 1] = true; + c2[c2.length - 1] = false; + constructors.add(c1); + constructors.add(c2); + } + + return constructors; } @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + private final boolean useCombiner; + private final String partitionType; + private final Interval interval; + private final Object[][][] shardInfoForEachSegment; + private final List data; + private final String inputFormatName; + private final InputRowParser inputRowParser; + private final boolean buildV9Directly; + private ObjectMapper mapper; private HadoopDruidIndexerConfig config; private File dataFile; private File tmpDir; - private Interval interval; - private String partitionType; - private Object[][][] shardInfoForEachSegment; - private List data; - private boolean useCombiner; - private String inputFormatName; - private InputRowParser inputRowParser; public IndexGeneratorJobTest( boolean useCombiner, @@ -297,7 +314,8 @@ public class IndexGeneratorJobTest Object[][][] shardInfoForEachSegment, List data, String inputFormatName, - InputRowParser inputRowParser + InputRowParser inputRowParser, + boolean buildV9Directly ) throws IOException { this.useCombiner = useCombiner; @@ -307,6 +325,7 @@ public class IndexGeneratorJobTest this.data = data; this.inputFormatName = inputFormatName; this.inputRowParser = inputRowParser; + this.buildV9Directly = buildV9Directly; } private void writeDataToLocalSequenceFile(File outputFile, List data) throws IOException @@ -396,7 +415,7 @@ public class IndexGeneratorJobTest false, useCombiner, null, - null + buildV9Directly ) ) ); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java index 07305c54ff5..8c1d7787617 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java @@ -37,6 +37,7 @@ import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.segment.IndexIO; import io.druid.segment.IndexMerger; +import io.druid.segment.IndexMergerV9; import io.druid.segment.QueryableIndex; import io.druid.segment.SegmentUtils; import io.druid.segment.incremental.IndexSizeExceededException; @@ -68,6 +69,7 @@ public class YeOldePlumberSchool implements PlumberSchool private final DataSegmentPusher dataSegmentPusher; private final File tmpSegmentDir; private final IndexMerger indexMerger; + private final IndexMergerV9 indexMergerV9; private final IndexIO indexIO; private static final Logger log = new Logger(YeOldePlumberSchool.class); @@ -79,6 +81,7 @@ public class YeOldePlumberSchool implements PlumberSchool @JacksonInject("segmentPusher") DataSegmentPusher dataSegmentPusher, @JacksonInject("tmpSegmentDir") File tmpSegmentDir, @JacksonInject IndexMerger indexMerger, + @JacksonInject IndexMergerV9 indexMergerV9, @JacksonInject IndexIO indexIO ) { @@ -87,6 +90,7 @@ public class YeOldePlumberSchool implements PlumberSchool this.dataSegmentPusher = dataSegmentPusher; this.tmpSegmentDir = tmpSegmentDir; this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger"); + this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null IndexMergerV9"); this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); } @@ -106,6 +110,9 @@ public class YeOldePlumberSchool implements PlumberSchool // Set of spilled segments. Will be merged at the end. final Set spilled = Sets.newHashSet(); + // IndexMerger implementation. + final IndexMerger theIndexMerger = config.getBuildV9Directly() ? indexMergerV9 : indexMerger; + return new Plumber() { @Override @@ -174,7 +181,7 @@ public class YeOldePlumberSchool implements PlumberSchool } fileToUpload = new File(tmpSegmentDir, "merged"); - indexMerger.mergeQueryableIndex(indexes, schema.getAggregators(), fileToUpload, config.getIndexSpec()); + theIndexMerger.mergeQueryableIndex(indexes, schema.getAggregators(), fileToUpload, config.getIndexSpec()); } // Map merged segment so we can extract dimensions @@ -219,7 +226,7 @@ public class YeOldePlumberSchool implements PlumberSchool log.info("Spilling index[%d] with rows[%d] to: %s", indexToPersist.getCount(), rowsToPersist, dirToPersist); try { - indexMerger.persist( + theIndexMerger.persist( indexToPersist.getIndex(), dirToPersist, null, diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 40854146d16..a326c31f9c0 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -124,7 +124,12 @@ public class IndexTask extends AbstractFixedIntervalTask ); } - static RealtimeTuningConfig convertTuningConfig(ShardSpec shardSpec, int rowFlushBoundary, IndexSpec indexSpec) + static RealtimeTuningConfig convertTuningConfig( + ShardSpec shardSpec, + int rowFlushBoundary, + IndexSpec indexSpec, + boolean buildV9Directly + ) { return new RealtimeTuningConfig( rowFlushBoundary, @@ -136,7 +141,7 @@ public class IndexTask extends AbstractFixedIntervalTask null, shardSpec, indexSpec, - null + buildV9Directly ); } @@ -355,19 +360,22 @@ public class IndexTask extends AbstractFixedIntervalTask final FireDepartmentMetrics metrics = new FireDepartmentMetrics(); final Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser()); final Supplier committerSupplier = Committers.supplierFromFirehose(firehose); - final IndexMerger indexMerger = ingestionSchema.getTuningConfig().getBuildV9Directly() - ? toolbox.getIndexMergerV9() - : toolbox.getIndexMerger(); final Plumber plumber = new YeOldePlumberSchool( interval, version, wrappedDataSegmentPusher, tmpDir, - indexMerger, + toolbox.getIndexMerger(), + toolbox.getIndexMergerV9(), toolbox.getIndexIO() ).findPlumber( schema, - convertTuningConfig(shardSpec, myRowFlushBoundary, ingestionSchema.getTuningConfig().getIndexSpec()), + convertTuningConfig( + shardSpec, + myRowFlushBoundary, + ingestionSchema.getTuningConfig().getIndexSpec(), + ingestionSchema.tuningConfig.getBuildV9Directly() + ), metrics ); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index 0822c55bbc8..215881c2c18 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -47,7 +47,6 @@ import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QueryToolChest; -import io.druid.segment.IndexMerger; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeIOConfig; import io.druid.segment.indexing.RealtimeTuningConfig; @@ -287,9 +286,6 @@ public class RealtimeIndexTask extends AbstractTask ); this.queryRunnerFactoryConglomerate = toolbox.getQueryRunnerFactoryConglomerate(); - IndexMerger indexMerger = spec.getTuningConfig().getBuildV9Directly() - ? toolbox.getIndexMergerV9() - : toolbox.getIndexMerger(); // NOTE: This pusher selects path based purely on global configuration and the DataSegment, which means // NOTE: that redundant realtime tasks will upload to the same location. This can cause index.zip and // NOTE: descriptor.json to mismatch, or it can cause historical nodes to load different instances of the @@ -302,7 +298,8 @@ public class RealtimeIndexTask extends AbstractTask segmentPublisher, toolbox.getSegmentHandoffNotifierFactory(), toolbox.getQueryExecutorService(), - indexMerger, + toolbox.getIndexMerger(), + toolbox.getIndexMergerV9(), toolbox.getIndexIO(), toolbox.getCache(), toolbox.getCacheConfig(), diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index e7f3ee80365..0098ec8e643 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -34,7 +34,6 @@ import io.druid.indexing.common.TestUtils; import io.druid.indexing.common.actions.LockListAction; import io.druid.indexing.common.actions.TaskAction; import io.druid.indexing.common.actions.TaskActionClient; -import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.segment.IndexIO; @@ -342,7 +341,8 @@ public class IndexTaskTest RealtimeTuningConfig realtimeTuningConfig = IndexTask.convertTuningConfig( spec, config.getRowFlushBoundary(), - config.getIndexSpec() + config.getIndexSpec(), + config.getBuildV9Directly() ); Assert.assertEquals(realtimeTuningConfig.getMaxRowsInMemory(), config.getRowFlushBoundary()); Assert.assertEquals(realtimeTuningConfig.getShardSpec(), spec); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index 454b97e3285..9a9d9881966 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -109,15 +109,20 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.File; +import java.io.IOException; import java.nio.file.Files; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.Executor; +@RunWith(Parameterized.class) public class RealtimeIndexTaskTest { private static final Logger log = new Logger(RealtimeIndexTaskTest.class); @@ -143,10 +148,25 @@ public class RealtimeIndexTaskTest @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); + private final boolean buildV9Directly; + private DateTime now; private ListeningExecutorService taskExec; private Map> handOffCallbacks; - private SegmentHandoffNotifierFactory handoffNotifierFactory; + + @Parameterized.Parameters(name = "buildV9Directly = {0}") + public static Collection constructorFeeder() throws IOException + { + return ImmutableList.of( + new Object[]{true}, + new Object[]{false} + ); + } + + public RealtimeIndexTaskTest(boolean buildV9Directly) + { + this.buildV9Directly = buildV9Directly; + } @Before public void setUp() @@ -572,7 +592,7 @@ public class RealtimeIndexTaskTest null, null, null, - null + buildV9Directly ); return new RealtimeIndexTask( taskId, @@ -650,7 +670,7 @@ public class RealtimeIndexTaskTest ) ); handOffCallbacks = Maps.newConcurrentMap(); - handoffNotifierFactory = new SegmentHandoffNotifierFactory() + final SegmentHandoffNotifierFactory handoffNotifierFactory = new SegmentHandoffNotifierFactory() { @Override public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource) diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java index f185f61c6f1..9c5bd269bd4 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java @@ -31,6 +31,7 @@ import io.druid.guice.annotations.Processing; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.segment.IndexIO; import io.druid.segment.IndexMerger; +import io.druid.segment.IndexMergerV9; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.realtime.FireDepartmentMetrics; @@ -54,6 +55,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool private final DataSegmentAnnouncer segmentAnnouncer; private final ExecutorService queryExecutorService; private final IndexMerger indexMerger; + private final IndexMergerV9 indexMergerV9; private final IndexIO indexIO; private final Cache cache; private final CacheConfig cacheConfig; @@ -67,6 +69,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool @JacksonInject DataSegmentAnnouncer segmentAnnouncer, @JacksonInject @Processing ExecutorService queryExecutorService, @JacksonInject IndexMerger indexMerger, + @JacksonInject IndexMergerV9 indexMergerV9, @JacksonInject IndexIO indexIO, @JacksonInject Cache cache, @JacksonInject CacheConfig cacheConfig, @@ -82,6 +85,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool null, queryExecutorService, indexMerger, + indexMergerV9, indexIO, cache, cacheConfig, @@ -94,6 +98,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool this.segmentAnnouncer = segmentAnnouncer; this.queryExecutorService = queryExecutorService; this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger"); + this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null IndexMergerV9"); this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); this.cache = cache; this.cacheConfig = cacheConfig; @@ -118,7 +123,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool conglomerate, segmentAnnouncer, queryExecutorService, - indexMerger, + config.getBuildV9Directly() ? indexMergerV9 : indexMerger, indexIO, cache, cacheConfig, diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java index fe2a94e5280..89839538062 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java @@ -30,6 +30,7 @@ import io.druid.guice.annotations.Processing; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.segment.IndexIO; import io.druid.segment.IndexMerger; +import io.druid.segment.IndexMergerV9; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.loading.DataSegmentPusher; @@ -51,6 +52,7 @@ public class RealtimePlumberSchool implements PlumberSchool private final SegmentHandoffNotifierFactory handoffNotifierFactory; private final ExecutorService queryExecutorService; private final IndexMerger indexMerger; + private final IndexMergerV9 indexMergerV9; private final IndexIO indexIO; private final Cache cache; private final CacheConfig cacheConfig; @@ -66,6 +68,7 @@ public class RealtimePlumberSchool implements PlumberSchool @JacksonInject SegmentHandoffNotifierFactory handoffNotifierFactory, @JacksonInject @Processing ExecutorService executorService, @JacksonInject IndexMerger indexMerger, + @JacksonInject IndexMergerV9 indexMergerV9, @JacksonInject IndexIO indexIO, @JacksonInject Cache cache, @JacksonInject CacheConfig cacheConfig, @@ -80,6 +83,7 @@ public class RealtimePlumberSchool implements PlumberSchool this.handoffNotifierFactory = handoffNotifierFactory; this.queryExecutorService = executorService; this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger"); + this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null IndexMergerV9"); this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); this.cache = cache; @@ -107,7 +111,7 @@ public class RealtimePlumberSchool implements PlumberSchool dataSegmentPusher, segmentPublisher, handoffNotifierFactory.createSegmentHandoffNotifier(schema.getDataSource()), - indexMerger, + config.getBuildV9Directly() ? indexMergerV9 : indexMerger, indexIO, cache, cacheConfig, diff --git a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java index b76a795074a..c38cd02bf02 100644 --- a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java +++ b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java @@ -107,6 +107,7 @@ public class FireDepartmentTest null, null, TestHelper.getTestIndexMerger(), + TestHelper.getTestIndexMergerV9(), TestHelper.getTestIndexIO(), MapCache.create(0), NO_CACHE_CONFIG, diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 747b68d5992..e59eaf85383 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -68,7 +68,6 @@ import org.junit.runners.Parameterized; import java.io.File; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; @@ -82,6 +81,7 @@ import java.util.concurrent.atomic.AtomicBoolean; public class RealtimePlumberSchoolTest { private final RejectionPolicyFactory rejectionPolicy; + private final boolean buildV9Directly; private RealtimePlumber plumber; private RealtimePlumberSchool realtimePlumberSchool; private DataSegmentAnnouncer announcer; @@ -95,24 +95,28 @@ public class RealtimePlumberSchoolTest private DataSchema schema2; private FireDepartmentMetrics metrics; - public RealtimePlumberSchoolTest(RejectionPolicyFactory rejectionPolicy) + public RealtimePlumberSchoolTest(RejectionPolicyFactory rejectionPolicy, boolean buildV9Directly) { this.rejectionPolicy = rejectionPolicy; + this.buildV9Directly = buildV9Directly; } - @Parameterized.Parameters + @Parameterized.Parameters(name = "rejectionPolicy = {0}, buildV9Directly = {1}") public static Collection constructorFeeder() throws IOException { - return Arrays.asList( - new Object[][]{ - { - new NoopRejectionPolicyFactory() - }, - { - new MessageTimeRejectionPolicyFactory() - } - } - ); + final RejectionPolicyFactory[] rejectionPolicies = new RejectionPolicyFactory[]{ + new NoopRejectionPolicyFactory(), + new MessageTimeRejectionPolicyFactory() + }; + final boolean[] buildV9Directlies = new boolean[]{true, false}; + + final List constructors = Lists.newArrayList(); + for (RejectionPolicyFactory rejectionPolicy : rejectionPolicies) { + for (boolean buildV9Directly : buildV9Directlies) { + constructors.add(new Object[]{rejectionPolicy, buildV9Directly}); + } + } + return constructors; } @Before @@ -163,7 +167,9 @@ public class RealtimePlumberSchoolTest dataSegmentPusher = EasyMock.createNiceMock(DataSegmentPusher.class); handoffNotifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class); handoffNotifier = EasyMock.createNiceMock(SegmentHandoffNotifier.class); - EasyMock.expect(handoffNotifierFactory.createSegmentHandoffNotifier(EasyMock.anyString())).andReturn(handoffNotifier).anyTimes(); + EasyMock.expect(handoffNotifierFactory.createSegmentHandoffNotifier(EasyMock.anyString())) + .andReturn(handoffNotifier) + .anyTimes(); EasyMock.expect( handoffNotifier.registerSegmentHandoffCallback( EasyMock.anyObject(), @@ -186,7 +192,7 @@ public class RealtimePlumberSchoolTest null, null, null, - null + buildV9Directly ); realtimePlumberSchool = new RealtimePlumberSchool( @@ -198,6 +204,7 @@ public class RealtimePlumberSchoolTest handoffNotifierFactory, MoreExecutors.sameThreadExecutor(), TestHelper.getTestIndexMerger(), + TestHelper.getTestIndexMergerV9(), TestHelper.getTestIndexIO(), MapCache.create(0), FireDepartmentTest.NO_CACHE_CONFIG, @@ -211,7 +218,7 @@ public class RealtimePlumberSchoolTest @After public void tearDown() throws Exception { - EasyMock.verify(announcer, segmentPublisher, dataSegmentPusher,handoffNotifierFactory, handoffNotifier, emitter); + EasyMock.verify(announcer, segmentPublisher, dataSegmentPusher, handoffNotifierFactory, handoffNotifier, emitter); FileUtils.deleteDirectory( new File( tuningConfig.getBasePersistDirectory(),