DruidMasterSegmentMerger: Clearer method names and more tests

This commit is contained in:
Gian Merlino 2012-11-27 15:45:56 -08:00
parent cc8a14d5a1
commit 5902ccd0cb
2 changed files with 63 additions and 13 deletions

View File

@ -108,16 +108,16 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
segmentsToMerge.add(timelineObjects.get(i));
if (segmentsToMerge.getMergedSize() > params.getMergeBytesLimit()
|| segmentsToMerge.size() >= params.getMergeSegmentsLimit())
if (segmentsToMerge.getByteCount() > params.getMergeBytesLimit()
|| segmentsToMerge.getSegmentCount() >= params.getMergeSegmentsLimit())
{
i -= segmentsToMerge.backtrack(params.getMergeBytesLimit());
if (segmentsToMerge.size() > 1) {
if (segmentsToMerge.getSegmentCount() > 1) {
count += mergeSegments(segmentsToMerge, entry.getKey());
}
if (segmentsToMerge.size() == 0) {
if (segmentsToMerge.getSegmentCount() == 0) {
// Backtracked all the way to zero. Increment by one so we continue to make progress.
i++;
}
@ -128,7 +128,7 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
// Finish any timelineObjects to merge that may have not hit threshold
segmentsToMerge.backtrack(params.getMergeBytesLimit());
if (segmentsToMerge.size() > 1) {
if (segmentsToMerge.getSegmentCount() > 1) {
count += mergeSegments(segmentsToMerge, entry.getKey());
}
}
@ -182,13 +182,13 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
// (timeline object, union interval of underlying segments up to this point in the list)
private final List<Pair<TimelineObjectHolder<String, DataSegment>, Interval>> timelineObjects;
private long mergedSize;
private long byteCount;
private SegmentsToMerge()
{
this.timelineObjects = Lists.newArrayList();
this.segments = HashMultiset.create();
this.mergedSize = 0;
this.byteCount = 0;
}
public List<DataSegment> getSegments()
@ -233,7 +233,7 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
for(final PartitionChunk<DataSegment> segment : timelineObject.getObject()) {
segments.add(segment.getObject());
if(segments.count(segment.getObject()) == 1) {
mergedSize += segment.getObject().getSize();
byteCount += segment.getObject().getSize();
}
}
@ -275,12 +275,12 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
}
}
public long getMergedSize()
public long getByteCount()
{
return mergedSize;
return byteCount;
}
public int size()
public int getSegmentCount()
{
return timelineObjects.size();
}
@ -303,14 +303,14 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
Preconditions.checkArgument(maxSize >= 0, "maxSize >= 0");
int removed = 0;
while (!isComplete() || mergedSize > maxSize) {
while (!isComplete() || byteCount > maxSize) {
removed ++;
final TimelineObjectHolder<String, DataSegment> removedHolder = timelineObjects.remove(timelineObjects.size() - 1).lhs;
for(final PartitionChunk<DataSegment> segment : removedHolder.getObject()) {
segments.remove(segment.getObject());
if(segments.count(segment.getObject()) == 0) {
mergedSize -= segment.getObject().getSize();
byteCount -= segment.getObject().getSize();
}
}
}

View File

@ -101,6 +101,22 @@ public class DruidMasterSegmentMergerTest
);
}
@Test
public void testMergeNoncontiguous()
{
final List<DataSegment> segments = ImmutableList.of(
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P1D")).version("2").size(10).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-03/P1D")).version("2").size(10).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-04/P1D")).version("2").size(10).build()
);
Assert.assertEquals(
ImmutableList.of(
ImmutableList.of(segments.get(0), segments.get(1), segments.get(2))
), merge(segments)
);
}
@Test
public void testMergeSeriesByteLimited()
{
@ -176,6 +192,40 @@ public class DruidMasterSegmentMergerTest
);
}
@Test
public void testOverlappingMergeWithGapsAlignedStart()
{
final List<DataSegment> segments = ImmutableList.of(
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P8D")).version("2").size(80).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P1D")).version("3").size(8).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-04/P1D")).version("3").size(8).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-09/P1D")).version("3").size(8).build()
);
Assert.assertEquals(
ImmutableList.of(
ImmutableList.of(segments.get(1), segments.get(0), segments.get(2))
), merge(segments)
);
}
@Test
public void testOverlappingMergeWithGapsNonalignedStart()
{
final List<DataSegment> segments = ImmutableList.of(
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P8D")).version("2").size(80).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-02/P1D")).version("3").size(8).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-04/P1D")).version("3").size(8).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-09/P1D")).version("3").size(8).build()
);
Assert.assertEquals(
ImmutableList.of(
ImmutableList.of(segments.get(0), segments.get(1), segments.get(2))
), merge(segments)
);
}
@Test
public void testOverlappingMerge1()
{