mirror of https://github.com/apache/druid.git
Update RealtimePlumberSchool.java
There was an issue if you restart realtime node and merged dir is still there it will throw exceptions. To tackle this issue a check has been added. So now only persisted dir/files will be read and anything from "merged" will be avoided.
This commit is contained in:
parent
67be515db6
commit
a0462374b0
|
@ -70,6 +70,7 @@ import org.joda.time.Period;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FilenameFilter;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
|
@ -377,7 +378,14 @@ public class RealtimePlumberSchool implements PlumberSchool
|
||||||
for (File sinkDir : computeBaseDir(schema).listFiles()) {
|
for (File sinkDir : computeBaseDir(schema).listFiles()) {
|
||||||
Interval sinkInterval = new Interval(sinkDir.getName().replace("_", "/"));
|
Interval sinkInterval = new Interval(sinkDir.getName().replace("_", "/"));
|
||||||
|
|
||||||
final File[] sinkFiles = sinkDir.listFiles();
|
//final File[] sinkFiles = sinkDir.listFiles();
|
||||||
|
// To avoid reading and listing of "merged" dir
|
||||||
|
final File[] sinkFiles = sinkDir.listFiles(new FilenameFilter() {
|
||||||
|
@Override
|
||||||
|
public boolean accept(File dir, String fileName) {
|
||||||
|
return !fileName.equalsIgnoreCase("merged");
|
||||||
|
}
|
||||||
|
});
|
||||||
Arrays.sort(
|
Arrays.sort(
|
||||||
sinkFiles,
|
sinkFiles,
|
||||||
new Comparator<File>()
|
new Comparator<File>()
|
||||||
|
@ -400,6 +408,13 @@ public class RealtimePlumberSchool implements PlumberSchool
|
||||||
List<FireHydrant> hydrants = Lists.newArrayList();
|
List<FireHydrant> hydrants = Lists.newArrayList();
|
||||||
for (File segmentDir : sinkFiles) {
|
for (File segmentDir : sinkFiles) {
|
||||||
log.info("Loading previously persisted segment at [%s]", segmentDir);
|
log.info("Loading previously persisted segment at [%s]", segmentDir);
|
||||||
|
|
||||||
|
// Although this is has been tackled at start of this method.
|
||||||
|
// Just a doubly-check added to skip "merged" dir. from being added to hydrants
|
||||||
|
// If 100% sure that this is not needed, this check can be removed.
|
||||||
|
if(segmentDir.getName().equalsIgnoreCase("merged"))
|
||||||
|
continue;
|
||||||
|
|
||||||
hydrants.add(
|
hydrants.add(
|
||||||
new FireHydrant(
|
new FireHydrant(
|
||||||
new QueryableIndexSegment(null, IndexIO.loadIndex(segmentDir)),
|
new QueryableIndexSegment(null, IndexIO.loadIndex(segmentDir)),
|
||||||
|
|
Loading…
Reference in New Issue