Add coordinator API for unused segments (#14846)

There is a current issue due to inconsistent metadata between worker and controller in MSQ. A controller can receive one set of segments, which are then marked as unused by, say, a compaction job. The worker would be unable to get the segment information as MetadataResource.
This commit is contained in:
Adarsh Sanjeev 2023-08-23 14:51:25 +05:30 committed by GitHub
parent 989ed8d0c2
commit dfb5a98888
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 173 additions and 42 deletions

View File

@ -153,6 +153,11 @@ Returns a list of all segments for a datasource with the full segment metadata a
Returns full segment metadata for a specific segment as stored in the metadata store, if the segment is used. If the
segment is unused, or is unknown, a 404 response is returned.
`GET /druid/coordinator/v1/metadata/datasources/{dataSourceName}/segments/{segmentId}?includeUnused=true`
Returns full segment metadata for a specific segment as stored in the metadata store. If the is unknown, a 404 response
is returned.
`GET /druid/coordinator/v1/metadata/datasources/{dataSourceName}/segments`
Returns a list of all segments, overlapping with any of given intervals, for a datasource as stored in the metadata store. Request body is array of string IS0 8601 intervals like `[interval1, interval2,...]`—for example, `["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"]`.

View File

@ -616,6 +616,12 @@ public class ControllerImpl implements Controller
);
}
}
taskContextOverridesBuilder.put(
MultiStageQueryContext.CTX_IS_REINDEX,
MSQControllerTask.isReplaceInputDataSourceTask(task)
);
this.workerTaskLauncher = new MSQWorkerTaskLauncher(
id(),
task.getDataSource(),

View File

@ -72,7 +72,8 @@ public class TaskDataSegmentProvider implements DataSegmentProvider
@Override
public Supplier<ResourceHolder<Segment>> fetchSegment(
final SegmentId segmentId,
final ChannelCounters channelCounters
final ChannelCounters channelCounters,
final boolean isReindex
)
{
// Returns Supplier<ResourceHolder> instead of ResourceHolder, so the Coordinator calls and segment downloads happen
@ -84,7 +85,7 @@ public class TaskDataSegmentProvider implements DataSegmentProvider
holder = holders.computeIfAbsent(
segmentId,
k -> new SegmentHolder(
() -> fetchSegmentInternal(segmentId, channelCounters),
() -> fetchSegmentInternal(segmentId, channelCounters, isReindex),
() -> holders.remove(segmentId)
)
).get();
@ -95,20 +96,22 @@ public class TaskDataSegmentProvider implements DataSegmentProvider
}
/**
* Helper used by {@link #fetchSegment(SegmentId, ChannelCounters)}. Does the actual fetching of a segment, once it
* Helper used by {@link #fetchSegment(SegmentId, ChannelCounters, boolean)}. Does the actual fetching of a segment, once it
* is determined that we definitely need to go out and get one.
*/
private ReferenceCountingResourceHolder<Segment> fetchSegmentInternal(
final SegmentId segmentId,
final ChannelCounters channelCounters
final ChannelCounters channelCounters,
final boolean isReindex
)
{
final DataSegment dataSegment;
try {
dataSegment = FutureUtils.get(
coordinatorClient.fetchUsedSegment(
coordinatorClient.fetchSegment(
segmentId.getDataSource(),
segmentId.toString()
segmentId.toString(),
!isReindex
),
true
);

View File

@ -1100,7 +1100,12 @@ public class WorkerImpl implements Worker
.put(ExternalInputSlice.class, new ExternalInputSliceReader(frameContext.tempDir()))
.put(InlineInputSlice.class, new InlineInputSliceReader(frameContext.segmentWrangler()))
.put(LookupInputSlice.class, new LookupInputSliceReader(frameContext.segmentWrangler()))
.put(SegmentsInputSlice.class, new SegmentsInputSliceReader(frameContext.dataSegmentProvider()))
.put(SegmentsInputSlice.class,
new SegmentsInputSliceReader(
frameContext.dataSegmentProvider(),
MultiStageQueryContext.isReindex(QueryContext.of(task().getContext()))
)
)
.build()
);
}

View File

@ -267,6 +267,20 @@ public class MSQControllerTask extends AbstractTask implements ClientTaskQuery
return querySpec.getDestination() instanceof DataSourceMSQDestination;
}
/**
* Returns true if the task reads from the same table as the destionation. In this case, we would prefer to fail
* instead of reading any unused segments to ensure that old data is not read.
*/
public static boolean isReplaceInputDataSourceTask(MSQControllerTask task)
{
return task.getQuerySpec()
.getQuery()
.getDataSource()
.getTableNames()
.stream()
.anyMatch(datasouce -> task.getDataSource().equals(datasouce));
}
public static boolean writeResultsToDurableStorage(final MSQSpec querySpec)
{
return querySpec.getDestination() instanceof DurableStorageMSQDestination;

View File

@ -40,10 +40,12 @@ import java.util.function.Consumer;
public class SegmentsInputSliceReader implements InputSliceReader
{
private final DataSegmentProvider dataSegmentProvider;
private final boolean isReindex;
public SegmentsInputSliceReader(final DataSegmentProvider dataSegmentProvider)
public SegmentsInputSliceReader(final DataSegmentProvider dataSegmentProvider, final boolean isReindex)
{
this.dataSegmentProvider = dataSegmentProvider;
this.isReindex = isReindex;
}
@Override
@ -91,7 +93,7 @@ public class SegmentsInputSliceReader implements InputSliceReader
);
return new SegmentWithDescriptor(
dataSegmentProvider.fetchSegment(segmentId, channelCounters),
dataSegmentProvider.fetchSegment(segmentId, channelCounters, isReindex),
descriptor
);
}

View File

@ -37,6 +37,7 @@ public interface DataSegmentProvider
*/
Supplier<ResourceHolder<Segment>> fetchSegment(
SegmentId segmentId,
ChannelCounters channelCounters
ChannelCounters channelCounters,
boolean isReindex
);
}

