fix unsafe concurrent access in StreamAppenderatorDriver (#9943)

during segment publishing we do streaming operations on a collection not
safe for concurrent modification. To guarantee correct results we must
also guard any operations on the stream itself.

This may explain the issue seen in https://github.com/apache/druid/issues/9845
This commit is contained in:
Xavier Léauté 2020-05-31 09:12:25 -07:00 committed by GitHub
parent c2c38f6ac2
commit acfcfd35b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 11 additions and 12 deletions

View File

@ -426,9 +426,9 @@ public abstract class BaseAppenderatorDriver implements Closeable
}
/**
* Returns a stream of {@link SegmentWithState} for the given sequenceNames.
* Returns a stream of {@link SegmentIdWithShardSpec} for the given sequenceNames.
*/
Stream<SegmentWithState> getSegmentWithStates(Collection<String> sequenceNames)
List<SegmentIdWithShardSpec> getSegmentIdsWithShardSpecs(Collection<String> sequenceNames)
{
synchronized (segments) {
return sequenceNames
@ -436,11 +436,13 @@ public abstract class BaseAppenderatorDriver implements Closeable
.map(segments::get)
.filter(Objects::nonNull)
.flatMap(segmentsForSequence -> segmentsForSequence.intervalToSegmentStates.values().stream())
.flatMap(segmentsOfInterval -> segmentsOfInterval.getAllSegments().stream());
.flatMap(segmentsOfInterval -> segmentsOfInterval.getAllSegments().stream())
.map(SegmentWithState::getSegmentIdentifier)
.collect(Collectors.toList());
}
}
Stream<SegmentWithState> getAppendingSegments(Collection<String> sequenceNames)
Set<SegmentIdWithShardSpec> getAppendingSegments(Collection<String> sequenceNames)
{
synchronized (segments) {
return sequenceNames
@ -449,7 +451,9 @@ public abstract class BaseAppenderatorDriver implements Closeable
.filter(Objects::nonNull)
.flatMap(segmentsForSequence -> segmentsForSequence.intervalToSegmentStates.values().stream())
.map(segmentsOfInterval -> segmentsOfInterval.appendingSegment)
.filter(Objects::nonNull);
.filter(Objects::nonNull)
.map(SegmentWithState::getSegmentIdentifier)
.collect(Collectors.toSet());
}
}

View File

@ -136,10 +136,7 @@ public class BatchAppenderatorDriver extends BaseAppenderatorDriver
long pushAndClearTimeoutMs
) throws InterruptedException, ExecutionException, TimeoutException
{
final Set<SegmentIdWithShardSpec> requestedSegmentIdsForSequences = getAppendingSegments(sequenceNames)
.map(SegmentWithState::getSegmentIdentifier)
.collect(Collectors.toSet());
final Set<SegmentIdWithShardSpec> requestedSegmentIdsForSequences = getAppendingSegments(sequenceNames);
final ListenableFuture<SegmentsAndCommitMetadata> future = ListenableFutures.transformAsync(
pushInBackground(null, requestedSegmentIdsForSequences, false),

View File

@ -271,9 +271,7 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
final Collection<String> sequenceNames
)
{
final List<SegmentIdWithShardSpec> theSegments = getSegmentWithStates(sequenceNames)
.map(SegmentWithState::getSegmentIdentifier)
.collect(Collectors.toList());
final List<SegmentIdWithShardSpec> theSegments = getSegmentIdsWithShardSpecs(sequenceNames);
final ListenableFuture<SegmentsAndCommitMetadata> publishFuture = ListenableFutures.transformAsync(
// useUniquePath=true prevents inconsistencies in segment data when task failures or replicas leads to a second