diff --git a/api/src/main/java/io/druid/data/input/impl/DimensionsSpec.java b/api/src/main/java/io/druid/data/input/impl/DimensionsSpec.java index 837a48261c3..be801637031 100644 --- a/api/src/main/java/io/druid/data/input/impl/DimensionsSpec.java +++ b/api/src/main/java/io/druid/data/input/impl/DimensionsSpec.java @@ -101,6 +101,10 @@ public class DimensionsSpec } } + public DimensionsSpec(List dimensions) + { + this(dimensions, null, null); + } @JsonProperty public List getDimensions() diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java index f81999f15f2..9f1fb567130 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java @@ -28,7 +28,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import io.druid.data.input.impl.DimensionSchema; import io.druid.data.input.impl.DimensionSchema.MultiValueHandling; @@ -54,7 +53,7 @@ import io.druid.java.util.common.ISE; import io.druid.java.util.common.JodaUtils; import io.druid.java.util.common.Pair; import io.druid.java.util.common.RE; -import io.druid.java.util.common.granularity.NoneGranularity; +import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.guava.Comparators; import io.druid.java.util.common.jackson.JacksonUtils; import io.druid.java.util.common.logger.Logger; @@ -96,17 +95,19 @@ public class CompactionTask extends AbstractTask { private static final Logger log = new Logger(CompactionTask.class); private static final String TYPE = "compact"; + private static final boolean DEFAULT_KEEP_SEGMENT_GRANULARITY = true; private final Interval interval; private final List segments; private final DimensionsSpec dimensionsSpec; + private final boolean keepSegmentGranularity; private final IndexTuningConfig tuningConfig; private final ObjectMapper jsonMapper; @JsonIgnore private final SegmentProvider segmentProvider; @JsonIgnore - private IndexTask indexTaskSpec; + private List indexTaskSpecs; @JsonIgnore private final AuthorizerMapper authorizerMapper; @@ -125,6 +126,7 @@ public class CompactionTask extends AbstractTask @Nullable @JsonProperty("interval") final Interval interval, @Nullable @JsonProperty("segments") final List segments, @Nullable @JsonProperty("dimensions") final DimensionsSpec dimensionsSpec, + @Nullable @JsonProperty("keepSegmentGranularity") final Boolean keepSegmentGranularity, @Nullable @JsonProperty("tuningConfig") final IndexTuningConfig tuningConfig, @Nullable @JsonProperty("context") final Map context, @JacksonInject ObjectMapper jsonMapper, @@ -144,6 +146,9 @@ public class CompactionTask extends AbstractTask this.interval = interval; this.segments = segments; this.dimensionsSpec = dimensionsSpec; + this.keepSegmentGranularity = keepSegmentGranularity == null + ? DEFAULT_KEEP_SEGMENT_GRANULARITY + : keepSegmentGranularity; this.tuningConfig = tuningConfig; this.jsonMapper = jsonMapper; this.segmentProvider = segments == null ? new SegmentProvider(dataSource, interval) : new SegmentProvider(segments); @@ -170,6 +175,12 @@ public class CompactionTask extends AbstractTask return dimensionsSpec; } + @JsonProperty + public boolean isKeepSegmentGranularity() + { + return keepSegmentGranularity; + } + @JsonProperty public IndexTuningConfig getTuningConfig() { @@ -205,52 +216,65 @@ public class CompactionTask extends AbstractTask @Override public TaskStatus run(final TaskToolbox toolbox) throws Exception { - if (indexTaskSpec == null) { - final IndexIngestionSpec ingestionSpec = createIngestionSchema( + if (indexTaskSpecs == null) { + indexTaskSpecs = createIngestionSchema( toolbox, segmentProvider, dimensionsSpec, + keepSegmentGranularity, tuningConfig, jsonMapper - ); - - if (ingestionSpec != null) { - indexTaskSpec = new IndexTask( - getId(), - getGroupId(), - getTaskResource(), - getDataSource(), - ingestionSpec, - getContext(), - authorizerMapper, - chatHandlerProvider, - rowIngestionMetersFactory - ); - } + ).stream() + .map(spec -> new IndexTask( + getId(), + getGroupId(), + getTaskResource(), + getDataSource(), + spec, + getContext(), + authorizerMapper, + chatHandlerProvider, + rowIngestionMetersFactory + )) + .collect(Collectors.toList()); } - if (indexTaskSpec == null) { + if (indexTaskSpecs.isEmpty()) { log.warn("Interval[%s] has no segments, nothing to do.", interval); return TaskStatus.failure(getId()); } else { - final String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(indexTaskSpec); - log.info("Generated compaction task details: " + json); + log.info("Generated [%d] compaction task specs", indexTaskSpecs.size()); - return indexTaskSpec.run(toolbox); + for (IndexTask eachSpec : indexTaskSpecs) { + final String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec); + log.info("Running indexSpec: " + json); + + try { + final TaskStatus eachResult = eachSpec.run(toolbox); + if (!eachResult.isSuccess()) { + log.warn("Failed to run indexSpec: [%s].\nTrying the next indexSpec.", json); + } + } + catch (Exception e) { + log.warn(e, "Failed to run indexSpec: [%s].\nTrying the next indexSpec.", json); + } + } + + return TaskStatus.success(getId()); } } /** * Generate {@link IndexIngestionSpec} from input segments. * - * @return null if input segments don't exist. Otherwise, a generated ingestionSpec. + * @return an empty list if input segments don't exist. Otherwise, a generated ingestionSpec. */ - @Nullable @VisibleForTesting - static IndexIngestionSpec createIngestionSchema( + static List createIngestionSchema( TaskToolbox toolbox, SegmentProvider segmentProvider, DimensionsSpec dimensionsSpec, + boolean keepSegmentGranularity, IndexTuningConfig tuningConfig, ObjectMapper jsonMapper ) throws IOException, SegmentLoadingException @@ -263,33 +287,68 @@ public class CompactionTask extends AbstractTask final List> timelineSegments = pair.rhs; if (timelineSegments.size() == 0) { - return null; + return Collections.emptyList(); } - final DataSchema dataSchema = createDataSchema( - segmentProvider.dataSource, - segmentProvider.interval, - dimensionsSpec, - toolbox.getIndexIO(), - jsonMapper, - timelineSegments, - segmentFileMap - ); - return new IndexIngestionSpec( - dataSchema, - new IndexIOConfig( - new IngestSegmentFirehoseFactory( - segmentProvider.dataSource, - segmentProvider.interval, - null, // no filter - // set dimensions and metrics names to make sure that the generated dataSchema is used for the firehose - dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(), - Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList()), - toolbox.getIndexIO() - ), - false + if (keepSegmentGranularity) { + // if keepSegmentGranularity = true, create indexIngestionSpec per segment interval, so that we can run an index + // task per segment interval. + final List specs = new ArrayList<>(timelineSegments.size()); + for (TimelineObjectHolder holder : timelineSegments) { + final DataSchema dataSchema = createDataSchema( + segmentProvider.dataSource, + holder.getInterval(), + Collections.singletonList(holder), + dimensionsSpec, + toolbox.getIndexIO(), + jsonMapper, + segmentFileMap + ); + + specs.add( + new IndexIngestionSpec( + dataSchema, + createIoConfig(toolbox, dataSchema, holder.getInterval()), + tuningConfig + ) + ); + } + + return specs; + } else { + final DataSchema dataSchema = createDataSchema( + segmentProvider.dataSource, + segmentProvider.interval, + timelineSegments, + dimensionsSpec, + toolbox.getIndexIO(), + jsonMapper, + segmentFileMap + ); + + return Collections.singletonList( + new IndexIngestionSpec( + dataSchema, + createIoConfig(toolbox, dataSchema, segmentProvider.interval), + tuningConfig + ) + ); + } + } + + private static IndexIOConfig createIoConfig(TaskToolbox toolbox, DataSchema dataSchema, Interval interval) + { + return new IndexIOConfig( + new IngestSegmentFirehoseFactory( + dataSchema.getDataSource(), + interval, + null, // no filter + // set dimensions and metrics names to make sure that the generated dataSchema is used for the firehose + dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(), + Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList()), + toolbox.getIndexIO() ), - tuningConfig + false ); } @@ -308,18 +367,18 @@ public class CompactionTask extends AbstractTask private static DataSchema createDataSchema( String dataSource, - Interval interval, + Interval totalInterval, + List> timelineObjectHolder, DimensionsSpec dimensionsSpec, IndexIO indexIO, ObjectMapper jsonMapper, - List> timelineSegments, Map segmentFileMap ) throws IOException { // find metadata for interval final List> queryableIndexAndSegments = loadSegments( - timelineSegments, + timelineObjectHolder, segmentFileMap, indexIO ); @@ -348,10 +407,11 @@ public class CompactionTask extends AbstractTask final Boolean isRollup = pair.lhs.getMetadata().isRollup(); return isRollup != null && isRollup; }); + final GranularitySpec granularitySpec = new ArbitraryGranularitySpec( - new NoneGranularity(), + Granularities.NONE, rollup, - ImmutableList.of(interval) + Collections.singletonList(totalInterval) ); // find unique dimensions @@ -444,15 +504,15 @@ public class CompactionTask extends AbstractTask } private static List> loadSegments( - List> timelineSegments, + List> timelineObjectHolders, Map segmentFileMap, IndexIO indexIO ) throws IOException { final List> segments = new ArrayList<>(); - for (TimelineObjectHolder timelineSegment : timelineSegments) { - final PartitionHolder partitionHolder = timelineSegment.getObject(); + for (TimelineObjectHolder timelineObjectHolder : timelineObjectHolders) { + final PartitionHolder partitionHolder = timelineObjectHolder.getObject(); for (PartitionChunk chunk : partitionHolder) { final DataSegment segment = chunk.getObject(); final QueryableIndex queryableIndex = indexIO.loadIndex( diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java index 2288393bbe8..6277a9e8f11 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java @@ -98,11 +98,16 @@ import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -110,13 +115,22 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.IntStream; +@RunWith(Parameterized.class) public class CompactionTaskTest { private static final String DATA_SOURCE = "dataSource"; private static final String TIMESTAMP_COLUMN = "timestamp"; private static final String MIXED_TYPE_COLUMN = "string_to_double"; private static final Interval COMPACTION_INTERVAL = Intervals.of("2017-01-01/2017-06-01"); + private static final List SEGMENT_INTERVALS = ImmutableList.of( + Intervals.of("2017-01-01/2017-02-01"), + Intervals.of("2017-02-01/2017-03-01"), + Intervals.of("2017-03-01/2017-04-01"), + Intervals.of("2017-04-01/2017-05-01"), + Intervals.of("2017-05-01/2017-06-01") + ); private static final Map MIXED_TYPE_COLUMN_MAP = ImmutableMap.of( Intervals.of("2017-01-01/2017-02-01"), new StringDimensionSchema(MIXED_TYPE_COLUMN), @@ -138,6 +152,8 @@ public class CompactionTaskTest private static ObjectMapper objectMapper = setupInjectablesInObjectMapper(new DefaultObjectMapper()); private static Map segmentMap; + private final boolean keepSegmentGranularity; + private TaskToolbox toolbox; @BeforeClass @@ -288,6 +304,20 @@ public class CompactionTaskTest ); } + @Parameters(name = "keepSegmentGranularity={0}") + public static Collection parameters() + { + return ImmutableList.of( + new Object[] {false}, + new Object[] {true} + ); + } + + public CompactionTaskTest(boolean keepSegmentGranularity) + { + this.keepSegmentGranularity = keepSegmentGranularity; + } + @Test public void testSerdeWithInterval() throws IOException { @@ -298,6 +328,7 @@ public class CompactionTaskTest COMPACTION_INTERVAL, null, null, + null, createTuningConfig(), ImmutableMap.of("testKey", "testContext"), objectMapper, @@ -327,6 +358,7 @@ public class CompactionTaskTest null, SEGMENTS, null, + null, createTuningConfig(), ImmutableMap.of("testKey", "testContext"), objectMapper, @@ -346,18 +378,51 @@ public class CompactionTaskTest } @Test - public void testCreateIngestionSchema() throws IOException, SegmentLoadingException + public void testCreateIngestionSchemaWithKeepSegmentGranularity() throws IOException, SegmentLoadingException { - final IndexIngestionSpec ingestionSchema = CompactionTask.createIngestionSchema( + final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL), null, + keepSegmentGranularity, TUNING_CONFIG, objectMapper ); - final DimensionsSpec expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); + final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration( + keepSegmentGranularity + ); - assertIngestionSchema(ingestionSchema, expectedDimensionsSpec); + if (keepSegmentGranularity) { + Assert.assertEquals(5, ingestionSpecs.size()); + assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS); + } else { + Assert.assertEquals(1, ingestionSpecs.size()); + assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, Collections.singletonList(COMPACTION_INTERVAL)); + } + } + + @Test + public void testCreateIngestionSchemaWithIgnoreSegmentGranularity() throws IOException, SegmentLoadingException + { + final List ingestionSpecs = CompactionTask.createIngestionSchema( + toolbox, + new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL), + null, + keepSegmentGranularity, + TUNING_CONFIG, + objectMapper + ); + final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration( + keepSegmentGranularity + ); + + if (keepSegmentGranularity) { + Assert.assertEquals(5, ingestionSpecs.size()); + assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS); + } else { + Assert.assertEquals(1, ingestionSpecs.size()); + assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, Collections.singletonList(COMPACTION_INTERVAL)); + } } @Test @@ -387,35 +452,59 @@ public class CompactionTaskTest new DoubleDimensionSchema("double_dim_3"), new DoubleDimensionSchema("double_dim_4"), new StringDimensionSchema(MIXED_TYPE_COLUMN) - ), - null, - null + ) ); - final IndexIngestionSpec ingestionSchema = CompactionTask.createIngestionSchema( + final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL), customSpec, + keepSegmentGranularity, TUNING_CONFIG, objectMapper ); - assertIngestionSchema(ingestionSchema, customSpec); + if (keepSegmentGranularity) { + Assert.assertEquals(5, ingestionSpecs.size()); + final List dimensionsSpecs = new ArrayList<>(5); + IntStream.range(0, 5).forEach(i -> dimensionsSpecs.add(customSpec)); + assertIngestionSchema( + ingestionSpecs, + dimensionsSpecs, + SEGMENT_INTERVALS + ); + } else { + Assert.assertEquals(1, ingestionSpecs.size()); + assertIngestionSchema( + ingestionSpecs, + Collections.singletonList(customSpec), + Collections.singletonList(COMPACTION_INTERVAL) + ); + } } @Test public void testCreateIngestionSchemaWithCustomSegments() throws IOException, SegmentLoadingException { - final IndexIngestionSpec ingestionSchema = CompactionTask.createIngestionSchema( + final List ingestionSpecs = CompactionTask.createIngestionSchema( toolbox, new SegmentProvider(SEGMENTS), null, + keepSegmentGranularity, TUNING_CONFIG, objectMapper ); - final DimensionsSpec expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(); + final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration( + keepSegmentGranularity + ); - assertIngestionSchema(ingestionSchema, expectedDimensionsSpec); + if (keepSegmentGranularity) { + Assert.assertEquals(5, ingestionSpecs.size()); + assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS); + } else { + Assert.assertEquals(1, ingestionSpecs.size()); + assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, Collections.singletonList(COMPACTION_INTERVAL)); + } } @Test @@ -430,6 +519,7 @@ public class CompactionTaskTest toolbox, new SegmentProvider(segments), null, + keepSegmentGranularity, TUNING_CONFIG, objectMapper ); @@ -448,13 +538,14 @@ public class CompactionTaskTest toolbox, new SegmentProvider(segments), null, + keepSegmentGranularity, TUNING_CONFIG, objectMapper ); } @Test - public void testEmptyInterval() throws Exception + public void testEmptyInterval() { expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage(CoreMatchers.containsString("must specify a nonempty interval")); @@ -468,6 +559,7 @@ public class CompactionTaskTest null, null, null, + null, objectMapper, AuthTestUtils.TEST_AUTHORIZER_MAPPER, new NoopChatHandlerProvider(), @@ -475,85 +567,118 @@ public class CompactionTaskTest ); } - private static DimensionsSpec getExpectedDimensionsSpecForAutoGeneration() + private static List getExpectedDimensionsSpecForAutoGeneration(boolean keepSegmentGranularity) { - return new DimensionsSpec( - Lists.newArrayList( - new LongDimensionSchema("timestamp"), - new StringDimensionSchema("string_dim_4"), - new LongDimensionSchema("long_dim_4"), - new FloatDimensionSchema("float_dim_4"), - new DoubleDimensionSchema("double_dim_4"), - new StringDimensionSchema("string_dim_0"), - new LongDimensionSchema("long_dim_0"), - new FloatDimensionSchema("float_dim_0"), - new DoubleDimensionSchema("double_dim_0"), - new StringDimensionSchema("string_dim_1"), - new LongDimensionSchema("long_dim_1"), - new FloatDimensionSchema("float_dim_1"), - new DoubleDimensionSchema("double_dim_1"), - new StringDimensionSchema("string_dim_2"), - new LongDimensionSchema("long_dim_2"), - new FloatDimensionSchema("float_dim_2"), - new DoubleDimensionSchema("double_dim_2"), - new StringDimensionSchema("string_dim_3"), - new LongDimensionSchema("long_dim_3"), - new FloatDimensionSchema("float_dim_3"), - new DoubleDimensionSchema("double_dim_3"), - new DoubleDimensionSchema("string_to_double") - ), - null, - null + if (keepSegmentGranularity) { + return ImmutableList.of( + new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))), + new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))), + new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))), + new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))), + new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double"))) + ); + } else { + return Collections.singletonList( + new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double"))) + ); + } + } + + private static List getDimensionSchema(DimensionSchema mixedTypeColumn) + { + return Lists.newArrayList( + new LongDimensionSchema("timestamp"), + new StringDimensionSchema("string_dim_4"), + new LongDimensionSchema("long_dim_4"), + new FloatDimensionSchema("float_dim_4"), + new DoubleDimensionSchema("double_dim_4"), + new StringDimensionSchema("string_dim_0"), + new LongDimensionSchema("long_dim_0"), + new FloatDimensionSchema("float_dim_0"), + new DoubleDimensionSchema("double_dim_0"), + new StringDimensionSchema("string_dim_1"), + new LongDimensionSchema("long_dim_1"), + new FloatDimensionSchema("float_dim_1"), + new DoubleDimensionSchema("double_dim_1"), + new StringDimensionSchema("string_dim_2"), + new LongDimensionSchema("long_dim_2"), + new FloatDimensionSchema("float_dim_2"), + new DoubleDimensionSchema("double_dim_2"), + new StringDimensionSchema("string_dim_3"), + new LongDimensionSchema("long_dim_3"), + new FloatDimensionSchema("float_dim_3"), + new DoubleDimensionSchema("double_dim_3"), + mixedTypeColumn ); } private static void assertIngestionSchema( - IndexIngestionSpec ingestionSchema, - DimensionsSpec expectedDimensionsSpec + List ingestionSchemas, + List expectedDimensionsSpecs, + List expectedSegmentIntervals ) { - // assert dataSchema - final DataSchema dataSchema = ingestionSchema.getDataSchema(); - Assert.assertEquals(DATA_SOURCE, dataSchema.getDataSource()); - - final InputRowParser parser = objectMapper.convertValue(dataSchema.getParser(), InputRowParser.class); - Assert.assertTrue(parser instanceof TransformingInputRowParser); - Assert.assertTrue(((TransformingInputRowParser) parser).getParser() instanceof NoopInputRowParser); - Assert.assertTrue(parser.getParseSpec() instanceof TimeAndDimsParseSpec); - Assert.assertEquals( - new HashSet<>(expectedDimensionsSpec.getDimensions()), - new HashSet<>(parser.getParseSpec().getDimensionsSpec().getDimensions()) - ); - final Set expectedAggregators = AGGREGATORS.values() - .stream() - .map(AggregatorFactory::getCombiningFactory) - .collect(Collectors.toSet()); - Assert.assertEquals(expectedAggregators, new HashSet<>(Arrays.asList(dataSchema.getAggregators()))); - Assert.assertEquals( - new ArbitraryGranularitySpec(Granularities.NONE, false, ImmutableList.of(COMPACTION_INTERVAL)), - dataSchema.getGranularitySpec() + Preconditions.checkArgument( + ingestionSchemas.size() == expectedDimensionsSpecs.size(), + "ingesionSchemas.size()[%s] should be same with expectedDimensionsSpecs.size()[%s]", + ingestionSchemas.size(), + expectedDimensionsSpecs.size() ); - // assert ioConfig - final IndexIOConfig ioConfig = ingestionSchema.getIOConfig(); - Assert.assertFalse(ioConfig.isAppendToExisting()); - final FirehoseFactory firehoseFactory = ioConfig.getFirehoseFactory(); - Assert.assertTrue(firehoseFactory instanceof IngestSegmentFirehoseFactory); - final IngestSegmentFirehoseFactory ingestSegmentFirehoseFactory = (IngestSegmentFirehoseFactory) firehoseFactory; - Assert.assertEquals(DATA_SOURCE, ingestSegmentFirehoseFactory.getDataSource()); - Assert.assertEquals(COMPACTION_INTERVAL, ingestSegmentFirehoseFactory.getInterval()); - Assert.assertNull(ingestSegmentFirehoseFactory.getDimensionsFilter()); + for (int i = 0; i < ingestionSchemas.size(); i++) { + final IndexIngestionSpec ingestionSchema = ingestionSchemas.get(i); + final DimensionsSpec expectedDimensionsSpec = expectedDimensionsSpecs.get(i); - // check the order of dimensions - Assert.assertEquals(expectedDimensionsSpec.getDimensionNames(), ingestSegmentFirehoseFactory.getDimensions()); - // check the order of metrics - Assert.assertEquals( - Lists.newArrayList("agg_4", "agg_3", "agg_2", "agg_1", "agg_0"), - ingestSegmentFirehoseFactory.getMetrics() - ); + // assert dataSchema + final DataSchema dataSchema = ingestionSchema.getDataSchema(); + Assert.assertEquals(DATA_SOURCE, dataSchema.getDataSource()); - // assert tuningConfig - Assert.assertEquals(createTuningConfig(), ingestionSchema.getTuningConfig()); + final InputRowParser parser = objectMapper.convertValue(dataSchema.getParser(), InputRowParser.class); + Assert.assertTrue(parser instanceof TransformingInputRowParser); + Assert.assertTrue(((TransformingInputRowParser) parser).getParser() instanceof NoopInputRowParser); + Assert.assertTrue(parser.getParseSpec() instanceof TimeAndDimsParseSpec); + Assert.assertEquals( + new HashSet<>(expectedDimensionsSpec.getDimensions()), + new HashSet<>(parser.getParseSpec().getDimensionsSpec().getDimensions()) + ); + final Set expectedAggregators = AGGREGATORS.values() + .stream() + .map(AggregatorFactory::getCombiningFactory) + .collect(Collectors.toSet()); + Assert.assertEquals(expectedAggregators, new HashSet<>(Arrays.asList(dataSchema.getAggregators()))); + Assert.assertEquals( + new ArbitraryGranularitySpec( + Granularities.NONE, + false, + Collections.singletonList(expectedSegmentIntervals.get(i)) + ), + dataSchema.getGranularitySpec() + ); + + // assert ioConfig + final IndexIOConfig ioConfig = ingestionSchema.getIOConfig(); + Assert.assertFalse(ioConfig.isAppendToExisting()); + final FirehoseFactory firehoseFactory = ioConfig.getFirehoseFactory(); + Assert.assertTrue(firehoseFactory instanceof IngestSegmentFirehoseFactory); + final IngestSegmentFirehoseFactory ingestSegmentFirehoseFactory = (IngestSegmentFirehoseFactory) firehoseFactory; + Assert.assertEquals(DATA_SOURCE, ingestSegmentFirehoseFactory.getDataSource()); + Assert.assertEquals(expectedSegmentIntervals.get(i), ingestSegmentFirehoseFactory.getInterval()); + Assert.assertNull(ingestSegmentFirehoseFactory.getDimensionsFilter()); + + // check the order of dimensions + Assert.assertEquals( + new HashSet<>(expectedDimensionsSpec.getDimensionNames()), + new HashSet<>(ingestSegmentFirehoseFactory.getDimensions()) + ); + // check the order of metrics + Assert.assertEquals( + Lists.newArrayList("agg_4", "agg_3", "agg_2", "agg_1", "agg_0"), + ingestSegmentFirehoseFactory.getMetrics() + ); + + // assert tuningConfig + Assert.assertEquals(createTuningConfig(), ingestionSchema.getTuningConfig()); + } } private static class TestTaskToolbox extends TaskToolbox diff --git a/integration-tests/src/test/java/io/druid/tests/indexer/ITCompactionTaskTest.java b/integration-tests/src/test/java/io/druid/tests/indexer/ITCompactionTaskTest.java index c1005153998..1a951b2977a 100644 --- a/integration-tests/src/test/java/io/druid/tests/indexer/ITCompactionTaskTest.java +++ b/integration-tests/src/test/java/io/druid/tests/indexer/ITCompactionTaskTest.java @@ -36,24 +36,50 @@ public class ITCompactionTaskTest extends AbstractIndexerTest private static String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json"; private static String INDEX_DATASOURCE = "wikipedia_index_test"; private static String COMPACTION_TASK = "/indexer/wikipedia_compaction_task.json"; - private static String COMPACTED_INTERVAL = "2013-08-31T00:00:00.000Z/2013-09-02T00:00:00.000Z"; @Test - public void testCompaction() throws Exception + public void testCompactionWithoutKeepSegmentGranularity() throws Exception { loadData(); final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE); - if (intervalsBeforeCompaction.contains(COMPACTED_INTERVAL)) { - throw new ISE("Containing a segment for the compacted interval[%s] before compaction", COMPACTED_INTERVAL); + final String compactedInterval = "2013-08-31T00:00:00.000Z/2013-09-02T00:00:00.000Z"; + if (intervalsBeforeCompaction.contains(compactedInterval)) { + throw new ISE("Containing a segment for the compacted interval[%s] before compaction", compactedInterval); } try { queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2); - compactData(); + compactData(false); queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2); final List intervalsAfterCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE); - if (!intervalsAfterCompaction.contains(COMPACTED_INTERVAL)) { - throw new ISE("Compacted segment for interval[%s] does not exist", COMPACTED_INTERVAL); + if (!intervalsAfterCompaction.contains(compactedInterval)) { + throw new ISE("Compacted segment for interval[%s] does not exist", compactedInterval); + } + } + finally { + unloadAndKillData(INDEX_DATASOURCE); + } + } + + @Test + public void testCompactionWithKeepSegmentGranularity() throws Exception + { + loadData(); + final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE); + try { + queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2); + compactData(true); + queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2); + + final List intervalsAfterCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE); + intervalsBeforeCompaction.sort(null); + intervalsAfterCompaction.sort(null); + if (!intervalsBeforeCompaction.equals(intervalsAfterCompaction)) { + throw new ISE( + "Intervals before compaction[%s] should be same with those after compaction[%s]", + intervalsBeforeCompaction, + intervalsAfterCompaction + ); } } finally { @@ -73,9 +99,11 @@ public class ITCompactionTaskTest extends AbstractIndexerTest ); } - private void compactData() throws Exception + private void compactData(boolean keepSegmentGranularity) throws Exception { - final String taskID = indexer.submitTask(getTaskAsString(COMPACTION_TASK)); + final String template = getTaskAsString(COMPACTION_TASK); + final String taskSpec = template.replace("${KEEP_SEGMENT_GRANULARITY}", Boolean.toString(keepSegmentGranularity)); + final String taskID = indexer.submitTask(taskSpec); LOG.info("TaskID for compaction task %s", taskID); indexer.waitUntilTaskCompletes(taskID); diff --git a/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json index bc7f786646f..3fdad69ff5d 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json @@ -1,5 +1,6 @@ { "type" : "compact", "dataSource" : "wikipedia_index_test", - "interval" : "2013-08-31/2013-09-02" + "interval" : "2013-08-31/2013-09-02", + "keepSegmentGranularity" : ${KEEP_SEGMENT_GRANULARITY} } \ No newline at end of file diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_index_task.json index af7b98a6a71..8b3eab89fb2 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_index_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_index_task.json @@ -63,7 +63,8 @@ } }, "tuningConfig": { - "type": "index" + "type": "index", + "targetPartitionSize": 3 } } } \ No newline at end of file diff --git a/server/src/main/java/io/druid/client/indexing/ClientCompactQuery.java b/server/src/main/java/io/druid/client/indexing/ClientCompactQuery.java index e1e32922588..db40fecf439 100644 --- a/server/src/main/java/io/druid/client/indexing/ClientCompactQuery.java +++ b/server/src/main/java/io/druid/client/indexing/ClientCompactQuery.java @@ -30,6 +30,7 @@ public class ClientCompactQuery { private final String dataSource; private final List segments; + private final boolean keepSegmentGranularity; private final ClientCompactQueryTuningConfig tuningConfig; private final Map context; @@ -37,12 +38,14 @@ public class ClientCompactQuery public ClientCompactQuery( @JsonProperty("dataSource") String dataSource, @JsonProperty("segments") List segments, + @JsonProperty("keepSegmentGranularity") boolean keepSegmentGranularity, @JsonProperty("tuningConfig") ClientCompactQueryTuningConfig tuningConfig, @JsonProperty("context") Map context ) { this.dataSource = dataSource; this.segments = segments; + this.keepSegmentGranularity = keepSegmentGranularity; this.tuningConfig = tuningConfig; this.context = context; } @@ -65,6 +68,12 @@ public class ClientCompactQuery return segments; } + @JsonProperty + public boolean isKeepSegmentGranularity() + { + return keepSegmentGranularity; + } + @JsonProperty public ClientCompactQueryTuningConfig getTuningConfig() { diff --git a/server/src/main/java/io/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/io/druid/client/indexing/HttpIndexingServiceClient.java index b012db60442..c81bb0f5f4f 100644 --- a/server/src/main/java/io/druid/client/indexing/HttpIndexingServiceClient.java +++ b/server/src/main/java/io/druid/client/indexing/HttpIndexingServiceClient.java @@ -96,6 +96,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient @Override public String compactSegments( List segments, + boolean keepSegmentGranularity, int compactionTaskPriority, @Nullable ClientCompactQueryTuningConfig tuningConfig, @Nullable Map context @@ -112,7 +113,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient context = context == null ? new HashMap<>() : context; context.put("priority", compactionTaskPriority); - return runTask(new ClientCompactQuery(dataSource, segments, tuningConfig, context)); + return runTask(new ClientCompactQuery(dataSource, segments, keepSegmentGranularity, tuningConfig, context)); } @Override diff --git a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java index 80e47d0d48c..72c9794b366 100644 --- a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java +++ b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java @@ -40,6 +40,7 @@ public interface IndexingServiceClient String compactSegments( List segments, + boolean keepSegmentGranularity, int compactionTaskPriority, @Nullable ClientCompactQueryTuningConfig tuningConfig, @Nullable Map context diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java index 6e368ed21d6..4a3e4900d08 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java @@ -150,8 +150,11 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper if (segmentsToCompact.size() > 1) { final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName); + // Currently set keepSegmentGranularity to false because it breaks the algorithm of CompactionSegmentIterator to + // find segments to be compacted. final String taskId = indexingServiceClient.compactSegments( segmentsToCompact, + false, config.getTaskPriority(), config.getTuningConfig(), config.getTaskContext() diff --git a/server/src/test/java/io/druid/client/indexing/NoopIndexingServiceClient.java b/server/src/test/java/io/druid/client/indexing/NoopIndexingServiceClient.java index 030c991cbfe..e7889dc0e2d 100644 --- a/server/src/test/java/io/druid/client/indexing/NoopIndexingServiceClient.java +++ b/server/src/test/java/io/druid/client/indexing/NoopIndexingServiceClient.java @@ -57,6 +57,7 @@ public class NoopIndexingServiceClient implements IndexingServiceClient @Override public String compactSegments( List segments, + boolean keepSegmentGranularity, int compactionTaskPriority, @Nullable ClientCompactQueryTuningConfig tuningConfig, @Nullable Map context diff --git a/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java b/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java index 52ba82ce1ff..62f79b2c8d1 100644 --- a/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java +++ b/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java @@ -64,6 +64,7 @@ public class DruidCoordinatorSegmentCompactorTest @Override public String compactSegments( List segments, + boolean keepSegmentGranularity, int compactionTaskPriority, ClientCompactQueryTuningConfig tuningConfig, Map context