Add last_compaction_state to sys.segments table (#10413)

* Add is_compacted to sys.segments table

* change is_compacted to last_compaction_state

* fix tests

* fix tests

* address comments
This commit is contained in:
Maytas Monsereenusorn 2020-09-23 15:29:36 -07:00 committed by GitHub
parent 19c4b16640
commit 72f1b55f56
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 101 additions and 27 deletions

View File

@ -100,7 +100,7 @@ public class DataSegment implements Comparable<DataSegment>, Overshadowable<Data
/** /**
* Stores some configurations of the compaction task which created this segment. * Stores some configurations of the compaction task which created this segment.
* This field is filled in the metadata store only when "storeCompactionState" is set true in the context of the * This field is filled in the metadata store only when "storeCompactionState" is set true in the context of the
* compaction task which is false by default. * task. True by default see {@link org.apache.druid.indexing.common.task.Tasks#DEFAULT_STORE_COMPACTION_STATE}.
* Also, this field can be pruned in many Druid modules when this class is loaded from the metadata store. * Also, this field can be pruned in many Druid modules when this class is loaded from the metadata store.
* See {@link PruneLastCompactionState} for details. * See {@link PruneLastCompactionState} for details.
*/ */

View File

@ -33,6 +33,7 @@ import java.lang.annotation.Target;
* *
* - In auto compaction of the coordinator, "lastCompactionState" is used to determine whether the given * - In auto compaction of the coordinator, "lastCompactionState" is used to determine whether the given
* segment needs further compaction or not. * segment needs further compaction or not.
* - In Metadata store information API of the coordinator, "lastCompactionState" is part of the sys.segments table
* - In parallel indexing, "lastCompactionState" should be serialized and deserialized properly when * - In parallel indexing, "lastCompactionState" should be serialized and deserialized properly when
* the sub tasks report the pushed segments to the supervisor task. * the sub tasks report the pushed segments to the supervisor task.
*/ */

View File

@ -1086,6 +1086,7 @@ Segments table provides details on all Druid segments, whether they are publishe
|shardSpec|STRING|The toString of specific `ShardSpec`| |shardSpec|STRING|The toString of specific `ShardSpec`|
|dimensions|STRING|The dimensions of the segment| |dimensions|STRING|The dimensions of the segment|
|metrics|STRING|The metrics of the segment| |metrics|STRING|The metrics of the segment|
|last_compaction_state|STRING|The configurations of the compaction task which created this segment. May be null if segment was not created by compaction task.|
For example to retrieve all segments for datasource "wikipedia", use the query: For example to retrieve all segments for datasource "wikipedia", use the query:
@ -1107,6 +1108,18 @@ GROUP BY 1
ORDER BY 2 DESC ORDER BY 2 DESC
``` ```
If you want to retrieve segment that was compacted (ANY compaction):
```sql
SELECT * FROM sys.segments WHERE last_compaction_state is not null
```
or if you want to retrieve segment that was compacted only by a particular compaction spec (such as that of the auto compaction):
```sql
SELECT * FROM sys.segments WHERE last_compaction_state == 'SELECT * FROM sys.segments where last_compaction_state = 'CompactionState{partitionsSpec=DynamicPartitionsSpec{maxRowsPerSegment=5000000, maxTotalRows=9223372036854775807}, indexSpec={bitmap={type=roaring, compressRunOnSerialization=true}, dimensionCompression=lz4, metricCompression=lz4, longEncoding=longs, segmentLoader=null}}'
```
*Caveat:* Note that a segment can be served by more than one stream ingestion tasks or Historical processes, in that case it would have multiple replicas. These replicas are weakly consistent with each other when served by multiple ingestion tasks, until a segment is eventually served by a Historical, at that point the segment is immutable. Broker prefers to query a segment from Historical over an ingestion task. But if a segment has multiple realtime replicas, for e.g.. Kafka index tasks, and one task is slower than other, then the sys.segments query results can vary for the duration of the tasks because only one of the ingestion tasks is queried by the Broker and it is not guaranteed that the same task gets picked every time. The `num_rows` column of segments table can have inconsistent values during this period. There is an open [issue](https://github.com/apache/druid/issues/5915) about this inconsistency with stream ingestion tasks. *Caveat:* Note that a segment can be served by more than one stream ingestion tasks or Historical processes, in that case it would have multiple replicas. These replicas are weakly consistent with each other when served by multiple ingestion tasks, until a segment is eventually served by a Historical, at that point the segment is immutable. Broker prefers to query a segment from Historical over an ingestion task. But if a segment has multiple realtime replicas, for e.g.. Kafka index tasks, and one task is slower than other, then the sys.segments query results can vary for the duration of the tasks because only one of the ingestion tasks is queried by the Broker and it is not guaranteed that the same task gets picked every time. The `num_rows` column of segments table can have inconsistent values during this period. There is an open [issue](https://github.com/apache/druid/issues/5915) about this inconsistency with stream ingestion tasks.
#### SERVERS table #### SERVERS table

View File

@ -126,6 +126,8 @@ public class CompactionTask extends AbstractBatchIndexTask
private static final String TYPE = "compact"; private static final String TYPE = "compact";
private static final boolean STORE_COMPACTION_STATE = true;
static { static {
Verify.verify(TYPE.equals(CompactSegments.COMPACTION_TASK_TYPE)); Verify.verify(TYPE.equals(CompactSegments.COMPACTION_TASK_TYPE));
} }
@ -422,6 +424,7 @@ public class CompactionTask extends AbstractBatchIndexTask
{ {
final Map<String, Object> newContext = new HashMap<>(getContext()); final Map<String, Object> newContext = new HashMap<>(getContext());
newContext.put(CTX_KEY_APPENDERATOR_TRACKING_TASK_ID, getId()); newContext.put(CTX_KEY_APPENDERATOR_TRACKING_TASK_ID, getId());
newContext.putIfAbsent(CompactSegments.STORE_COMPACTION_STATE_KEY, STORE_COMPACTION_STATE);
// Set the priority of the compaction task. // Set the priority of the compaction task.
newContext.put(Tasks.PRIORITY_KEY, getPriority()); newContext.put(Tasks.PRIORITY_KEY, getPriority());
return newContext; return newContext;

View File

@ -51,7 +51,7 @@ public class Tasks
public static final String LOCK_TIMEOUT_KEY = "taskLockTimeout"; public static final String LOCK_TIMEOUT_KEY = "taskLockTimeout";
public static final String FORCE_TIME_CHUNK_LOCK_KEY = "forceTimeChunkLock"; public static final String FORCE_TIME_CHUNK_LOCK_KEY = "forceTimeChunkLock";
/** /**
* This context is used in auto compaction. When it is set in the context, the segments created by the task * This context is used in compaction. When it is set in the context, the segments created by the task
* will fill 'lastCompactionState' in its metadata. This will be used to track what segments are compacted or not. * will fill 'lastCompactionState' in its metadata. This will be used to track what segments are compacted or not.
* See {@link org.apache.druid.timeline.DataSegment} and {@link * See {@link org.apache.druid.timeline.DataSegment} and {@link
* org.apache.druid.server.coordinator.duty.NewestSegmentFirstIterator} for more details. * org.apache.druid.server.coordinator.duty.NewestSegmentFirstIterator} for more details.

View File

@ -140,7 +140,6 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
final CompactionTask compactionTask = builder final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null)) .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING) .tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
.context(ImmutableMap.of(Tasks.STORE_COMPACTION_STATE_KEY, true))
.build(); .build();
final Set<DataSegment> compactedSegments = runTask(compactionTask); final Set<DataSegment> compactedSegments = runTask(compactionTask);
@ -153,6 +152,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
lockGranularity == LockGranularity.TIME_CHUNK ? NumberedShardSpec.class : NumberedOverwriteShardSpec.class, lockGranularity == LockGranularity.TIME_CHUNK ? NumberedShardSpec.class : NumberedOverwriteShardSpec.class,
segment.getShardSpec().getClass() segment.getShardSpec().getClass()
); );
// Expecte compaction state to exist as store compaction state by default
Assert.assertEquals(expectedState, segment.getLastCompactionState()); Assert.assertEquals(expectedState, segment.getLastCompactionState());
} }
} }
@ -174,7 +174,6 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
final CompactionTask compactionTask = builder final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null)) .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.tuningConfig(newTuningConfig(new HashedPartitionsSpec(null, 3, null), 2, true)) .tuningConfig(newTuningConfig(new HashedPartitionsSpec(null, 3, null), 2, true))
.context(ImmutableMap.of(Tasks.STORE_COMPACTION_STATE_KEY, true))
.build(); .build();
final Set<DataSegment> compactedSegments = runTask(compactionTask); final Set<DataSegment> compactedSegments = runTask(compactionTask);
@ -183,6 +182,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()) compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
); );
for (DataSegment segment : compactedSegments) { for (DataSegment segment : compactedSegments) {
// Expecte compaction state to exist as store compaction state by default
Assert.assertSame(HashBasedNumberedShardSpec.class, segment.getShardSpec().getClass()); Assert.assertSame(HashBasedNumberedShardSpec.class, segment.getShardSpec().getClass());
Assert.assertEquals(expectedState, segment.getLastCompactionState()); Assert.assertEquals(expectedState, segment.getLastCompactionState());
} }
@ -205,7 +205,6 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
final CompactionTask compactionTask = builder final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null)) .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.tuningConfig(newTuningConfig(new SingleDimensionPartitionsSpec(7, null, "dim", false), 2, true)) .tuningConfig(newTuningConfig(new SingleDimensionPartitionsSpec(7, null, "dim", false), 2, true))
.context(ImmutableMap.of(Tasks.STORE_COMPACTION_STATE_KEY, true))
.build(); .build();
final Set<DataSegment> compactedSegments = runTask(compactionTask); final Set<DataSegment> compactedSegments = runTask(compactionTask);
@ -214,11 +213,40 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()) compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
); );
for (DataSegment segment : compactedSegments) { for (DataSegment segment : compactedSegments) {
// Expecte compaction state to exist as store compaction state by default
Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass()); Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass());
Assert.assertEquals(expectedState, segment.getLastCompactionState()); Assert.assertEquals(expectedState, segment.getLastCompactionState());
} }
} }
@Test
public void testRunCompactionStateNotStoreIfContextSetToFalse()
{
runIndexTask(null, true);
final Builder builder = new Builder(
DATA_SOURCE,
getSegmentLoaderFactory(),
RETRY_POLICY_FACTORY
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
.context(ImmutableMap.of(Tasks.STORE_COMPACTION_STATE_KEY, false))
.build();
final Set<DataSegment> compactedSegments = runTask(compactionTask);
for (DataSegment segment : compactedSegments) {
Assert.assertSame(
lockGranularity == LockGranularity.TIME_CHUNK ? NumberedShardSpec.class : NumberedOverwriteShardSpec.class,
segment.getShardSpec().getClass()
);
// Expecte compaction state to exist as store compaction state by default
Assert.assertEquals(null, segment.getLastCompactionState());
}
}
@Test @Test
public void testCompactHashAndDynamicPartitionedSegments() public void testCompactHashAndDynamicPartitionedSegments()
{ {

View File

@ -130,6 +130,8 @@ public class CompactionTaskRunTest extends IngestionTestBase
false, false,
0 0
); );
// Expecte compaction state to exist after compaction as we store compaction state by default
private static CompactionState DEFAULT_COMPACTION_STATE; private static CompactionState DEFAULT_COMPACTION_STATE;
private static final List<String> TEST_ROWS = ImmutableList.of( private static final List<String> TEST_ROWS = ImmutableList.of(
@ -761,6 +763,9 @@ public class CompactionTaskRunTest extends IngestionTestBase
null null
); );
// This is a regular index so we need to explicitly add this context to store the CompactionState
indexTask.addToContext(Tasks.STORE_COMPACTION_STATE_KEY, true);
final Pair<TaskStatus, List<DataSegment>> resultPair = runTask(indexTask); final Pair<TaskStatus, List<DataSegment>> resultPair = runTask(indexTask);
Assert.assertTrue(resultPair.lhs.isSuccess()); Assert.assertTrue(resultPair.lhs.isSuccess());
@ -853,7 +858,6 @@ public class CompactionTaskRunTest extends IngestionTestBase
final TaskToolbox box = createTaskToolbox(objectMapper, task); final TaskToolbox box = createTaskToolbox(objectMapper, task);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK); task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
task.addToContext(Tasks.STORE_COMPACTION_STATE_KEY, true);
if (task.isReady(box.getTaskActionClient())) { if (task.isReady(box.getTaskActionClient())) {
if (readyLatchToCountDown != null) { if (readyLatchToCountDown != null) {
readyLatchToCountDown.countDown(); readyLatchToCountDown.countDown();

View File

@ -15,6 +15,7 @@
"is_overshadowed": 0, "is_overshadowed": 0,
"shardSpec": "NoneShardSpec", "shardSpec": "NoneShardSpec",
"dimensions": "[anonymous, area_code, city, continent_code, country_name, dma_code, geo, language, namespace, network, newpage, page, postal_code, region_lookup, robot, unpatrolled, user]", "dimensions": "[anonymous, area_code, city, continent_code, country_name, dma_code, geo, language, namespace, network, newpage, page, postal_code, region_lookup, robot, unpatrolled, user]",
"metrics": "[added, count, deleted, delta, delta_hist, unique_users, variation]" "metrics": "[added, count, deleted, delta, delta_hist, unique_users, variation]",
"last_compaction_state": null
} }
] ]

View File

@ -67,7 +67,6 @@ import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.metrics.QueryCountStatsProvider; import org.apache.druid.server.metrics.QueryCountStatsProvider;
import org.apache.druid.server.router.TieredBrokerConfig; import org.apache.druid.server.router.TieredBrokerConfig;
import org.apache.druid.sql.guice.SqlModule; import org.apache.druid.sql.guice.SqlModule;
import org.apache.druid.timeline.PruneLastCompactionState;
import org.apache.druid.timeline.PruneLoadSpec; import org.apache.druid.timeline.PruneLoadSpec;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
@ -102,7 +101,6 @@ public class CliBroker extends ServerRunnable
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8082); binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8082);
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(8282); binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(8282);
binder.bindConstant().annotatedWith(PruneLoadSpec.class).to(true); binder.bindConstant().annotatedWith(PruneLoadSpec.class).to(true);
binder.bindConstant().annotatedWith(PruneLastCompactionState.class).to(true);
binder.bind(ResponseContextConfig.class).toInstance(ResponseContextConfig.newConfig(false)); binder.bind(ResponseContextConfig.class).toInstance(ResponseContextConfig.newConfig(false));
binder.bind(CachingClusteredClient.class).in(LazySingleton.class); binder.bind(CachingClusteredClient.class).in(LazySingleton.class);

View File

@ -145,6 +145,7 @@ public class SystemSchema extends AbstractSchema
.add("shardSpec", ValueType.STRING) .add("shardSpec", ValueType.STRING)
.add("dimensions", ValueType.STRING) .add("dimensions", ValueType.STRING)
.add("metrics", ValueType.STRING) .add("metrics", ValueType.STRING)
.add("last_compaction_state", ValueType.STRING)
.build(); .build();
static final RowSignature SERVERS_SIGNATURE = RowSignature static final RowSignature SERVERS_SIGNATURE = RowSignature
@ -311,7 +312,8 @@ public class SystemSchema extends AbstractSchema
val.isOvershadowed() ? IS_OVERSHADOWED_TRUE : IS_OVERSHADOWED_FALSE, val.isOvershadowed() ? IS_OVERSHADOWED_TRUE : IS_OVERSHADOWED_FALSE,
segment.getShardSpec(), segment.getShardSpec(),
segment.getDimensions(), segment.getDimensions(),
segment.getMetrics() segment.getMetrics(),
segment.getLastCompactionState()
}; };
}); });
@ -343,7 +345,8 @@ public class SystemSchema extends AbstractSchema
IS_OVERSHADOWED_FALSE, // there is an assumption here that unpublished segments are never overshadowed IS_OVERSHADOWED_FALSE, // there is an assumption here that unpublished segments are never overshadowed
val.getValue().getSegment().getShardSpec(), val.getValue().getSegment().getShardSpec(),
val.getValue().getSegment().getDimensions(), val.getValue().getSegment().getDimensions(),
val.getValue().getSegment().getMetrics() val.getValue().getSegment().getMetrics(),
null // unpublished segments from realtime tasks will not be compacted yet
}; };
}); });

View File

@ -47,6 +47,7 @@ import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.discovery.DruidNodeDiscovery; import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeRole; import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
@ -89,6 +90,7 @@ import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.sql.calcite.util.TestServerInventoryView; import org.apache.druid.sql.calcite.util.TestServerInventoryView;
import org.apache.druid.sql.calcite.view.NoopViewManager; import org.apache.druid.sql.calcite.view.NoopViewManager;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.SegmentWithOvershadowedStatus; import org.apache.druid.timeline.SegmentWithOvershadowedStatus;
@ -271,8 +273,12 @@ public class SystemSchemaTest extends CalciteTestBase
); );
} }
private final CompactionState expectedCompactionState = new CompactionState(
new DynamicPartitionsSpec(null, null),
Collections.singletonMap("test", "map")
);
private final DataSegment publishedSegment1 = new DataSegment( private final DataSegment publishedCompactedSegment1 = new DataSegment(
"wikipedia1", "wikipedia1",
Intervals.of("2007/2008"), Intervals.of("2007/2008"),
"version1", "version1",
@ -280,10 +286,11 @@ public class SystemSchemaTest extends CalciteTestBase
ImmutableList.of("dim1", "dim2"), ImmutableList.of("dim1", "dim2"),
ImmutableList.of("met1", "met2"), ImmutableList.of("met1", "met2"),
null, null,
expectedCompactionState,
1, 1,
53000L 53000L
); );
private final DataSegment publishedSegment2 = new DataSegment( private final DataSegment publishedCompactedSegment2 = new DataSegment(
"wikipedia2", "wikipedia2",
Intervals.of("2008/2009"), Intervals.of("2008/2009"),
"version2", "version2",
@ -291,10 +298,11 @@ public class SystemSchemaTest extends CalciteTestBase
ImmutableList.of("dim1", "dim2"), ImmutableList.of("dim1", "dim2"),
ImmutableList.of("met1", "met2"), ImmutableList.of("met1", "met2"),
null, null,
expectedCompactionState,
1, 1,
83000L 83000L
); );
private final DataSegment publishedSegment3 = new DataSegment( private final DataSegment publishedUncompactedSegment3 = new DataSegment(
"wikipedia3", "wikipedia3",
Intervals.of("2009/2010"), Intervals.of("2009/2010"),
"version3", "version3",
@ -302,6 +310,7 @@ public class SystemSchemaTest extends CalciteTestBase
ImmutableList.of("dim1", "dim2"), ImmutableList.of("dim1", "dim2"),
ImmutableList.of("met1", "met2"), ImmutableList.of("met1", "met2"),
null, null,
null,
1, 1,
47000L 47000L
); );
@ -495,7 +504,7 @@ public class SystemSchemaTest extends CalciteTestBase
final RelDataType rowType = segmentsTable.getRowType(new JavaTypeFactoryImpl()); final RelDataType rowType = segmentsTable.getRowType(new JavaTypeFactoryImpl());
final List<RelDataTypeField> fields = rowType.getFieldList(); final List<RelDataTypeField> fields = rowType.getFieldList();
Assert.assertEquals(16, fields.size()); Assert.assertEquals(17, fields.size());
final SystemSchema.TasksTable tasksTable = (SystemSchema.TasksTable) schema.getTableMap().get("tasks"); final SystemSchema.TasksTable tasksTable = (SystemSchema.TasksTable) schema.getTableMap().get("tasks");
final RelDataType sysRowType = tasksTable.getRowType(new JavaTypeFactoryImpl()); final RelDataType sysRowType = tasksTable.getRowType(new JavaTypeFactoryImpl());
@ -518,9 +527,9 @@ public class SystemSchemaTest extends CalciteTestBase
{ {
final SegmentsTable segmentsTable = new SegmentsTable(druidSchema, metadataView, authMapper); final SegmentsTable segmentsTable = new SegmentsTable(druidSchema, metadataView, authMapper);
final Set<SegmentWithOvershadowedStatus> publishedSegments = new HashSet<>(Arrays.asList( final Set<SegmentWithOvershadowedStatus> publishedSegments = new HashSet<>(Arrays.asList(
new SegmentWithOvershadowedStatus(publishedSegment1, true), new SegmentWithOvershadowedStatus(publishedCompactedSegment1, true),
new SegmentWithOvershadowedStatus(publishedSegment2, false), new SegmentWithOvershadowedStatus(publishedCompactedSegment2, false),
new SegmentWithOvershadowedStatus(publishedSegment3, false), new SegmentWithOvershadowedStatus(publishedUncompactedSegment3, false),
new SegmentWithOvershadowedStatus(segment1, true), new SegmentWithOvershadowedStatus(segment1, true),
new SegmentWithOvershadowedStatus(segment2, false) new SegmentWithOvershadowedStatus(segment2, false)
)); ));
@ -576,7 +585,8 @@ public class SystemSchemaTest extends CalciteTestBase
1L, //is_published 1L, //is_published
1L, //is_available 1L, //is_available
0L, //is_realtime 0L, //is_realtime
1L //is_overshadowed 1L, //is_overshadowed
null //is_compacted
); );
verifyRow( verifyRow(
@ -589,7 +599,8 @@ public class SystemSchemaTest extends CalciteTestBase
1L, //is_published 1L, //is_published
1L, //is_available 1L, //is_available
0L, //is_realtime 0L, //is_realtime
0L //is_overshadowed 0L, //is_overshadowed,
null //is_compacted
); );
//segment test3 is unpublished and has a NumberedShardSpec with partitionNum = 2 //segment test3 is unpublished and has a NumberedShardSpec with partitionNum = 2
@ -603,7 +614,8 @@ public class SystemSchemaTest extends CalciteTestBase
0L, //is_published 0L, //is_published
1L, //is_available 1L, //is_available
0L, //is_realtime 0L, //is_realtime
0L //is_overshadowed 0L, //is_overshadowed
null //is_compacted
); );
verifyRow( verifyRow(
@ -616,7 +628,8 @@ public class SystemSchemaTest extends CalciteTestBase
0L, //is_published 0L, //is_published
1L, //is_available 1L, //is_available
1L, //is_realtime 1L, //is_realtime
0L //is_overshadowed 0L, //is_overshadowed
null //is_compacted
); );
verifyRow( verifyRow(
@ -629,10 +642,12 @@ public class SystemSchemaTest extends CalciteTestBase
0L, //is_published 0L, //is_published
1L, //is_available 1L, //is_available
1L, //is_realtime 1L, //is_realtime
0L //is_overshadowed 0L, //is_overshadowed
null //is_compacted
); );
// wikipedia segments are published and unavailable, num_replicas is 0 // wikipedia segments are published and unavailable, num_replicas is 0
// wikipedia segment 1 and 2 are compacted while 3 are not compacted
verifyRow( verifyRow(
rows.get(5), rows.get(5),
"wikipedia1_2007-01-01T00:00:00.000Z_2008-01-01T00:00:00.000Z_version1", "wikipedia1_2007-01-01T00:00:00.000Z_2008-01-01T00:00:00.000Z_version1",
@ -643,7 +658,8 @@ public class SystemSchemaTest extends CalciteTestBase
1L, //is_published 1L, //is_published
0L, //is_available 0L, //is_available
0L, //is_realtime 0L, //is_realtime
1L //is_overshadowed 1L, //is_overshadowed
expectedCompactionState //is_compacted
); );
verifyRow( verifyRow(
@ -656,7 +672,8 @@ public class SystemSchemaTest extends CalciteTestBase
1L, //is_published 1L, //is_published
0L, //is_available 0L, //is_available
0L, //is_realtime 0L, //is_realtime
0L //is_overshadowed 0L, //is_overshadowed
expectedCompactionState //is_compacted
); );
verifyRow( verifyRow(
@ -669,7 +686,8 @@ public class SystemSchemaTest extends CalciteTestBase
1L, //is_published 1L, //is_published
0L, //is_available 0L, //is_available
0L, //is_realtime 0L, //is_realtime
0L //is_overshadowed 0L, //is_overshadowed
null //is_compacted
); );
// Verify value types. // Verify value types.
@ -686,7 +704,8 @@ public class SystemSchemaTest extends CalciteTestBase
long isPublished, long isPublished,
long isAvailable, long isAvailable,
long isRealtime, long isRealtime,
long isOvershadowed long isOvershadowed,
CompactionState compactionState
) )
{ {
Assert.assertEquals(segmentId, row[0].toString()); Assert.assertEquals(segmentId, row[0].toString());
@ -703,6 +722,7 @@ public class SystemSchemaTest extends CalciteTestBase
Assert.assertEquals(isAvailable, row[10]); Assert.assertEquals(isAvailable, row[10]);
Assert.assertEquals(isRealtime, row[11]); Assert.assertEquals(isRealtime, row[11]);
Assert.assertEquals(isOvershadowed, row[12]); Assert.assertEquals(isOvershadowed, row[12]);
Assert.assertEquals(compactionState, row[16]);
} }
@Test @Test
@ -1271,6 +1291,8 @@ public class SystemSchemaTest extends CalciteTestBase
expectedClass = SegmentId.class; expectedClass = SegmentId.class;
} else if (signature.getColumnName(i).equals("shardSpec")) { } else if (signature.getColumnName(i).equals("shardSpec")) {
expectedClass = ShardSpec.class; expectedClass = ShardSpec.class;
} else if (signature.getColumnName(i).equals("last_compaction_state")) {
expectedClass = CompactionState.class;
} else if (signature.getColumnName(i).equals("dimensions") || signature.getColumnName(i).equals("metrics")) { } else if (signature.getColumnName(i).equals("dimensions") || signature.getColumnName(i).equals("metrics")) {
expectedClass = List.class; expectedClass = List.class;
} else { } else {

View File

@ -1498,6 +1498,7 @@ is_overshadowed
is_published is_published
is_realtime is_realtime
java.sql.Types java.sql.Types
last_compaction_state
max_size max_size
num_replicas num_replicas
num_rows num_rows