View File

@ -37,6 +37,7 @@ import org.apache.druid.error.InvalidSqlInput;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.msq.querykit.QueryKitUtils;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
@ -59,6 +60,7 @@ public class MSQTaskSqlEngine implements SqlEngine
ImmutableSet.<String>builder()
.addAll(NativeSqlEngine.SYSTEM_CONTEXT_PARAMETERS)
.add(QueryKitUtils.CTX_TIME_COLUMN_NAME)
.add(MultiStageQueryContext.CTX_IS_REINDEX)
.build();
public static final List<String> TASK_STRUCT_FIELD_NAMES = ImmutableList.of("TASK");

View File

@ -110,6 +110,8 @@ public class MultiStageQueryContext
// OnheapIncrementalIndex. For example: overheads related to creating bitmaps during persist.
static final int DEFAULT_ROWS_IN_MEMORY = 100000;
public static final String CTX_IS_REINDEX = "isReindex";
/**
* Controls sort order within segments. Normally, this is the same as the overall order of the query (from the
* CLUSTERED BY clause) but it can be overridden.
@ -146,6 +148,14 @@ public class MultiStageQueryContext
);
}
public static boolean isReindex(final QueryContext queryContext)
{
return queryContext.getBoolean(
CTX_IS_REINDEX,
true
);
}
public static long getMaxInputBytesPerWorker(final QueryContext queryContext)
{
return queryContext.getLong(

View File

@ -179,7 +179,7 @@ public class TaskDataSegmentProviderTest
final int expectedSegmentNumber = i % NUM_SEGMENTS;
final DataSegment segment = segments.get(expectedSegmentNumber);
final ListenableFuture<Supplier<ResourceHolder<Segment>>> f =
exec.submit(() -> provider.fetchSegment(segment.getId(), new ChannelCounters()));
exec.submit(() -> provider.fetchSegment(segment.getId(), new ChannelCounters(), false));
testFutures.add(
FutureUtils.transform(
@ -231,7 +231,7 @@ public class TaskDataSegmentProviderTest
private class TestCoordinatorClientImpl extends NoopCoordinatorClient
{
@Override
public ListenableFuture<DataSegment> fetchUsedSegment(String dataSource, String segmentId)
public ListenableFuture<DataSegment> fetchSegment(String dataSource, String segmentId, boolean includeUnused)
{
for (final DataSegment segment : segments) {
if (segment.getDataSource().equals(dataSource) && segment.getId().toString().equals(segmentId)) {

View File

@ -161,7 +161,7 @@ public class CalciteMSQTestsHelper
));
binder.bind(DataSegmentAnnouncer.class).toInstance(new NoopDataSegmentAnnouncer());
binder.bind(DataSegmentProvider.class)
.toInstance((dataSegment, channelCounters) -> getSupplierForSegment(dataSegment));
.toInstance((segmentId, channelCounters, isReindex) -> getSupplierForSegment(segmentId));
GroupByQueryConfig groupByQueryConfig = new GroupByQueryConfig();
binder.bind(GroupByStrategySelector.class)

View File

@ -400,7 +400,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
binder.bind(QueryProcessingPool.class)
.toInstance(new ForwardingQueryProcessingPool(Execs.singleThreaded("Test-runner-processing-pool")));
binder.bind(DataSegmentProvider.class)
.toInstance((dataSegment, channelCounters) -> getSupplierForSegment(dataSegment));
.toInstance((dataSegment, channelCounters, isReindex) -> getSupplierForSegment(dataSegment));
binder.bind(IndexIO.class).toInstance(indexIO);
binder.bind(SpecificSegmentsQuerySegmentWalker.class).toInstance(qf.walker());

View File

@ -581,7 +581,7 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
);
for (WindowedSegmentId windowedSegmentId : Preconditions.checkNotNull(segmentIds, "segmentIds")) {
final DataSegment segment = FutureUtils.getUnchecked(
coordinatorClient.fetchUsedSegment(dataSource, windowedSegmentId.getSegmentId()),
coordinatorClient.fetchSegment(dataSource, windowedSegmentId.getSegmentId(), false),
true
);
for (Interval interval : windowedSegmentId.getIntervals()) {

View File

@ -1039,7 +1039,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
}
@Override
public ListenableFuture<DataSegment> fetchUsedSegment(String dataSource, String segmentId)
public ListenableFuture<DataSegment> fetchSegment(String dataSource, String segmentId, boolean includeUnused)
{
ImmutableDruidDataSource druidDataSource;
try {

View File

@ -229,7 +229,7 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
}
@Override
public DataSegment retrieveUsedSegmentForId(final String id)
public DataSegment retrieveSegmentForId(final String id, boolean includeUnused)
{
return null;
}

View File

@ -35,9 +35,10 @@ public interface CoordinatorClient
ListenableFuture<Boolean> isHandoffComplete(String dataSource, SegmentDescriptor descriptor);
/**
* Fetches segment metadata for the given dataSource and segmentId.
* Fetches segment metadata for the given dataSource and segmentId. If includeUnused is set to false, the segment is
* not returned if it is marked as unused.
*/
ListenableFuture<DataSegment> fetchUsedSegment(String dataSource, String segmentId);
ListenableFuture<DataSegment> fetchSegment(String dataSource, String segmentId, boolean includeUnused);
/**
* Fetches segment metadata for the given dataSource and intervals.

View File

@ -71,12 +71,13 @@ public class CoordinatorClientImpl implements CoordinatorClient
}
@Override
public ListenableFuture<DataSegment> fetchUsedSegment(String dataSource, String segmentId)
public ListenableFuture<DataSegment> fetchSegment(String dataSource, String segmentId, boolean includeUnused)
{
final String path = StringUtils.format(
"/druid/coordinator/v1/metadata/datasources/%s/segments/%s",
"/druid/coordinator/v1/metadata/datasources/%s/segments/%s?includeUnused=%s",
StringUtils.urlEncode(dataSource),
StringUtils.urlEncode(segmentId)
StringUtils.urlEncode(segmentId),
includeUnused ? "true" : "false"
);
return FutureUtils.transform(

View File

@ -352,12 +352,16 @@ public interface IndexerMetadataStorageCoordinator
void deleteSegments(Set<DataSegment> segments);
/**
* Retrieve the segment for a given id from the metadata store. Return null if no such used segment exists
* Retrieve the segment for a given id from the metadata store. Return null if no such segment exists
* <br>
* If includeUnused is set, this also returns unused segments. Unused segments could be deleted by a kill task at any
* time and might lead to unexpected behaviour. This option exists mainly to provide a consistent view of the metadata,
* for example, in calls from MSQ controller and worker and would generally not be requrired.
*
* @param id The segment id
*
* @return DataSegment corresponding to given id
* @return DataSegment used segment corresponding to given id
*/
DataSegment retrieveUsedSegmentForId(String id);
DataSegment retrieveSegmentForId(String id, boolean includeUnused);
}

