mirror of
https://github.com/apache/druid.git
synced 2025-02-17 07:25:02 +00:00
Fix 404 when segment is used but not in the Coordinator snapshot (#14762)
* Fix 404 when used segment has not been updated in the Coordinator snapshot * Add unit test
This commit is contained in:
parent
786e772d26
commit
e16096735b
@ -229,6 +229,12 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataSegment retrieveUsedSegmentForId(final String id)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
public Set<DataSegment> getPublished()
|
||||
{
|
||||
return ImmutableSet.copyOf(published);
|
||||
|
@ -352,4 +352,14 @@ public interface IndexerMetadataStorageCoordinator
|
||||
void updateSegmentMetadata(Set<DataSegment> segments);
|
||||
|
||||
void deleteSegments(Set<DataSegment> segments);
|
||||
|
||||
/**
|
||||
* Retrieve the segment for a given id from the metadata store. Return null if no such used segment exists
|
||||
*
|
||||
* @param id The segment id
|
||||
*
|
||||
* @return DataSegment corresponding to given id
|
||||
*/
|
||||
DataSegment retrieveUsedSegmentForId(String id);
|
||||
|
||||
}
|
||||
|
@ -1883,6 +1883,18 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataSegment retrieveUsedSegmentForId(final String id)
|
||||
{
|
||||
return connector.retryTransaction(
|
||||
(handle, status) ->
|
||||
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper)
|
||||
.retrieveUsedSegmentForId(id),
|
||||
3,
|
||||
SQLMetadataConnector.DEFAULT_MAX_TRIES
|
||||
);
|
||||
}
|
||||
|
||||
private static class PendingSegmentsRecord
|
||||
{
|
||||
private final String sequenceName;
|
||||
|
@ -212,6 +212,29 @@ public class SqlSegmentsMetadataQuery
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve the used segment for a given id if it exists in the metadata store and null otherwise
|
||||
*/
|
||||
public DataSegment retrieveUsedSegmentForId(String id)
|
||||
{
|
||||
|
||||
final String query = "SELECT payload FROM %s WHERE used = true AND id = :id";
|
||||
|
||||
final Query<Map<String, Object>> sql = handle
|
||||
.createQuery(StringUtils.format(query, dbTables.getSegmentsTable()))
|
||||
.bind("id", id);
|
||||
|
||||
final ResultIterator<DataSegment> resultIterator =
|
||||
sql.map((index, r, ctx) -> JacksonUtils.readValue(jsonMapper, r.getBytes(1), DataSegment.class))
|
||||
.iterator();
|
||||
|
||||
if (resultIterator.hasNext()) {
|
||||
return resultIterator.next();
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private CloseableIterator<DataSegment> retrieveSegments(
|
||||
final String dataSource,
|
||||
final Collection<Interval> intervals,
|
||||
|
@ -295,6 +295,11 @@ public class MetadataResource
|
||||
return Response.status(Response.Status.OK).entity(segment).build();
|
||||
}
|
||||
}
|
||||
// fallback to db
|
||||
DataSegment segment = metadataStorageCoordinator.retrieveUsedSegmentForId(segmentId);
|
||||
if (segment != null) {
|
||||
return Response.status(Response.Status.OK).entity(segment).build();
|
||||
}
|
||||
return Response.status(Response.Status.NOT_FOUND).build();
|
||||
}
|
||||
}
|
||||
|
@ -53,7 +53,7 @@ public class MetadataResourceTest
|
||||
|
||||
private final DataSegment[] segments =
|
||||
CreateDataSegments.ofDatasource(DATASOURCE1)
|
||||
.forIntervals(2, Granularities.DAY)
|
||||
.forIntervals(3, Granularities.DAY)
|
||||
.withNumPartitions(2)
|
||||
.eachOfSizeInMb(500)
|
||||
.toArray(new DataSegment[0]);
|
||||
@ -77,6 +77,9 @@ public class MetadataResourceTest
|
||||
.when(segmentsMetadataManager).getSnapshotOfDataSourcesWithAllUsedSegments();
|
||||
Mockito.doReturn(ImmutableList.of(druidDataSource1))
|
||||
.when(dataSourcesSnapshot).getDataSourcesWithAllUsedSegments();
|
||||
Mockito.doReturn(druidDataSource1)
|
||||
.when(segmentsMetadataManager)
|
||||
.getImmutableDataSourceWithUsedSegments(DATASOURCE1);
|
||||
|
||||
DruidCoordinator coordinator = Mockito.mock(DruidCoordinator.class);
|
||||
Mockito.doReturn(2).when(coordinator).getReplicationFactor(segments[0].getId());
|
||||
@ -86,9 +89,17 @@ public class MetadataResourceTest
|
||||
Mockito.doReturn(ImmutableSet.of(segments[3]))
|
||||
.when(dataSourcesSnapshot).getOvershadowedSegments();
|
||||
|
||||
IndexerMetadataStorageCoordinator storageCoordinator = Mockito.mock(IndexerMetadataStorageCoordinator.class);
|
||||
Mockito.doReturn(segments[4])
|
||||
.when(storageCoordinator)
|
||||
.retrieveUsedSegmentForId(segments[4].getId().toString());
|
||||
Mockito.doReturn(null)
|
||||
.when(storageCoordinator)
|
||||
.retrieveUsedSegmentForId(segments[5].getId().toString());
|
||||
|
||||
metadataResource = new MetadataResource(
|
||||
segmentsMetadataManager,
|
||||
Mockito.mock(IndexerMetadataStorageCoordinator.class),
|
||||
storageCoordinator,
|
||||
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
|
||||
coordinator
|
||||
);
|
||||
@ -108,6 +119,27 @@ public class MetadataResourceTest
|
||||
Assert.assertEquals(new SegmentStatusInCluster(segments[3], true, 0), resultList.get(3));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetUsedSegment()
|
||||
{
|
||||
// Available in snapshot
|
||||
Assert.assertEquals(
|
||||
segments[0],
|
||||
metadataResource.getUsedSegment(segments[0].getDataSource(), segments[0].getId().toString()).getEntity()
|
||||
);
|
||||
|
||||
// Unavailable in snapshot, but available in metadata
|
||||
Assert.assertEquals(
|
||||
segments[4],
|
||||
metadataResource.getUsedSegment(segments[4].getDataSource(), segments[4].getId().toString()).getEntity()
|
||||
);
|
||||
|
||||
// Unavailable in both snapshot and metadata
|
||||
Assert.assertNull(
|
||||
metadataResource.getUsedSegment(segments[5].getDataSource(), segments[5].getId().toString()).getEntity()
|
||||
);
|
||||
}
|
||||
|
||||
private List<SegmentStatusInCluster> extractSegmentStatusList(Response response)
|
||||
{
|
||||
return Lists.newArrayList(
|
||||
|
Loading…
x
Reference in New Issue
Block a user