From dfb5a988882848b855bc1a65358cf4f2ff0f2064 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Wed, 23 Aug 2023 14:51:25 +0530 Subject: [PATCH] Add coordinator API for unused segments (#14846) There is a current issue due to inconsistent metadata between worker and controller in MSQ. A controller can receive one set of segments, which are then marked as unused by, say, a compaction job. The worker would be unable to get the segment information as MetadataResource. --- docs/api-reference/legacy-metadata-api.md | 5 ++++ .../apache/druid/msq/exec/ControllerImpl.java | 6 ++++ .../msq/exec/TaskDataSegmentProvider.java | 15 ++++++---- .../org/apache/druid/msq/exec/WorkerImpl.java | 7 ++++- .../druid/msq/indexing/MSQControllerTask.java | 14 +++++++++ .../input/table/SegmentsInputSliceReader.java | 6 ++-- .../msq/querykit/DataSegmentProvider.java | 3 +- .../druid/msq/sql/MSQTaskSqlEngine.java | 2 ++ .../msq/util/MultiStageQueryContext.java | 10 +++++++ .../msq/exec/TaskDataSegmentProviderTest.java | 4 +-- .../druid/msq/test/CalciteMSQTestsHelper.java | 2 +- .../apache/druid/msq/test/MSQTestBase.java | 2 +- .../indexing/input/DruidInputSource.java | 2 +- ...stractParallelIndexSupervisorTaskTest.java | 2 +- ...TestIndexerMetadataStorageCoordinator.java | 2 +- .../client/coordinator/CoordinatorClient.java | 5 ++-- .../coordinator/CoordinatorClientImpl.java | 7 +++-- .../IndexerMetadataStorageCoordinator.java | 10 +++++-- .../IndexerSQLMetadataStorageCoordinator.java | 14 ++++++--- .../metadata/SqlSegmentsMetadataQuery.java | 23 +++++++++++++++ .../druid/server/http/MetadataResource.java | 7 +++-- .../CoordinatorClientImplTest.java | 29 +++++++++++++++++-- .../coordinator/NoopCoordinatorClient.java | 2 +- ...exerSQLMetadataStorageCoordinatorTest.java | 15 ++++++++++ .../server/http/MetadataResourceTest.java | 21 +++++++++----- 25 files changed, 173 insertions(+), 42 deletions(-) diff --git a/docs/api-reference/legacy-metadata-api.md b/docs/api-reference/legacy-metadata-api.md index 11b99b6c84e..fe031c7348a 100644 --- a/docs/api-reference/legacy-metadata-api.md +++ b/docs/api-reference/legacy-metadata-api.md @@ -153,6 +153,11 @@ Returns a list of all segments for a datasource with the full segment metadata a Returns full segment metadata for a specific segment as stored in the metadata store, if the segment is used. If the segment is unused, or is unknown, a 404 response is returned. +`GET /druid/coordinator/v1/metadata/datasources/{dataSourceName}/segments/{segmentId}?includeUnused=true` + +Returns full segment metadata for a specific segment as stored in the metadata store. If the is unknown, a 404 response +is returned. + `GET /druid/coordinator/v1/metadata/datasources/{dataSourceName}/segments` Returns a list of all segments, overlapping with any of given intervals, for a datasource as stored in the metadata store. Request body is array of string IS0 8601 intervals like `[interval1, interval2,...]`—for example, `["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"]`. diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index c9fdc6cf9d9..19733dea0ac 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -616,6 +616,12 @@ public class ControllerImpl implements Controller ); } } + + taskContextOverridesBuilder.put( + MultiStageQueryContext.CTX_IS_REINDEX, + MSQControllerTask.isReplaceInputDataSourceTask(task) + ); + this.workerTaskLauncher = new MSQWorkerTaskLauncher( id(), task.getDataSource(), diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java index c4a9d1ae401..c327ec340ae 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java @@ -72,7 +72,8 @@ public class TaskDataSegmentProvider implements DataSegmentProvider @Override public Supplier> fetchSegment( final SegmentId segmentId, - final ChannelCounters channelCounters + final ChannelCounters channelCounters, + final boolean isReindex ) { // Returns Supplier instead of ResourceHolder, so the Coordinator calls and segment downloads happen @@ -84,7 +85,7 @@ public class TaskDataSegmentProvider implements DataSegmentProvider holder = holders.computeIfAbsent( segmentId, k -> new SegmentHolder( - () -> fetchSegmentInternal(segmentId, channelCounters), + () -> fetchSegmentInternal(segmentId, channelCounters, isReindex), () -> holders.remove(segmentId) ) ).get(); @@ -95,20 +96,22 @@ public class TaskDataSegmentProvider implements DataSegmentProvider } /** - * Helper used by {@link #fetchSegment(SegmentId, ChannelCounters)}. Does the actual fetching of a segment, once it + * Helper used by {@link #fetchSegment(SegmentId, ChannelCounters, boolean)}. Does the actual fetching of a segment, once it * is determined that we definitely need to go out and get one. */ private ReferenceCountingResourceHolder fetchSegmentInternal( final SegmentId segmentId, - final ChannelCounters channelCounters + final ChannelCounters channelCounters, + final boolean isReindex ) { final DataSegment dataSegment; try { dataSegment = FutureUtils.get( - coordinatorClient.fetchUsedSegment( + coordinatorClient.fetchSegment( segmentId.getDataSource(), - segmentId.toString() + segmentId.toString(), + !isReindex ), true ); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java index 090cc976093..95f656015ac 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java @@ -1100,7 +1100,12 @@ public class WorkerImpl implements Worker .put(ExternalInputSlice.class, new ExternalInputSliceReader(frameContext.tempDir())) .put(InlineInputSlice.class, new InlineInputSliceReader(frameContext.segmentWrangler())) .put(LookupInputSlice.class, new LookupInputSliceReader(frameContext.segmentWrangler())) - .put(SegmentsInputSlice.class, new SegmentsInputSliceReader(frameContext.dataSegmentProvider())) + .put(SegmentsInputSlice.class, + new SegmentsInputSliceReader( + frameContext.dataSegmentProvider(), + MultiStageQueryContext.isReindex(QueryContext.of(task().getContext())) + ) + ) .build() ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java index c39b7637dd6..5a7c0abbfda 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java @@ -267,6 +267,20 @@ public class MSQControllerTask extends AbstractTask implements ClientTaskQuery return querySpec.getDestination() instanceof DataSourceMSQDestination; } + /** + * Returns true if the task reads from the same table as the destionation. In this case, we would prefer to fail + * instead of reading any unused segments to ensure that old data is not read. + */ + public static boolean isReplaceInputDataSourceTask(MSQControllerTask task) + { + return task.getQuerySpec() + .getQuery() + .getDataSource() + .getTableNames() + .stream() + .anyMatch(datasouce -> task.getDataSource().equals(datasouce)); + } + public static boolean writeResultsToDurableStorage(final MSQSpec querySpec) { return querySpec.getDestination() instanceof DurableStorageMSQDestination; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java index f7efd287aed..5334c4cb2ab 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java @@ -40,10 +40,12 @@ import java.util.function.Consumer; public class SegmentsInputSliceReader implements InputSliceReader { private final DataSegmentProvider dataSegmentProvider; + private final boolean isReindex; - public SegmentsInputSliceReader(final DataSegmentProvider dataSegmentProvider) + public SegmentsInputSliceReader(final DataSegmentProvider dataSegmentProvider, final boolean isReindex) { this.dataSegmentProvider = dataSegmentProvider; + this.isReindex = isReindex; } @Override @@ -91,7 +93,7 @@ public class SegmentsInputSliceReader implements InputSliceReader ); return new SegmentWithDescriptor( - dataSegmentProvider.fetchSegment(segmentId, channelCounters), + dataSegmentProvider.fetchSegment(segmentId, channelCounters, isReindex), descriptor ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java index 5f5f6099e5f..0e931c7f8ef 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java @@ -37,6 +37,7 @@ public interface DataSegmentProvider */ Supplier> fetchSegment( SegmentId segmentId, - ChannelCounters channelCounters + ChannelCounters channelCounters, + boolean isReindex ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java index 37b8692cb4d..ee7a590e1f0 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java @@ -37,6 +37,7 @@ import org.apache.druid.error.InvalidSqlInput; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.msq.querykit.QueryKitUtils; +import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.sql.calcite.parser.DruidSqlInsert; @@ -59,6 +60,7 @@ public class MSQTaskSqlEngine implements SqlEngine ImmutableSet.builder() .addAll(NativeSqlEngine.SYSTEM_CONTEXT_PARAMETERS) .add(QueryKitUtils.CTX_TIME_COLUMN_NAME) + .add(MultiStageQueryContext.CTX_IS_REINDEX) .build(); public static final List TASK_STRUCT_FIELD_NAMES = ImmutableList.of("TASK"); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java index 7bdded98c1e..265f5eae0fe 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java @@ -110,6 +110,8 @@ public class MultiStageQueryContext // OnheapIncrementalIndex. For example: overheads related to creating bitmaps during persist. static final int DEFAULT_ROWS_IN_MEMORY = 100000; + public static final String CTX_IS_REINDEX = "isReindex"; + /** * Controls sort order within segments. Normally, this is the same as the overall order of the query (from the * CLUSTERED BY clause) but it can be overridden. @@ -146,6 +148,14 @@ public class MultiStageQueryContext ); } + public static boolean isReindex(final QueryContext queryContext) + { + return queryContext.getBoolean( + CTX_IS_REINDEX, + true + ); + } + public static long getMaxInputBytesPerWorker(final QueryContext queryContext) { return queryContext.getLong( diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java index b6dd751e7dd..eafb443bafc 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java @@ -179,7 +179,7 @@ public class TaskDataSegmentProviderTest final int expectedSegmentNumber = i % NUM_SEGMENTS; final DataSegment segment = segments.get(expectedSegmentNumber); final ListenableFuture>> f = - exec.submit(() -> provider.fetchSegment(segment.getId(), new ChannelCounters())); + exec.submit(() -> provider.fetchSegment(segment.getId(), new ChannelCounters(), false)); testFutures.add( FutureUtils.transform( @@ -231,7 +231,7 @@ public class TaskDataSegmentProviderTest private class TestCoordinatorClientImpl extends NoopCoordinatorClient { @Override - public ListenableFuture fetchUsedSegment(String dataSource, String segmentId) + public ListenableFuture fetchSegment(String dataSource, String segmentId, boolean includeUnused) { for (final DataSegment segment : segments) { if (segment.getDataSource().equals(dataSource) && segment.getId().toString().equals(segmentId)) { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java index 39fa01b3da4..c2616c0514e 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java @@ -161,7 +161,7 @@ public class CalciteMSQTestsHelper )); binder.bind(DataSegmentAnnouncer.class).toInstance(new NoopDataSegmentAnnouncer()); binder.bind(DataSegmentProvider.class) - .toInstance((dataSegment, channelCounters) -> getSupplierForSegment(dataSegment)); + .toInstance((segmentId, channelCounters, isReindex) -> getSupplierForSegment(segmentId)); GroupByQueryConfig groupByQueryConfig = new GroupByQueryConfig(); binder.bind(GroupByStrategySelector.class) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 4e00fd657ac..8be3b346e1d 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -400,7 +400,7 @@ public class MSQTestBase extends BaseCalciteQueryTest binder.bind(QueryProcessingPool.class) .toInstance(new ForwardingQueryProcessingPool(Execs.singleThreaded("Test-runner-processing-pool"))); binder.bind(DataSegmentProvider.class) - .toInstance((dataSegment, channelCounters) -> getSupplierForSegment(dataSegment)); + .toInstance((dataSegment, channelCounters, isReindex) -> getSupplierForSegment(dataSegment)); binder.bind(IndexIO.class).toInstance(indexIO); binder.bind(SpecificSegmentsQuerySegmentWalker.class).toInstance(qf.walker()); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java index 03d58cb96d9..bf8b4bfb1d5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java @@ -581,7 +581,7 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI ); for (WindowedSegmentId windowedSegmentId : Preconditions.checkNotNull(segmentIds, "segmentIds")) { final DataSegment segment = FutureUtils.getUnchecked( - coordinatorClient.fetchUsedSegment(dataSource, windowedSegmentId.getSegmentId()), + coordinatorClient.fetchSegment(dataSource, windowedSegmentId.getSegmentId(), false), true ); for (Interval interval : windowedSegmentId.getIntervals()) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index 24cc1096091..480a37c5723 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -1039,7 +1039,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase } @Override - public ListenableFuture fetchUsedSegment(String dataSource, String segmentId) + public ListenableFuture fetchSegment(String dataSource, String segmentId, boolean includeUnused) { ImmutableDruidDataSource druidDataSource; try { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index 41a688fc9fa..1db186e5a48 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -229,7 +229,7 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto } @Override - public DataSegment retrieveUsedSegmentForId(final String id) + public DataSegment retrieveSegmentForId(final String id, boolean includeUnused) { return null; } diff --git a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java index c497dcb6894..08110f61f05 100644 --- a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java +++ b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java @@ -35,9 +35,10 @@ public interface CoordinatorClient ListenableFuture isHandoffComplete(String dataSource, SegmentDescriptor descriptor); /** - * Fetches segment metadata for the given dataSource and segmentId. + * Fetches segment metadata for the given dataSource and segmentId. If includeUnused is set to false, the segment is + * not returned if it is marked as unused. */ - ListenableFuture fetchUsedSegment(String dataSource, String segmentId); + ListenableFuture fetchSegment(String dataSource, String segmentId, boolean includeUnused); /** * Fetches segment metadata for the given dataSource and intervals. diff --git a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java index e023d9827bd..e93cbe830b3 100644 --- a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java +++ b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java @@ -71,12 +71,13 @@ public class CoordinatorClientImpl implements CoordinatorClient } @Override - public ListenableFuture fetchUsedSegment(String dataSource, String segmentId) + public ListenableFuture fetchSegment(String dataSource, String segmentId, boolean includeUnused) { final String path = StringUtils.format( - "/druid/coordinator/v1/metadata/datasources/%s/segments/%s", + "/druid/coordinator/v1/metadata/datasources/%s/segments/%s?includeUnused=%s", StringUtils.urlEncode(dataSource), - StringUtils.urlEncode(segmentId) + StringUtils.urlEncode(segmentId), + includeUnused ? "true" : "false" ); return FutureUtils.transform( diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index a954c1f2d67..fb65b810672 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -352,12 +352,16 @@ public interface IndexerMetadataStorageCoordinator void deleteSegments(Set segments); /** - * Retrieve the segment for a given id from the metadata store. Return null if no such used segment exists + * Retrieve the segment for a given id from the metadata store. Return null if no such segment exists + *
+ * If includeUnused is set, this also returns unused segments. Unused segments could be deleted by a kill task at any + * time and might lead to unexpected behaviour. This option exists mainly to provide a consistent view of the metadata, + * for example, in calls from MSQ controller and worker and would generally not be requrired. * * @param id The segment id * - * @return DataSegment corresponding to given id + * @return DataSegment used segment corresponding to given id */ - DataSegment retrieveUsedSegmentForId(String id); + DataSegment retrieveSegmentForId(String id, boolean includeUnused); } diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 25a96a30765..ee1cff6476e 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -1810,12 +1810,18 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor } @Override - public DataSegment retrieveUsedSegmentForId(final String id) + public DataSegment retrieveSegmentForId(final String id, boolean includeUnused) { return connector.retryTransaction( - (handle, status) -> - SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper) - .retrieveUsedSegmentForId(id), + (handle, status) -> { + if (includeUnused) { + return SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper) + .retrieveSegmentForId(id); + } else { + return SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper) + .retrieveUsedSegmentForId(id); + } + }, 3, SQLMetadataConnector.DEFAULT_MAX_TRIES ); diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java index 01b110516ff..20b176c5091 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java @@ -238,6 +238,29 @@ public class SqlSegmentsMetadataQuery return null; } + /** + * Retrieve the segment for a given id if it exists in the metadata store and null otherwise + */ + public DataSegment retrieveSegmentForId(String id) + { + + final String query = "SELECT payload FROM %s WHERE id = :id"; + + final Query> sql = handle + .createQuery(StringUtils.format(query, dbTables.getSegmentsTable())) + .bind("id", id); + + final ResultIterator resultIterator = + sql.map((index, r, ctx) -> JacksonUtils.readValue(jsonMapper, r.getBytes(1), DataSegment.class)) + .iterator(); + + if (resultIterator.hasNext()) { + return resultIterator.next(); + } + + return null; + } + private CloseableIterator retrieveSegments( final String dataSource, final Collection intervals, diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java index 4f9631c1cfd..b81c6891ed3 100644 --- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java +++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java @@ -279,9 +279,10 @@ public class MetadataResource @Path("/datasources/{dataSourceName}/segments/{segmentId}") @Produces(MediaType.APPLICATION_JSON) @ResourceFilters(DatasourceResourceFilter.class) - public Response getUsedSegment( + public Response getSegment( @PathParam("dataSourceName") String dataSourceName, - @PathParam("segmentId") String segmentId + @PathParam("segmentId") String segmentId, + @QueryParam("includeUnused") @Nullable Boolean includeUnused ) { ImmutableDruidDataSource dataSource = segmentsMetadataManager.getImmutableDataSourceWithUsedSegments(dataSourceName); @@ -296,7 +297,7 @@ public class MetadataResource } } // fallback to db - DataSegment segment = metadataStorageCoordinator.retrieveUsedSegmentForId(segmentId); + DataSegment segment = metadataStorageCoordinator.retrieveSegmentForId(segmentId, Boolean.TRUE.equals(includeUnused)); if (segment != null) { return Response.status(Response.Status.OK).entity(segment).build(); } diff --git a/server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java b/server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java index b7419485b08..f48e21327a0 100644 --- a/server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java +++ b/server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java @@ -105,7 +105,7 @@ public class CoordinatorClientImplTest .build(); serviceClient.expectAndRespond( - new RequestBuilder(HttpMethod.GET, "/druid/coordinator/v1/metadata/datasources/xyz/segments/def"), + new RequestBuilder(HttpMethod.GET, "/druid/coordinator/v1/metadata/datasources/xyz/segments/def?includeUnused=false"), HttpResponseStatus.OK, ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), jsonMapper.writeValueAsBytes(segment) @@ -113,7 +113,32 @@ public class CoordinatorClientImplTest Assert.assertEquals( segment, - coordinatorClient.fetchUsedSegment("xyz", "def").get() + coordinatorClient.fetchSegment("xyz", "def", false).get() + ); + } + + @Test + public void test_fetchSegment() throws Exception + { + final DataSegment segment = + DataSegment.builder() + .dataSource("xyz") + .interval(Intervals.of("2000/3000")) + .version("1") + .shardSpec(new NumberedShardSpec(0, 1)) + .size(1) + .build(); + + serviceClient.expectAndRespond( + new RequestBuilder(HttpMethod.GET, "/druid/coordinator/v1/metadata/datasources/xyz/segments/def?includeUnused=true"), + HttpResponseStatus.OK, + ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), + jsonMapper.writeValueAsBytes(segment) + ); + + Assert.assertEquals( + segment, + coordinatorClient.fetchSegment("xyz", "def", true).get() ); } diff --git a/server/src/test/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java b/server/src/test/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java index bcaab5c255f..76e6346d380 100644 --- a/server/src/test/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java +++ b/server/src/test/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java @@ -36,7 +36,7 @@ public class NoopCoordinatorClient implements CoordinatorClient } @Override - public ListenableFuture fetchUsedSegment(String dataSource, String segmentId) + public ListenableFuture fetchSegment(String dataSource, String segmentId, boolean includeUnused) { throw new UnsupportedOperationException(); } diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 68654c88550..389a003cef3 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -709,6 +709,21 @@ public class IndexerSQLMetadataStorageCoordinatorTest Assert.assertEquals(2, metadataUpdateCounter.get()); } + @Test + public void testRetrieveUsedSegmentForId() + { + insertUsedSegments(ImmutableSet.of(defaultSegment)); + Assert.assertEquals(defaultSegment, coordinator.retrieveSegmentForId(defaultSegment.getId().toString(), false)); + } + + @Test + public void testRetrieveSegmentForId() + { + insertUsedSegments(ImmutableSet.of(defaultSegment)); + markAllSegmentsUnused(ImmutableSet.of(defaultSegment)); + Assert.assertEquals(defaultSegment, coordinator.retrieveSegmentForId(defaultSegment.getId().toString(), true)); + } + @Test public void testTransactionalAnnounceFailDbNotNullWantDifferent() throws IOException { diff --git a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java index d5ab6aaaa1a..ddedfbb53be 100644 --- a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java @@ -92,10 +92,13 @@ public class MetadataResourceTest IndexerMetadataStorageCoordinator storageCoordinator = Mockito.mock(IndexerMetadataStorageCoordinator.class); Mockito.doReturn(segments[4]) .when(storageCoordinator) - .retrieveUsedSegmentForId(segments[4].getId().toString()); + .retrieveSegmentForId(segments[4].getId().toString(), false); Mockito.doReturn(null) .when(storageCoordinator) - .retrieveUsedSegmentForId(segments[5].getId().toString()); + .retrieveSegmentForId(segments[5].getId().toString(), false); + Mockito.doReturn(segments[5]) + .when(storageCoordinator) + .retrieveSegmentForId(segments[5].getId().toString(), true); metadataResource = new MetadataResource( segmentsMetadataManager, @@ -120,23 +123,27 @@ public class MetadataResourceTest } @Test - public void testGetUsedSegment() + public void testGetSegment() { // Available in snapshot Assert.assertEquals( segments[0], - metadataResource.getUsedSegment(segments[0].getDataSource(), segments[0].getId().toString()).getEntity() + metadataResource.getSegment(segments[0].getDataSource(), segments[0].getId().toString(), null).getEntity() ); // Unavailable in snapshot, but available in metadata Assert.assertEquals( segments[4], - metadataResource.getUsedSegment(segments[4].getDataSource(), segments[4].getId().toString()).getEntity() + metadataResource.getSegment(segments[4].getDataSource(), segments[4].getId().toString(), null).getEntity() ); - // Unavailable in both snapshot and metadata + // Unavailable and unused Assert.assertNull( - metadataResource.getUsedSegment(segments[5].getDataSource(), segments[5].getId().toString()).getEntity() + metadataResource.getSegment(segments[5].getDataSource(), segments[5].getId().toString(), null).getEntity() + ); + Assert.assertEquals( + segments[5], + metadataResource.getSegment(segments[5].getDataSource(), segments[5].getId().toString(), true).getEntity() ); }