diff --git a/common/src/main/java/com/metamx/druid/partition/PartitionHolder.java b/common/src/main/java/com/metamx/druid/partition/PartitionHolder.java index 00dc3e18a43..79d1b5dd96e 100644 --- a/common/src/main/java/com/metamx/druid/partition/PartitionHolder.java +++ b/common/src/main/java/com/metamx/druid/partition/PartitionHolder.java @@ -61,6 +61,11 @@ public class PartitionHolder implements Iterable> holderSet.add(chunk); } + public int size() + { + return holderSet.size(); + } + public PartitionChunk remove(PartitionChunk chunk) { // Somewhat funky implementation in order to return the removed object as it exists in the set diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterSegmentMerger.java b/server/src/main/java/com/metamx/druid/master/DruidMasterSegmentMerger.java index 8be132b9400..e46c2a06f51 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterSegmentMerger.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterSegmentMerger.java @@ -38,6 +38,7 @@ import com.metamx.druid.VersionedIntervalTimeline; import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.indexing.IndexingServiceClient; import com.metamx.druid.partition.PartitionChunk; +import com.metamx.druid.shard.NoneShardSpec; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -98,16 +99,8 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper SegmentsToMerge segmentsToMerge = new SegmentsToMerge(); for (int i = 0; i < timelineObjects.size(); i++) { - - try { - segmentsToMerge.add(timelineObjects.get(i)); - } - catch (Exception e) { - log.error("Unable to merge segments for %s", entry.getKey()); - throw Throwables.propagate(e); - } - - if (segmentsToMerge.getByteCount() > params.getMergeBytesLimit() + if (!segmentsToMerge.add(timelineObjects.get(i)) + || segmentsToMerge.getByteCount() > params.getMergeBytesLimit() || segmentsToMerge.getSegmentCount() >= params.getMergeSegmentsLimit()) { i -= segmentsToMerge.backtrack(params.getMergeBytesLimit()); @@ -216,7 +209,7 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper ).asList(); } - public void add(TimelineObjectHolder timelineObject) + public boolean add(TimelineObjectHolder timelineObject) { final Interval timelineObjectInterval = timelineObject.getInterval(); @@ -235,6 +228,10 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper Interval underlyingInterval = firstChunk.getObject().getInterval(); for (final PartitionChunk segment : timelineObject.getObject()) { + if (!(segment.getObject().getShardSpec() instanceof NoneShardSpec)) { + return false; + } + segments.add(segment.getObject()); if (segments.count(segment.getObject()) == 1) { byteCount += segment.getObject().getSize(); @@ -256,6 +253,8 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper timelineObjects.add(Pair.of(timelineObject, new Interval(start, end))); } + + return true; } public Interval getMergedTimelineInterval() diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterSegmentMergerTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterSegmentMergerTest.java index 44cac2ad19e..f8bbdcc63f8 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterSegmentMergerTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterSegmentMergerTest.java @@ -397,7 +397,45 @@ public class DruidMasterSegmentMergerTest ); Assert.assertEquals( - ImmutableList.of(ImmutableList.of(segments.get(0), segments.get(1), segments.get(2))), + ImmutableList.of(), + merge(segments) + ); + } + + @Test + public void testMergeMixedShardSpecs() + { + final List segments = ImmutableList.of( + DataSegment.builder() + .dataSource("foo") + .interval(new Interval("2012-01-01/P1D")) + .version("1") + .build(), + DataSegment.builder() + .dataSource("foo") + .interval(new Interval("2012-01-02/P1D")) + .version("1") + .build(), + DataSegment.builder().dataSource("foo") + .interval(new Interval("2012-01-03/P1D")) + .version("1") + .shardSpec(new LinearShardSpec(1500)) + .build(), + DataSegment.builder().dataSource("foo") + .interval(new Interval("2012-01-04/P1D")) + .version("1") + .build(), + DataSegment.builder().dataSource("foo") + .interval(new Interval("2012-01-05/P1D")) + .version("1") + .build() + ); + + Assert.assertEquals( + ImmutableList.of( + ImmutableList.of(segments.get(0), segments.get(1)), + ImmutableList.of(segments.get(3), segments.get(4)) + ), merge(segments) ); }