diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index 275e7670661..e78068086f2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -34,7 +34,9 @@ import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec; import org.apache.druid.client.indexing.NoopOverlordClient; import org.apache.druid.data.input.impl.CSVParseSpec; import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.NewSpatialDimensionSchema; import org.apache.druid.data.input.impl.ParseSpec; +import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; @@ -67,6 +69,8 @@ import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.filter.SelectorDimFilter; import org.apache.druid.rpc.indexing.OverlordClient; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.IndexSpec; @@ -205,16 +209,33 @@ public class CompactionTaskRunTest extends IngestionTestBase List intervals ) throws JsonProcessingException { - ObjectMapper mapper = new DefaultObjectMapper(); - // Expected compaction state to exist after compaction as we store compaction state by default Map expectedLongSumMetric = new HashMap<>(); expectedLongSumMetric.put("type", "longSum"); expectedLongSumMetric.put("name", "val"); expectedLongSumMetric.put("fieldName", "val"); + return getDefaultCompactionState( + segmentGranularity, + queryGranularity, + intervals, + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))), + expectedLongSumMetric + ); + } + + public static CompactionState getDefaultCompactionState( + Granularity segmentGranularity, + Granularity queryGranularity, + List intervals, + DimensionsSpec expectedDims, + Map expectedMetric + ) throws JsonProcessingException + { + ObjectMapper mapper = new DefaultObjectMapper(); + // Expected compaction state to exist after compaction as we store compaction state by default return new CompactionState( new DynamicPartitionsSpec(5000000, Long.MAX_VALUE), - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))), - ImmutableList.of(expectedLongSumMetric), + expectedDims, + ImmutableList.of(expectedMetric), null, IndexSpec.DEFAULT.asMap(mapper), mapper.readValue( @@ -1572,6 +1593,135 @@ public class CompactionTaskRunTest extends IngestionTestBase Assert.assertEquals(TaskState.FAILED, compactionResult.lhs.getStatusCode()); } + @Test + public void testRunWithSpatialDimensions() throws Exception + { + final List spatialrows = ImmutableList.of( + "2014-01-01T00:00:10Z,a,10,100,1\n", + "2014-01-01T00:00:10Z,b,20,110,2\n", + "2014-01-01T00:00:10Z,c,30,120,3\n", + "2014-01-01T01:00:20Z,a,10,100,1\n", + "2014-01-01T01:00:20Z,b,20,110,2\n", + "2014-01-01T01:00:20Z,c,30,120,3\n" + ); + final ParseSpec spatialSpec = new CSVParseSpec( + new TimestampSpec("ts", "auto", null), + DimensionsSpec.builder() + .setDimensions(Arrays.asList( + new StringDimensionSchema("ts"), + new StringDimensionSchema("dim"), + new NewSpatialDimensionSchema("spatial", Arrays.asList("x", "y")) + )) + .build(), + "|", + Arrays.asList("ts", "dim", "x", "y", "val"), + false, + 0 + ); + runIndexTask(null, null, spatialSpec, spatialrows, false); + + final Builder builder = new Builder( + DATA_SOURCE, + segmentCacheManagerFactory, + RETRY_POLICY_FACTORY + ); + + final CompactionTask compactionTask = builder + .interval(Intervals.of("2014-01-01/2014-01-02")) + .build(); + + final Pair> resultPair = runTask(compactionTask); + + Assert.assertTrue(resultPair.lhs.isSuccess()); + + final List segments = resultPair.rhs; + Assert.assertEquals(2, segments.size()); + + for (int i = 0; i < 2; i++) { + Assert.assertEquals( + Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1), + segments.get(i).getInterval() + ); + Map expectedLongSumMetric = new HashMap<>(); + expectedLongSumMetric.put("name", "val"); + expectedLongSumMetric.put("type", "longSum"); + expectedLongSumMetric.put("fieldName", "val"); + Assert.assertEquals( + getDefaultCompactionState( + Granularities.HOUR, + Granularities.MINUTE, + ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1)), + DimensionsSpec.builder() + .setDimensions(Arrays.asList( + new StringDimensionSchema("ts"), + new StringDimensionSchema("dim"), + new NewSpatialDimensionSchema("spatial", Collections.singletonList("spatial")) + )) + .build(), + expectedLongSumMetric + ), + segments.get(i).getLastCompactionState() + ); + if (lockGranularity == LockGranularity.SEGMENT) { + Assert.assertEquals( + new NumberedOverwriteShardSpec(32768, 0, 2, (short) 1, (short) 1), + segments.get(i).getShardSpec() + ); + } else { + Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(i).getShardSpec()); + } + } + + final File cacheDir = temporaryFolder.newFolder(); + final SegmentCacheManager segmentCacheManager = segmentCacheManagerFactory.manufacturate(cacheDir); + + List rowsFromSegment = new ArrayList<>(); + for (DataSegment segment : segments) { + final File segmentFile = segmentCacheManager.getSegmentFiles(segment); + + final WindowedStorageAdapter adapter = new WindowedStorageAdapter( + new QueryableIndexStorageAdapter(testUtils.getTestIndexIO().loadIndex(segmentFile)), + segment.getInterval() + ); + final Sequence cursorSequence = adapter.getAdapter().makeCursors( + null, + segment.getInterval(), + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + + cursorSequence.accumulate(rowsFromSegment, (accumulated, cursor) -> { + cursor.reset(); + final ColumnSelectorFactory factory = cursor.getColumnSelectorFactory(); + Assert.assertTrue(factory.getColumnCapabilities("spatial").hasSpatialIndexes()); + while (!cursor.isDone()) { + final ColumnValueSelector selector1 = factory.makeColumnValueSelector("ts"); + final DimensionSelector selector2 = factory.makeDimensionSelector(new DefaultDimensionSpec("dim", "dim")); + final DimensionSelector selector3 = factory.makeDimensionSelector(new DefaultDimensionSpec("spatial", "spatial")); + final DimensionSelector selector4 = factory.makeDimensionSelector(new DefaultDimensionSpec("val", "val")); + + + rowsFromSegment.add( + StringUtils.format( + "%s,%s,%s,%s\n", + selector1.getObject(), + selector2.getObject(), + selector3.getObject(), + selector4.getObject() + ) + ); + + cursor.advance(); + } + + return accumulated; + }); + } + Assert.assertEquals(spatialrows, rowsFromSegment); + } + private Pair> runIndexTask() throws Exception { return runIndexTask(null, null, false); @@ -1620,6 +1770,46 @@ public class CompactionTaskRunTest extends IngestionTestBase return runTask(indexTask, readyLatchToCountDown, latchToAwaitBeforeRun); } + private Pair> runIndexTask( + @Nullable CountDownLatch readyLatchToCountDown, + @Nullable CountDownLatch latchToAwaitBeforeRun, + ParseSpec parseSpec, + List rows, + boolean appendToExisting + ) throws Exception + { + File tmpDir = temporaryFolder.newFolder(); + File tmpFile = File.createTempFile("druid", "index", tmpDir); + + try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + for (String testRow : rows) { + writer.write(testRow); + } + } + + IndexTask indexTask = new IndexTask( + null, + null, + IndexTaskTest.createIngestionSpec( + getObjectMapper(), + tmpDir, + parseSpec, + null, + new UniformGranularitySpec( + Granularities.HOUR, + Granularities.MINUTE, + null + ), + IndexTaskTest.createTuningConfig(2, 2, null, 2L, null, false, true), + appendToExisting, + false + ), + null + ); + + return runTask(indexTask, readyLatchToCountDown, latchToAwaitBeforeRun); + } + private Pair> runTask(Task task) throws Exception { return runTask(task, null, null); diff --git a/processing/src/main/java/org/apache/druid/segment/StringDimensionHandler.java b/processing/src/main/java/org/apache/druid/segment/StringDimensionHandler.java index b3ff6639403..49366023231 100644 --- a/processing/src/main/java/org/apache/druid/segment/StringDimensionHandler.java +++ b/processing/src/main/java/org/apache/druid/segment/StringDimensionHandler.java @@ -21,6 +21,7 @@ package org.apache.druid.segment; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling; +import org.apache.druid.data.input.impl.NewSpatialDimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.io.Closer; @@ -31,6 +32,7 @@ import org.apache.druid.segment.selector.settable.SettableColumnValueSelector; import org.apache.druid.segment.selector.settable.SettableDimensionValueSelector; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; +import java.util.Collections; import java.util.Comparator; public class StringDimensionHandler implements DimensionHandler @@ -124,6 +126,9 @@ public class StringDimensionHandler implements DimensionHandler