View File

@ -1810,12 +1810,18 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
}
@Override
public DataSegment retrieveUsedSegmentForId(final String id)
public DataSegment retrieveSegmentForId(final String id, boolean includeUnused)
{
return connector.retryTransaction(
(handle, status) ->
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper)
.retrieveUsedSegmentForId(id),
(handle, status) -> {
if (includeUnused) {
return SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper)
.retrieveSegmentForId(id);
} else {
return SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper)
.retrieveUsedSegmentForId(id);
}
},
3,
SQLMetadataConnector.DEFAULT_MAX_TRIES
);

View File

@ -238,6 +238,29 @@ public class SqlSegmentsMetadataQuery
return null;
}
/**
* Retrieve the segment for a given id if it exists in the metadata store and null otherwise
*/
public DataSegment retrieveSegmentForId(String id)
{
final String query = "SELECT payload FROM %s WHERE id = :id";
final Query<Map<String, Object>> sql = handle
.createQuery(StringUtils.format(query, dbTables.getSegmentsTable()))
.bind("id", id);
final ResultIterator<DataSegment> resultIterator =
sql.map((index, r, ctx) -> JacksonUtils.readValue(jsonMapper, r.getBytes(1), DataSegment.class))
.iterator();
if (resultIterator.hasNext()) {
return resultIterator.next();
}
return null;
}
private CloseableIterator<DataSegment> retrieveSegments(
final String dataSource,
final Collection<Interval> intervals,

View File

@ -279,9 +279,10 @@ public class MetadataResource
@Path("/datasources/{dataSourceName}/segments/{segmentId}")
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(DatasourceResourceFilter.class)
public Response getUsedSegment(
public Response getSegment(
@PathParam("dataSourceName") String dataSourceName,
@PathParam("segmentId") String segmentId
@PathParam("segmentId") String segmentId,
@QueryParam("includeUnused") @Nullable Boolean includeUnused
)
{
ImmutableDruidDataSource dataSource = segmentsMetadataManager.getImmutableDataSourceWithUsedSegments(dataSourceName);
@ -296,7 +297,7 @@ public class MetadataResource
}
}
// fallback to db
DataSegment segment = metadataStorageCoordinator.retrieveUsedSegmentForId(segmentId);
DataSegment segment = metadataStorageCoordinator.retrieveSegmentForId(segmentId, Boolean.TRUE.equals(includeUnused));
if (segment != null) {
return Response.status(Response.Status.OK).entity(segment).build();
}

View File

@ -105,7 +105,7 @@ public class CoordinatorClientImplTest
.build();
serviceClient.expectAndRespond(
new RequestBuilder(HttpMethod.GET, "/druid/coordinator/v1/metadata/datasources/xyz/segments/def"),
new RequestBuilder(HttpMethod.GET, "/druid/coordinator/v1/metadata/datasources/xyz/segments/def?includeUnused=false"),
HttpResponseStatus.OK,
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
jsonMapper.writeValueAsBytes(segment)
@ -113,7 +113,32 @@ public class CoordinatorClientImplTest
Assert.assertEquals(
segment,
coordinatorClient.fetchUsedSegment("xyz", "def").get()
coordinatorClient.fetchSegment("xyz", "def", false).get()
);
}
@Test
public void test_fetchSegment() throws Exception
{
final DataSegment segment =
DataSegment.builder()
.dataSource("xyz")
.interval(Intervals.of("2000/3000"))
.version("1")
.shardSpec(new NumberedShardSpec(0, 1))
.size(1)
.build();
serviceClient.expectAndRespond(
new RequestBuilder(HttpMethod.GET, "/druid/coordinator/v1/metadata/datasources/xyz/segments/def?includeUnused=true"),
HttpResponseStatus.OK,
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
jsonMapper.writeValueAsBytes(segment)
);
Assert.assertEquals(
segment,
coordinatorClient.fetchSegment("xyz", "def", true).get()
);
}

View File

@ -36,7 +36,7 @@ public class NoopCoordinatorClient implements CoordinatorClient
}
@Override
public ListenableFuture<DataSegment> fetchUsedSegment(String dataSource, String segmentId)
public ListenableFuture<DataSegment> fetchSegment(String dataSource, String segmentId, boolean includeUnused)
{
throw new UnsupportedOperationException();
}

