mirror of https://github.com/apache/druid.git
Merge pull request #2530 from himanshug/fix_plumber
In persistAndMerge, increase the scope of try-catch block so that any…
This commit is contained in:
commit
6ac32c5518
|
@ -48,8 +48,8 @@ import io.druid.common.guava.ThreadRenamingCallable;
|
|||
import io.druid.common.guava.ThreadRenamingRunnable;
|
||||
import io.druid.common.utils.VMUtils;
|
||||
import io.druid.concurrent.Execs;
|
||||
import io.druid.data.input.Committer;
|
||||
import io.druid.concurrent.TaskThreadPriority;
|
||||
import io.druid.data.input.Committer;
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.query.MetricsEmittingQueryRunner;
|
||||
import io.druid.query.NoopQueryRunner;
|
||||
|
@ -59,7 +59,6 @@ import io.druid.query.QueryRunnerFactory;
|
|||
import io.druid.query.QueryRunnerFactoryConglomerate;
|
||||
import io.druid.query.QueryRunnerHelper;
|
||||
import io.druid.query.QueryToolChest;
|
||||
import io.druid.query.ReportTimelineMissingSegmentQueryRunner;
|
||||
import io.druid.query.SegmentDescriptor;
|
||||
import io.druid.query.spec.SpecificSegmentQueryRunner;
|
||||
import io.druid.query.spec.SpecificSegmentSpec;
|
||||
|
@ -69,7 +68,6 @@ import io.druid.segment.IndexSpec;
|
|||
import io.druid.segment.Metadata;
|
||||
import io.druid.segment.QueryableIndex;
|
||||
import io.druid.segment.QueryableIndexSegment;
|
||||
import io.druid.segment.ReferenceCountingSegment;
|
||||
import io.druid.segment.Segment;
|
||||
import io.druid.segment.incremental.IndexSizeExceededException;
|
||||
import io.druid.segment.indexing.DataSchema;
|
||||
|
@ -90,7 +88,6 @@ import org.joda.time.Interval;
|
|||
import org.joda.time.Period;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import javax.ws.rs.HEAD;
|
||||
import java.io.Closeable;
|
||||
import java.io.File;
|
||||
import java.io.FilenameFilter;
|
||||
|
@ -481,50 +478,52 @@ public class RealtimePlumber implements Plumber
|
|||
mergeExecutor.execute(
|
||||
new ThreadRenamingRunnable(threadName)
|
||||
{
|
||||
final Interval interval = sink.getInterval();
|
||||
Stopwatch mergeStopwatch = null;
|
||||
|
||||
@Override
|
||||
public void doRun()
|
||||
{
|
||||
final Interval interval = sink.getInterval();
|
||||
|
||||
// Bail out if this sink has been abandoned by a previously-executed task.
|
||||
if (sinks.get(truncatedTime) != sink) {
|
||||
log.info("Sink[%s] was abandoned, bailing out of persist-n-merge.", sink);
|
||||
return;
|
||||
}
|
||||
|
||||
// Use a file to indicate that pushing has completed.
|
||||
final File persistDir = computePersistDir(schema, interval);
|
||||
final File mergedTarget = new File(persistDir, "merged");
|
||||
final File isPushedMarker = new File(persistDir, "isPushedMarker");
|
||||
|
||||
if (!isPushedMarker.exists()) {
|
||||
removeSegment(sink, mergedTarget);
|
||||
if (mergedTarget.exists()) {
|
||||
log.wtf("Merged target[%s] exists?!", mergedTarget);
|
||||
try {
|
||||
// Bail out if this sink has been abandoned by a previously-executed task.
|
||||
if (sinks.get(truncatedTime) != sink) {
|
||||
log.info("Sink[%s] was abandoned, bailing out of persist-n-merge.", sink);
|
||||
return;
|
||||
}
|
||||
|
||||
// Use a file to indicate that pushing has completed.
|
||||
final File persistDir = computePersistDir(schema, interval);
|
||||
final File mergedTarget = new File(persistDir, "merged");
|
||||
final File isPushedMarker = new File(persistDir, "isPushedMarker");
|
||||
|
||||
if (!isPushedMarker.exists()) {
|
||||
removeSegment(sink, mergedTarget);
|
||||
if (mergedTarget.exists()) {
|
||||
log.wtf("Merged target[%s] exists?!", mergedTarget);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
log.info("Already pushed sink[%s]", sink);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
log.info("Already pushed sink[%s]", sink);
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
Note: it the plumber crashes after persisting a subset of hydrants then might duplicate data as these
|
||||
hydrants will be read but older commitMetadata will be used. fixing this possibly needs structural
|
||||
changes to plumber.
|
||||
*/
|
||||
for (FireHydrant hydrant : sink) {
|
||||
synchronized (hydrant) {
|
||||
if (!hydrant.hasSwapped()) {
|
||||
log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink);
|
||||
final int rowCount = persistHydrant(hydrant, schema, interval, null);
|
||||
metrics.incrementRowOutputCount(rowCount);
|
||||
for (FireHydrant hydrant : sink) {
|
||||
synchronized (hydrant) {
|
||||
if (!hydrant.hasSwapped()) {
|
||||
log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink);
|
||||
final int rowCount = persistHydrant(hydrant, schema, interval, null);
|
||||
metrics.incrementRowOutputCount(rowCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
final long mergeThreadCpuTime = VMUtils.safeGetThreadCpuTime();
|
||||
final Stopwatch mergeStopwatch = Stopwatch.createStarted();
|
||||
try {
|
||||
final long mergeThreadCpuTime = VMUtils.safeGetThreadCpuTime();
|
||||
mergeStopwatch = Stopwatch.createStarted();
|
||||
|
||||
List<QueryableIndex> indexes = Lists.newArrayList();
|
||||
for (FireHydrant fireHydrant : sink) {
|
||||
Segment segment = fireHydrant.getSegment();
|
||||
|
@ -575,7 +574,9 @@ public class RealtimePlumber implements Plumber
|
|||
}
|
||||
}
|
||||
finally {
|
||||
mergeStopwatch.stop();
|
||||
if (mergeStopwatch != null) {
|
||||
mergeStopwatch.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue