Fix ConcurrentModificationException in IncrementalPublishingKafkaIndexTaskRunner (#5907)

* Fix ConcurrentModificationException in IncrementalPublishingKafkaIndexTaskRunner

* fix lock and add comments
This commit is contained in:
Jihoon Son 2018-06-30 17:20:41 -07:00 committed by Gian Merlino
parent 933b25416c
commit b76a056c14

View File

@ -103,6 +103,8 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -1501,13 +1503,19 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
private static class SequenceMetadata
{
/**
* Lock for accessing {@link #endOffsets} and {@link #checkpointed}. This lock is required because
* {@link #setEndOffsets)} can be called by both the main thread and the HTTP thread.
*/
private final ReentrantLock lock = new ReentrantLock();
private final int sequenceId;
private final String sequenceName;
private final Map<Integer, Long> startOffsets;
private final Map<Integer, Long> endOffsets;
private final Set<Integer> assignments;
private final boolean sentinel;
private volatile boolean checkpointed;
private boolean checkpointed;
@JsonCreator
public SequenceMetadata(
@ -1524,8 +1532,8 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
this.sequenceId = sequenceId;
this.sequenceName = sequenceName;
this.startOffsets = ImmutableMap.copyOf(startOffsets);
this.endOffsets = Maps.newHashMap(endOffsets);
this.assignments = Sets.newHashSet(startOffsets.keySet());
this.endOffsets = new HashMap<>(endOffsets);
this.assignments = new HashSet<>(startOffsets.keySet());
this.checkpointed = checkpointed;
this.sentinel = false;
}
@ -1539,7 +1547,13 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
@JsonProperty
public boolean isCheckpointed()
{
return checkpointed;
lock.lock();
try {
return checkpointed;
}
finally {
lock.unlock();
}
}
@JsonProperty
@ -1557,7 +1571,13 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
@JsonProperty
public Map<Integer, Long> getEndOffsets()
{
return endOffsets;
lock.lock();
try {
return endOffsets;
}
finally {
lock.unlock();
}
}
@JsonProperty
@ -1568,19 +1588,30 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
void setEndOffsets(Map<Integer, Long> newEndOffsets)
{
endOffsets.putAll(newEndOffsets);
checkpointed = true;
lock.lock();
try {
endOffsets.putAll(newEndOffsets);
checkpointed = true;
}
finally {
lock.unlock();
}
}
void updateAssignments(Map<Integer, Long> nextPartitionOffset)
{
assignments.clear();
nextPartitionOffset.entrySet().forEach(partitionOffset -> {
if (Longs.compare(endOffsets.get(partitionOffset.getKey()), nextPartitionOffset.get(partitionOffset.getKey()))
> 0) {
assignments.add(partitionOffset.getKey());
}
});
lock.lock();
try {
assignments.clear();
nextPartitionOffset.forEach((key, value) -> {
if (Longs.compare(endOffsets.get(key), nextPartitionOffset.get(key)) > 0) {
assignments.add(key);
}
});
}
finally {
lock.unlock();
}
}
boolean isOpen()
@ -1590,10 +1621,17 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
boolean canHandle(ConsumerRecord<byte[], byte[]> record)
{
return isOpen()
&& endOffsets.get(record.partition()) != null
&& record.offset() >= startOffsets.get(record.partition())
&& record.offset() < endOffsets.get(record.partition());
lock.lock();
try {
final Long partitionEndOffset = endOffsets.get(record.partition());
return isOpen()
&& partitionEndOffset != null
&& record.offset() >= startOffsets.get(record.partition())
&& record.offset() < partitionEndOffset;
}
finally {
lock.unlock();
}
}
private SequenceMetadata()
@ -1615,15 +1653,21 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
@Override
public String toString()
{
return "SequenceMetadata{" +
"sequenceName='" + sequenceName + '\'' +
", sequenceId=" + sequenceId +
", startOffsets=" + startOffsets +
", endOffsets=" + endOffsets +
", assignments=" + assignments +
", sentinel=" + sentinel +
", checkpointed=" + checkpointed +
'}';
lock.lock();
try {
return "SequenceMetadata{" +
"sequenceName='" + sequenceName + '\'' +
", sequenceId=" + sequenceId +
", startOffsets=" + startOffsets +
", endOffsets=" + endOffsets +
", assignments=" + assignments +
", sentinel=" + sentinel +
", checkpointed=" + checkpointed +
'}';
}
finally {
lock.unlock();
}
}
Supplier<Committer> getCommitterSupplier(String topic, Map<Integer, Long> lastPersistedOffsets)
@ -1635,28 +1679,40 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
@Override
public Object getMetadata()
{
Preconditions.checkState(
assignments.isEmpty(),
"This committer can be used only once all the records till offsets [%s] have been consumed, also make sure to call updateAssignments before using this committer",
endOffsets
);
lock.lock();
// merge endOffsets for this sequence with globally lastPersistedOffsets
// This is done because this committer would be persisting only sub set of segments
// corresponding to the current sequence. Generally, lastPersistedOffsets should already
// cover endOffsets but just to be sure take max of offsets and persist that
for (Map.Entry<Integer, Long> partitionOffset : endOffsets.entrySet()) {
lastPersistedOffsets.put(partitionOffset.getKey(), Math.max(
partitionOffset.getValue(),
lastPersistedOffsets.getOrDefault(partitionOffset.getKey(), 0L)
));
try {
Preconditions.checkState(
assignments.isEmpty(),
"This committer can be used only once all the records till offsets [%s] have been consumed, also make"
+ " sure to call updateAssignments before using this committer",
endOffsets
);
// merge endOffsets for this sequence with globally lastPersistedOffsets
// This is done because this committer would be persisting only sub set of segments
// corresponding to the current sequence. Generally, lastPersistedOffsets should already
// cover endOffsets but just to be sure take max of offsets and persist that
for (Map.Entry<Integer, Long> partitionOffset : endOffsets.entrySet()) {
lastPersistedOffsets.put(
partitionOffset.getKey(),
Math.max(
partitionOffset.getValue(),
lastPersistedOffsets.getOrDefault(partitionOffset.getKey(), 0L)
)
);
}
// Publish metadata can be different from persist metadata as we are going to publish only
// subset of segments
return ImmutableMap.of(
METADATA_NEXT_PARTITIONS, new KafkaPartitions(topic, lastPersistedOffsets),
METADATA_PUBLISH_PARTITIONS, new KafkaPartitions(topic, endOffsets)
);
}
finally {
lock.unlock();
}
// Publish metadata can be different from persist metadata as we are going to publish only
// subset of segments
return ImmutableMap.of(METADATA_NEXT_PARTITIONS, new KafkaPartitions(topic, lastPersistedOffsets),
METADATA_PUBLISH_PARTITIONS, new KafkaPartitions(topic, endOffsets)
);
}
@Override