View File

@ -709,6 +709,21 @@ public class IndexerSQLMetadataStorageCoordinatorTest
Assert.assertEquals(2, metadataUpdateCounter.get());
}
@Test
public void testRetrieveUsedSegmentForId()
{
insertUsedSegments(ImmutableSet.of(defaultSegment));
Assert.assertEquals(defaultSegment, coordinator.retrieveSegmentForId(defaultSegment.getId().toString(), false));
}
@Test
public void testRetrieveSegmentForId()
{
insertUsedSegments(ImmutableSet.of(defaultSegment));
markAllSegmentsUnused(ImmutableSet.of(defaultSegment));
Assert.assertEquals(defaultSegment, coordinator.retrieveSegmentForId(defaultSegment.getId().toString(), true));
}
@Test
public void testTransactionalAnnounceFailDbNotNullWantDifferent() throws IOException
{

View File

@ -92,10 +92,13 @@ public class MetadataResourceTest
IndexerMetadataStorageCoordinator storageCoordinator = Mockito.mock(IndexerMetadataStorageCoordinator.class);
Mockito.doReturn(segments[4])
.when(storageCoordinator)
.retrieveUsedSegmentForId(segments[4].getId().toString());
.retrieveSegmentForId(segments[4].getId().toString(), false);
Mockito.doReturn(null)
.when(storageCoordinator)
.retrieveUsedSegmentForId(segments[5].getId().toString());
.retrieveSegmentForId(segments[5].getId().toString(), false);
Mockito.doReturn(segments[5])
.when(storageCoordinator)
.retrieveSegmentForId(segments[5].getId().toString(), true);
metadataResource = new MetadataResource(
segmentsMetadataManager,
@ -120,23 +123,27 @@ public class MetadataResourceTest
}
@Test
public void testGetUsedSegment()
public void testGetSegment()
{
// Available in snapshot
Assert.assertEquals(
segments[0],
metadataResource.getUsedSegment(segments[0].getDataSource(), segments[0].getId().toString()).getEntity()
metadataResource.getSegment(segments[0].getDataSource(), segments[0].getId().toString(), null).getEntity()
);
// Unavailable in snapshot, but available in metadata
Assert.assertEquals(
segments[4],
metadataResource.getUsedSegment(segments[4].getDataSource(), segments[4].getId().toString()).getEntity()
metadataResource.getSegment(segments[4].getDataSource(), segments[4].getId().toString(), null).getEntity()
);
// Unavailable in both snapshot and metadata
// Unavailable and unused
Assert.assertNull(
metadataResource.getUsedSegment(segments[5].getDataSource(), segments[5].getId().toString()).getEntity()
metadataResource.getSegment(segments[5].getDataSource(), segments[5].getId().toString(), null).getEntity()
);
Assert.assertEquals(
segments[5],
metadataResource.getSegment(segments[5].getDataSource(), segments[5].getId().toString(), true).getEntity()
);
}