mirror of https://github.com/apache/druid.git
push sinks after bootstrap. fix #570
This commit is contained in:
parent
12ee17c4d2
commit
a4003966b2
|
@ -152,6 +152,8 @@ public class RealtimePlumber implements Plumber
|
|||
bootstrapSinksFromDisk();
|
||||
registerServerViewCallback();
|
||||
startPersistThread();
|
||||
// Push pending sinks bootstrapped from previous run
|
||||
mergeAndPush();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -595,6 +597,25 @@ public class RealtimePlumber implements Plumber
|
|||
return ScheduledExecutors.Signal.STOP;
|
||||
}
|
||||
|
||||
mergeAndPush();
|
||||
|
||||
if (stopped) {
|
||||
log.info("Stopping merge-n-push overseer thread");
|
||||
return ScheduledExecutors.Signal.STOP;
|
||||
} else {
|
||||
return ScheduledExecutors.Signal.REPEAT;
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
private void mergeAndPush()
|
||||
{
|
||||
final Granularity segmentGranularity = schema.getGranularitySpec().getSegmentGranularity();
|
||||
final Period windowPeriod = config.getWindowPeriod();
|
||||
|
||||
final long windowMillis = windowPeriod.toStandardDuration().getMillis();
|
||||
log.info("Starting merge and push.");
|
||||
|
||||
DateTime minTimestampAsDate = segmentGranularity.truncate(
|
||||
|
@ -624,16 +645,6 @@ public class RealtimePlumber implements Plumber
|
|||
for (final Map.Entry<Long, Sink> entry : sinksToPush) {
|
||||
persistAndMerge(entry.getKey(), entry.getValue());
|
||||
}
|
||||
|
||||
if (stopped) {
|
||||
log.info("Stopping merge-n-push overseer thread");
|
||||
return ScheduledExecutors.Signal.STOP;
|
||||
} else {
|
||||
return ScheduledExecutors.Signal.REPEAT;
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue