Add support 'keepSegmentGranularity' for compactionTask (#6095)

* Add keepSegmentGranularity for compactionTask

* fix build

* createIoConfig method

* fix build

* fix build

* address comments

* fix build
This commit is contained in:
Jihoon Son 2018-08-09 13:51:20 -07:00 committed by Gian Merlino
parent c028d18d74
commit d6a02de5b5
12 changed files with 387 additions and 152 deletions

View File

@ -101,6 +101,10 @@ public class DimensionsSpec
}
}
public DimensionsSpec(List<DimensionSchema> dimensions)
{
this(dimensions, null, null);
}
@JsonProperty
public List<DimensionSchema> getDimensions()

View File

@ -28,7 +28,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.druid.data.input.impl.DimensionSchema;
import io.druid.data.input.impl.DimensionSchema.MultiValueHandling;
@ -54,7 +53,7 @@ import io.druid.java.util.common.ISE;
import io.druid.java.util.common.JodaUtils;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.RE;
import io.druid.java.util.common.granularity.NoneGranularity;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.guava.Comparators;
import io.druid.java.util.common.jackson.JacksonUtils;
import io.druid.java.util.common.logger.Logger;
@ -96,17 +95,19 @@ public class CompactionTask extends AbstractTask
{
private static final Logger log = new Logger(CompactionTask.class);
private static final String TYPE = "compact";
private static final boolean DEFAULT_KEEP_SEGMENT_GRANULARITY = true;
private final Interval interval;
private final List<DataSegment> segments;
private final DimensionsSpec dimensionsSpec;
private final boolean keepSegmentGranularity;
private final IndexTuningConfig tuningConfig;
private final ObjectMapper jsonMapper;
@JsonIgnore
private final SegmentProvider segmentProvider;
@JsonIgnore
private IndexTask indexTaskSpec;
private List<IndexTask> indexTaskSpecs;
@JsonIgnore
private final AuthorizerMapper authorizerMapper;
@ -125,6 +126,7 @@ public class CompactionTask extends AbstractTask
@Nullable @JsonProperty("interval") final Interval interval,
@Nullable @JsonProperty("segments") final List<DataSegment> segments,
@Nullable @JsonProperty("dimensions") final DimensionsSpec dimensionsSpec,
@Nullable @JsonProperty("keepSegmentGranularity") final Boolean keepSegmentGranularity,
@Nullable @JsonProperty("tuningConfig") final IndexTuningConfig tuningConfig,
@Nullable @JsonProperty("context") final Map<String, Object> context,
@JacksonInject ObjectMapper jsonMapper,
@ -144,6 +146,9 @@ public class CompactionTask extends AbstractTask
this.interval = interval;
this.segments = segments;
this.dimensionsSpec = dimensionsSpec;
this.keepSegmentGranularity = keepSegmentGranularity == null
? DEFAULT_KEEP_SEGMENT_GRANULARITY
: keepSegmentGranularity;
this.tuningConfig = tuningConfig;
this.jsonMapper = jsonMapper;
this.segmentProvider = segments == null ? new SegmentProvider(dataSource, interval) : new SegmentProvider(segments);
@ -170,6 +175,12 @@ public class CompactionTask extends AbstractTask
return dimensionsSpec;
}
@JsonProperty
public boolean isKeepSegmentGranularity()
{
return keepSegmentGranularity;
}
@JsonProperty
public IndexTuningConfig getTuningConfig()
{
@ -205,52 +216,65 @@ public class CompactionTask extends AbstractTask
@Override
public TaskStatus run(final TaskToolbox toolbox) throws Exception
{
if (indexTaskSpec == null) {
final IndexIngestionSpec ingestionSpec = createIngestionSchema(
if (indexTaskSpecs == null) {
indexTaskSpecs = createIngestionSchema(
toolbox,
segmentProvider,
dimensionsSpec,
keepSegmentGranularity,
tuningConfig,
jsonMapper
);
if (ingestionSpec != null) {
indexTaskSpec = new IndexTask(
getId(),
getGroupId(),
getTaskResource(),
getDataSource(),
ingestionSpec,
getContext(),
authorizerMapper,
chatHandlerProvider,
rowIngestionMetersFactory
);
}
).stream()
.map(spec -> new IndexTask(
getId(),
getGroupId(),
getTaskResource(),
getDataSource(),
spec,
getContext(),
authorizerMapper,
chatHandlerProvider,
rowIngestionMetersFactory
))
.collect(Collectors.toList());
}
if (indexTaskSpec == null) {
if (indexTaskSpecs.isEmpty()) {
log.warn("Interval[%s] has no segments, nothing to do.", interval);
return TaskStatus.failure(getId());
} else {
final String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(indexTaskSpec);
log.info("Generated compaction task details: " + json);
log.info("Generated [%d] compaction task specs", indexTaskSpecs.size());
return indexTaskSpec.run(toolbox);
for (IndexTask eachSpec : indexTaskSpecs) {
final String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec);
log.info("Running indexSpec: " + json);
try {
final TaskStatus eachResult = eachSpec.run(toolbox);
if (!eachResult.isSuccess()) {
log.warn("Failed to run indexSpec: [%s].\nTrying the next indexSpec.", json);
}
}
catch (Exception e) {
log.warn(e, "Failed to run indexSpec: [%s].\nTrying the next indexSpec.", json);
}
}
return TaskStatus.success(getId());
}
}
/**
* Generate {@link IndexIngestionSpec} from input segments.
*
* @return null if input segments don't exist. Otherwise, a generated ingestionSpec.
* @return an empty list if input segments don't exist. Otherwise, a generated ingestionSpec.
*/
@Nullable
@VisibleForTesting
static IndexIngestionSpec createIngestionSchema(
static List<IndexIngestionSpec> createIngestionSchema(
TaskToolbox toolbox,
SegmentProvider segmentProvider,
DimensionsSpec dimensionsSpec,
boolean keepSegmentGranularity,
IndexTuningConfig tuningConfig,
ObjectMapper jsonMapper
) throws IOException, SegmentLoadingException
@ -263,33 +287,68 @@ public class CompactionTask extends AbstractTask
final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = pair.rhs;
if (timelineSegments.size() == 0) {
return null;
return Collections.emptyList();
}
final DataSchema dataSchema = createDataSchema(
segmentProvider.dataSource,
segmentProvider.interval,
dimensionsSpec,
toolbox.getIndexIO(),
jsonMapper,
timelineSegments,
segmentFileMap
);
return new IndexIngestionSpec(
dataSchema,
new IndexIOConfig(
new IngestSegmentFirehoseFactory(
segmentProvider.dataSource,
segmentProvider.interval,
null, // no filter
// set dimensions and metrics names to make sure that the generated dataSchema is used for the firehose
dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(),
Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList()),
toolbox.getIndexIO()
),
false
if (keepSegmentGranularity) {
// if keepSegmentGranularity = true, create indexIngestionSpec per segment interval, so that we can run an index
// task per segment interval.
final List<IndexIngestionSpec> specs = new ArrayList<>(timelineSegments.size());
for (TimelineObjectHolder<String, DataSegment> holder : timelineSegments) {
final DataSchema dataSchema = createDataSchema(
segmentProvider.dataSource,
holder.getInterval(),
Collections.singletonList(holder),
dimensionsSpec,
toolbox.getIndexIO(),
jsonMapper,
segmentFileMap
);
specs.add(
new IndexIngestionSpec(
dataSchema,
createIoConfig(toolbox, dataSchema, holder.getInterval()),
tuningConfig
)
);
}
return specs;
} else {
final DataSchema dataSchema = createDataSchema(
segmentProvider.dataSource,
segmentProvider.interval,
timelineSegments,
dimensionsSpec,
toolbox.getIndexIO(),
jsonMapper,
segmentFileMap
);
return Collections.singletonList(
new IndexIngestionSpec(
dataSchema,
createIoConfig(toolbox, dataSchema, segmentProvider.interval),
tuningConfig
)
);
}
}
private static IndexIOConfig createIoConfig(TaskToolbox toolbox, DataSchema dataSchema, Interval interval)
{
return new IndexIOConfig(
new IngestSegmentFirehoseFactory(
dataSchema.getDataSource(),
interval,
null, // no filter
// set dimensions and metrics names to make sure that the generated dataSchema is used for the firehose
dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(),
Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList()),
toolbox.getIndexIO()
),
tuningConfig
false
);
}
@ -308,18 +367,18 @@ public class CompactionTask extends AbstractTask
private static DataSchema createDataSchema(
String dataSource,
Interval interval,
Interval totalInterval,
List<TimelineObjectHolder<String, DataSegment>> timelineObjectHolder,
DimensionsSpec dimensionsSpec,
IndexIO indexIO,
ObjectMapper jsonMapper,
List<TimelineObjectHolder<String, DataSegment>> timelineSegments,
Map<DataSegment, File> segmentFileMap
)
throws IOException
{
// find metadata for interval
final List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments = loadSegments(
timelineSegments,
timelineObjectHolder,
segmentFileMap,
indexIO
);
@ -348,10 +407,11 @@ public class CompactionTask extends AbstractTask
final Boolean isRollup = pair.lhs.getMetadata().isRollup();
return isRollup != null && isRollup;
});
final GranularitySpec granularitySpec = new ArbitraryGranularitySpec(
new NoneGranularity(),
Granularities.NONE,
rollup,
ImmutableList.of(interval)
Collections.singletonList(totalInterval)
);
// find unique dimensions
@ -444,15 +504,15 @@ public class CompactionTask extends AbstractTask
}
private static List<Pair<QueryableIndex, DataSegment>> loadSegments(
List<TimelineObjectHolder<String, DataSegment>> timelineSegments,
List<TimelineObjectHolder<String, DataSegment>> timelineObjectHolders,
Map<DataSegment, File> segmentFileMap,
IndexIO indexIO
) throws IOException
{
final List<Pair<QueryableIndex, DataSegment>> segments = new ArrayList<>();
for (TimelineObjectHolder<String, DataSegment> timelineSegment : timelineSegments) {
final PartitionHolder<DataSegment> partitionHolder = timelineSegment.getObject();
for (TimelineObjectHolder<String, DataSegment> timelineObjectHolder : timelineObjectHolders) {
final PartitionHolder<DataSegment> partitionHolder = timelineObjectHolder.getObject();
for (PartitionChunk<DataSegment> chunk : partitionHolder) {
final DataSegment segment = chunk.getObject();
final QueryableIndex queryableIndex = indexIO.loadIndex(

View File

@ -98,11 +98,16 @@ import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -110,13 +115,22 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@RunWith(Parameterized.class)
public class CompactionTaskTest
{
private static final String DATA_SOURCE = "dataSource";
private static final String TIMESTAMP_COLUMN = "timestamp";
private static final String MIXED_TYPE_COLUMN = "string_to_double";
private static final Interval COMPACTION_INTERVAL = Intervals.of("2017-01-01/2017-06-01");
private static final List<Interval> SEGMENT_INTERVALS = ImmutableList.of(
Intervals.of("2017-01-01/2017-02-01"),
Intervals.of("2017-02-01/2017-03-01"),
Intervals.of("2017-03-01/2017-04-01"),
Intervals.of("2017-04-01/2017-05-01"),
Intervals.of("2017-05-01/2017-06-01")
);
private static final Map<Interval, DimensionSchema> MIXED_TYPE_COLUMN_MAP = ImmutableMap.of(
Intervals.of("2017-01-01/2017-02-01"),
new StringDimensionSchema(MIXED_TYPE_COLUMN),
@ -138,6 +152,8 @@ public class CompactionTaskTest
private static ObjectMapper objectMapper = setupInjectablesInObjectMapper(new DefaultObjectMapper());
private static Map<DataSegment, File> segmentMap;
private final boolean keepSegmentGranularity;
private TaskToolbox toolbox;
@BeforeClass
@ -288,6 +304,20 @@ public class CompactionTaskTest
);
}
@Parameters(name = "keepSegmentGranularity={0}")
public static Collection<Object[]> parameters()
{
return ImmutableList.of(
new Object[] {false},
new Object[] {true}
);
}
public CompactionTaskTest(boolean keepSegmentGranularity)
{
this.keepSegmentGranularity = keepSegmentGranularity;
}
@Test
public void testSerdeWithInterval() throws IOException
{
@ -298,6 +328,7 @@ public class CompactionTaskTest
COMPACTION_INTERVAL,
null,
null,
null,
createTuningConfig(),
ImmutableMap.of("testKey", "testContext"),
objectMapper,
@ -327,6 +358,7 @@ public class CompactionTaskTest
null,
SEGMENTS,
null,
null,
createTuningConfig(),
ImmutableMap.of("testKey", "testContext"),
objectMapper,
@ -346,18 +378,51 @@ public class CompactionTaskTest
}
@Test
public void testCreateIngestionSchema() throws IOException, SegmentLoadingException
public void testCreateIngestionSchemaWithKeepSegmentGranularity() throws IOException, SegmentLoadingException
{
final IndexIngestionSpec ingestionSchema = CompactionTask.createIngestionSchema(
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
null,
keepSegmentGranularity,
TUNING_CONFIG,
objectMapper
);
final DimensionsSpec expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(
keepSegmentGranularity
);
assertIngestionSchema(ingestionSchema, expectedDimensionsSpec);
if (keepSegmentGranularity) {
Assert.assertEquals(5, ingestionSpecs.size());
assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS);
} else {
Assert.assertEquals(1, ingestionSpecs.size());
assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, Collections.singletonList(COMPACTION_INTERVAL));
}
}
@Test
public void testCreateIngestionSchemaWithIgnoreSegmentGranularity() throws IOException, SegmentLoadingException
{
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
null,
keepSegmentGranularity,
TUNING_CONFIG,
objectMapper
);
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(
keepSegmentGranularity
);
if (keepSegmentGranularity) {
Assert.assertEquals(5, ingestionSpecs.size());
assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS);
} else {
Assert.assertEquals(1, ingestionSpecs.size());
assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, Collections.singletonList(COMPACTION_INTERVAL));
}
}
@Test
@ -387,35 +452,59 @@ public class CompactionTaskTest
new DoubleDimensionSchema("double_dim_3"),
new DoubleDimensionSchema("double_dim_4"),
new StringDimensionSchema(MIXED_TYPE_COLUMN)
),
null,
null
)
);
final IndexIngestionSpec ingestionSchema = CompactionTask.createIngestionSchema(
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
customSpec,
keepSegmentGranularity,
TUNING_CONFIG,
objectMapper
);
assertIngestionSchema(ingestionSchema, customSpec);
if (keepSegmentGranularity) {
Assert.assertEquals(5, ingestionSpecs.size());
final List<DimensionsSpec> dimensionsSpecs = new ArrayList<>(5);
IntStream.range(0, 5).forEach(i -> dimensionsSpecs.add(customSpec));
assertIngestionSchema(
ingestionSpecs,
dimensionsSpecs,
SEGMENT_INTERVALS
);
} else {
Assert.assertEquals(1, ingestionSpecs.size());
assertIngestionSchema(
ingestionSpecs,
Collections.singletonList(customSpec),
Collections.singletonList(COMPACTION_INTERVAL)
);
}
}
@Test
public void testCreateIngestionSchemaWithCustomSegments() throws IOException, SegmentLoadingException
{
final IndexIngestionSpec ingestionSchema = CompactionTask.createIngestionSchema(
final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(SEGMENTS),
null,
keepSegmentGranularity,
TUNING_CONFIG,
objectMapper
);
final DimensionsSpec expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(
keepSegmentGranularity
);
assertIngestionSchema(ingestionSchema, expectedDimensionsSpec);
if (keepSegmentGranularity) {
Assert.assertEquals(5, ingestionSpecs.size());
assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS);
} else {
Assert.assertEquals(1, ingestionSpecs.size());
assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, Collections.singletonList(COMPACTION_INTERVAL));
}
}
@Test
@ -430,6 +519,7 @@ public class CompactionTaskTest
toolbox,
new SegmentProvider(segments),
null,
keepSegmentGranularity,
TUNING_CONFIG,
objectMapper
);
@ -448,13 +538,14 @@ public class CompactionTaskTest
toolbox,
new SegmentProvider(segments),
null,
keepSegmentGranularity,
TUNING_CONFIG,
objectMapper
);
}
@Test
public void testEmptyInterval() throws Exception
public void testEmptyInterval()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage(CoreMatchers.containsString("must specify a nonempty interval"));
@ -468,6 +559,7 @@ public class CompactionTaskTest
null,
null,
null,
null,
objectMapper,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
new NoopChatHandlerProvider(),
@ -475,85 +567,118 @@ public class CompactionTaskTest
);
}
private static DimensionsSpec getExpectedDimensionsSpecForAutoGeneration()
private static List<DimensionsSpec> getExpectedDimensionsSpecForAutoGeneration(boolean keepSegmentGranularity)
{
return new DimensionsSpec(
Lists.newArrayList(
new LongDimensionSchema("timestamp"),
new StringDimensionSchema("string_dim_4"),
new LongDimensionSchema("long_dim_4"),
new FloatDimensionSchema("float_dim_4"),
new DoubleDimensionSchema("double_dim_4"),
new StringDimensionSchema("string_dim_0"),
new LongDimensionSchema("long_dim_0"),
new FloatDimensionSchema("float_dim_0"),
new DoubleDimensionSchema("double_dim_0"),
new StringDimensionSchema("string_dim_1"),
new LongDimensionSchema("long_dim_1"),
new FloatDimensionSchema("float_dim_1"),
new DoubleDimensionSchema("double_dim_1"),
new StringDimensionSchema("string_dim_2"),
new LongDimensionSchema("long_dim_2"),
new FloatDimensionSchema("float_dim_2"),
new DoubleDimensionSchema("double_dim_2"),
new StringDimensionSchema("string_dim_3"),
new LongDimensionSchema("long_dim_3"),
new FloatDimensionSchema("float_dim_3"),
new DoubleDimensionSchema("double_dim_3"),
new DoubleDimensionSchema("string_to_double")
),
null,
null
if (keepSegmentGranularity) {
return ImmutableList.of(
new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))),
new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))),
new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))),
new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))),
new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double")))
);
} else {
return Collections.singletonList(
new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double")))
);
}
}
private static List<DimensionSchema> getDimensionSchema(DimensionSchema mixedTypeColumn)
{
return Lists.newArrayList(
new LongDimensionSchema("timestamp"),
new StringDimensionSchema("string_dim_4"),
new LongDimensionSchema("long_dim_4"),
new FloatDimensionSchema("float_dim_4"),
new DoubleDimensionSchema("double_dim_4"),
new StringDimensionSchema("string_dim_0"),
new LongDimensionSchema("long_dim_0"),
new FloatDimensionSchema("float_dim_0"),
new DoubleDimensionSchema("double_dim_0"),
new StringDimensionSchema("string_dim_1"),
new LongDimensionSchema("long_dim_1"),
new FloatDimensionSchema("float_dim_1"),
new DoubleDimensionSchema("double_dim_1"),
new StringDimensionSchema("string_dim_2"),
new LongDimensionSchema("long_dim_2"),
new FloatDimensionSchema("float_dim_2"),
new DoubleDimensionSchema("double_dim_2"),
new StringDimensionSchema("string_dim_3"),
new LongDimensionSchema("long_dim_3"),
new FloatDimensionSchema("float_dim_3"),
new DoubleDimensionSchema("double_dim_3"),
mixedTypeColumn
);
}
private static void assertIngestionSchema(
IndexIngestionSpec ingestionSchema,
DimensionsSpec expectedDimensionsSpec
List<IndexIngestionSpec> ingestionSchemas,
List<DimensionsSpec> expectedDimensionsSpecs,
List<Interval> expectedSegmentIntervals
)
{
// assert dataSchema
final DataSchema dataSchema = ingestionSchema.getDataSchema();
Assert.assertEquals(DATA_SOURCE, dataSchema.getDataSource());
final InputRowParser parser = objectMapper.convertValue(dataSchema.getParser(), InputRowParser.class);
Assert.assertTrue(parser instanceof TransformingInputRowParser);
Assert.assertTrue(((TransformingInputRowParser) parser).getParser() instanceof NoopInputRowParser);
Assert.assertTrue(parser.getParseSpec() instanceof TimeAndDimsParseSpec);
Assert.assertEquals(
new HashSet<>(expectedDimensionsSpec.getDimensions()),
new HashSet<>(parser.getParseSpec().getDimensionsSpec().getDimensions())
);
final Set<AggregatorFactory> expectedAggregators = AGGREGATORS.values()
.stream()
.map(AggregatorFactory::getCombiningFactory)
.collect(Collectors.toSet());
Assert.assertEquals(expectedAggregators, new HashSet<>(Arrays.asList(dataSchema.getAggregators())));
Assert.assertEquals(
new ArbitraryGranularitySpec(Granularities.NONE, false, ImmutableList.of(COMPACTION_INTERVAL)),
dataSchema.getGranularitySpec()
Preconditions.checkArgument(
ingestionSchemas.size() == expectedDimensionsSpecs.size(),
"ingesionSchemas.size()[%s] should be same with expectedDimensionsSpecs.size()[%s]",
ingestionSchemas.size(),
expectedDimensionsSpecs.size()
);
// assert ioConfig
final IndexIOConfig ioConfig = ingestionSchema.getIOConfig();
Assert.assertFalse(ioConfig.isAppendToExisting());
final FirehoseFactory firehoseFactory = ioConfig.getFirehoseFactory();
Assert.assertTrue(firehoseFactory instanceof IngestSegmentFirehoseFactory);
final IngestSegmentFirehoseFactory ingestSegmentFirehoseFactory = (IngestSegmentFirehoseFactory) firehoseFactory;
Assert.assertEquals(DATA_SOURCE, ingestSegmentFirehoseFactory.getDataSource());
Assert.assertEquals(COMPACTION_INTERVAL, ingestSegmentFirehoseFactory.getInterval());
Assert.assertNull(ingestSegmentFirehoseFactory.getDimensionsFilter());
for (int i = 0; i < ingestionSchemas.size(); i++) {
final IndexIngestionSpec ingestionSchema = ingestionSchemas.get(i);
final DimensionsSpec expectedDimensionsSpec = expectedDimensionsSpecs.get(i);
// check the order of dimensions
Assert.assertEquals(expectedDimensionsSpec.getDimensionNames(), ingestSegmentFirehoseFactory.getDimensions());
// check the order of metrics
Assert.assertEquals(
Lists.newArrayList("agg_4", "agg_3", "agg_2", "agg_1", "agg_0"),
ingestSegmentFirehoseFactory.getMetrics()
);
// assert dataSchema
final DataSchema dataSchema = ingestionSchema.getDataSchema();
Assert.assertEquals(DATA_SOURCE, dataSchema.getDataSource());
// assert tuningConfig
Assert.assertEquals(createTuningConfig(), ingestionSchema.getTuningConfig());
final InputRowParser parser = objectMapper.convertValue(dataSchema.getParser(), InputRowParser.class);
Assert.assertTrue(parser instanceof TransformingInputRowParser);
Assert.assertTrue(((TransformingInputRowParser) parser).getParser() instanceof NoopInputRowParser);
Assert.assertTrue(parser.getParseSpec() instanceof TimeAndDimsParseSpec);
Assert.assertEquals(
new HashSet<>(expectedDimensionsSpec.getDimensions()),
new HashSet<>(parser.getParseSpec().getDimensionsSpec().getDimensions())
);
final Set<AggregatorFactory> expectedAggregators = AGGREGATORS.values()
.stream()
.map(AggregatorFactory::getCombiningFactory)
.collect(Collectors.toSet());
Assert.assertEquals(expectedAggregators, new HashSet<>(Arrays.asList(dataSchema.getAggregators())));
Assert.assertEquals(
new ArbitraryGranularitySpec(
Granularities.NONE,
false,
Collections.singletonList(expectedSegmentIntervals.get(i))
),
dataSchema.getGranularitySpec()
);
// assert ioConfig
final IndexIOConfig ioConfig = ingestionSchema.getIOConfig();
Assert.assertFalse(ioConfig.isAppendToExisting());
final FirehoseFactory firehoseFactory = ioConfig.getFirehoseFactory();
Assert.assertTrue(firehoseFactory instanceof IngestSegmentFirehoseFactory);
final IngestSegmentFirehoseFactory ingestSegmentFirehoseFactory = (IngestSegmentFirehoseFactory) firehoseFactory;
Assert.assertEquals(DATA_SOURCE, ingestSegmentFirehoseFactory.getDataSource());
Assert.assertEquals(expectedSegmentIntervals.get(i), ingestSegmentFirehoseFactory.getInterval());
Assert.assertNull(ingestSegmentFirehoseFactory.getDimensionsFilter());
// check the order of dimensions
Assert.assertEquals(
new HashSet<>(expectedDimensionsSpec.getDimensionNames()),
new HashSet<>(ingestSegmentFirehoseFactory.getDimensions())
);
// check the order of metrics
Assert.assertEquals(
Lists.newArrayList("agg_4", "agg_3", "agg_2", "agg_1", "agg_0"),
ingestSegmentFirehoseFactory.getMetrics()
);
// assert tuningConfig
Assert.assertEquals(createTuningConfig(), ingestionSchema.getTuningConfig());
}
}
private static class TestTaskToolbox extends TaskToolbox

View File

@ -36,24 +36,50 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
private static String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
private static String INDEX_DATASOURCE = "wikipedia_index_test";
private static String COMPACTION_TASK = "/indexer/wikipedia_compaction_task.json";
private static String COMPACTED_INTERVAL = "2013-08-31T00:00:00.000Z/2013-09-02T00:00:00.000Z";
@Test
public void testCompaction() throws Exception
public void testCompactionWithoutKeepSegmentGranularity() throws Exception
{
loadData();
final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE);
if (intervalsBeforeCompaction.contains(COMPACTED_INTERVAL)) {
throw new ISE("Containing a segment for the compacted interval[%s] before compaction", COMPACTED_INTERVAL);
final String compactedInterval = "2013-08-31T00:00:00.000Z/2013-09-02T00:00:00.000Z";
if (intervalsBeforeCompaction.contains(compactedInterval)) {
throw new ISE("Containing a segment for the compacted interval[%s] before compaction", compactedInterval);
}
try {
queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
compactData();
compactData(false);
queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
final List<String> intervalsAfterCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE);
if (!intervalsAfterCompaction.contains(COMPACTED_INTERVAL)) {
throw new ISE("Compacted segment for interval[%s] does not exist", COMPACTED_INTERVAL);
if (!intervalsAfterCompaction.contains(compactedInterval)) {
throw new ISE("Compacted segment for interval[%s] does not exist", compactedInterval);
}
}
finally {
unloadAndKillData(INDEX_DATASOURCE);
}
}
@Test
public void testCompactionWithKeepSegmentGranularity() throws Exception
{
loadData();
final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE);
try {
queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
compactData(true);
queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
final List<String> intervalsAfterCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE);
intervalsBeforeCompaction.sort(null);
intervalsAfterCompaction.sort(null);
if (!intervalsBeforeCompaction.equals(intervalsAfterCompaction)) {
throw new ISE(
"Intervals before compaction[%s] should be same with those after compaction[%s]",
intervalsBeforeCompaction,
intervalsAfterCompaction
);
}
}
finally {
@ -73,9 +99,11 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
);
}
private void compactData() throws Exception
private void compactData(boolean keepSegmentGranularity) throws Exception
{
final String taskID = indexer.submitTask(getTaskAsString(COMPACTION_TASK));
final String template = getTaskAsString(COMPACTION_TASK);
final String taskSpec = template.replace("${KEEP_SEGMENT_GRANULARITY}", Boolean.toString(keepSegmentGranularity));
final String taskID = indexer.submitTask(taskSpec);
LOG.info("TaskID for compaction task %s", taskID);
indexer.waitUntilTaskCompletes(taskID);

View File

@ -1,5 +1,6 @@
{
"type" : "compact",
"dataSource" : "wikipedia_index_test",
"interval" : "2013-08-31/2013-09-02"
"interval" : "2013-08-31/2013-09-02",
"keepSegmentGranularity" : ${KEEP_SEGMENT_GRANULARITY}
}

View File

@ -63,7 +63,8 @@
}
},
"tuningConfig": {
"type": "index"
"type": "index",
"targetPartitionSize": 3
}
}
}

