mirror of https://github.com/apache/druid.git
Merge pull request #288 from metamx/abandon-rt-segments
Realtime: Abandon segments that fail to push when shutting down
This commit is contained in:
commit
6181114acd
|
@ -58,7 +58,7 @@ public class S3Utils
|
|||
public static <T> T retryS3Operation(Callable<T> f) throws IOException, S3ServiceException, InterruptedException
|
||||
{
|
||||
int nTry = 0;
|
||||
final int maxTries = 3;
|
||||
final int maxTries = 10;
|
||||
while (true) {
|
||||
try {
|
||||
nTry++;
|
||||
|
@ -89,13 +89,10 @@ public class S3Utils
|
|||
private static void awaitNextRetry(Exception e, int nTry) throws InterruptedException
|
||||
{
|
||||
final long baseSleepMillis = 1000;
|
||||
final double fuzziness = 0.2;
|
||||
final long sleepMillis = Math.max(
|
||||
baseSleepMillis,
|
||||
(long) (baseSleepMillis * Math.pow(2, nTry) *
|
||||
(1 + new Random().nextGaussian() * fuzziness))
|
||||
);
|
||||
log.info(e, "S3 fail on try %d, retrying in %,dms.", nTry, sleepMillis);
|
||||
final long maxSleepMillis = 60000;
|
||||
final double fuzzyMultiplier = Math.min(Math.max(1 + 0.2 * new Random().nextGaussian(), 0), 2);
|
||||
final long sleepMillis = (long) (Math.min(maxSleepMillis, baseSleepMillis * Math.pow(2, nTry)) * fuzzyMultiplier);
|
||||
log.warn("S3 fail on try %d, retrying in %,dms.", nTry, sleepMillis);
|
||||
Thread.sleep(sleepMillis);
|
||||
}
|
||||
|
||||
|
|
|
@ -98,6 +98,8 @@ public class RealtimePlumberSchool implements PlumberSchool
|
|||
private final IndexGranularity segmentGranularity;
|
||||
private final Object handoffCondition = new Object();
|
||||
|
||||
private volatile boolean shuttingDown = false;
|
||||
|
||||
@JacksonInject
|
||||
@NotNull
|
||||
private volatile ServiceEmitter emitter;
|
||||
|
@ -410,15 +412,17 @@ public class RealtimePlumberSchool implements PlumberSchool
|
|||
log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource())
|
||||
.addData("interval", interval)
|
||||
.emit();
|
||||
if (shuttingDown) {
|
||||
// We're trying to shut down, and this segment failed to push. Let's just get rid of it.
|
||||
abandonSegment(truncatedTime, sink);
|
||||
}
|
||||
}
|
||||
|
||||
if (mergedFile != null) {
|
||||
try {
|
||||
if (mergedFile != null) {
|
||||
log.info("Deleting Index File[%s]", mergedFile);
|
||||
FileUtils.deleteDirectory(mergedFile);
|
||||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.warn(e, "Error deleting directory[%s]", mergedFile);
|
||||
}
|
||||
|
@ -433,6 +437,8 @@ public class RealtimePlumberSchool implements PlumberSchool
|
|||
{
|
||||
log.info("Shutting down...");
|
||||
|
||||
shuttingDown = true;
|
||||
|
||||
for (final Map.Entry<Long, Sink> entry : sinks.entrySet()) {
|
||||
persistAndMerge(entry.getKey(), entry.getValue());
|
||||
}
|
||||
|
@ -613,26 +619,7 @@ public class RealtimePlumberSchool implements PlumberSchool
|
|||
final String sinkVersion = sink.getSegment().getVersion();
|
||||
if (segmentVersion.compareTo(sinkVersion) >= 0) {
|
||||
log.info("Segment version[%s] >= sink version[%s]", segmentVersion, sinkVersion);
|
||||
try {
|
||||
segmentAnnouncer.unannounceSegment(sink.getSegment());
|
||||
FileUtils.deleteDirectory(computePersistDir(schema, sink.getInterval()));
|
||||
log.info("Removing sinkKey %d for segment %s", sinkKey, sink.getSegment().getIdentifier());
|
||||
sinks.remove(sinkKey);
|
||||
sinkTimeline.remove(
|
||||
sink.getInterval(),
|
||||
sink.getVersion(),
|
||||
new SingleElementPartitionChunk<Sink>(sink)
|
||||
);
|
||||
|
||||
synchronized (handoffCondition) {
|
||||
handoffCondition.notifyAll();
|
||||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.makeAlert(e, "Unable to delete old segment for dataSource[%s].", schema.getDataSource())
|
||||
.addData("interval", sink.getInterval())
|
||||
.emit();
|
||||
}
|
||||
abandonSegment(sinkKey, sink);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -706,6 +693,31 @@ public class RealtimePlumberSchool implements PlumberSchool
|
|||
}
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Unannounces a given sink and removes all local references to it.
|
||||
*/
|
||||
private void abandonSegment(final long truncatedTime, final Sink sink) {
|
||||
try {
|
||||
segmentAnnouncer.unannounceSegment(sink.getSegment());
|
||||
FileUtils.deleteDirectory(computePersistDir(schema, sink.getInterval()));
|
||||
log.info("Removing sinkKey %d for segment %s", truncatedTime, sink.getSegment().getIdentifier());
|
||||
sinks.remove(truncatedTime);
|
||||
sinkTimeline.remove(
|
||||
sink.getInterval(),
|
||||
sink.getVersion(),
|
||||
new SingleElementPartitionChunk<>(sink)
|
||||
);
|
||||
synchronized (handoffCondition) {
|
||||
handoffCondition.notifyAll();
|
||||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.makeAlert(e, "Unable to abandon old segment for dataSource[%s]", schema.getDataSource())
|
||||
.addData("interval", sink.getInterval())
|
||||
.emit();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue