Master: Add segment limit to merge selection algo

If we attempt to merge too many segments at once, we risk creating an
oversized indexing task.
This commit is contained in:
Gian Merlino 2012-11-19 14:42:57 -08:00
parent aa073e4e73
commit 16835a1f84
6 changed files with 88 additions and 30 deletions

View File

@ -563,7 +563,8 @@ public class DruidMaster
.withLoadManagementPeons(loadManagementPeons)
.withMillisToWaitBeforeDeleting(config.getMillisToWaitBeforeDeleting())
.withEmitter(emitter)
.withMergeThreshold(config.getMergeThreshold())
.withMergeBytesLimit(config.getMergeBytesLimit())
.withMergeSegmentsLimit(config.getMergeSegmentsLimit())
.build();
for (DruidMasterHelper helper : helpers) {

View File

@ -70,8 +70,14 @@ public abstract class DruidMasterConfig
}
@Config("druid.master.merge.threshold")
public long getMergeThreshold()
public long getMergeBytesLimit()
{
return 100000000L;
}
@Config("druid.master.merge.maxSegments")
public int getMergeSegmentsLimit()
{
return Integer.MAX_VALUE;
}
}

View File

@ -56,7 +56,8 @@ public class DruidMasterRuntimeParams
private final int movedCount;
private final int createdReplicantCount;
private final int destroyedReplicantCount;
private final long mergeThreshold;
private final long mergeBytesLimit;
private final int mergeSegmentsLimit;
private final int mergedSegmentCount;
public DruidMasterRuntimeParams(
@ -78,7 +79,8 @@ public class DruidMasterRuntimeParams
int movedCount,
int createdReplicantCount,
int destroyedReplicantCount,
long mergeThreshold,
long mergeBytesLimit,
int mergeSegmentsLimit,
int mergedSegmentCount
)
{
@ -100,7 +102,8 @@ public class DruidMasterRuntimeParams
this.movedCount = movedCount;
this.createdReplicantCount = createdReplicantCount;
this.destroyedReplicantCount = destroyedReplicantCount;
this.mergeThreshold = mergeThreshold;
this.mergeBytesLimit = mergeBytesLimit;
this.mergeSegmentsLimit = mergeSegmentsLimit;
this.mergedSegmentCount = mergedSegmentCount;
}
@ -194,9 +197,14 @@ public class DruidMasterRuntimeParams
return destroyedReplicantCount;
}
public long getMergeThreshold()
public long getMergeBytesLimit()
{
return mergeThreshold;
return mergeBytesLimit;
}
public int getMergeSegmentsLimit()
{
return mergeSegmentsLimit;
}
public int getMergedSegmentCount()
@ -230,7 +238,8 @@ public class DruidMasterRuntimeParams
movedCount,
createdReplicantCount,
destroyedReplicantCount,
mergeThreshold,
mergeBytesLimit,
mergeSegmentsLimit,
mergedSegmentCount
);
}
@ -255,7 +264,8 @@ public class DruidMasterRuntimeParams
private int movedCount;
private int createdReplicantCount;
private int destroyedReplicantCount;
private long mergeThreshold;
private long mergeBytesLimit;
private int mergeSegmentsLimit;
private int mergedSegmentCount;
Builder()
@ -278,7 +288,8 @@ public class DruidMasterRuntimeParams
this.movedCount = 0;
this.createdReplicantCount = 0;
this.destroyedReplicantCount = 0;
this.mergeThreshold = 0;
this.mergeBytesLimit = 0;
this.mergeSegmentsLimit = 0;
this.mergedSegmentCount = 0;
}
@ -301,7 +312,8 @@ public class DruidMasterRuntimeParams
int movedCount,
int createdReplicantCount,
int destroyedReplicantCount,
long mergeThreshold,
long mergeBytesLimit,
int mergeSegmentsLimit,
int mergedSegmentCount
)
{
@ -323,7 +335,8 @@ public class DruidMasterRuntimeParams
this.movedCount = movedCount;
this.createdReplicantCount = createdReplicantCount;
this.destroyedReplicantCount = destroyedReplicantCount;
this.mergeThreshold = mergeThreshold;
this.mergeBytesLimit = mergeBytesLimit;
this.mergeSegmentsLimit = mergeSegmentsLimit;
this.mergedSegmentCount = mergedSegmentCount;
}
@ -348,7 +361,8 @@ public class DruidMasterRuntimeParams
movedCount,
createdReplicantCount,
destroyedReplicantCount,
mergeThreshold,
mergeBytesLimit,
mergeSegmentsLimit,
mergedSegmentCount
);
}
@ -467,9 +481,15 @@ public class DruidMasterRuntimeParams
return this;
}
public Builder withMergeThreshold(long mergeThreshold)
public Builder withMergeBytesLimit(long mergeBytesLimit)
{
this.mergeThreshold = mergeThreshold;
this.mergeBytesLimit = mergeBytesLimit;
return this;
}
public Builder withMergeSegmentsLimit(int mergeSegmentsLimit)
{
this.mergeSegmentsLimit = mergeSegmentsLimit;
return this;
}

View File

@ -101,21 +101,23 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
List<TimelineObjectHolder<String, DataSegment>> timelineObjects =
timeline.lookup(new Interval(new DateTime(0), new DateTime("3000-01-01")));
// Accumulate timelineObjects greedily until we surpass our size threshold, then backtrack to the maximum complete set
// Accumulate timelineObjects greedily until we reach our limits, then backtrack to the maximum complete set
SegmentsToMerge segmentsToMerge = new SegmentsToMerge();
for(int i = 0 ; i < timelineObjects.size() ; i++) {
segmentsToMerge.add(timelineObjects.get(i));
if(segmentsToMerge.getMergedSize() > params.getMergeThreshold()) {
i -= segmentsToMerge.backtrack(params.getMergeThreshold());
if (segmentsToMerge.getMergedSize() > params.getMergeBytesLimit()
|| segmentsToMerge.size() >= params.getMergeSegmentsLimit())
{
i -= segmentsToMerge.backtrack(params.getMergeBytesLimit());
if(segmentsToMerge.size() > 1) {
if (segmentsToMerge.size() > 1) {
count += mergeSegments(segmentsToMerge, entry.getKey());
}
if(segmentsToMerge.size() == 0) {
if (segmentsToMerge.size() == 0) {
// Backtracked all the way to zero. Increment by one so we continue to make progress.
i++;
}
@ -125,7 +127,7 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
}
// Finish any timelineObjects to merge that may have not hit threshold
segmentsToMerge.backtrack(params.getMergeThreshold());
segmentsToMerge.backtrack(params.getMergeBytesLimit());
if (segmentsToMerge.size() > 1) {
count += mergeSegments(segmentsToMerge, entry.getKey());
}

View File

@ -32,7 +32,8 @@ import java.util.List;
public class DruidMasterSegmentMergerTest
{
private static final long mergeThreshold = 100;
private static final long mergeBytesLimit = 100;
private static final int mergeSegmentsLimit = 8;
@Test
public void testNoMerges()
@ -101,7 +102,7 @@ public class DruidMasterSegmentMergerTest
}
@Test
public void testMergeSeries()
public void testMergeSeriesByteLimited()
{
final List<DataSegment> segments = ImmutableList.of(
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P1D")).version("2").size(40).build(),
@ -121,6 +122,39 @@ public class DruidMasterSegmentMergerTest
);
}
@Test
public void testMergeSeriesSegmentLimited()
{
final List<DataSegment> segments = ImmutableList.of(
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P1D")).version("2").size(1).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-02/P1D")).version("2").size(1).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-03/P1D")).version("2").size(1).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-04/P1D")).version("2").size(1).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-05/P1D")).version("2").size(1).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-06/P1D")).version("2").size(1).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-07/P1D")).version("2").size(1).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-08/P1D")).version("2").size(1).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-09/P1D")).version("2").size(1).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-10/P1D")).version("2").size(1).build()
);
Assert.assertEquals(
ImmutableList.of(
ImmutableList.of(
segments.get(0),
segments.get(1),
segments.get(2),
segments.get(3),
segments.get(4),
segments.get(5),
segments.get(6),
segments.get(7)
),
ImmutableList.of(segments.get(8), segments.get(9))
), merge(segments)
);
}
@Test
public void testOverlappingMergeWithBacktracking()
{
@ -308,7 +342,8 @@ public class DruidMasterSegmentMergerTest
final DruidMasterSegmentMerger merger = new DruidMasterSegmentMerger(mergerClient);
final DruidMasterRuntimeParams params = DruidMasterRuntimeParams.newBuilder()
.withAvailableSegments(ImmutableSet.copyOf(segments))
.withMergeThreshold(mergeThreshold)
.withMergeBytesLimit(mergeBytesLimit)
.withMergeSegmentsLimit(mergeSegmentsLimit)
.build();
merger.run(params);

View File

@ -126,12 +126,6 @@ public class DruidMasterTest
{
return "";
}
@Override
public long getMergeThreshold()
{
return super.getMergeThreshold();
}
},
null,
null,