View File

@ -30,6 +30,7 @@ public class ClientCompactQuery
{
private final String dataSource;
private final List<DataSegment> segments;
private final boolean keepSegmentGranularity;
private final ClientCompactQueryTuningConfig tuningConfig;
private final Map<String, Object> context;
@ -37,12 +38,14 @@ public class ClientCompactQuery
public ClientCompactQuery(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("keepSegmentGranularity") boolean keepSegmentGranularity,
@JsonProperty("tuningConfig") ClientCompactQueryTuningConfig tuningConfig,
@JsonProperty("context") Map<String, Object> context
)
{
this.dataSource = dataSource;
this.segments = segments;
this.keepSegmentGranularity = keepSegmentGranularity;
this.tuningConfig = tuningConfig;
this.context = context;
}
@ -65,6 +68,12 @@ public class ClientCompactQuery
return segments;
}
@JsonProperty
public boolean isKeepSegmentGranularity()
{
return keepSegmentGranularity;
}
@JsonProperty
public ClientCompactQueryTuningConfig getTuningConfig()
{

View File

@ -96,6 +96,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
@Override
public String compactSegments(
List<DataSegment> segments,
boolean keepSegmentGranularity,
int compactionTaskPriority,
@Nullable ClientCompactQueryTuningConfig tuningConfig,
@Nullable Map<String, Object> context
@ -112,7 +113,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
context = context == null ? new HashMap<>() : context;
context.put("priority", compactionTaskPriority);
return runTask(new ClientCompactQuery(dataSource, segments, tuningConfig, context));
return runTask(new ClientCompactQuery(dataSource, segments, keepSegmentGranularity, tuningConfig, context));
}
@Override

View File

@ -40,6 +40,7 @@ public interface IndexingServiceClient
String compactSegments(
List<DataSegment> segments,
boolean keepSegmentGranularity,
int compactionTaskPriority,
@Nullable ClientCompactQueryTuningConfig tuningConfig,
@Nullable Map<String, Object> context

View File

@ -150,8 +150,11 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
if (segmentsToCompact.size() > 1) {
final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName);
// Currently set keepSegmentGranularity to false because it breaks the algorithm of CompactionSegmentIterator to
// find segments to be compacted.
final String taskId = indexingServiceClient.compactSegments(
segmentsToCompact,
false,
config.getTaskPriority(),
config.getTuningConfig(),
config.getTaskContext()

View File

@ -57,6 +57,7 @@ public class NoopIndexingServiceClient implements IndexingServiceClient
@Override
public String compactSegments(
List<DataSegment> segments,
boolean keepSegmentGranularity,
int compactionTaskPriority,
@Nullable ClientCompactQueryTuningConfig tuningConfig,
@Nullable Map<String, Object> context

View File

@ -64,6 +64,7 @@ public class DruidCoordinatorSegmentCompactorTest
@Override
public String compactSegments(
List<DataSegment> segments,
boolean keepSegmentGranularity,
int compactionTaskPriority,
ClientCompactQueryTuningConfig tuningConfig,
Map<String, Object> context