diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterSegmentMerger.java b/server/src/main/java/com/metamx/druid/master/DruidMasterSegmentMerger.java index ef4f9d1e6d3..8be132b9400 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterSegmentMerger.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterSegmentMerger.java @@ -21,6 +21,7 @@ package com.metamx.druid.master; import com.google.common.base.Function; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import com.google.common.collect.HashMultiset; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; @@ -28,6 +29,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Multiset; import com.google.common.collect.Ordering; +import com.metamx.common.ISE; import com.metamx.common.Pair; import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.logger.Logger; @@ -36,7 +38,6 @@ import com.metamx.druid.VersionedIntervalTimeline; import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.indexing.IndexingServiceClient; import com.metamx.druid.partition.PartitionChunk; - import org.joda.time.DateTime; import org.joda.time.Interval; @@ -96,13 +97,18 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper // Accumulate timelineObjects greedily until we reach our limits, then backtrack to the maximum complete set SegmentsToMerge segmentsToMerge = new SegmentsToMerge(); - for(int i = 0 ; i < timelineObjects.size() ; i++) { + for (int i = 0; i < timelineObjects.size(); i++) { - segmentsToMerge.add(timelineObjects.get(i)); + try { + segmentsToMerge.add(timelineObjects.get(i)); + } + catch (Exception e) { + log.error("Unable to merge segments for %s", entry.getKey()); + throw Throwables.propagate(e); + } if (segmentsToMerge.getByteCount() > params.getMergeBytesLimit() - || segmentsToMerge.getSegmentCount() >= params.getMergeSegmentsLimit()) - { + || segmentsToMerge.getSegmentCount() >= params.getMergeSegmentsLimit()) { i -= segmentsToMerge.backtrack(params.getMergeBytesLimit()); if (segmentsToMerge.getSegmentCount() > 1) { @@ -132,6 +138,7 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper /** * Issue merge request for some list of segments. + * * @return number of segments merged */ private int mergeSegments(SegmentsToMerge segmentsToMerge, String dataSource) @@ -212,7 +219,6 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper public void add(TimelineObjectHolder timelineObject) { final Interval timelineObjectInterval = timelineObject.getInterval(); - final Interval underlyingInterval = timelineObject.getObject().getChunk(0).getObject().getInterval(); if (timelineObjects.size() > 0) { Preconditions.checkArgument( @@ -222,16 +228,22 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper ); } - for(final PartitionChunk segment : timelineObject.getObject()) { + PartitionChunk firstChunk = Iterables.getFirst(timelineObject.getObject(), null); + if (firstChunk == null) { + throw new ISE("Unable to find an underlying interval"); + } + Interval underlyingInterval = firstChunk.getObject().getInterval(); + + for (final PartitionChunk segment : timelineObject.getObject()) { segments.add(segment.getObject()); - if(segments.count(segment.getObject()) == 1) { + if (segments.count(segment.getObject()) == 1) { byteCount += segment.getObject().getSize(); } } // Compute new underlying merged interval final Interval mergedUnderlyingInterval = getMergedUnderlyingInterval(); - if(mergedUnderlyingInterval == null) { + if (mergedUnderlyingInterval == null) { timelineObjects.add(Pair.of(timelineObject, underlyingInterval)); } else { final DateTime start = underlyingInterval.getStart().isBefore(mergedUnderlyingInterval.getStart()) @@ -248,7 +260,7 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper public Interval getMergedTimelineInterval() { - if(timelineObjects.isEmpty()) { + if (timelineObjects.isEmpty()) { return null; } else { return new Interval( @@ -260,7 +272,7 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper public Interval getMergedUnderlyingInterval() { - if(timelineObjects.isEmpty()) { + if (timelineObjects.isEmpty()) { return null; } else { return timelineObjects.get(timelineObjects.size() - 1).rhs; @@ -279,6 +291,7 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper /** * Does this set of segments fully cover union(all segment intervals)? + * * @return true if this set is complete */ public boolean isComplete() @@ -288,6 +301,7 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper /** * Remove timelineObjects from this holder until we have a complete set with total size <= maxSize. + * * @return number of timeline object holders removed */ public int backtrack(long maxSize) @@ -296,12 +310,15 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper int removed = 0; while (!isComplete() || byteCount > maxSize) { - removed ++; + removed++; - final TimelineObjectHolder removedHolder = timelineObjects.remove(timelineObjects.size() - 1).lhs; - for(final PartitionChunk segment : removedHolder.getObject()) { + final TimelineObjectHolder removedHolder = timelineObjects.remove( + timelineObjects.size() + - 1 + ).lhs; + for (final PartitionChunk segment : removedHolder.getObject()) { segments.remove(segment.getObject()); - if(segments.count(segment.getObject()) == 0) { + if (segments.count(segment.getObject()) == 0) { byteCount -= segment.getObject().getSize(); } } diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterSegmentMergerTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterSegmentMergerTest.java index 1ad1f96d163..44cac2ad19e 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterSegmentMergerTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterSegmentMergerTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.indexing.IndexingServiceClient; +import com.metamx.druid.shard.LinearShardSpec; import junit.framework.Assert; import org.joda.time.Interval; import org.junit.Test; @@ -190,7 +191,7 @@ public class DruidMasterSegmentMergerTest ImmutableList.of( ImmutableList.of(segments.get(0), segments.get(1)), ImmutableList.of(segments.get(2), segments.get(3), segments.get(4), segments.get(5), segments.get(6)) - ), merge(segments) + ), merge(segments) ); } @@ -372,6 +373,35 @@ public class DruidMasterSegmentMergerTest Assert.assertEquals(ImmutableList.of(ImmutableList.of(segments.get(4), segments.get(5))), merge(segments)); } + @Test + public void testMergeLinearShardSpecs() + { + final List segments = ImmutableList.of( + DataSegment.builder() + .dataSource("foo") + .interval(new Interval("2012-01-01/P1D")) + .version("1") + .shardSpec(new LinearShardSpec(1)) + .build(), + DataSegment.builder() + .dataSource("foo") + .interval(new Interval("2012-01-02/P1D")) + .version("1") + .shardSpec(new LinearShardSpec(7)) + .build(), + DataSegment.builder().dataSource("foo") + .interval(new Interval("2012-01-03/P1D")) + .version("1") + .shardSpec(new LinearShardSpec(1500)) + .build() + ); + + Assert.assertEquals( + ImmutableList.of(ImmutableList.of(segments.get(0), segments.get(1), segments.get(2))), + merge(segments) + ); + } + /** * Runs DruidMasterSegmentMerger on a particular set of segments and returns the list of requested merges. */