Merge pull request #216 from metamx/announce-fix

Make batch data segment announcer thread safe
This commit is contained in:
fjy 2013-08-12 18:54:23 -07:00
commit 08e2b94e41

View File

@ -36,6 +36,8 @@ import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicLong;
/** /**
*/ */
@ -48,8 +50,11 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final String liveSegmentLocation; private final String liveSegmentLocation;
private final Set<SegmentZNode> availableZNodes = Sets.newHashSet(); private final Object lock = new Object();
private final Map<DataSegment, SegmentZNode> segmentLookup = Maps.newHashMap(); private final AtomicLong counter = new AtomicLong(0);
private final Set<SegmentZNode> availableZNodes = new ConcurrentSkipListSet<SegmentZNode>();
private final Map<DataSegment, SegmentZNode> segmentLookup = Maps.newConcurrentMap();
public BatchDataSegmentAnnouncer( public BatchDataSegmentAnnouncer(
DruidServerMetadata server, DruidServerMetadata server,
@ -74,32 +79,34 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxNumBytes()); throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxNumBytes());
} }
// create new batch synchronized (lock) {
if (availableZNodes.isEmpty()) { // create new batch
SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString())); if (availableZNodes.isEmpty()) {
availableZNode.addSegment(segment); SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString()));
availableZNode.addSegment(segment);
log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath()); log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
announcer.announce(availableZNode.getPath(), availableZNode.getBytes()); announcer.announce(availableZNode.getPath(), availableZNode.getBytes());
segmentLookup.put(segment, availableZNode); segmentLookup.put(segment, availableZNode);
availableZNodes.add(availableZNode); availableZNodes.add(availableZNode);
} else { // update existing batch } else { // update existing batch
Iterator<SegmentZNode> iter = availableZNodes.iterator(); Iterator<SegmentZNode> iter = availableZNodes.iterator();
boolean done = false; boolean done = false;
while (iter.hasNext() && !done) { while (iter.hasNext() && !done) {
SegmentZNode availableZNode = iter.next(); SegmentZNode availableZNode = iter.next();
if (availableZNode.getBytes().length + newBytesLen < config.getMaxNumBytes()) { if (availableZNode.getBytes().length + newBytesLen < config.getMaxNumBytes()) {
availableZNode.addSegment(segment); availableZNode.addSegment(segment);
log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath()); log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
announcer.update(availableZNode.getPath(), availableZNode.getBytes()); announcer.update(availableZNode.getPath(), availableZNode.getBytes());
segmentLookup.put(segment, availableZNode); segmentLookup.put(segment, availableZNode);
if (availableZNode.getCount() >= config.getSegmentsPerNode()) { if (availableZNode.getCount() >= config.getSegmentsPerNode()) {
availableZNodes.remove(availableZNode); availableZNodes.remove(availableZNode);
}
done = true;
} }
done = true;
} }
} }
} }
@ -109,15 +116,21 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
public void unannounceSegment(DataSegment segment) throws IOException public void unannounceSegment(DataSegment segment) throws IOException
{ {
final SegmentZNode segmentZNode = segmentLookup.remove(segment); final SegmentZNode segmentZNode = segmentLookup.remove(segment);
segmentZNode.removeSegment(segment); if (segmentZNode == null) {
return;
}
log.info("Unannouncing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath()); synchronized (lock) {
if (segmentZNode.getCount() == 0) { segmentZNode.removeSegment(segment);
availableZNodes.remove(segmentZNode);
announcer.unannounce(segmentZNode.getPath()); log.info("Unannouncing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath());
} else { if (segmentZNode.getCount() == 0) {
announcer.update(segmentZNode.getPath(), segmentZNode.getBytes()); availableZNodes.remove(segmentZNode);
availableZNodes.add(segmentZNode); announcer.unannounce(segmentZNode.getPath());
} else {
announcer.update(segmentZNode.getPath(), segmentZNode.getBytes());
availableZNodes.add(segmentZNode);
}
} }
} }
@ -167,10 +180,10 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
private String makeServedSegmentPath(String zNode) private String makeServedSegmentPath(String zNode)
{ {
return ZKPaths.makePath(liveSegmentLocation, zNode); return ZKPaths.makePath(liveSegmentLocation, String.format("%s%s", zNode, counter.getAndIncrement()));
} }
private class SegmentZNode private class SegmentZNode implements Comparable<SegmentZNode>
{ {
private final String path; private final String path;
@ -286,5 +299,11 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
{ {
return path.hashCode(); return path.hashCode();
} }
@Override
public int compareTo(SegmentZNode segmentZNode)
{
return path.compareTo(segmentZNode.getPath());
}
} }
} }