announcer thread fixes

This commit is contained in:
fjy 2013-07-09 17:16:45 -07:00
parent adda1488dc
commit dd203f969b
2 changed files with 33 additions and 28 deletions

View File

@ -81,26 +81,33 @@ public class BatchingCuratorDataSegmentAnnouncer extends AbstractDataSegmentAnno
throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxNumBytes());
}
// create new batch
if (availableZNodes.isEmpty()) {
availableZNodes.add(new SegmentZNode(makeServedSegmentPath(new DateTime().toString())));
}
SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString()));
availableZNode.addSegment(segment);
Iterator<SegmentZNode> iter = availableZNodes.iterator();
boolean done = false;
while (iter.hasNext() && !done) {
SegmentZNode availableZNode = iter.next();
if (availableZNode.getBytes().length + newBytesLen < config.getMaxNumBytes()) {
availableZNode.addSegment(segment);
log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
announcer.announce(availableZNode.getPath(), availableZNode.getBytes());
segmentLookup.put(segment, availableZNode);
availableZNodes.add(availableZNode);
} else { // update existing batch
Iterator<SegmentZNode> iter = availableZNodes.iterator();
boolean done = false;
while (iter.hasNext() && !done) {
SegmentZNode availableZNode = iter.next();
if (availableZNode.getBytes().length + newBytesLen < config.getMaxNumBytes()) {
availableZNode.addSegment(segment);
log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
announcer.update(availableZNode.getPath(), availableZNode.getBytes());
segmentLookup.put(segment, availableZNode);
log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
announcer.update(availableZNode.getPath(), availableZNode.getBytes());
segmentLookup.put(segment, availableZNode);
if (availableZNode.getCount() >= config.getSegmentsPerNode()) {
availableZNodes.remove(availableZNode);
if (availableZNode.getCount() >= config.getSegmentsPerNode()) {
availableZNodes.remove(availableZNode);
}
done = true;
}
done = true;
}
}
}
@ -138,7 +145,7 @@ public class BatchingCuratorDataSegmentAnnouncer extends AbstractDataSegmentAnno
if (count >= config.getSegmentsPerNode() || byteSize + newBytesLen > config.getMaxNumBytes()) {
segmentZNode.addSegments(batch);
announcer.update(segmentZNode.getPath(), segmentZNode.getBytes());
announcer.announce(segmentZNode.getPath(), segmentZNode.getBytes());
segmentZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString()));
batch = Sets.newHashSet();
@ -154,7 +161,7 @@ public class BatchingCuratorDataSegmentAnnouncer extends AbstractDataSegmentAnno
}
segmentZNode.addSegments(batch);
announcer.update(segmentZNode.getPath(), segmentZNode.getBytes());
announcer.announce(segmentZNode.getPath(), segmentZNode.getBytes());
}
@Override

View File

@ -25,6 +25,7 @@ import com.google.common.collect.MapMaker;
import com.google.common.collect.Sets;
import com.google.common.io.Closeables;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
@ -64,8 +65,6 @@ public class Announcer
private final ConcurrentMap<String, PathChildrenCache> listeners = new MapMaker().makeMap();
private final ConcurrentMap<String, ConcurrentMap<String, byte[]>> announcements = new MapMaker().makeMap();
private final Object lock = new Object();
private boolean started = false;
public Announcer(
@ -251,19 +250,18 @@ public class Announcer
public void update(final String path, final byte[] bytes)
{
synchronized (lock) {
final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
final String parentPath = pathAndNode.getPath();
final String nodePath = pathAndNode.getNode();
final String parentPath = pathAndNode.getPath();
final String nodePath = pathAndNode.getNode();
ConcurrentMap<String, byte[]> subPaths = announcements.get(parentPath);
ConcurrentMap<String, byte[]> subPaths = announcements.get(parentPath);
if (subPaths == null || subPaths.get(nodePath) == null) {
announce(path, bytes);
return;
}
if (subPaths == null || subPaths.get(nodePath) == null) {
throw new ISE("Cannot update a path[%s] that hasn't been announced!", path);
}
synchronized (subPaths) {
try {
byte[] oldBytes = subPaths.get(nodePath);