diff --git a/merger/src/main/java/com/metamx/druid/merger/common/index/YeOldePlumberSchool.java b/merger/src/main/java/com/metamx/druid/merger/common/index/YeOldePlumberSchool.java index 4ece129f49b..c77dfbd9e80 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/index/YeOldePlumberSchool.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/index/YeOldePlumberSchool.java @@ -81,10 +81,12 @@ public class YeOldePlumberSchool implements PlumberSchool // There can be only one. final Sink theSink = new Sink(interval, schema); + // Temporary directory to hold spilled segments. final File persistDir = new File( tmpSegmentDir, theSink.getSegment().withVersion(version).getIdentifier() ); + // Set of spilled segments. Will be merged at the end. final Set spilled = Sets.newHashSet(); return new Plumber() diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java index 82da5561e24..37187904ace 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java @@ -163,6 +163,17 @@ public class IndexGeneratorTask extends AbstractTask plumber.persist(firehose.commit()); plumber.finishJob(); + // Output metrics + log.info( + "Task[%s] took in %,d rows (%,d processed, %,d unparseable, %,d thrown away) and output %,d rows", + getId(), + metrics.processed() + metrics.unparseable() + metrics.thrownAway(), + metrics.processed(), + metrics.unparseable(), + metrics.thrownAway(), + metrics.rowOutput() + ); + // Done return TaskStatus.success(getId(), ImmutableList.copyOf(pushedSegments)); } diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterSegmentMerger.java b/server/src/main/java/com/metamx/druid/master/DruidMasterSegmentMerger.java index 2c34134c9f0..e23d44f2499 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterSegmentMerger.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterSegmentMerger.java @@ -108,16 +108,16 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper segmentsToMerge.add(timelineObjects.get(i)); - if (segmentsToMerge.getMergedSize() > params.getMergeBytesLimit() - || segmentsToMerge.size() >= params.getMergeSegmentsLimit()) + if (segmentsToMerge.getByteCount() > params.getMergeBytesLimit() + || segmentsToMerge.getSegmentCount() >= params.getMergeSegmentsLimit()) { i -= segmentsToMerge.backtrack(params.getMergeBytesLimit()); - if (segmentsToMerge.size() > 1) { + if (segmentsToMerge.getSegmentCount() > 1) { count += mergeSegments(segmentsToMerge, entry.getKey()); } - if (segmentsToMerge.size() == 0) { + if (segmentsToMerge.getSegmentCount() == 0) { // Backtracked all the way to zero. Increment by one so we continue to make progress. i++; } @@ -128,7 +128,7 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper // Finish any timelineObjects to merge that may have not hit threshold segmentsToMerge.backtrack(params.getMergeBytesLimit()); - if (segmentsToMerge.size() > 1) { + if (segmentsToMerge.getSegmentCount() > 1) { count += mergeSegments(segmentsToMerge, entry.getKey()); } } @@ -182,13 +182,13 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper // (timeline object, union interval of underlying segments up to this point in the list) private final List, Interval>> timelineObjects; - private long mergedSize; + private long byteCount; private SegmentsToMerge() { this.timelineObjects = Lists.newArrayList(); this.segments = HashMultiset.create(); - this.mergedSize = 0; + this.byteCount = 0; } public List getSegments() @@ -233,7 +233,7 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper for(final PartitionChunk segment : timelineObject.getObject()) { segments.add(segment.getObject()); if(segments.count(segment.getObject()) == 1) { - mergedSize += segment.getObject().getSize(); + byteCount += segment.getObject().getSize(); } } @@ -275,12 +275,12 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper } } - public long getMergedSize() + public long getByteCount() { - return mergedSize; + return byteCount; } - public int size() + public int getSegmentCount() { return timelineObjects.size(); } @@ -303,14 +303,14 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper Preconditions.checkArgument(maxSize >= 0, "maxSize >= 0"); int removed = 0; - while (!isComplete() || mergedSize > maxSize) { + while (!isComplete() || byteCount > maxSize) { removed ++; final TimelineObjectHolder removedHolder = timelineObjects.remove(timelineObjects.size() - 1).lhs; for(final PartitionChunk segment : removedHolder.getObject()) { segments.remove(segment.getObject()); if(segments.count(segment.getObject()) == 0) { - mergedSize -= segment.getObject().getSize(); + byteCount -= segment.getObject().getSize(); } } } diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterSegmentMergerTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterSegmentMergerTest.java index c47ffab8fb5..8e19ed5d330 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterSegmentMergerTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterSegmentMergerTest.java @@ -101,6 +101,22 @@ public class DruidMasterSegmentMergerTest ); } + @Test + public void testMergeNoncontiguous() + { + final List segments = ImmutableList.of( + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P1D")).version("2").size(10).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-03/P1D")).version("2").size(10).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-04/P1D")).version("2").size(10).build() + ); + + Assert.assertEquals( + ImmutableList.of( + ImmutableList.of(segments.get(0), segments.get(1), segments.get(2)) + ), merge(segments) + ); + } + @Test public void testMergeSeriesByteLimited() { @@ -176,6 +192,40 @@ public class DruidMasterSegmentMergerTest ); } + @Test + public void testOverlappingMergeWithGapsAlignedStart() + { + final List segments = ImmutableList.of( + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P8D")).version("2").size(80).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P1D")).version("3").size(8).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-04/P1D")).version("3").size(8).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-09/P1D")).version("3").size(8).build() + ); + + Assert.assertEquals( + ImmutableList.of( + ImmutableList.of(segments.get(1), segments.get(0), segments.get(2)) + ), merge(segments) + ); + } + + @Test + public void testOverlappingMergeWithGapsNonalignedStart() + { + final List segments = ImmutableList.of( + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P8D")).version("2").size(80).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-02/P1D")).version("3").size(8).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-04/P1D")).version("3").size(8).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-09/P1D")).version("3").size(8).build() + ); + + Assert.assertEquals( + ImmutableList.of( + ImmutableList.of(segments.get(0), segments.get(1), segments.get(2)) + ), merge(segments) + ); + } + @Test public void testOverlappingMerge1() {