mirror of https://github.com/apache/druid.git
make batch data segment announcer thread safe
This commit is contained in:
parent
1f4e0ea40c
commit
5ff216838d
|
@ -36,6 +36,7 @@ import java.io.IOException;
|
|||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -48,8 +49,10 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
|
|||
private final ObjectMapper jsonMapper;
|
||||
private final String liveSegmentLocation;
|
||||
|
||||
private final Set<SegmentZNode> availableZNodes = Sets.newHashSet();
|
||||
private final Map<DataSegment, SegmentZNode> segmentLookup = Maps.newHashMap();
|
||||
private final Object lock = new Object();
|
||||
|
||||
private final Set<SegmentZNode> availableZNodes = new ConcurrentSkipListSet<SegmentZNode>();
|
||||
private final Map<DataSegment, SegmentZNode> segmentLookup = Maps.newConcurrentMap();
|
||||
|
||||
public BatchDataSegmentAnnouncer(
|
||||
DruidServerMetadata server,
|
||||
|
@ -74,32 +77,34 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
|
|||
throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxNumBytes());
|
||||
}
|
||||
|
||||
// create new batch
|
||||
if (availableZNodes.isEmpty()) {
|
||||
SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString()));
|
||||
availableZNode.addSegment(segment);
|
||||
synchronized (lock) {
|
||||
// create new batch
|
||||
if (availableZNodes.isEmpty()) {
|
||||
SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString()));
|
||||
availableZNode.addSegment(segment);
|
||||
|
||||
log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
|
||||
announcer.announce(availableZNode.getPath(), availableZNode.getBytes());
|
||||
segmentLookup.put(segment, availableZNode);
|
||||
availableZNodes.add(availableZNode);
|
||||
} else { // update existing batch
|
||||
Iterator<SegmentZNode> iter = availableZNodes.iterator();
|
||||
boolean done = false;
|
||||
while (iter.hasNext() && !done) {
|
||||
SegmentZNode availableZNode = iter.next();
|
||||
if (availableZNode.getBytes().length + newBytesLen < config.getMaxNumBytes()) {
|
||||
availableZNode.addSegment(segment);
|
||||
log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
|
||||
announcer.announce(availableZNode.getPath(), availableZNode.getBytes());
|
||||
segmentLookup.put(segment, availableZNode);
|
||||
availableZNodes.add(availableZNode);
|
||||
} else { // update existing batch
|
||||
Iterator<SegmentZNode> iter = availableZNodes.iterator();
|
||||
boolean done = false;
|
||||
while (iter.hasNext() && !done) {
|
||||
SegmentZNode availableZNode = iter.next();
|
||||
if (availableZNode.getBytes().length + newBytesLen < config.getMaxNumBytes()) {
|
||||
availableZNode.addSegment(segment);
|
||||
|
||||
log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
|
||||
announcer.update(availableZNode.getPath(), availableZNode.getBytes());
|
||||
segmentLookup.put(segment, availableZNode);
|
||||
log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
|
||||
announcer.update(availableZNode.getPath(), availableZNode.getBytes());
|
||||
segmentLookup.put(segment, availableZNode);
|
||||
|
||||
if (availableZNode.getCount() >= config.getSegmentsPerNode()) {
|
||||
availableZNodes.remove(availableZNode);
|
||||
if (availableZNode.getCount() >= config.getSegmentsPerNode()) {
|
||||
availableZNodes.remove(availableZNode);
|
||||
}
|
||||
|
||||
done = true;
|
||||
}
|
||||
|
||||
done = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -109,6 +114,10 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
|
|||
public void unannounceSegment(DataSegment segment) throws IOException
|
||||
{
|
||||
final SegmentZNode segmentZNode = segmentLookup.remove(segment);
|
||||
if (segmentZNode == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
segmentZNode.removeSegment(segment);
|
||||
|
||||
log.info("Unannouncing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath());
|
||||
|
@ -170,7 +179,7 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
|
|||
return ZKPaths.makePath(liveSegmentLocation, zNode);
|
||||
}
|
||||
|
||||
private class SegmentZNode
|
||||
private class SegmentZNode implements Comparable<SegmentZNode>
|
||||
{
|
||||
private final String path;
|
||||
|
||||
|
@ -286,5 +295,11 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
|
|||
{
|
||||
return path.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(SegmentZNode segmentZNode)
|
||||
{
|
||||
return path.compareTo(segmentZNode.getPath());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue