allow compaction to work with spatial dimensions (#15321)

This commit is contained in:
Clint Wylie 2023-11-03 11:27:50 -07:00 committed by GitHub
parent 0cc8839a60
commit 5d39b94149
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 218 additions and 4 deletions

View File

@ -34,7 +34,9 @@ import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
import org.apache.druid.client.indexing.NoopOverlordClient; import org.apache.druid.client.indexing.NoopOverlordClient;
import org.apache.druid.data.input.impl.CSVParseSpec; import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.NewSpatialDimensionSchema;
import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatus;
@ -67,6 +69,8 @@ import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.filter.SelectorDimFilter; import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.Cursor; import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.IndexSpec;
@ -205,16 +209,33 @@ public class CompactionTaskRunTest extends IngestionTestBase
List<Interval> intervals List<Interval> intervals
) throws JsonProcessingException ) throws JsonProcessingException
{ {
ObjectMapper mapper = new DefaultObjectMapper();
// Expected compaction state to exist after compaction as we store compaction state by default
Map<String, String> expectedLongSumMetric = new HashMap<>(); Map<String, String> expectedLongSumMetric = new HashMap<>();
expectedLongSumMetric.put("type", "longSum"); expectedLongSumMetric.put("type", "longSum");
expectedLongSumMetric.put("name", "val"); expectedLongSumMetric.put("name", "val");
expectedLongSumMetric.put("fieldName", "val"); expectedLongSumMetric.put("fieldName", "val");
return getDefaultCompactionState(
segmentGranularity,
queryGranularity,
intervals,
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))),
expectedLongSumMetric
);
}
public static CompactionState getDefaultCompactionState(
Granularity segmentGranularity,
Granularity queryGranularity,
List<Interval> intervals,
DimensionsSpec expectedDims,
Map<String, String> expectedMetric
) throws JsonProcessingException
{
ObjectMapper mapper = new DefaultObjectMapper();
// Expected compaction state to exist after compaction as we store compaction state by default
return new CompactionState( return new CompactionState(
new DynamicPartitionsSpec(5000000, Long.MAX_VALUE), new DynamicPartitionsSpec(5000000, Long.MAX_VALUE),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))), expectedDims,
ImmutableList.of(expectedLongSumMetric), ImmutableList.of(expectedMetric),
null, null,
IndexSpec.DEFAULT.asMap(mapper), IndexSpec.DEFAULT.asMap(mapper),
mapper.readValue( mapper.readValue(
@ -1572,6 +1593,135 @@ public class CompactionTaskRunTest extends IngestionTestBase
Assert.assertEquals(TaskState.FAILED, compactionResult.lhs.getStatusCode()); Assert.assertEquals(TaskState.FAILED, compactionResult.lhs.getStatusCode());
} }
@Test
public void testRunWithSpatialDimensions() throws Exception
{
final List<String> spatialrows = ImmutableList.of(
"2014-01-01T00:00:10Z,a,10,100,1\n",
"2014-01-01T00:00:10Z,b,20,110,2\n",
"2014-01-01T00:00:10Z,c,30,120,3\n",
"2014-01-01T01:00:20Z,a,10,100,1\n",
"2014-01-01T01:00:20Z,b,20,110,2\n",
"2014-01-01T01:00:20Z,c,30,120,3\n"
);
final ParseSpec spatialSpec = new CSVParseSpec(
new TimestampSpec("ts", "auto", null),
DimensionsSpec.builder()
.setDimensions(Arrays.asList(
new StringDimensionSchema("ts"),
new StringDimensionSchema("dim"),
new NewSpatialDimensionSchema("spatial", Arrays.asList("x", "y"))
))
.build(),
"|",
Arrays.asList("ts", "dim", "x", "y", "val"),
false,
0
);
runIndexTask(null, null, spatialSpec, spatialrows, false);
final Builder builder = new Builder(
DATA_SOURCE,
segmentCacheManagerFactory,
RETRY_POLICY_FACTORY
);
final CompactionTask compactionTask = builder
.interval(Intervals.of("2014-01-01/2014-01-02"))
.build();
final Pair<TaskStatus, List<DataSegment>> resultPair = runTask(compactionTask);
Assert.assertTrue(resultPair.lhs.isSuccess());
final List<DataSegment> segments = resultPair.rhs;
Assert.assertEquals(2, segments.size());
for (int i = 0; i < 2; i++) {
Assert.assertEquals(
Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1),
segments.get(i).getInterval()
);
Map<String, String> expectedLongSumMetric = new HashMap<>();
expectedLongSumMetric.put("name", "val");
expectedLongSumMetric.put("type", "longSum");
expectedLongSumMetric.put("fieldName", "val");
Assert.assertEquals(
getDefaultCompactionState(
Granularities.HOUR,
Granularities.MINUTE,
ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1)),
DimensionsSpec.builder()
.setDimensions(Arrays.asList(
new StringDimensionSchema("ts"),
new StringDimensionSchema("dim"),
new NewSpatialDimensionSchema("spatial", Collections.singletonList("spatial"))
))
.build(),
expectedLongSumMetric
),
segments.get(i).getLastCompactionState()
);
if (lockGranularity == LockGranularity.SEGMENT) {
Assert.assertEquals(
new NumberedOverwriteShardSpec(32768, 0, 2, (short) 1, (short) 1),
segments.get(i).getShardSpec()
);
} else {
Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(i).getShardSpec());
}
}
final File cacheDir = temporaryFolder.newFolder();
final SegmentCacheManager segmentCacheManager = segmentCacheManagerFactory.manufacturate(cacheDir);
List<String> rowsFromSegment = new ArrayList<>();
for (DataSegment segment : segments) {
final File segmentFile = segmentCacheManager.getSegmentFiles(segment);
final WindowedStorageAdapter adapter = new WindowedStorageAdapter(
new QueryableIndexStorageAdapter(testUtils.getTestIndexIO().loadIndex(segmentFile)),
segment.getInterval()
);
final Sequence<Cursor> cursorSequence = adapter.getAdapter().makeCursors(
null,
segment.getInterval(),
VirtualColumns.EMPTY,
Granularities.ALL,
false,
null
);
cursorSequence.accumulate(rowsFromSegment, (accumulated, cursor) -> {
cursor.reset();
final ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
Assert.assertTrue(factory.getColumnCapabilities("spatial").hasSpatialIndexes());
while (!cursor.isDone()) {
final ColumnValueSelector<?> selector1 = factory.makeColumnValueSelector("ts");
final DimensionSelector selector2 = factory.makeDimensionSelector(new DefaultDimensionSpec("dim", "dim"));
final DimensionSelector selector3 = factory.makeDimensionSelector(new DefaultDimensionSpec("spatial", "spatial"));
final DimensionSelector selector4 = factory.makeDimensionSelector(new DefaultDimensionSpec("val", "val"));
rowsFromSegment.add(
StringUtils.format(
"%s,%s,%s,%s\n",
selector1.getObject(),
selector2.getObject(),
selector3.getObject(),
selector4.getObject()
)
);
cursor.advance();
}
return accumulated;
});
}
Assert.assertEquals(spatialrows, rowsFromSegment);
}
private Pair<TaskStatus, List<DataSegment>> runIndexTask() throws Exception private Pair<TaskStatus, List<DataSegment>> runIndexTask() throws Exception
{ {
return runIndexTask(null, null, false); return runIndexTask(null, null, false);
@ -1620,6 +1770,46 @@ public class CompactionTaskRunTest extends IngestionTestBase
return runTask(indexTask, readyLatchToCountDown, latchToAwaitBeforeRun); return runTask(indexTask, readyLatchToCountDown, latchToAwaitBeforeRun);
} }
private Pair<TaskStatus, List<DataSegment>> runIndexTask(
@Nullable CountDownLatch readyLatchToCountDown,
@Nullable CountDownLatch latchToAwaitBeforeRun,
ParseSpec parseSpec,
List<String> rows,
boolean appendToExisting
) throws Exception
{
File tmpDir = temporaryFolder.newFolder();
File tmpFile = File.createTempFile("druid", "index", tmpDir);
try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) {
for (String testRow : rows) {
writer.write(testRow);
}
}
IndexTask indexTask = new IndexTask(
null,
null,
IndexTaskTest.createIngestionSpec(
getObjectMapper(),
tmpDir,
parseSpec,
null,
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.MINUTE,
null
),
IndexTaskTest.createTuningConfig(2, 2, null, 2L, null, false, true),
appendToExisting,
false
),
null
);
return runTask(indexTask, readyLatchToCountDown, latchToAwaitBeforeRun);
}
private Pair<TaskStatus, List<DataSegment>> runTask(Task task) throws Exception private Pair<TaskStatus, List<DataSegment>> runTask(Task task) throws Exception
{ {
return runTask(task, null, null); return runTask(task, null, null);

View File

@ -21,6 +21,7 @@ package org.apache.druid.segment;
import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling; import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling;
import org.apache.druid.data.input.impl.NewSpatialDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.Closer;
@ -31,6 +32,7 @@ import org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
import org.apache.druid.segment.selector.settable.SettableDimensionValueSelector; import org.apache.druid.segment.selector.settable.SettableDimensionValueSelector;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
public class StringDimensionHandler implements DimensionHandler<Integer, int[], String> public class StringDimensionHandler implements DimensionHandler<Integer, int[], String>
@ -124,6 +126,9 @@ public class StringDimensionHandler implements DimensionHandler<Integer, int[],
@Override @Override
public DimensionSchema getDimensionSchema(ColumnCapabilities capabilities) public DimensionSchema getDimensionSchema(ColumnCapabilities capabilities)
{ {
if (hasSpatialIndexes) {
return new NewSpatialDimensionSchema(dimensionName, Collections.singletonList(dimensionName));
}
return new StringDimensionSchema(dimensionName, multiValueHandling, hasBitmapIndexes); return new StringDimensionSchema(dimensionName, multiValueHandling, hasBitmapIndexes);
} }

View File

@ -24,6 +24,7 @@ import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DoubleDimensionSchema; import org.apache.druid.data.input.impl.DoubleDimensionSchema;
import org.apache.druid.data.input.impl.FloatDimensionSchema; import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.LongDimensionSchema; import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.NewSpatialDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnCapabilities;
@ -115,6 +116,24 @@ public class DimensionHandlerUtilsTest extends InitializedNullHandlingTest
Assert.assertTrue(stringHandler.getDimensionSchema(stringCapabilities) instanceof StringDimensionSchema); Assert.assertTrue(stringHandler.getDimensionSchema(stringCapabilities) instanceof StringDimensionSchema);
} }
@Test
public void testGetHandlerFromStringCapabilitiesSpatialIndexes()
{
ColumnCapabilities stringCapabilities = ColumnCapabilitiesImpl.createSimpleSingleValueStringColumnCapabilities()
.setHasBitmapIndexes(true)
.setDictionaryEncoded(true)
.setDictionaryValuesUnique(true)
.setDictionaryValuesUnique(true)
.setHasSpatialIndexes(true);
DimensionHandler spatialHandler = DimensionHandlerUtils.getHandlerFromCapabilities(
DIM_NAME,
stringCapabilities,
DimensionSchema.MultiValueHandling.SORTED_SET
);
Assert.assertTrue(spatialHandler instanceof StringDimensionHandler);
Assert.assertTrue(spatialHandler.getDimensionSchema(stringCapabilities) instanceof NewSpatialDimensionSchema);
}
@Test @Test
public void testGetHandlerFromFloatCapabilities() public void testGetHandlerFromFloatCapabilities()
{ {