Better logging for SegmentAllocateAction (#4884)

* Better logging for SegmentAllocateAction

* Split methods
This commit is contained in:
Jihoon Son 2017-10-03 01:29:21 +09:00 committed by Gian Merlino
parent 1f2074c247
commit ee7eaccbab
1 changed files with 106 additions and 45 deletions

View File

@ -24,13 +24,12 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.timeline.DataSegment;
@ -40,6 +39,7 @@ import org.joda.time.Interval;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Allocates a pending segment for a given timestamp. The preferredSegmentGranularity is used if there are no prior
@ -148,54 +148,22 @@ public class SegmentAllocateAction implements TaskAction<SegmentIdentifier>
// 1) if something overlaps our timestamp, use that
// 2) otherwise try preferredSegmentGranularity & going progressively smaller
final List<Interval> tryIntervals = Lists.newArrayList();
final Interval rowInterval = queryGranularity.bucket(timestamp);
final Set<DataSegment> usedSegmentsForRow = ImmutableSet.copyOf(
msc.getUsedSegmentsForInterval(dataSource, rowInterval)
);
if (usedSegmentsForRow.isEmpty()) {
// No existing segments for this row, but there might still be nearby ones that conflict with our preferred
// segment granularity. Try that first, and then progressively smaller ones if it fails.
for (Granularity gran : Granularity.granularitiesFinerThan(preferredSegmentGranularity)) {
tryIntervals.add(gran.bucket(timestamp));
}
} else {
// Existing segment(s) exist for this row; use the interval of the first one.
tryIntervals.add(usedSegmentsForRow.iterator().next().getInterval());
}
for (final Interval tryInterval : tryIntervals) {
if (tryInterval.contains(rowInterval)) {
log.debug(
"Trying to allocate pending segment for rowInterval[%s], segmentInterval[%s].",
final SegmentIdentifier identifier = usedSegmentsForRow.isEmpty() ?
tryAllocateFirstSegment(toolbox, task, rowInterval) :
tryAllocateSubsequentSegment(
toolbox,
task,
rowInterval,
tryInterval
);
final TaskLock tryLock = toolbox.getTaskLockbox().tryLock(task, tryInterval).orNull();
if (tryLock != null) {
final SegmentIdentifier identifier = msc.allocatePendingSegment(
dataSource,
sequenceName,
previousSegmentId,
tryInterval,
tryLock.getVersion()
usedSegmentsForRow.iterator().next()
);
if (identifier != null) {
return identifier;
} else {
log.debug(
"Could not allocate pending segment for rowInterval[%s], segmentInterval[%s].",
rowInterval,
tryInterval
);
}
} else {
log.debug("Could not acquire lock for rowInterval[%s], segmentInterval[%s].", rowInterval, tryInterval);
}
}
}
// Could not allocate a pending segment. There's a chance that this is because someone else inserted a segment
@ -232,6 +200,99 @@ public class SegmentAllocateAction implements TaskAction<SegmentIdentifier>
}
}
private SegmentIdentifier tryAllocateFirstSegment(
TaskActionToolbox toolbox,
Task task,
Interval rowInterval
) throws IOException
{
// No existing segments for this row, but there might still be nearby ones that conflict with our preferred
// segment granularity. Try that first, and then progressively smaller ones if it fails.
final List<Interval> tryIntervals = Granularity.granularitiesFinerThan(preferredSegmentGranularity)
.stream()
.map(granularity -> granularity.bucket(timestamp))
.collect(Collectors.toList());
for (Interval tryInterval : tryIntervals) {
if (tryInterval.contains(rowInterval)) {
final SegmentIdentifier identifier = tryAllocate(toolbox, task, tryInterval, rowInterval, false);
if (identifier != null) {
return identifier;
}
}
}
return null;
}
private SegmentIdentifier tryAllocateSubsequentSegment(
TaskActionToolbox toolbox,
Task task,
Interval rowInterval,
DataSegment usedSegment
) throws IOException
{
// Existing segment(s) exist for this row; use the interval of the first one.
if (!usedSegment.getInterval().contains(rowInterval)) {
log.error("The interval of existing segment[%s] doesn't contain rowInterval[%s]", usedSegment, rowInterval);
return null;
} else {
// If segment allocation failed here, it is highly likely an unrecoverable error. We log here for easier
// debugging.
return tryAllocate(toolbox, task, usedSegment.getInterval(), rowInterval, true);
}
}
private SegmentIdentifier tryAllocate(
TaskActionToolbox toolbox,
Task task,
Interval tryInterval,
Interval rowInterval,
boolean logOnFail
) throws IOException
{
log.debug(
"Trying to allocate pending segment for rowInterval[%s], segmentInterval[%s].",
rowInterval,
tryInterval
);
final TaskLock tryLock = toolbox.getTaskLockbox().tryLock(task, tryInterval).orNull();
if (tryLock != null) {
final SegmentIdentifier identifier = toolbox.getIndexerMetadataStorageCoordinator().allocatePendingSegment(
dataSource,
sequenceName,
previousSegmentId,
tryInterval,
tryLock.getVersion()
);
if (identifier != null) {
return identifier;
} else {
final String msg = StringUtils.format(
"Could not allocate pending segment for rowInterval[%s], segmentInterval[%s].",
rowInterval,
tryInterval
);
if (logOnFail) {
log.error(msg);
} else {
log.debug(msg);
}
return null;
}
} else {
final String msg = StringUtils.format(
"Could not acquire lock for rowInterval[%s], segmentInterval[%s].",
rowInterval,
tryInterval
);
if (logOnFail) {
log.error(msg);
} else {
log.debug(msg);
}
return null;
}
}
@Override
public boolean isAudited()
{