diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentAllocateAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentAllocateAction.java index c966131ab3c..54dae487ffc 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentAllocateAction.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentAllocateAction.java @@ -24,13 +24,12 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; - -import io.druid.java.util.common.granularity.Granularity; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import io.druid.java.util.common.IAE; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.granularity.Granularity; import io.druid.java.util.common.logger.Logger; import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.timeline.DataSegment; @@ -40,6 +39,7 @@ import org.joda.time.Interval; import java.io.IOException; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; /** * Allocates a pending segment for a given timestamp. The preferredSegmentGranularity is used if there are no prior @@ -148,54 +148,22 @@ public class SegmentAllocateAction implements TaskAction // 1) if something overlaps our timestamp, use that // 2) otherwise try preferredSegmentGranularity & going progressively smaller - final List tryIntervals = Lists.newArrayList(); - final Interval rowInterval = queryGranularity.bucket(timestamp); final Set usedSegmentsForRow = ImmutableSet.copyOf( msc.getUsedSegmentsForInterval(dataSource, rowInterval) ); - if (usedSegmentsForRow.isEmpty()) { - // No existing segments for this row, but there might still be nearby ones that conflict with our preferred - // segment granularity. Try that first, and then progressively smaller ones if it fails. - for (Granularity gran : Granularity.granularitiesFinerThan(preferredSegmentGranularity)) { - tryIntervals.add(gran.bucket(timestamp)); - } - } else { - // Existing segment(s) exist for this row; use the interval of the first one. - tryIntervals.add(usedSegmentsForRow.iterator().next().getInterval()); - } - - for (final Interval tryInterval : tryIntervals) { - if (tryInterval.contains(rowInterval)) { - log.debug( - "Trying to allocate pending segment for rowInterval[%s], segmentInterval[%s].", - rowInterval, - tryInterval - ); - final TaskLock tryLock = toolbox.getTaskLockbox().tryLock(task, tryInterval).orNull(); - if (tryLock != null) { - final SegmentIdentifier identifier = msc.allocatePendingSegment( - dataSource, - sequenceName, - previousSegmentId, - tryInterval, - tryLock.getVersion() - ); - if (identifier != null) { - return identifier; - } else { - log.debug( - "Could not allocate pending segment for rowInterval[%s], segmentInterval[%s].", - rowInterval, - tryInterval - ); - } - } else { - log.debug("Could not acquire lock for rowInterval[%s], segmentInterval[%s].", rowInterval, tryInterval); - } - } + final SegmentIdentifier identifier = usedSegmentsForRow.isEmpty() ? + tryAllocateFirstSegment(toolbox, task, rowInterval) : + tryAllocateSubsequentSegment( + toolbox, + task, + rowInterval, + usedSegmentsForRow.iterator().next() + ); + if (identifier != null) { + return identifier; } // Could not allocate a pending segment. There's a chance that this is because someone else inserted a segment @@ -232,6 +200,99 @@ public class SegmentAllocateAction implements TaskAction } } + private SegmentIdentifier tryAllocateFirstSegment( + TaskActionToolbox toolbox, + Task task, + Interval rowInterval + ) throws IOException + { + // No existing segments for this row, but there might still be nearby ones that conflict with our preferred + // segment granularity. Try that first, and then progressively smaller ones if it fails. + final List tryIntervals = Granularity.granularitiesFinerThan(preferredSegmentGranularity) + .stream() + .map(granularity -> granularity.bucket(timestamp)) + .collect(Collectors.toList()); + for (Interval tryInterval : tryIntervals) { + if (tryInterval.contains(rowInterval)) { + final SegmentIdentifier identifier = tryAllocate(toolbox, task, tryInterval, rowInterval, false); + if (identifier != null) { + return identifier; + } + } + } + return null; + } + + private SegmentIdentifier tryAllocateSubsequentSegment( + TaskActionToolbox toolbox, + Task task, + Interval rowInterval, + DataSegment usedSegment + ) throws IOException + { + // Existing segment(s) exist for this row; use the interval of the first one. + if (!usedSegment.getInterval().contains(rowInterval)) { + log.error("The interval of existing segment[%s] doesn't contain rowInterval[%s]", usedSegment, rowInterval); + return null; + } else { + // If segment allocation failed here, it is highly likely an unrecoverable error. We log here for easier + // debugging. + return tryAllocate(toolbox, task, usedSegment.getInterval(), rowInterval, true); + } + } + + private SegmentIdentifier tryAllocate( + TaskActionToolbox toolbox, + Task task, + Interval tryInterval, + Interval rowInterval, + boolean logOnFail + ) throws IOException + { + log.debug( + "Trying to allocate pending segment for rowInterval[%s], segmentInterval[%s].", + rowInterval, + tryInterval + ); + final TaskLock tryLock = toolbox.getTaskLockbox().tryLock(task, tryInterval).orNull(); + if (tryLock != null) { + final SegmentIdentifier identifier = toolbox.getIndexerMetadataStorageCoordinator().allocatePendingSegment( + dataSource, + sequenceName, + previousSegmentId, + tryInterval, + tryLock.getVersion() + ); + if (identifier != null) { + return identifier; + } else { + final String msg = StringUtils.format( + "Could not allocate pending segment for rowInterval[%s], segmentInterval[%s].", + rowInterval, + tryInterval + ); + if (logOnFail) { + log.error(msg); + } else { + log.debug(msg); + } + return null; + } + } else { + final String msg = StringUtils.format( + "Could not acquire lock for rowInterval[%s], segmentInterval[%s].", + rowInterval, + tryInterval + ); + if (logOnFail) { + log.error(msg); + } else { + log.debug(msg); + } + return null; + } + } + @Override public boolean isAudited() {