disable merging segments with linear shard spec

This commit is contained in:
fjy 2013-08-06 16:17:49 -07:00
parent ebf1ac47f0
commit 5f292143e0
3 changed files with 54 additions and 12 deletions

View File

@ -61,6 +61,11 @@ public class PartitionHolder<T> implements Iterable<PartitionChunk<T>>
holderSet.add(chunk);
}
public int size()
{
return holderSet.size();
}
public PartitionChunk<T> remove(PartitionChunk<T> chunk)
{
// Somewhat funky implementation in order to return the removed object as it exists in the set

View File

@ -38,6 +38,7 @@ import com.metamx.druid.VersionedIntervalTimeline;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.indexing.IndexingServiceClient;
import com.metamx.druid.partition.PartitionChunk;
import com.metamx.druid.shard.NoneShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -98,16 +99,8 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
SegmentsToMerge segmentsToMerge = new SegmentsToMerge();
for (int i = 0; i < timelineObjects.size(); i++) {
try {
segmentsToMerge.add(timelineObjects.get(i));
}
catch (Exception e) {
log.error("Unable to merge segments for %s", entry.getKey());
throw Throwables.propagate(e);
}
if (segmentsToMerge.getByteCount() > params.getMergeBytesLimit()
if (!segmentsToMerge.add(timelineObjects.get(i))
|| segmentsToMerge.getByteCount() > params.getMergeBytesLimit()
|| segmentsToMerge.getSegmentCount() >= params.getMergeSegmentsLimit()) {
i -= segmentsToMerge.backtrack(params.getMergeBytesLimit());
@ -216,7 +209,7 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
).asList();
}
public void add(TimelineObjectHolder<String, DataSegment> timelineObject)
public boolean add(TimelineObjectHolder<String, DataSegment> timelineObject)
{
final Interval timelineObjectInterval = timelineObject.getInterval();
@ -235,6 +228,10 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
Interval underlyingInterval = firstChunk.getObject().getInterval();
for (final PartitionChunk<DataSegment> segment : timelineObject.getObject()) {
if (!(segment.getObject().getShardSpec() instanceof NoneShardSpec)) {
return false;
}
segments.add(segment.getObject());
if (segments.count(segment.getObject()) == 1) {
byteCount += segment.getObject().getSize();
@ -256,6 +253,8 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
timelineObjects.add(Pair.of(timelineObject, new Interval(start, end)));
}
return true;
}
public Interval getMergedTimelineInterval()

View File

@ -397,7 +397,45 @@ public class DruidMasterSegmentMergerTest
);
Assert.assertEquals(
ImmutableList.of(ImmutableList.of(segments.get(0), segments.get(1), segments.get(2))),
ImmutableList.of(),
merge(segments)
);
}
@Test
public void testMergeMixedShardSpecs()
{
final List<DataSegment> segments = ImmutableList.of(
DataSegment.builder()
.dataSource("foo")
.interval(new Interval("2012-01-01/P1D"))
.version("1")
.build(),
DataSegment.builder()
.dataSource("foo")
.interval(new Interval("2012-01-02/P1D"))
.version("1")
.build(),
DataSegment.builder().dataSource("foo")
.interval(new Interval("2012-01-03/P1D"))
.version("1")
.shardSpec(new LinearShardSpec(1500))
.build(),
DataSegment.builder().dataSource("foo")
.interval(new Interval("2012-01-04/P1D"))
.version("1")
.build(),
DataSegment.builder().dataSource("foo")
.interval(new Interval("2012-01-05/P1D"))
.version("1")
.build()
);
Assert.assertEquals(
ImmutableList.of(
ImmutableList.of(segments.get(0), segments.get(1)),
ImmutableList.of(segments.get(3), segments.get(4))
),
merge(segments)
);
}