diff --git a/client/src/main/java/com/metamx/druid/coordination/BatchDataSegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/BatchDataSegmentAnnouncer.java index 10938b660e4..46aedcee162 100644 --- a/client/src/main/java/com/metamx/druid/coordination/BatchDataSegmentAnnouncer.java +++ b/client/src/main/java/com/metamx/druid/coordination/BatchDataSegmentAnnouncer.java @@ -37,6 +37,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.atomic.AtomicLong; /** */ @@ -50,6 +51,7 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer private final String liveSegmentLocation; private final Object lock = new Object(); + private final AtomicLong counter = new AtomicLong(0); private final Set availableZNodes = new ConcurrentSkipListSet(); private final Map segmentLookup = Maps.newConcurrentMap(); @@ -118,15 +120,17 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer return; } - segmentZNode.removeSegment(segment); + synchronized (lock) { + segmentZNode.removeSegment(segment); - log.info("Unannouncing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath()); - if (segmentZNode.getCount() == 0) { - availableZNodes.remove(segmentZNode); - announcer.unannounce(segmentZNode.getPath()); - } else { - announcer.update(segmentZNode.getPath(), segmentZNode.getBytes()); - availableZNodes.add(segmentZNode); + log.info("Unannouncing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath()); + if (segmentZNode.getCount() == 0) { + availableZNodes.remove(segmentZNode); + announcer.unannounce(segmentZNode.getPath()); + } else { + announcer.update(segmentZNode.getPath(), segmentZNode.getBytes()); + availableZNodes.add(segmentZNode); + } } } @@ -176,7 +180,7 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer private String makeServedSegmentPath(String zNode) { - return ZKPaths.makePath(liveSegmentLocation, zNode); + return ZKPaths.makePath(liveSegmentLocation, String.format("%s%s", zNode, counter.getAndIncrement())); } private class SegmentZNode implements Comparable