fix previous fix because multithread is not correct

This commit is contained in:
fjy 2013-08-12 18:53:50 -07:00
parent 5ff216838d
commit 5cb1475a91
1 changed files with 13 additions and 9 deletions

View File

@ -37,6 +37,7 @@ import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicLong;
/** /**
*/ */
@ -50,6 +51,7 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
private final String liveSegmentLocation; private final String liveSegmentLocation;
private final Object lock = new Object(); private final Object lock = new Object();
private final AtomicLong counter = new AtomicLong(0);
private final Set<SegmentZNode> availableZNodes = new ConcurrentSkipListSet<SegmentZNode>(); private final Set<SegmentZNode> availableZNodes = new ConcurrentSkipListSet<SegmentZNode>();
private final Map<DataSegment, SegmentZNode> segmentLookup = Maps.newConcurrentMap(); private final Map<DataSegment, SegmentZNode> segmentLookup = Maps.newConcurrentMap();
@ -118,15 +120,17 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
return; return;
} }
segmentZNode.removeSegment(segment); synchronized (lock) {
segmentZNode.removeSegment(segment);
log.info("Unannouncing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath()); log.info("Unannouncing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath());
if (segmentZNode.getCount() == 0) { if (segmentZNode.getCount() == 0) {
availableZNodes.remove(segmentZNode); availableZNodes.remove(segmentZNode);
announcer.unannounce(segmentZNode.getPath()); announcer.unannounce(segmentZNode.getPath());
} else { } else {
announcer.update(segmentZNode.getPath(), segmentZNode.getBytes()); announcer.update(segmentZNode.getPath(), segmentZNode.getBytes());
availableZNodes.add(segmentZNode); availableZNodes.add(segmentZNode);
}
} }
} }
@ -176,7 +180,7 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
private String makeServedSegmentPath(String zNode) private String makeServedSegmentPath(String zNode)
{ {
return ZKPaths.makePath(liveSegmentLocation, zNode); return ZKPaths.makePath(liveSegmentLocation, String.format("%s%s", zNode, counter.getAndIncrement()));
} }
private class SegmentZNode implements Comparable<SegmentZNode> private class SegmentZNode implements Comparable<SegmentZNode>