mirror of https://github.com/apache/druid.git
Merge pull request #673 from metamx/rt-plumber-merge-behavior
Change merged-segment-removing behavior in RealtimePlumber.
This commit is contained in:
commit
9803be7529
|
@ -328,7 +328,6 @@ public class RealtimePlumber implements Plumber
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
File mergedFile = null;
|
|
||||||
try {
|
try {
|
||||||
List<QueryableIndex> indexes = Lists.newArrayList();
|
List<QueryableIndex> indexes = Lists.newArrayList();
|
||||||
for (FireHydrant fireHydrant : sink) {
|
for (FireHydrant fireHydrant : sink) {
|
||||||
|
@ -338,7 +337,7 @@ public class RealtimePlumber implements Plumber
|
||||||
indexes.add(queryableIndex);
|
indexes.add(queryableIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
mergedFile = IndexMerger.mergeQueryableIndex(
|
final File mergedFile = IndexMerger.mergeQueryableIndex(
|
||||||
indexes,
|
indexes,
|
||||||
schema.getAggregators(),
|
schema.getAggregators(),
|
||||||
mergedTarget
|
mergedTarget
|
||||||
|
@ -353,23 +352,17 @@ public class RealtimePlumber implements Plumber
|
||||||
|
|
||||||
segmentPublisher.publishSegment(segment);
|
segmentPublisher.publishSegment(segment);
|
||||||
}
|
}
|
||||||
catch (IOException e) {
|
catch (Exception e) {
|
||||||
log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource())
|
log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource())
|
||||||
.addData("interval", interval)
|
.addData("interval", interval)
|
||||||
.emit();
|
.emit();
|
||||||
if (shuttingDown) {
|
if (shuttingDown) {
|
||||||
// We're trying to shut down, and this segment failed to push. Let's just get rid of it.
|
// We're trying to shut down, and this segment failed to push. Let's just get rid of it.
|
||||||
|
// This call will also delete possibly-partially-written files, so we don't need to do it explicitly.
|
||||||
abandonSegment(truncatedTime, sink);
|
abandonSegment(truncatedTime, sink);
|
||||||
}
|
} else {
|
||||||
}
|
// Delete any possibly-partially-written files, so we can try again on the next push cycle.
|
||||||
|
removeMergedSegment(sink);
|
||||||
if (mergedFile != null) {
|
|
||||||
try {
|
|
||||||
log.info("Deleting Index File[%s]", mergedFile);
|
|
||||||
FileUtils.deleteDirectory(mergedFile);
|
|
||||||
}
|
|
||||||
catch (IOException e) {
|
|
||||||
log.warn(e, "Error deleting directory[%s]", mergedFile);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -648,13 +641,15 @@ public class RealtimePlumber implements Plumber
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unannounces a given sink and removes all local references to it.
|
* Unannounces a given sink and removes all local references to it. It is important that this is only called
|
||||||
|
* from the single-threaded mergeExecutor, since otherwise chaos may ensue if merged segments are deleted while
|
||||||
|
* being created.
|
||||||
*/
|
*/
|
||||||
protected void abandonSegment(final long truncatedTime, final Sink sink)
|
protected void abandonSegment(final long truncatedTime, final Sink sink)
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
segmentAnnouncer.unannounceSegment(sink.getSegment());
|
segmentAnnouncer.unannounceSegment(sink.getSegment());
|
||||||
FileUtils.deleteDirectory(computePersistDir(schema, sink.getInterval()));
|
removeMergedSegment(sink);
|
||||||
log.info("Removing sinkKey %d for segment %s", truncatedTime, sink.getSegment().getIdentifier());
|
log.info("Removing sinkKey %d for segment %s", truncatedTime, sink.getSegment().getIdentifier());
|
||||||
sinks.remove(truncatedTime);
|
sinks.remove(truncatedTime);
|
||||||
sinkTimeline.remove(
|
sinkTimeline.remove(
|
||||||
|
@ -666,7 +661,7 @@ public class RealtimePlumber implements Plumber
|
||||||
handoffCondition.notifyAll();
|
handoffCondition.notifyAll();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (IOException e) {
|
catch (Exception e) {
|
||||||
log.makeAlert(e, "Unable to abandon old segment for dataSource[%s]", schema.getDataSource())
|
log.makeAlert(e, "Unable to abandon old segment for dataSource[%s]", schema.getDataSource())
|
||||||
.addData("interval", sink.getInterval())
|
.addData("interval", sink.getInterval())
|
||||||
.emit();
|
.emit();
|
||||||
|
@ -802,4 +797,20 @@ public class RealtimePlumber implements Plumber
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void removeMergedSegment(final Sink sink)
|
||||||
|
{
|
||||||
|
final File mergedTarget = new File(computePersistDir(schema, sink.getInterval()), "merged");
|
||||||
|
if (mergedTarget.exists()) {
|
||||||
|
try {
|
||||||
|
log.info("Deleting Index File[%s]", mergedTarget);
|
||||||
|
FileUtils.deleteDirectory(mergedTarget);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
log.makeAlert(e, "Unable to remove merged segment for dataSource[%s]", schema.getDataSource())
|
||||||
|
.addData("interval", sink.getInterval())
|
||||||
|
.emit();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue