Fix NPE in compactionTask (#5613)

* Fix NPE in compactionTask

* more annotations for metadata

* better error message for empty input

* fix build

* revert some null checks

* address comments
This commit is contained in:
Jihoon Son 2018-04-12 21:11:03 -07:00 committed by Gian Merlino
parent 124c89e435
commit f349e03091
8 changed files with 308 additions and 181 deletions

View File

@ -51,6 +51,7 @@ import io.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import io.druid.java.util.common.ISE; import io.druid.java.util.common.ISE;
import io.druid.java.util.common.JodaUtils; import io.druid.java.util.common.JodaUtils;
import io.druid.java.util.common.Pair; import io.druid.java.util.common.Pair;
import io.druid.java.util.common.RE;
import io.druid.java.util.common.granularity.NoneGranularity; import io.druid.java.util.common.granularity.NoneGranularity;
import io.druid.java.util.common.guava.Comparators; import io.druid.java.util.common.guava.Comparators;
import io.druid.java.util.common.jackson.JacksonUtils; import io.druid.java.util.common.jackson.JacksonUtils;
@ -79,6 +80,7 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -195,28 +197,37 @@ public class CompactionTask extends AbstractTask
jsonMapper jsonMapper
); );
indexTaskSpec = new IndexTask( if (ingestionSpec != null) {
getId(), indexTaskSpec = new IndexTask(
getGroupId(), getId(),
getTaskResource(), getGroupId(),
getDataSource(), getTaskResource(),
ingestionSpec, getDataSource(),
getContext(), ingestionSpec,
authorizerMapper, getContext(),
null authorizerMapper,
); null
);
}
} }
if (indexTaskSpec.getIngestionSchema() == null) { if (indexTaskSpec == null) {
log.info("Cannot find segments for interval"); log.warn("Failed to generate compaction spec");
return TaskStatus.failure(getId());
} else {
final String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(indexTaskSpec);
log.info("Generated compaction task details: " + json);
return indexTaskSpec.run(toolbox);
} }
final String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(indexTaskSpec);
log.info("Generated compaction task details: " + json);
return indexTaskSpec.run(toolbox);
} }
/**
* Generate {@link IndexIngestionSpec} from input segments.
* @return null if input segments don't exist. Otherwise, a generated ingestionSpec.
*/
@Nullable
@VisibleForTesting @VisibleForTesting
static IndexIngestionSpec createIngestionSchema( static IndexIngestionSpec createIngestionSchema(
TaskToolbox toolbox, TaskToolbox toolbox,
@ -289,12 +300,22 @@ public class CompactionTask extends AbstractTask
throws IOException throws IOException
{ {
// find metadata for interval // find metadata for interval
final List<QueryableIndex> queryableIndices = loadSegments(timelineSegments, segmentFileMap, indexIO); final List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments = loadSegments(
timelineSegments,
segmentFileMap,
indexIO
);
// find merged aggregators // find merged aggregators
final List<AggregatorFactory[]> aggregatorFactories = queryableIndices for (Pair<QueryableIndex, DataSegment> pair : queryableIndexAndSegments) {
final QueryableIndex index = pair.lhs;
if (index.getMetadata() == null) {
throw new RE("Index metadata doesn't exist for segment[%s]", pair.rhs.getIdentifier());
}
}
final List<AggregatorFactory[]> aggregatorFactories = queryableIndexAndSegments
.stream() .stream()
.map(index -> index.getMetadata().getAggregators()) .map(pair -> pair.lhs.getMetadata().getAggregators()) // We have already done null check on index.getMetadata()
.collect(Collectors.toList()); .collect(Collectors.toList());
final AggregatorFactory[] mergedAggregators = AggregatorFactory.mergeAggregators(aggregatorFactories); final AggregatorFactory[] mergedAggregators = AggregatorFactory.mergeAggregators(aggregatorFactories);
@ -304,7 +325,11 @@ public class CompactionTask extends AbstractTask
// find granularity spec // find granularity spec
// set rollup only if rollup is set for all segments // set rollup only if rollup is set for all segments
final boolean rollup = queryableIndices.stream().allMatch(index -> index.getMetadata().isRollup()); final boolean rollup = queryableIndexAndSegments.stream().allMatch(pair -> {
// We have already checked getMetadata() doesn't return null
final Boolean isRollup = pair.lhs.getMetadata().isRollup();
return isRollup != null && isRollup;
});
final GranularitySpec granularitySpec = new ArbitraryGranularitySpec( final GranularitySpec granularitySpec = new ArbitraryGranularitySpec(
new NoneGranularity(), new NoneGranularity(),
rollup, rollup,
@ -313,7 +338,7 @@ public class CompactionTask extends AbstractTask
// find unique dimensions // find unique dimensions
final DimensionsSpec finalDimensionsSpec = dimensionsSpec == null ? final DimensionsSpec finalDimensionsSpec = dimensionsSpec == null ?
createDimensionsSpec(queryableIndices) : createDimensionsSpec(queryableIndexAndSegments) :
dimensionsSpec; dimensionsSpec;
final InputRowParser parser = new NoopInputRowParser(new TimeAndDimsParseSpec(null, finalDimensionsSpec)); final InputRowParser parser = new NoopInputRowParser(new TimeAndDimsParseSpec(null, finalDimensionsSpec));
@ -327,7 +352,7 @@ public class CompactionTask extends AbstractTask
); );
} }
private static DimensionsSpec createDimensionsSpec(List<QueryableIndex> queryableIndices) private static DimensionsSpec createDimensionsSpec(List<Pair<QueryableIndex, DataSegment>> queryableIndices)
{ {
final BiMap<String, Integer> uniqueDims = HashBiMap.create(); final BiMap<String, Integer> uniqueDims = HashBiMap.create();
final Map<String, DimensionSchema> dimensionSchemaMap = new HashMap<>(); final Map<String, DimensionSchema> dimensionSchemaMap = new HashMap<>();
@ -337,9 +362,24 @@ public class CompactionTask extends AbstractTask
// Dimensions are extracted from the recent segments to olders because recent segments are likely to be queried more // Dimensions are extracted from the recent segments to olders because recent segments are likely to be queried more
// frequently, and thus the performance should be optimized for recent ones rather than old ones. // frequently, and thus the performance should be optimized for recent ones rather than old ones.
// timelineSegments are sorted in order of interval // timelineSegments are sorted in order of interval, but we do a sanity check here.
final Comparator<Interval> intervalComparator = Comparators.intervalsByStartThenEnd();
for (int i = 0; i < queryableIndices.size() - 1; i++) {
final Interval shouldBeSmaller = queryableIndices.get(i).lhs.getDataInterval();
final Interval shouldBeLarger = queryableIndices.get(i + 1).lhs.getDataInterval();
Preconditions.checkState(
intervalComparator.compare(shouldBeSmaller, shouldBeLarger) <= 0,
"QueryableIndexes are not sorted! Interval[%s] of segment[%s] is laster than interval[%s] of segment[%s]",
shouldBeSmaller,
queryableIndices.get(i).rhs.getIdentifier(),
shouldBeLarger,
queryableIndices.get(i + 1).rhs.getIdentifier()
);
}
int index = 0; int index = 0;
for (QueryableIndex queryableIndex : Lists.reverse(queryableIndices)) { for (Pair<QueryableIndex, DataSegment> pair : Lists.reverse(queryableIndices)) {
final QueryableIndex queryableIndex = pair.lhs;
final Map<String, DimensionHandler> dimensionHandlerMap = queryableIndex.getDimensionHandlers(); final Map<String, DimensionHandler> dimensionHandlerMap = queryableIndex.getDimensionHandlers();
for (String dimension : queryableIndex.getAvailableDimensions()) { for (String dimension : queryableIndex.getAvailableDimensions()) {
@ -385,23 +425,22 @@ public class CompactionTask extends AbstractTask
return new DimensionsSpec(dimensionSchemas, null, null); return new DimensionsSpec(dimensionSchemas, null, null);
} }
private static List<QueryableIndex> loadSegments( private static List<Pair<QueryableIndex, DataSegment>> loadSegments(
List<TimelineObjectHolder<String, DataSegment>> timelineSegments, List<TimelineObjectHolder<String, DataSegment>> timelineSegments,
Map<DataSegment, File> segmentFileMap, Map<DataSegment, File> segmentFileMap,
IndexIO indexIO IndexIO indexIO
) throws IOException ) throws IOException
{ {
final List<QueryableIndex> segments = new ArrayList<>(); final List<Pair<QueryableIndex, DataSegment>> segments = new ArrayList<>();
for (TimelineObjectHolder<String, DataSegment> timelineSegment : timelineSegments) { for (TimelineObjectHolder<String, DataSegment> timelineSegment : timelineSegments) {
final PartitionHolder<DataSegment> partitionHolder = timelineSegment.getObject(); final PartitionHolder<DataSegment> partitionHolder = timelineSegment.getObject();
for (PartitionChunk<DataSegment> chunk : partitionHolder) { for (PartitionChunk<DataSegment> chunk : partitionHolder) {
final DataSegment segment = chunk.getObject(); final DataSegment segment = chunk.getObject();
segments.add( final QueryableIndex queryableIndex = indexIO.loadIndex(
indexIO.loadIndex( Preconditions.checkNotNull(segmentFileMap.get(segment), "File for segment %s", segment.getIdentifier())
Preconditions.checkNotNull(segmentFileMap.get(segment), "File for segment %s", segment.getIdentifier())
)
); );
segments.add(Pair.of(queryableIndex, segment));
} }
} }

View File

@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.inject.Binder; import com.google.inject.Binder;
import com.google.inject.Module; import com.google.inject.Module;
@ -88,6 +89,7 @@ import io.druid.timeline.partition.NumberedShardSpec;
import org.hamcrest.CoreMatchers; import org.hamcrest.CoreMatchers;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
@ -129,10 +131,12 @@ public class CompactionTaskTest
private static Map<String, AggregatorFactory> AGGREGATORS; private static Map<String, AggregatorFactory> AGGREGATORS;
private static List<DataSegment> SEGMENTS; private static List<DataSegment> SEGMENTS;
private static ObjectMapper objectMapper = setupInjectablesInObjectMapper(new DefaultObjectMapper()); private static ObjectMapper objectMapper = setupInjectablesInObjectMapper(new DefaultObjectMapper());
private static TaskToolbox toolbox; private static Map<DataSegment, File> segmentMap;
private TaskToolbox toolbox;
@BeforeClass @BeforeClass
public static void setup() public static void setupClass()
{ {
DIMENSIONS = new HashMap<>(); DIMENSIONS = new HashMap<>();
AGGREGATORS = new HashMap<>(); AGGREGATORS = new HashMap<>();
@ -166,7 +170,7 @@ public class CompactionTaskTest
AGGREGATORS.put("agg_3", new FloatFirstAggregatorFactory("agg_3", "float_dim_3")); AGGREGATORS.put("agg_3", new FloatFirstAggregatorFactory("agg_3", "float_dim_3"));
AGGREGATORS.put("agg_4", new DoubleLastAggregatorFactory("agg_4", "double_dim_4")); AGGREGATORS.put("agg_4", new DoubleLastAggregatorFactory("agg_4", "double_dim_4"));
final Map<DataSegment, File> segmentMap = new HashMap<>(5); segmentMap = new HashMap<>(5);
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
final Interval segmentInterval = Intervals.of(StringUtils.format("2017-0%d-01/2017-0%d-01", (i + 1), (i + 2))); final Interval segmentInterval = Intervals.of(StringUtils.format("2017-0%d-01/2017-0%d-01", (i + 1), (i + 2)));
segmentMap.put( segmentMap.put(
@ -185,12 +189,6 @@ public class CompactionTaskTest
); );
} }
SEGMENTS = new ArrayList<>(segmentMap.keySet()); SEGMENTS = new ArrayList<>(segmentMap.keySet());
toolbox = new TestTaskToolbox(
new TestTaskActionClient(new ArrayList<>(segmentMap.keySet())),
new TestIndexIO(objectMapper, segmentMap),
segmentMap
);
} }
private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMapper) private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMapper)
@ -272,6 +270,16 @@ public class CompactionTaskTest
@Rule @Rule
public ExpectedException expectedException = ExpectedException.none(); public ExpectedException expectedException = ExpectedException.none();
@Before
public void setup()
{
toolbox = new TestTaskToolbox(
new TestTaskActionClient(new ArrayList<>(segmentMap.keySet())),
new TestIndexIO(objectMapper, segmentMap),
segmentMap
);
}
@Test @Test
public void testSerdeWithInterval() throws IOException public void testSerdeWithInterval() throws IOException
{ {
@ -415,6 +423,24 @@ public class CompactionTaskTest
); );
} }
@Test
public void testMissingMetadata() throws IOException, SegmentLoadingException
{
expectedException.expect(RuntimeException.class);
expectedException.expectMessage(CoreMatchers.startsWith("Index metadata doesn't exist for segment"));
final TestIndexIO indexIO = (TestIndexIO) toolbox.getIndexIO();
indexIO.removeMetadata(Iterables.getFirst(indexIO.getQueryableIndexMap().keySet(), null));
final List<DataSegment> segments = new ArrayList<>(SEGMENTS);
CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(segments),
null,
TUNING_CONFIG,
objectMapper
);
}
private static DimensionsSpec getExpectedDimensionsSpecForAutoGeneration() private static DimensionsSpec getExpectedDimensionsSpecForAutoGeneration()
{ {
return new DimensionsSpec( return new DimensionsSpec(
@ -599,9 +625,13 @@ public class CompactionTaskTest
} }
} }
final Metadata metadata = new Metadata(); final Metadata metadata = new Metadata(
metadata.setAggregators(aggregatorFactories.toArray(new AggregatorFactory[aggregatorFactories.size()])); null,
metadata.setRollup(false); aggregatorFactories.toArray(new AggregatorFactory[0]),
null,
null,
null
);
queryableIndexMap.put( queryableIndexMap.put(
entry.getValue(), entry.getValue(),
@ -622,6 +652,31 @@ public class CompactionTaskTest
{ {
return queryableIndexMap.get(file); return queryableIndexMap.get(file);
} }
void removeMetadata(File file)
{
final SimpleQueryableIndex index = (SimpleQueryableIndex) queryableIndexMap.get(file);
if (index != null) {
queryableIndexMap.put(
file,
new SimpleQueryableIndex(
index.getDataInterval(),
index.getColumnNames(),
index.getAvailableDimensions(),
index.getBitmapFactoryForDimensions(),
index.getColumns(),
index.getFileMapper(),
null,
index.getDimensionHandlers()
)
);
}
}
Map<File, QueryableIndex> getQueryableIndexMap()
{
return queryableIndexMap;
}
} }
private static Column createColumn(DimensionSchema dimensionSchema) private static Column createColumn(DimensionSchema dimensionSchema)

View File

@ -25,11 +25,13 @@ import io.druid.guice.annotations.PublicApi;
import io.druid.java.util.common.granularity.Granularity; import io.druid.java.util.common.granularity.Granularity;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import javax.annotation.Nullable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
/** /**
@ -39,71 +41,66 @@ public class Metadata
{ {
// container is used for arbitrary key-value pairs in segment metadata e.g. // container is used for arbitrary key-value pairs in segment metadata e.g.
// kafka firehose uses it to store commit offset // kafka firehose uses it to store commit offset
@JsonProperty
private final Map<String, Object> container; private final Map<String, Object> container;
@Nullable
private final AggregatorFactory[] aggregators;
@Nullable
private final TimestampSpec timestampSpec;
@Nullable
private final Granularity queryGranularity;
@Nullable
private final Boolean rollup;
@JsonProperty public Metadata(
private AggregatorFactory[] aggregators; @JsonProperty("container") @Nullable Map<String, Object> container,
@JsonProperty("aggregators") @Nullable AggregatorFactory[] aggregators,
@JsonProperty @JsonProperty("timestampSpec") @Nullable TimestampSpec timestampSpec,
private TimestampSpec timestampSpec; @JsonProperty("queryGranularity") @Nullable Granularity queryGranularity,
@JsonProperty("rollup") @Nullable Boolean rollup
@JsonProperty )
private Granularity queryGranularity;
@JsonProperty
private Boolean rollup;
public Metadata()
{ {
container = new ConcurrentHashMap<>(); this.container = container == null ? new ConcurrentHashMap<>() : container;
this.aggregators = aggregators;
this.timestampSpec = timestampSpec;
this.queryGranularity = queryGranularity;
this.rollup = rollup;
} }
@JsonProperty
public Map<String, Object> getContainer()
{
return container;
}
@JsonProperty
@Nullable
public AggregatorFactory[] getAggregators() public AggregatorFactory[] getAggregators()
{ {
return aggregators; return aggregators;
} }
public Metadata setAggregators(AggregatorFactory[] aggregators) @JsonProperty
{ @Nullable
this.aggregators = aggregators;
return this;
}
public TimestampSpec getTimestampSpec() public TimestampSpec getTimestampSpec()
{ {
return timestampSpec; return timestampSpec;
} }
public Metadata setTimestampSpec(TimestampSpec timestampSpec) @JsonProperty
{ @Nullable
this.timestampSpec = timestampSpec;
return this;
}
public Granularity getQueryGranularity() public Granularity getQueryGranularity()
{ {
return queryGranularity; return queryGranularity;
} }
public Metadata setQueryGranularity(Granularity queryGranularity) @JsonProperty
{ @Nullable
this.queryGranularity = queryGranularity;
return this;
}
public Boolean isRollup() public Boolean isRollup()
{ {
return rollup; return rollup;
} }
public Metadata setRollup(Boolean rollup) public Metadata putAll(@Nullable Map<String, Object> other)
{
this.rollup = rollup;
return this;
}
public Metadata putAll(Map<String, Object> other)
{ {
if (other != null) { if (other != null) {
container.putAll(other); container.putAll(other);
@ -116,7 +113,7 @@ public class Metadata
return container.get(key); return container.get(key);
} }
public Metadata put(String key, Object value) public Metadata put(String key, @Nullable Object value)
{ {
if (value != null) { if (value != null) {
container.put(key, value); container.put(key, value);
@ -127,9 +124,10 @@ public class Metadata
// arbitrary key-value pairs from the metadata just follow the semantics of last one wins if same // arbitrary key-value pairs from the metadata just follow the semantics of last one wins if same
// key exists in multiple input Metadata containers // key exists in multiple input Metadata containers
// for others e.g. Aggregators, appropriate merging is done // for others e.g. Aggregators, appropriate merging is done
@Nullable
public static Metadata merge( public static Metadata merge(
List<Metadata> toBeMerged, @Nullable List<Metadata> toBeMerged,
AggregatorFactory[] overrideMergedAggregators @Nullable AggregatorFactory[] overrideMergedAggregators
) )
{ {
if (toBeMerged == null || toBeMerged.size() == 0) { if (toBeMerged == null || toBeMerged.size() == 0) {
@ -139,7 +137,7 @@ public class Metadata
boolean foundSomeMetadata = false; boolean foundSomeMetadata = false;
Map<String, Object> mergedContainer = new HashMap<>(); Map<String, Object> mergedContainer = new HashMap<>();
List<AggregatorFactory[]> aggregatorsToMerge = overrideMergedAggregators == null List<AggregatorFactory[]> aggregatorsToMerge = overrideMergedAggregators == null
? new ArrayList<AggregatorFactory[]>() ? new ArrayList<>()
: null; : null;
List<TimestampSpec> timestampSpecsToMerge = new ArrayList<>(); List<TimestampSpec> timestampSpecsToMerge = new ArrayList<>();
@ -179,20 +177,17 @@ public class Metadata
return null; return null;
} }
Metadata result = new Metadata(); final AggregatorFactory[] mergedAggregators = aggregatorsToMerge == null ?
if (aggregatorsToMerge != null) { overrideMergedAggregators :
result.setAggregators(AggregatorFactory.mergeAggregators(aggregatorsToMerge)); AggregatorFactory.mergeAggregators(aggregatorsToMerge);
} else {
result.setAggregators(overrideMergedAggregators);
}
if (timestampSpecsToMerge != null) { final TimestampSpec mergedTimestampSpec = timestampSpecsToMerge == null ?
result.setTimestampSpec(TimestampSpec.mergeTimestampSpec(timestampSpecsToMerge)); null :
} TimestampSpec.mergeTimestampSpec(timestampSpecsToMerge);
if (gransToMerge != null) { final Granularity mergedGranularity = gransToMerge == null ?
result.setQueryGranularity(Granularity.mergeGranularities(gransToMerge)); null :
} Granularity.mergeGranularities(gransToMerge);
Boolean rollup = null; Boolean rollup = null;
if (rollupToMerge != null && !rollupToMerge.isEmpty()) { if (rollupToMerge != null && !rollupToMerge.isEmpty()) {
@ -210,10 +205,13 @@ public class Metadata
} }
} }
result.setRollup(rollup); return new Metadata(
result.container.putAll(mergedContainer); mergedContainer,
return result; mergedAggregators,
mergedTimestampSpec,
mergedGranularity,
rollup
);
} }
@Override @Override
@ -225,37 +223,18 @@ public class Metadata
if (o == null || getClass() != o.getClass()) { if (o == null || getClass() != o.getClass()) {
return false; return false;
} }
final Metadata metadata = (Metadata) o;
Metadata metadata = (Metadata) o; return Objects.equals(container, metadata.container) &&
Arrays.equals(aggregators, metadata.aggregators) &&
if (!container.equals(metadata.container)) { Objects.equals(timestampSpec, metadata.timestampSpec) &&
return false; Objects.equals(queryGranularity, metadata.queryGranularity) &&
} Objects.equals(rollup, metadata.rollup);
// Probably incorrect - comparing Object[] arrays with Arrays.equals
if (!Arrays.equals(aggregators, metadata.aggregators)) {
return false;
}
if (timestampSpec != null ? !timestampSpec.equals(metadata.timestampSpec) : metadata.timestampSpec != null) {
return false;
}
if (rollup != null ? !rollup.equals(metadata.rollup) : metadata.rollup != null) {
return false;
}
return queryGranularity != null
? queryGranularity.equals(metadata.queryGranularity)
: metadata.queryGranularity == null;
} }
@Override @Override
public int hashCode() public int hashCode()
{ {
int result = container.hashCode(); return Objects.hash(container, Arrays.hashCode(aggregators), timestampSpec, queryGranularity, rollup);
result = 31 * result + Arrays.hashCode(aggregators);
result = 31 * result + (timestampSpec != null ? timestampSpec.hashCode() : 0);
result = 31 * result + (queryGranularity != null ? queryGranularity.hashCode() : 0);
result = 31 * result + (rollup != null ? rollup.hashCode() : 0);
return result;
} }
@Override @Override

View File

@ -23,6 +23,7 @@ import io.druid.collections.bitmap.BitmapFactory;
import io.druid.segment.data.Indexed; import io.druid.segment.data.Indexed;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
@ -40,7 +41,7 @@ public interface QueryableIndex extends ColumnSelector, Closeable
int getNumRows(); int getNumRows();
Indexed<String> getAvailableDimensions(); Indexed<String> getAvailableDimensions();
BitmapFactory getBitmapFactoryForDimensions(); BitmapFactory getBitmapFactoryForDimensions();
Metadata getMetadata(); @Nullable Metadata getMetadata();
Map<String, DimensionHandler> getDimensionHandlers(); Map<String, DimensionHandler> getDimensionHandlers();
/** /**

View File

@ -19,6 +19,7 @@
package io.druid.segment; package io.druid.segment;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
@ -43,6 +44,7 @@ public class SimpleQueryableIndex extends AbstractIndex implements QueryableInde
private final BitmapFactory bitmapFactory; private final BitmapFactory bitmapFactory;
private final Map<String, Column> columns; private final Map<String, Column> columns;
private final SmooshedFileMapper fileMapper; private final SmooshedFileMapper fileMapper;
@Nullable
private final Metadata metadata; private final Metadata metadata;
private final Map<String, DimensionHandler> dimensionHandlers; private final Map<String, DimensionHandler> dimensionHandlers;
@ -51,11 +53,11 @@ public class SimpleQueryableIndex extends AbstractIndex implements QueryableInde
BitmapFactory bitmapFactory, BitmapFactory bitmapFactory,
Map<String, Column> columns, Map<String, Column> columns,
SmooshedFileMapper fileMapper, SmooshedFileMapper fileMapper,
Metadata metadata @Nullable Metadata metadata
) )
{ {
Preconditions.checkNotNull(columns.get(Column.TIME_COLUMN_NAME)); Preconditions.checkNotNull(columns.get(Column.TIME_COLUMN_NAME));
this.dataInterval = dataInterval; this.dataInterval = Preconditions.checkNotNull(dataInterval, "dataInterval");
ImmutableList.Builder<String> columnNamesBuilder = ImmutableList.builder(); ImmutableList.Builder<String> columnNamesBuilder = ImmutableList.builder();
for (String column : columns.keySet()) { for (String column : columns.keySet()) {
if (!Column.TIME_COLUMN_NAME.equals(column)) { if (!Column.TIME_COLUMN_NAME.equals(column)) {
@ -72,6 +74,28 @@ public class SimpleQueryableIndex extends AbstractIndex implements QueryableInde
initDimensionHandlers(); initDimensionHandlers();
} }
@VisibleForTesting
public SimpleQueryableIndex(
Interval interval,
List<String> columnNames,
Indexed<String> availableDimensions,
BitmapFactory bitmapFactory,
Map<String, Column> columns,
SmooshedFileMapper fileMapper,
@Nullable Metadata metadata,
Map<String, DimensionHandler> dimensionHandlers
)
{
this.dataInterval = interval;
this.columnNames = columnNames;
this.availableDimensions = availableDimensions;
this.bitmapFactory = bitmapFactory;
this.columns = columns;
this.fileMapper = fileMapper;
this.metadata = metadata;
this.dimensionHandlers = dimensionHandlers;
}
@Override @Override
public Interval getDataInterval() public Interval getDataInterval()
{ {
@ -115,6 +139,18 @@ public class SimpleQueryableIndex extends AbstractIndex implements QueryableInde
return columns.get(columnName); return columns.get(columnName);
} }
@VisibleForTesting
public Map<String, Column> getColumns()
{
return columns;
}
@VisibleForTesting
public SmooshedFileMapper getFileMapper()
{
return fileMapper;
}
@Override @Override
public void close() public void close()
{ {

View File

@ -276,11 +276,13 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
this.reportParseExceptions = reportParseExceptions; this.reportParseExceptions = reportParseExceptions;
this.columnCapabilities = Maps.newHashMap(); this.columnCapabilities = Maps.newHashMap();
this.metadata = new Metadata() this.metadata = new Metadata(
.setAggregators(getCombiningAggregators(metrics)) null,
.setTimestampSpec(incrementalIndexSchema.getTimestampSpec()) getCombiningAggregators(metrics),
.setQueryGranularity(this.gran) incrementalIndexSchema.getTimestampSpec(),
.setRollup(this.rollup); this.gran,
this.rollup
);
this.aggs = initAggs(metrics, rowSupplier, deserializeComplexMetrics, concurrentEventAdd); this.aggs = initAggs(metrics, rowSupplier, deserializeComplexMetrics, concurrentEventAdd);

View File

@ -274,13 +274,13 @@ public class IndexMergerTestBase
assertDimCompression(index, indexSpec.getDimensionCompression()); assertDimCompression(index, indexSpec.getDimensionCompression());
Assert.assertEquals( Assert.assertEquals(
new Metadata() new Metadata(
.setAggregators( metadataElems,
IncrementalIndexTest.getDefaultCombiningAggregatorFactories() IncrementalIndexTest.getDefaultCombiningAggregatorFactories(),
) null,
.setQueryGranularity(Granularities.NONE) Granularities.NONE,
.setRollup(Boolean.TRUE) Boolean.TRUE
.putAll(metadataElems), ),
index.getMetadata() index.getMetadata()
); );
} }

View File

@ -31,6 +31,7 @@ import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
/** /**
@ -42,15 +43,17 @@ public class MetadataTest
{ {
ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
Metadata metadata = new Metadata();
metadata.put("k", "v");
AggregatorFactory[] aggregators = new AggregatorFactory[] { AggregatorFactory[] aggregators = new AggregatorFactory[] {
new LongSumAggregatorFactory("out", "in") new LongSumAggregatorFactory("out", "in")
}; };
metadata.setAggregators(aggregators);
metadata.setQueryGranularity(Granularities.ALL); Metadata metadata = new Metadata(
metadata.setRollup(Boolean.FALSE); Collections.singletonMap("k", "v"),
aggregators,
null,
Granularities.ALL,
Boolean.FALSE
);
Metadata other = jsonMapper.readValue( Metadata other = jsonMapper.readValue(
jsonMapper.writeValueAsString(metadata), jsonMapper.writeValueAsString(metadata),
@ -75,30 +78,39 @@ public class MetadataTest
AggregatorFactory[] aggs = new AggregatorFactory[] { AggregatorFactory[] aggs = new AggregatorFactory[] {
new LongMaxAggregatorFactory("n", "f") new LongMaxAggregatorFactory("n", "f")
}; };
Metadata m1 = new Metadata(); final Metadata m1 = new Metadata(
m1.put("k", "v"); Collections.singletonMap("k", "v"),
m1.setAggregators(aggs); aggs,
m1.setTimestampSpec(new TimestampSpec("ds", "auto", null)); new TimestampSpec("ds", "auto", null),
m1.setQueryGranularity(Granularities.ALL); Granularities.ALL,
m1.setRollup(Boolean.FALSE); Boolean.FALSE
);
Metadata m2 = new Metadata(); final Metadata m2 = new Metadata(
m2.put("k", "v"); Collections.singletonMap("k", "v"),
m2.setAggregators(aggs); aggs,
m2.setTimestampSpec(new TimestampSpec("ds", "auto", null)); new TimestampSpec("ds", "auto", null),
m2.setQueryGranularity(Granularities.ALL); Granularities.ALL,
m2.setRollup(Boolean.FALSE); Boolean.FALSE
);
Metadata merged = new Metadata(); final Metadata m3 = new Metadata(
merged.put("k", "v"); Collections.singletonMap("k", "v"),
merged.setAggregators( aggs,
new TimestampSpec("ds", "auto", null),
Granularities.ALL,
Boolean.TRUE
);
final Metadata merged = new Metadata(
Collections.singletonMap("k", "v"),
new AggregatorFactory[]{ new AggregatorFactory[]{
new LongMaxAggregatorFactory("n", "n") new LongMaxAggregatorFactory("n", "n")
} },
new TimestampSpec("ds", "auto", null),
Granularities.ALL,
Boolean.FALSE
); );
merged.setTimestampSpec(new TimestampSpec("ds", "auto", null));
merged.setRollup(Boolean.FALSE);
merged.setQueryGranularity(Granularities.ALL);
Assert.assertEquals(merged, Metadata.merge(ImmutableList.of(m1, m2), null)); Assert.assertEquals(merged, Metadata.merge(ImmutableList.of(m1, m2), null));
//merge check with one metadata being null //merge check with one metadata being null
@ -107,29 +119,32 @@ public class MetadataTest
metadataToBeMerged.add(m2); metadataToBeMerged.add(m2);
metadataToBeMerged.add(null); metadataToBeMerged.add(null);
merged.setAggregators(null); final Metadata merged2 = new Metadata(Collections.singletonMap("k", "v"), null, null, null, null);
merged.setTimestampSpec(null);
merged.setQueryGranularity(null); Assert.assertEquals(merged2, Metadata.merge(metadataToBeMerged, null));
merged.setRollup(null);
Assert.assertEquals(merged, Metadata.merge(metadataToBeMerged, null));
//merge check with client explicitly providing merged aggregators //merge check with client explicitly providing merged aggregators
AggregatorFactory[] explicitAggs = new AggregatorFactory[] { AggregatorFactory[] explicitAggs = new AggregatorFactory[] {
new DoubleMaxAggregatorFactory("x", "y") new DoubleMaxAggregatorFactory("x", "y")
}; };
merged.setAggregators(explicitAggs);
final Metadata merged3 = new Metadata(Collections.singletonMap("k", "v"), explicitAggs, null, null, null);
Assert.assertEquals( Assert.assertEquals(
merged, merged3,
Metadata.merge(metadataToBeMerged, explicitAggs) Metadata.merge(metadataToBeMerged, explicitAggs)
); );
merged.setTimestampSpec(new TimestampSpec("ds", "auto", null)); final Metadata merged4 = new Metadata(
merged.setQueryGranularity(Granularities.ALL); Collections.singletonMap("k", "v"),
m1.setRollup(Boolean.TRUE); explicitAggs,
new TimestampSpec("ds", "auto", null),
Granularities.ALL,
null
);
Assert.assertEquals( Assert.assertEquals(
merged, merged4,
Metadata.merge(ImmutableList.of(m1, m2), explicitAggs) Metadata.merge(ImmutableList.of(m3, m2), explicitAggs)
); );
} }
} }