From b76a056c144ef7b440e0f94e0a0324aa2eb34c8a Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sat, 30 Jun 2018 17:20:41 -0700 Subject: [PATCH] Fix ConcurrentModificationException in IncrementalPublishingKafkaIndexTaskRunner (#5907) * Fix ConcurrentModificationException in IncrementalPublishingKafkaIndexTaskRunner * fix lock and add comments --- ...ementalPublishingKafkaIndexTaskRunner.java | 150 ++++++++++++------ 1 file changed, 103 insertions(+), 47 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java index df2f768662d..3caeedf905d 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java @@ -103,6 +103,8 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -1501,13 +1503,19 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask private static class SequenceMetadata { + /** + * Lock for accessing {@link #endOffsets} and {@link #checkpointed}. This lock is required because + * {@link #setEndOffsets)} can be called by both the main thread and the HTTP thread. + */ + private final ReentrantLock lock = new ReentrantLock(); + private final int sequenceId; private final String sequenceName; private final Map startOffsets; private final Map endOffsets; private final Set assignments; private final boolean sentinel; - private volatile boolean checkpointed; + private boolean checkpointed; @JsonCreator public SequenceMetadata( @@ -1524,8 +1532,8 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask this.sequenceId = sequenceId; this.sequenceName = sequenceName; this.startOffsets = ImmutableMap.copyOf(startOffsets); - this.endOffsets = Maps.newHashMap(endOffsets); - this.assignments = Sets.newHashSet(startOffsets.keySet()); + this.endOffsets = new HashMap<>(endOffsets); + this.assignments = new HashSet<>(startOffsets.keySet()); this.checkpointed = checkpointed; this.sentinel = false; } @@ -1539,7 +1547,13 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask @JsonProperty public boolean isCheckpointed() { - return checkpointed; + lock.lock(); + try { + return checkpointed; + } + finally { + lock.unlock(); + } } @JsonProperty @@ -1557,7 +1571,13 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask @JsonProperty public Map getEndOffsets() { - return endOffsets; + lock.lock(); + try { + return endOffsets; + } + finally { + lock.unlock(); + } } @JsonProperty @@ -1568,19 +1588,30 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask void setEndOffsets(Map newEndOffsets) { - endOffsets.putAll(newEndOffsets); - checkpointed = true; + lock.lock(); + try { + endOffsets.putAll(newEndOffsets); + checkpointed = true; + } + finally { + lock.unlock(); + } } void updateAssignments(Map nextPartitionOffset) { - assignments.clear(); - nextPartitionOffset.entrySet().forEach(partitionOffset -> { - if (Longs.compare(endOffsets.get(partitionOffset.getKey()), nextPartitionOffset.get(partitionOffset.getKey())) - > 0) { - assignments.add(partitionOffset.getKey()); - } - }); + lock.lock(); + try { + assignments.clear(); + nextPartitionOffset.forEach((key, value) -> { + if (Longs.compare(endOffsets.get(key), nextPartitionOffset.get(key)) > 0) { + assignments.add(key); + } + }); + } + finally { + lock.unlock(); + } } boolean isOpen() @@ -1590,10 +1621,17 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask boolean canHandle(ConsumerRecord record) { - return isOpen() - && endOffsets.get(record.partition()) != null - && record.offset() >= startOffsets.get(record.partition()) - && record.offset() < endOffsets.get(record.partition()); + lock.lock(); + try { + final Long partitionEndOffset = endOffsets.get(record.partition()); + return isOpen() + && partitionEndOffset != null + && record.offset() >= startOffsets.get(record.partition()) + && record.offset() < partitionEndOffset; + } + finally { + lock.unlock(); + } } private SequenceMetadata() @@ -1615,15 +1653,21 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask @Override public String toString() { - return "SequenceMetadata{" + - "sequenceName='" + sequenceName + '\'' + - ", sequenceId=" + sequenceId + - ", startOffsets=" + startOffsets + - ", endOffsets=" + endOffsets + - ", assignments=" + assignments + - ", sentinel=" + sentinel + - ", checkpointed=" + checkpointed + - '}'; + lock.lock(); + try { + return "SequenceMetadata{" + + "sequenceName='" + sequenceName + '\'' + + ", sequenceId=" + sequenceId + + ", startOffsets=" + startOffsets + + ", endOffsets=" + endOffsets + + ", assignments=" + assignments + + ", sentinel=" + sentinel + + ", checkpointed=" + checkpointed + + '}'; + } + finally { + lock.unlock(); + } } Supplier getCommitterSupplier(String topic, Map lastPersistedOffsets) @@ -1635,28 +1679,40 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask @Override public Object getMetadata() { - Preconditions.checkState( - assignments.isEmpty(), - "This committer can be used only once all the records till offsets [%s] have been consumed, also make sure to call updateAssignments before using this committer", - endOffsets - ); + lock.lock(); - // merge endOffsets for this sequence with globally lastPersistedOffsets - // This is done because this committer would be persisting only sub set of segments - // corresponding to the current sequence. Generally, lastPersistedOffsets should already - // cover endOffsets but just to be sure take max of offsets and persist that - for (Map.Entry partitionOffset : endOffsets.entrySet()) { - lastPersistedOffsets.put(partitionOffset.getKey(), Math.max( - partitionOffset.getValue(), - lastPersistedOffsets.getOrDefault(partitionOffset.getKey(), 0L) - )); + try { + Preconditions.checkState( + assignments.isEmpty(), + "This committer can be used only once all the records till offsets [%s] have been consumed, also make" + + " sure to call updateAssignments before using this committer", + endOffsets + ); + + // merge endOffsets for this sequence with globally lastPersistedOffsets + // This is done because this committer would be persisting only sub set of segments + // corresponding to the current sequence. Generally, lastPersistedOffsets should already + // cover endOffsets but just to be sure take max of offsets and persist that + for (Map.Entry partitionOffset : endOffsets.entrySet()) { + lastPersistedOffsets.put( + partitionOffset.getKey(), + Math.max( + partitionOffset.getValue(), + lastPersistedOffsets.getOrDefault(partitionOffset.getKey(), 0L) + ) + ); + } + + // Publish metadata can be different from persist metadata as we are going to publish only + // subset of segments + return ImmutableMap.of( + METADATA_NEXT_PARTITIONS, new KafkaPartitions(topic, lastPersistedOffsets), + METADATA_PUBLISH_PARTITIONS, new KafkaPartitions(topic, endOffsets) + ); + } + finally { + lock.unlock(); } - - // Publish metadata can be different from persist metadata as we are going to publish only - // subset of segments - return ImmutableMap.of(METADATA_NEXT_PARTITIONS, new KafkaPartitions(topic, lastPersistedOffsets), - METADATA_PUBLISH_PARTITIONS, new KafkaPartitions(topic, endOffsets) - ); } @Override