mirror of https://github.com/apache/druid.git
fix some rt cleanup and failure edge cases
This commit is contained in:
parent
4e3f4fbc22
commit
96d5d4a59a
|
@ -59,6 +59,7 @@ import javax.annotation.Nullable;
|
|||
import java.io.File;
|
||||
import java.io.FilenameFilter;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
|
@ -312,6 +313,12 @@ public class RealtimePlumber implements Plumber
|
|||
{
|
||||
final Interval interval = sink.getInterval();
|
||||
|
||||
// use a marker file to indicate that merging has completed
|
||||
final File marker = new File(computePersistDir(schema, sink.getInterval()), "marker");
|
||||
if (marker.exists()) {
|
||||
removeMergedSegment(sink);
|
||||
}
|
||||
|
||||
for (FireHydrant hydrant : sink) {
|
||||
synchronized (hydrant) {
|
||||
if (!hydrant.hasSwapped()) {
|
||||
|
@ -342,6 +349,9 @@ public class RealtimePlumber implements Plumber
|
|||
schema.getAggregators(),
|
||||
mergedTarget
|
||||
);
|
||||
if (!marker.createNewFile()) {
|
||||
log.makeAlert("Unable to make marker file[%s]! WTF?!", marker).emit();
|
||||
}
|
||||
|
||||
QueryableIndex index = IndexIO.loadIndex(mergedFile);
|
||||
|
||||
|
@ -350,6 +360,10 @@ public class RealtimePlumber implements Plumber
|
|||
sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions()))
|
||||
);
|
||||
|
||||
if (!marker.delete()) {
|
||||
log.makeAlert("Unable to remove merged marker", marker).emit();
|
||||
}
|
||||
|
||||
segmentPublisher.publishSegment(segment);
|
||||
}
|
||||
catch (Exception e) {
|
||||
|
@ -649,7 +663,7 @@ public class RealtimePlumber implements Plumber
|
|||
{
|
||||
try {
|
||||
segmentAnnouncer.unannounceSegment(sink.getSegment());
|
||||
removeMergedSegment(sink);
|
||||
removeSegment(sink, computePersistDir(schema, sink.getInterval()));
|
||||
log.info("Removing sinkKey %d for segment %s", truncatedTime, sink.getSegment().getIdentifier());
|
||||
sinks.remove(truncatedTime);
|
||||
sinkTimeline.remove(
|
||||
|
@ -785,13 +799,13 @@ public class RealtimePlumber implements Plumber
|
|||
&& config.getShardSpec().getPartitionNum() == segment.getShardSpec().getPartitionNum()
|
||||
&& Iterables.any(
|
||||
sinks.keySet(), new Predicate<Long>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(Long sinkKey)
|
||||
{
|
||||
return segment.getInterval().contains(sinkKey);
|
||||
}
|
||||
}
|
||||
{
|
||||
@Override
|
||||
public boolean apply(Long sinkKey)
|
||||
{
|
||||
return segment.getInterval().contains(sinkKey);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -801,10 +815,15 @@ public class RealtimePlumber implements Plumber
|
|||
private void removeMergedSegment(final Sink sink)
|
||||
{
|
||||
final File mergedTarget = new File(computePersistDir(schema, sink.getInterval()), "merged");
|
||||
if (mergedTarget.exists()) {
|
||||
removeSegment(sink, mergedTarget);
|
||||
}
|
||||
|
||||
private void removeSegment(final Sink sink, final File target)
|
||||
{
|
||||
if (target.exists()) {
|
||||
try {
|
||||
log.info("Deleting Index File[%s]", mergedTarget);
|
||||
FileUtils.deleteDirectory(mergedTarget);
|
||||
log.info("Deleting Index File[%s]", target);
|
||||
FileUtils.deleteDirectory(target);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.makeAlert(e, "Unable to remove merged segment for dataSource[%s]", schema.getDataSource())
|
||||
|
|
Loading…
Reference in New Issue