mirror of https://github.com/apache/druid.git
Avoid primary key violation in segment tables under certain conditions when appending data to same interval (#11714)
* Fix issue of duplicate key under certain conditions when loading late data in streaming. Also fixes a documentation issue with skipSegmentLineageCheck. * maxId may be null at this point, need to check for that * Remove hypothetical case (it cannot happen) * Revert compaction is simply "killing" the compacted segment and previously, used, overshadowed segments are visible again * Add comments
This commit is contained in:
parent
3525c0b195
commit
2355a60419
|
@ -253,13 +253,13 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
|
|||
return numSegmentsMarkedUnused;
|
||||
}
|
||||
|
||||
private List<SegmentIdWithShardSpec> getPendingSegmentsForIntervalWithHandle(
|
||||
private Set<SegmentIdWithShardSpec> getPendingSegmentsForIntervalWithHandle(
|
||||
final Handle handle,
|
||||
final String dataSource,
|
||||
final Interval interval
|
||||
) throws IOException
|
||||
{
|
||||
final List<SegmentIdWithShardSpec> identifiers = new ArrayList<>();
|
||||
final Set<SegmentIdWithShardSpec> identifiers = new HashSet<>();
|
||||
|
||||
final ResultIterator<byte[]> dbSegments =
|
||||
handle.createQuery(
|
||||
|
@ -843,15 +843,30 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
|
|||
.execute();
|
||||
}
|
||||
|
||||
/**
|
||||
* This function creates a new segment for the given datasource/interval/etc. A critical
|
||||
* aspect of the creation is to make sure that the new version & new partition number will make
|
||||
* sense given the existing segments & pending segments also very important is to avoid
|
||||
* clashes with existing pending & used/unused segments.
|
||||
* @param handle Database handle
|
||||
* @param dataSource datasource for the new segment
|
||||
* @param interval interval for the new segment
|
||||
* @param partialShardSpec Shard spec info minus segment id stuff
|
||||
* @param existingVersion Version of segments in interval, used to compute the version of the very first segment in
|
||||
* interval
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
@Nullable
|
||||
private SegmentIdWithShardSpec createNewSegment(
|
||||
final Handle handle,
|
||||
final String dataSource,
|
||||
final Interval interval,
|
||||
final PartialShardSpec partialShardSpec,
|
||||
final String maxVersion
|
||||
final String existingVersion
|
||||
) throws IOException
|
||||
{
|
||||
// Get the time chunk and associated data segments for the given interval, if any
|
||||
final List<TimelineObjectHolder<String, DataSegment>> existingChunks = getTimelineForIntervalsWithHandle(
|
||||
handle,
|
||||
dataSource,
|
||||
|
@ -884,66 +899,94 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
|
|||
// See PartitionIds.
|
||||
.filter(segment -> segment.getShardSpec().sharePartitionSpace(partialShardSpec))) {
|
||||
// Don't use the stream API for performance.
|
||||
// Note that this will compute the max id of existing, visible, data segments in the time chunk:
|
||||
if (maxId == null || maxId.getShardSpec().getPartitionNum() < segment.getShardSpec().getPartitionNum()) {
|
||||
maxId = SegmentIdWithShardSpec.fromDataSegment(segment);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final List<SegmentIdWithShardSpec> pendings = getPendingSegmentsForIntervalWithHandle(
|
||||
// Get the version of the existing chunk, we might need it in some of the cases below
|
||||
// to compute the new identifier's version
|
||||
@Nullable
|
||||
final String versionOfExistingChunk;
|
||||
if (!existingChunks.isEmpty()) {
|
||||
// remember only one chunk possible for given interval so get the first & only one
|
||||
versionOfExistingChunk = existingChunks.get(0).getVersion();
|
||||
} else {
|
||||
versionOfExistingChunk = null;
|
||||
}
|
||||
|
||||
// next, we need to enrich the maxId computed before with the information of the pending segments
|
||||
// it is possible that a pending segment has a higher id in which case we need that, it will work,
|
||||
// and it will avoid clashes when inserting the new pending segment later in the caller of this method
|
||||
final Set<SegmentIdWithShardSpec> pendings = getPendingSegmentsForIntervalWithHandle(
|
||||
handle,
|
||||
dataSource,
|
||||
interval
|
||||
);
|
||||
|
||||
// Make sure we add the maxId we obtained from the segments table:
|
||||
if (maxId != null) {
|
||||
pendings.add(maxId);
|
||||
}
|
||||
|
||||
// Now compute the maxId with all the information: pendings + segments:
|
||||
// The versionOfExistingChunks filter is ensure that we pick the max id with the version of the existing chunk
|
||||
// in the case that there may be a pending segment with a higher version but no corresponding used segments
|
||||
// which may generate a clash with an existing segment once the new id is generated
|
||||
maxId = pendings.stream()
|
||||
.filter(id -> id.getShardSpec().sharePartitionSpace(partialShardSpec))
|
||||
.filter(id -> versionOfExistingChunk == null ? true : id.getVersion().equals(versionOfExistingChunk))
|
||||
.max((id1, id2) -> {
|
||||
final int versionCompare = id1.getVersion().compareTo(id2.getVersion());
|
||||
if (versionCompare != 0) {
|
||||
return versionCompare;
|
||||
} else {
|
||||
return Integer.compare(id1.getShardSpec().getPartitionNum(), id2.getShardSpec().getPartitionNum());
|
||||
return Integer.compare(
|
||||
id1.getShardSpec().getPartitionNum(),
|
||||
id2.getShardSpec().getPartitionNum()
|
||||
);
|
||||
}
|
||||
})
|
||||
.orElse(null);
|
||||
|
||||
// Find the major version of existing segments
|
||||
@Nullable final String versionOfExistingChunks;
|
||||
if (!existingChunks.isEmpty()) {
|
||||
versionOfExistingChunks = existingChunks.get(0).getVersion();
|
||||
} else if (!pendings.isEmpty()) {
|
||||
versionOfExistingChunks = pendings.get(0).getVersion();
|
||||
// The following code attempts to compute the new version, if this
|
||||
// new version is not null at the end of next block then it will be
|
||||
// used as the new version in the case for initial or appended segment
|
||||
final String newSegmentVersion;
|
||||
if (versionOfExistingChunk != null) {
|
||||
// segment version overrides, so pick that now that we know it exists
|
||||
newSegmentVersion = versionOfExistingChunk;
|
||||
} else if (!pendings.isEmpty() && maxId != null) {
|
||||
// there is no visible segments in the time chunk, so pick the maxId of pendings, as computed above
|
||||
newSegmentVersion = maxId.getVersion();
|
||||
} else {
|
||||
versionOfExistingChunks = null;
|
||||
// no segments, no pendings, so this must be the very first segment created for this interval
|
||||
newSegmentVersion = null;
|
||||
}
|
||||
|
||||
if (maxId == null) {
|
||||
// When appending segments, null maxId means that we are allocating the very initial
|
||||
// segment for this time chunk.
|
||||
// This code is executed when the Overlord coordinates segment allocation, which is either you append segments
|
||||
// or you use segment lock. When appending segments, null maxId means that we are allocating the very initial
|
||||
// segment for this time chunk. Since the core partitions set is not determined for appended segments, we set
|
||||
// or you use segment lock. Since the core partitions set is not determined for appended segments, we set
|
||||
// it 0. When you use segment lock, the core partitions set doesn't work with it. We simply set it 0 so that the
|
||||
// OvershadowableManager handles the atomic segment update.
|
||||
final int newPartitionId = partialShardSpec.useNonRootGenerationPartitionSpace()
|
||||
? PartitionIds.NON_ROOT_GEN_START_PARTITION_ID
|
||||
: PartitionIds.ROOT_GEN_START_PARTITION_ID;
|
||||
String version = versionOfExistingChunks == null ? maxVersion : versionOfExistingChunks;
|
||||
String version = newSegmentVersion == null ? existingVersion : newSegmentVersion;
|
||||
return new SegmentIdWithShardSpec(
|
||||
dataSource,
|
||||
interval,
|
||||
version,
|
||||
partialShardSpec.complete(jsonMapper, newPartitionId, 0)
|
||||
);
|
||||
} else if (!maxId.getInterval().equals(interval) || maxId.getVersion().compareTo(maxVersion) > 0) {
|
||||
} else if (!maxId.getInterval().equals(interval) || maxId.getVersion().compareTo(existingVersion) > 0) {
|
||||
log.warn(
|
||||
"Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: conflicting segment[%s].",
|
||||
"Cannot allocate new segment for dataSource[%s], interval[%s], existingVersion[%s]: conflicting segment[%s].",
|
||||
dataSource,
|
||||
interval,
|
||||
maxVersion,
|
||||
existingVersion,
|
||||
maxId
|
||||
);
|
||||
return null;
|
||||
|
@ -958,7 +1001,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
|
|||
return new SegmentIdWithShardSpec(
|
||||
dataSource,
|
||||
maxId.getInterval(),
|
||||
Preconditions.checkNotNull(versionOfExistingChunks, "versionOfExistingChunks"),
|
||||
Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion"),
|
||||
partialShardSpec.complete(
|
||||
jsonMapper,
|
||||
maxId.getShardSpec().getPartitionNum() + 1,
|
||||
|
|
|
@ -383,7 +383,7 @@ public abstract class BaseAppenderatorDriver implements Closeable
|
|||
* @param sequenceName sequenceName for this row's segment
|
||||
* @param committerSupplier supplier of a committer associated with all data that has been added, including this row
|
||||
* if {@param allowIncrementalPersists} is set to false then this will not be used
|
||||
* @param skipSegmentLineageCheck if true, perform lineage validation using previousSegmentId for this sequence.
|
||||
* @param skipSegmentLineageCheck if false, perform lineage validation using previousSegmentId for this sequence.
|
||||
* Should be set to false if replica tasks would index events in same order
|
||||
* @param allowIncrementalPersists whether to allow persist to happen when maxRowsInMemory or intermediate persist period
|
||||
* threshold is hit
|
||||
|
|
|
@ -37,7 +37,7 @@ public interface SegmentAllocator
|
|||
* When skipSegmentLineageCheck is false, this can be null if it is the first call
|
||||
* for the same sequenceName.
|
||||
* When skipSegmentLineageCheck is true, this will be ignored.
|
||||
* @param skipSegmentLineageCheck if true, perform lineage validation using previousSegmentId for this sequence.
|
||||
* @param skipSegmentLineageCheck if false, perform lineage validation using previousSegmentId for this sequence.
|
||||
* Should be set to false if replica tasks would index events in same order
|
||||
*
|
||||
* @return the pending segment identifier, or null if it was impossible to allocate a new segment
|
||||
|
|
|
@ -167,8 +167,10 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
|
|||
* @param sequenceName sequenceName for this row's segment
|
||||
* @param committerSupplier supplier of a committer associated with all data that has been added, including this row
|
||||
* if {@param allowIncrementalPersists} is set to false then this will not be used
|
||||
* @param skipSegmentLineageCheck if true, perform lineage validation using previousSegmentId for this sequence.
|
||||
* Should be set to false if replica tasks would index events in same order
|
||||
* @param skipSegmentLineageCheck Should be set {@code false} to perform lineage validation using previousSegmentId for this sequence.
|
||||
* Note that for Kafka Streams we should disable this check and set this parameter to
|
||||
* {@code true}.
|
||||
* if {@code true}, skips, does not enforce, lineage validation.
|
||||
* @param allowIncrementalPersists whether to allow persist to happen when maxRowsInMemory or intermediate persist period
|
||||
* threshold is hit
|
||||
*
|
||||
|
|
|
@ -275,7 +275,12 @@ public class IndexerSQLMetadataStorageCoordinatorTest
|
|||
|
||||
private void markAllSegmentsUnused()
|
||||
{
|
||||
for (final DataSegment segment : SEGMENTS) {
|
||||
markAllSegmentsUnused(SEGMENTS);
|
||||
}
|
||||
|
||||
private void markAllSegmentsUnused(Set<DataSegment> segments)
|
||||
{
|
||||
for (final DataSegment segment : segments) {
|
||||
Assert.assertEquals(
|
||||
1,
|
||||
(int) derbyConnector.getDBI().<Integer>withHandle(
|
||||
|
@ -296,6 +301,45 @@ public class IndexerSQLMetadataStorageCoordinatorTest
|
|||
}
|
||||
}
|
||||
|
||||
private void markAllSegmentsUsed(Set<DataSegment> segments)
|
||||
{
|
||||
for (final DataSegment segment : segments) {
|
||||
Assert.assertEquals(
|
||||
1,
|
||||
(int) derbyConnector.getDBI().<Integer>withHandle(
|
||||
new HandleCallback<Integer>()
|
||||
{
|
||||
@Override
|
||||
public Integer withHandle(Handle handle)
|
||||
{
|
||||
String request = StringUtils.format(
|
||||
"UPDATE %s SET used = true WHERE id = :id",
|
||||
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable()
|
||||
);
|
||||
return handle.createStatement(request).bind("id", segment.getId().toString()).execute();
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private List<String> retrievePendingSegmentIds()
|
||||
{
|
||||
final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getPendingSegmentsTable();
|
||||
return derbyConnector.retryWithHandle(
|
||||
new HandleCallback<List<String>>()
|
||||
{
|
||||
@Override
|
||||
public List<String> withHandle(Handle handle)
|
||||
{
|
||||
return handle.createQuery("SELECT id FROM " + table + " ORDER BY id")
|
||||
.map(StringMapper.FIRST)
|
||||
.list();
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
private List<String> retrieveUsedSegmentIds()
|
||||
{
|
||||
final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
|
||||
|
@ -313,6 +357,24 @@ public class IndexerSQLMetadataStorageCoordinatorTest
|
|||
);
|
||||
}
|
||||
|
||||
private List<String> retrieveUnusedSegmentIds()
|
||||
{
|
||||
final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
|
||||
return derbyConnector.retryWithHandle(
|
||||
new HandleCallback<List<String>>()
|
||||
{
|
||||
@Override
|
||||
public List<String> withHandle(Handle handle)
|
||||
{
|
||||
return handle.createQuery("SELECT id FROM " + table + " WHERE used = false ORDER BY id")
|
||||
.map(StringMapper.FIRST)
|
||||
.list();
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
private Boolean insertUsedSegments(Set<DataSegment> dataSegments)
|
||||
{
|
||||
final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
|
||||
|
@ -1203,6 +1265,349 @@ public class IndexerSQLMetadataStorageCoordinatorTest
|
|||
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_3", identifier4.toString());
|
||||
}
|
||||
|
||||
/**
|
||||
* This test simulates an issue detected on the field consisting of the following sequence of events:
|
||||
* - A kafka stream segment was created on a given interval
|
||||
* - Later, after the above was published, another segment on same interval was created by the stream
|
||||
* - Later, after the above was published, another segment on same interval was created by the stream
|
||||
* - Later a compaction was issued for the three segments above
|
||||
* - Later, after the above was published, another segment on same interval was created by the stream
|
||||
* - Later, the compacted segment got dropped due to a drop rule
|
||||
* - Later, after the above was dropped, another segment on same interval was created by the stream but this
|
||||
* time there was an integrity violation in the pending segments table because the
|
||||
* {@link IndexerSQLMetadataStorageCoordinator#createNewSegment(Handle, String, Interval, PartialShardSpec, String)}
|
||||
* method returned an segment id that already existed in the pending segments table
|
||||
*/
|
||||
@Test
|
||||
public void testAllocatePendingSegmentAfterDroppingExistingSegment()
|
||||
{
|
||||
String maxVersion = "version_newer_newer";
|
||||
|
||||
// simulate one load using kafka streaming
|
||||
final PartialShardSpec partialShardSpec = NumberedPartialShardSpec.instance();
|
||||
final String dataSource = "ds";
|
||||
final Interval interval = Intervals.of("2017-01-01/2017-02-01");
|
||||
final SegmentIdWithShardSpec identifier = coordinator.allocatePendingSegment(
|
||||
dataSource,
|
||||
"seq",
|
||||
null,
|
||||
interval,
|
||||
partialShardSpec,
|
||||
"version",
|
||||
true
|
||||
);
|
||||
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version", identifier.toString());
|
||||
|
||||
// simulate one more load using kafka streaming (as if previous segment was published, note different sequence name)
|
||||
final SegmentIdWithShardSpec identifier1 = coordinator.allocatePendingSegment(
|
||||
dataSource,
|
||||
"seq2",
|
||||
identifier.toString(),
|
||||
interval,
|
||||
partialShardSpec,
|
||||
maxVersion,
|
||||
true
|
||||
);
|
||||
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_1", identifier1.toString());
|
||||
|
||||
// simulate one more load using kafka streaming (as if previous segment was published, note different sequence name)
|
||||
final SegmentIdWithShardSpec identifier2 = coordinator.allocatePendingSegment(
|
||||
dataSource,
|
||||
"seq3",
|
||||
identifier1.toString(),
|
||||
interval,
|
||||
partialShardSpec,
|
||||
maxVersion,
|
||||
true
|
||||
);
|
||||
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_2", identifier2.toString());
|
||||
|
||||
// now simulate that one compaction was done (batch) ingestion for same interval (like reindex of the previous three):
|
||||
DataSegment segment = new DataSegment(
|
||||
"ds",
|
||||
Intervals.of("2017-01-01T00Z/2017-02-01T00Z"),
|
||||
"version_new",
|
||||
ImmutableMap.of(),
|
||||
ImmutableList.of("dim1"),
|
||||
ImmutableList.of("m1"),
|
||||
new LinearShardSpec(0),
|
||||
9,
|
||||
100
|
||||
);
|
||||
Assert.assertTrue(insertUsedSegments(ImmutableSet.of(segment)));
|
||||
List<String> ids = retrieveUsedSegmentIds();
|
||||
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_new", ids.get(0));
|
||||
|
||||
// one more load on same interval:
|
||||
final SegmentIdWithShardSpec identifier3 = coordinator.allocatePendingSegment(
|
||||
dataSource,
|
||||
"seq4",
|
||||
identifier1.toString(),
|
||||
interval,
|
||||
partialShardSpec,
|
||||
maxVersion,
|
||||
true
|
||||
);
|
||||
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_new_1", identifier3.toString());
|
||||
|
||||
// now drop the used segment previously loaded:
|
||||
markAllSegmentsUnused(ImmutableSet.of(segment));
|
||||
|
||||
// and final load, this reproduces an issue that could happen with multiple streaming appends,
|
||||
// followed by a reindex, followed by a drop, and more streaming data coming in for same interval
|
||||
final SegmentIdWithShardSpec identifier4 = coordinator.allocatePendingSegment(
|
||||
dataSource,
|
||||
"seq5",
|
||||
identifier1.toString(),
|
||||
interval,
|
||||
partialShardSpec,
|
||||
maxVersion,
|
||||
true
|
||||
);
|
||||
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_new_2", identifier4.toString());
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Slightly different that the above test but that involves reverted compaction
|
||||
1) used segments of version = A, id = 0, 1, 2
|
||||
2) overwrote segments of version = B, id = 0 <= compaction
|
||||
3) marked segments unused for version = A, id = 0, 1, 2 <= overshadowing
|
||||
4) pending segment of version = B, id = 1 <= appending new data, aborted
|
||||
5) reverted compaction, mark segments used for version = A, id = 0, 1, 2, and mark compacted segments unused
|
||||
6) used segments of version = A, id = 0, 1, 2
|
||||
7) pending segment of version = B, id = 1
|
||||
*/
|
||||
@Test
|
||||
public void testAnotherAllocatePendingSegmentAfterRevertingCompaction()
|
||||
{
|
||||
String maxVersion = "Z";
|
||||
|
||||
// 1.0) simulate one append load
|
||||
final PartialShardSpec partialShardSpec = NumberedPartialShardSpec.instance();
|
||||
final String dataSource = "ds";
|
||||
final Interval interval = Intervals.of("2017-01-01/2017-02-01");
|
||||
final SegmentIdWithShardSpec identifier = coordinator.allocatePendingSegment(
|
||||
dataSource,
|
||||
"seq",
|
||||
null,
|
||||
interval,
|
||||
partialShardSpec,
|
||||
"A",
|
||||
true
|
||||
);
|
||||
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A", identifier.toString());
|
||||
// Assume it publishes; create its corresponding segment
|
||||
DataSegment segment = new DataSegment(
|
||||
"ds",
|
||||
Intervals.of("2017-01-01T00Z/2017-02-01T00Z"),
|
||||
"A",
|
||||
ImmutableMap.of(),
|
||||
ImmutableList.of("dim1"),
|
||||
ImmutableList.of("m1"),
|
||||
new LinearShardSpec(0),
|
||||
9,
|
||||
100
|
||||
);
|
||||
Assert.assertTrue(insertUsedSegments(ImmutableSet.of(segment)));
|
||||
List<String> ids = retrieveUsedSegmentIds();
|
||||
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A", ids.get(0));
|
||||
|
||||
|
||||
// 1.1) simulate one more append load (as if previous segment was published, note different sequence name)
|
||||
final SegmentIdWithShardSpec identifier1 = coordinator.allocatePendingSegment(
|
||||
dataSource,
|
||||
"seq2",
|
||||
identifier.toString(),
|
||||
interval,
|
||||
partialShardSpec,
|
||||
maxVersion,
|
||||
true
|
||||
);
|
||||
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_1", identifier1.toString());
|
||||
// Assume it publishes; create its corresponding segment
|
||||
segment = new DataSegment(
|
||||
"ds",
|
||||
Intervals.of("2017-01-01T00Z/2017-02-01T00Z"),
|
||||
"A",
|
||||
ImmutableMap.of(),
|
||||
ImmutableList.of("dim1"),
|
||||
ImmutableList.of("m1"),
|
||||
new LinearShardSpec(1),
|
||||
9,
|
||||
100
|
||||
);
|
||||
Assert.assertTrue(insertUsedSegments(ImmutableSet.of(segment)));
|
||||
ids = retrieveUsedSegmentIds();
|
||||
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_1", ids.get(1));
|
||||
|
||||
|
||||
// 1.2) simulate one more append load (as if previous segment was published, note different sequence name)
|
||||
final SegmentIdWithShardSpec identifier2 = coordinator.allocatePendingSegment(
|
||||
dataSource,
|
||||
"seq3",
|
||||
identifier1.toString(),
|
||||
interval,
|
||||
partialShardSpec,
|
||||
maxVersion,
|
||||
true
|
||||
);
|
||||
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_2", identifier2.toString());
|
||||
// Assume it publishes; create its corresponding segment
|
||||
segment = new DataSegment(
|
||||
"ds",
|
||||
Intervals.of("2017-01-01T00Z/2017-02-01T00Z"),
|
||||
"A",
|
||||
ImmutableMap.of(),
|
||||
ImmutableList.of("dim1"),
|
||||
ImmutableList.of("m1"),
|
||||
new LinearShardSpec(2),
|
||||
9,
|
||||
100
|
||||
);
|
||||
// state so far:
|
||||
// pendings: A: 0,1,2
|
||||
// used segments A: 0,1,2
|
||||
// unused segments:
|
||||
Assert.assertTrue(insertUsedSegments(ImmutableSet.of(segment)));
|
||||
ids = retrieveUsedSegmentIds();
|
||||
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_2", ids.get(2));
|
||||
|
||||
|
||||
// 2)
|
||||
// now simulate that one compaction was done (batch) ingestion for same interval (like reindex of the previous three):
|
||||
DataSegment compactedSegment = new DataSegment(
|
||||
"ds",
|
||||
Intervals.of("2017-01-01T00Z/2017-02-01T00Z"),
|
||||
"B",
|
||||
ImmutableMap.of(),
|
||||
ImmutableList.of("dim1"),
|
||||
ImmutableList.of("m1"),
|
||||
new LinearShardSpec(0),
|
||||
9,
|
||||
100
|
||||
);
|
||||
Assert.assertTrue(insertUsedSegments(ImmutableSet.of(compactedSegment)));
|
||||
ids = retrieveUsedSegmentIds();
|
||||
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_B", ids.get(3));
|
||||
// 3) When overshadowing, segments are still marked as "used" in the segments table
|
||||
// state so far:
|
||||
// pendings: A: 0,1,2
|
||||
// used segments: A: 0,1,2; B: 0 <- new compacted segment, overshadows previous version A
|
||||
// unused segment:
|
||||
|
||||
// 4) pending segment of version = B, id = 1 <= appending new data, aborted
|
||||
final SegmentIdWithShardSpec identifier3 = coordinator.allocatePendingSegment(
|
||||
dataSource,
|
||||
"seq4",
|
||||
identifier2.toString(),
|
||||
interval,
|
||||
partialShardSpec,
|
||||
maxVersion,
|
||||
true
|
||||
);
|
||||
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_B_1", identifier3.toString());
|
||||
// no corresponding segment, pending aborted
|
||||
// state so far:
|
||||
// pendings: A: 0,1,2; B:1 (note that B_1 does not make it into segments since its task aborted)
|
||||
// used segments: A: 0,1,2; B: 0 <- compacted segment, overshadows previous version A
|
||||
// unused segment:
|
||||
|
||||
// 5) reverted compaction (by marking B_0 as unused)
|
||||
// Revert compaction a manual metadata update which is basically the following two steps:
|
||||
markAllSegmentsUnused(ImmutableSet.of(compactedSegment)); // <- drop compacted segment
|
||||
// pending: version = A, id = 0,1,2
|
||||
// version = B, id = 1
|
||||
//
|
||||
// used segment: version = A, id = 0,1,2
|
||||
// unused segment: version = B, id = 0
|
||||
List<String> pendings = retrievePendingSegmentIds();
|
||||
Assert.assertTrue(pendings.size() == 4);
|
||||
|
||||
List<String> used = retrieveUsedSegmentIds();
|
||||
Assert.assertTrue(used.size() == 3);
|
||||
|
||||
List<String> unused = retrieveUnusedSegmentIds();
|
||||
Assert.assertTrue(unused.size() == 1);
|
||||
|
||||
// Simulate one more append load
|
||||
final SegmentIdWithShardSpec identifier4 = coordinator.allocatePendingSegment(
|
||||
dataSource,
|
||||
"seq5",
|
||||
identifier1.toString(),
|
||||
interval,
|
||||
partialShardSpec,
|
||||
maxVersion,
|
||||
true
|
||||
);
|
||||
// maxid = B_1 -> new partno = 2
|
||||
// versionofexistingchunk=A
|
||||
// ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_2
|
||||
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_3", identifier4.toString());
|
||||
// Assume it publishes; create its corresponding segment
|
||||
segment = new DataSegment(
|
||||
"ds",
|
||||
Intervals.of("2017-01-01T00Z/2017-02-01T00Z"),
|
||||
"A",
|
||||
ImmutableMap.of(),
|
||||
ImmutableList.of("dim1"),
|
||||
ImmutableList.of("m1"),
|
||||
new LinearShardSpec(3),
|
||||
9,
|
||||
100
|
||||
);
|
||||
// pending: version = A, id = 0,1,2,3
|
||||
// version = B, id = 1
|
||||
//
|
||||
// used segment: version = A, id = 0,1,2,3
|
||||
// unused segment: version = B, id = 0
|
||||
Assert.assertTrue(insertUsedSegments(ImmutableSet.of(segment)));
|
||||
ids = retrieveUsedSegmentIds();
|
||||
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_3", ids.get(3));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoPendingSegmentsAndOneUsedSegment()
|
||||
{
|
||||
String maxVersion = "Z";
|
||||
|
||||
// create one used segment
|
||||
DataSegment segment = new DataSegment(
|
||||
"ds",
|
||||
Intervals.of("2017-01-01T00Z/2017-02-01T00Z"),
|
||||
"A",
|
||||
ImmutableMap.of(),
|
||||
ImmutableList.of("dim1"),
|
||||
ImmutableList.of("m1"),
|
||||
new LinearShardSpec(0),
|
||||
9,
|
||||
100
|
||||
);
|
||||
Assert.assertTrue(insertUsedSegments(ImmutableSet.of(segment)));
|
||||
List<String> ids = retrieveUsedSegmentIds();
|
||||
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A", ids.get(0));
|
||||
|
||||
|
||||
// simulate one aborted append load
|
||||
final PartialShardSpec partialShardSpec = NumberedPartialShardSpec.instance();
|
||||
final String dataSource = "ds";
|
||||
final Interval interval = Intervals.of("2017-01-01/2017-02-01");
|
||||
final SegmentIdWithShardSpec identifier = coordinator.allocatePendingSegment(
|
||||
dataSource,
|
||||
"seq",
|
||||
null,
|
||||
interval,
|
||||
partialShardSpec,
|
||||
maxVersion,
|
||||
true
|
||||
);
|
||||
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_1", identifier.toString());
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
public void testDeletePendingSegment() throws InterruptedException
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue