fix merging problem with linear shard specs

This commit is contained in:
fjy 2013-06-10 14:07:36 -07:00
parent 12e78955db
commit 3312d9b802
2 changed files with 63 additions and 16 deletions

View File

@ -21,6 +21,7 @@ package com.metamx.druid.master;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
@ -28,6 +29,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multiset;
import com.google.common.collect.Ordering;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
@ -36,7 +38,6 @@ import com.metamx.druid.VersionedIntervalTimeline;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.indexing.IndexingServiceClient;
import com.metamx.druid.partition.PartitionChunk;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -96,13 +97,18 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
// Accumulate timelineObjects greedily until we reach our limits, then backtrack to the maximum complete set
SegmentsToMerge segmentsToMerge = new SegmentsToMerge();
for(int i = 0 ; i < timelineObjects.size() ; i++) {
for (int i = 0; i < timelineObjects.size(); i++) {
segmentsToMerge.add(timelineObjects.get(i));
try {
segmentsToMerge.add(timelineObjects.get(i));
}
catch (Exception e) {
log.error("Unable to merge segments for %s", entry.getKey());
throw Throwables.propagate(e);
}
if (segmentsToMerge.getByteCount() > params.getMergeBytesLimit()
|| segmentsToMerge.getSegmentCount() >= params.getMergeSegmentsLimit())
{
|| segmentsToMerge.getSegmentCount() >= params.getMergeSegmentsLimit()) {
i -= segmentsToMerge.backtrack(params.getMergeBytesLimit());
if (segmentsToMerge.getSegmentCount() > 1) {
@ -132,6 +138,7 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
/**
* Issue merge request for some list of segments.
*
* @return number of segments merged
*/
private int mergeSegments(SegmentsToMerge segmentsToMerge, String dataSource)
@ -212,7 +219,6 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
public void add(TimelineObjectHolder<String, DataSegment> timelineObject)
{
final Interval timelineObjectInterval = timelineObject.getInterval();
final Interval underlyingInterval = timelineObject.getObject().getChunk(0).getObject().getInterval();
if (timelineObjects.size() > 0) {
Preconditions.checkArgument(
@ -222,16 +228,22 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
);
}
for(final PartitionChunk<DataSegment> segment : timelineObject.getObject()) {
PartitionChunk<DataSegment> firstChunk = Iterables.getFirst(timelineObject.getObject(), null);
if (firstChunk == null) {
throw new ISE("Unable to find an underlying interval");
}
Interval underlyingInterval = firstChunk.getObject().getInterval();
for (final PartitionChunk<DataSegment> segment : timelineObject.getObject()) {
segments.add(segment.getObject());
if(segments.count(segment.getObject()) == 1) {
if (segments.count(segment.getObject()) == 1) {
byteCount += segment.getObject().getSize();
}
}
// Compute new underlying merged interval
final Interval mergedUnderlyingInterval = getMergedUnderlyingInterval();
if(mergedUnderlyingInterval == null) {
if (mergedUnderlyingInterval == null) {
timelineObjects.add(Pair.of(timelineObject, underlyingInterval));
} else {
final DateTime start = underlyingInterval.getStart().isBefore(mergedUnderlyingInterval.getStart())
@ -248,7 +260,7 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
public Interval getMergedTimelineInterval()
{
if(timelineObjects.isEmpty()) {
if (timelineObjects.isEmpty()) {
return null;
} else {
return new Interval(
@ -260,7 +272,7 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
public Interval getMergedUnderlyingInterval()
{
if(timelineObjects.isEmpty()) {
if (timelineObjects.isEmpty()) {
return null;
} else {
return timelineObjects.get(timelineObjects.size() - 1).rhs;
@ -279,6 +291,7 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
/**
* Does this set of segments fully cover union(all segment intervals)?
*
* @return true if this set is complete
*/
public boolean isComplete()
@ -288,6 +301,7 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
/**
* Remove timelineObjects from this holder until we have a complete set with total size <= maxSize.
*
* @return number of timeline object holders removed
*/
public int backtrack(long maxSize)
@ -296,12 +310,15 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
int removed = 0;
while (!isComplete() || byteCount > maxSize) {
removed ++;
removed++;
final TimelineObjectHolder<String, DataSegment> removedHolder = timelineObjects.remove(timelineObjects.size() - 1).lhs;
for(final PartitionChunk<DataSegment> segment : removedHolder.getObject()) {
final TimelineObjectHolder<String, DataSegment> removedHolder = timelineObjects.remove(
timelineObjects.size()
- 1
).lhs;
for (final PartitionChunk<DataSegment> segment : removedHolder.getObject()) {
segments.remove(segment.getObject());
if(segments.count(segment.getObject()) == 0) {
if (segments.count(segment.getObject()) == 0) {
byteCount -= segment.getObject().getSize();
}
}

View File

@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.indexing.IndexingServiceClient;
import com.metamx.druid.shard.LinearShardSpec;
import junit.framework.Assert;
import org.joda.time.Interval;
import org.junit.Test;
@ -190,7 +191,7 @@ public class DruidMasterSegmentMergerTest
ImmutableList.of(
ImmutableList.of(segments.get(0), segments.get(1)),
ImmutableList.of(segments.get(2), segments.get(3), segments.get(4), segments.get(5), segments.get(6))
), merge(segments)
), merge(segments)
);
}
@ -372,6 +373,35 @@ public class DruidMasterSegmentMergerTest
Assert.assertEquals(ImmutableList.of(ImmutableList.of(segments.get(4), segments.get(5))), merge(segments));
}
@Test
public void testMergeLinearShardSpecs()
{
final List<DataSegment> segments = ImmutableList.of(
DataSegment.builder()
.dataSource("foo")
.interval(new Interval("2012-01-01/P1D"))
.version("1")
.shardSpec(new LinearShardSpec(1))
.build(),
DataSegment.builder()
.dataSource("foo")
.interval(new Interval("2012-01-02/P1D"))
.version("1")
.shardSpec(new LinearShardSpec(7))
.build(),
DataSegment.builder().dataSource("foo")
.interval(new Interval("2012-01-03/P1D"))
.version("1")
.shardSpec(new LinearShardSpec(1500))
.build()
);
Assert.assertEquals(
ImmutableList.of(ImmutableList.of(segments.get(0), segments.get(1), segments.get(2))),
merge(segments)
);
}
/**
* Runs DruidMasterSegmentMerger on a particular set of segments and returns the list of requested merges.
*/