Fix for loss in segment announcements when segments do not fit in the znodes during compress mode.

Added unit test (from Navis).
This commit is contained in:
Anubhav Gupta 2016-01-13 19:17:51 -08:00
parent 4c014c1574
commit 6d09ab839f
2 changed files with 68 additions and 21 deletions

View File

@ -87,35 +87,45 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
}
synchronized (lock) {
// create new batch
if (availableZNodes.isEmpty()) {
SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath());
availableZNode.addSegment(segment);
log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
announcer.announce(availableZNode.getPath(), availableZNode.getBytes());
segmentLookup.put(segment, availableZNode);
availableZNodes.add(availableZNode);
} else { // update existing batch
Iterator<SegmentZNode> iter = availableZNodes.iterator();
boolean done = false;
while (iter.hasNext() && !done) {
SegmentZNode availableZNode = iter.next();
if (availableZNode.getBytes().length + newBytesLen < config.getMaxBytesPerNode()) {
availableZNode.addSegment(segment);
if (!availableZNodes.isEmpty()) {
// update existing batch
Iterator<SegmentZNode> iter = availableZNodes.iterator();
while (iter.hasNext() && !done) {
SegmentZNode availableZNode = iter.next();
if (availableZNode.getBytes().length + newBytesLen < config.getMaxBytesPerNode()) {
availableZNode.addSegment(segment);
log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
log.info("Announcing segment[%s] at existing path[%s]", segment.getIdentifier(), availableZNode.getPath());
announcer.update(availableZNode.getPath(), availableZNode.getBytes());
segmentLookup.put(segment, availableZNode);
if (availableZNode.getCount() >= config.getSegmentsPerNode()) {
availableZNodes.remove(availableZNode);
}
done = true;
} else {
// We could have kept the znode around for later use, however we remove it since segment announcements should
// have similar size unless there are significant schema changes. Removing the znode reduces the number of
// znodes that would be scanned at each announcement.
availableZNodes.remove(availableZNode);
}
}
}
if (!done) {
assert (availableZNodes.isEmpty());
// create new batch
SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath());
availableZNode.addSegment(segment);
log.info("Announcing segment[%s] at new path[%s]", segment.getIdentifier(),
availableZNode.getPath());
announcer.announce(availableZNode.getPath(), availableZNode.getBytes());
segmentLookup.put(segment, availableZNode);
availableZNodes.add(availableZNode);
}
}
}

View File

@ -47,6 +47,7 @@ import org.junit.Test;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
/**
*/
@ -64,6 +65,8 @@ public class BatchDataSegmentAnnouncerTest
private BatchDataSegmentAnnouncer segmentAnnouncer;
private Set<DataSegment> testSegments;
private final AtomicInteger maxBytesPerNode = new AtomicInteger(512 * 1024);
@Before
public void setUp() throws Exception
{
@ -104,6 +107,12 @@ public class BatchDataSegmentAnnouncerTest
{
return 50;
}
@Override
public long getMaxBytesPerNode()
{
return maxBytesPerNode.get();
}
},
new ZkPathsConfig()
{
@ -168,6 +177,34 @@ public class BatchDataSegmentAnnouncerTest
Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty());
}
@Test
public void testSingleAnnounceManyTimes() throws Exception
{
int prevMax = maxBytesPerNode.get();
maxBytesPerNode.set(2048);
// each segment is about 317 bytes long and that makes 2048 / 317 = 6 segments included per node
// so 100 segments makes (100 / 6) + 1 = 17 nodes
try {
for (DataSegment segment : testSegments) {
segmentAnnouncer.announceSegment(segment);
}
}
finally {
maxBytesPerNode.set(prevMax);
}
List<String> zNodes = cf.getChildren().forPath(testSegmentsPath);
Assert.assertEquals(17, zNodes.size());
Set<DataSegment> segments = Sets.newHashSet(testSegments);
for (String zNode : zNodes) {
for (DataSegment segment : segmentReader.read(joiner.join(testSegmentsPath, zNode))) {
Assert.assertTrue("Invalid segment " + segment, segments.remove(segment));
}
}
Assert.assertTrue("Failed to find segments " + segments, segments.isEmpty());
}
@Test
public void testBatchAnnounce() throws Exception
{
@ -175,7 +212,7 @@ public class BatchDataSegmentAnnouncerTest
List<String> zNodes = cf.getChildren().forPath(testSegmentsPath);
Assert.assertTrue(zNodes.size() == 2);
Assert.assertEquals(2, zNodes.size());
Set<DataSegment> allSegments = Sets.newHashSet();
for (String zNode : zNodes) {
@ -192,7 +229,7 @@ public class BatchDataSegmentAnnouncerTest
public void testMultipleBatchAnnounce() throws Exception
{
for (int i = 0; i < 10; i++) {
testBatchAnnounce();
testBatchAnnounce();
}
}
@ -227,8 +264,8 @@ public class BatchDataSegmentAnnouncerTest
if (cf.checkExists().forPath(path) != null) {
return jsonMapper.readValue(
cf.getData().forPath(path), new TypeReference<Set<DataSegment>>()
{
}
{
}
);
}
}