diff --git a/server/src/main/java/com/metamx/druid/coordination/ServerManager.java b/server/src/main/java/com/metamx/druid/coordination/ServerManager.java index 38e7d1e4a39..47a3f878b6d 100644 --- a/server/src/main/java/com/metamx/druid/coordination/ServerManager.java +++ b/server/src/main/java/com/metamx/druid/coordination/ServerManager.java @@ -104,6 +104,11 @@ public class ServerManager implements QuerySegmentWalker } } + public boolean isSegmentLoaded(final DataSegment segment) throws SegmentLoadingException + { + return segmentLoader.isSegmentLoaded(segment); + } + public void loadSegment(final DataSegment segment) throws SegmentLoadingException { final Segment adapter; diff --git a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java index 1951205975c..56e9e1a6596 100644 --- a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java +++ b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java @@ -215,9 +215,12 @@ public class ZkCoordinator implements DataSegmentChangeHandler for (File file : baseDir.listFiles()) { log.info("Loading segment cache file [%s]", file); try { - addSegment(jsonMapper.readValue(file, DataSegment.class)); + DataSegment segment = jsonMapper.readValue(file, DataSegment.class); + if (serverManager.isSegmentLoaded(segment)) { + addSegment(segment); + } } - catch (IOException e) { + catch (Exception e) { log.error(e, "Exception occurred reading file [%s]", file); emitter.emit( new AlertEvent.Builder().build( diff --git a/server/src/main/java/com/metamx/druid/loading/DelegatingSegmentLoader.java b/server/src/main/java/com/metamx/druid/loading/DelegatingSegmentLoader.java index 0f8e1e7074f..f113051d506 100644 --- a/server/src/main/java/com/metamx/druid/loading/DelegatingSegmentLoader.java +++ b/server/src/main/java/com/metamx/druid/loading/DelegatingSegmentLoader.java @@ -43,6 +43,12 @@ public class DelegatingSegmentLoader implements SegmentLoader this.loaderTypes = loaderTypes; } + @Override + public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException + { + return getLoader(segment.getLoadSpec()).isSegmentLoaded(segment); + } + @Override public Segment getSegment(DataSegment segment) throws SegmentLoadingException { diff --git a/server/src/main/java/com/metamx/druid/loading/SegmentLoader.java b/server/src/main/java/com/metamx/druid/loading/SegmentLoader.java index 20fa5592ac2..75c7591a395 100644 --- a/server/src/main/java/com/metamx/druid/loading/SegmentLoader.java +++ b/server/src/main/java/com/metamx/druid/loading/SegmentLoader.java @@ -26,6 +26,7 @@ import com.metamx.druid.index.Segment; */ public interface SegmentLoader { + public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException; public Segment getSegment(DataSegment loadSpec) throws SegmentLoadingException; public void cleanup(DataSegment loadSpec) throws SegmentLoadingException; } diff --git a/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java b/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java index f40f2a52eac..bc7dbc236f2 100644 --- a/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java +++ b/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java @@ -55,6 +55,21 @@ public class SingleSegmentLoader implements SegmentLoader this.config = config; } + @Override + public boolean isSegmentLoaded(final DataSegment segment) + { + File localStorageDir = new File(config.getCacheDirectory(), DataSegmentPusherUtil.getStorageDir(segment)); + if (localStorageDir.exists()) { + return true; + } + + final File legacyStorageDir = new File( + config.getCacheDirectory(), + DataSegmentPusherUtil.getLegacyStorageDir(segment) + ); + return legacyStorageDir.exists(); + } + @Override public Segment getSegment(DataSegment segment) throws SegmentLoadingException { diff --git a/server/src/test/java/com/metamx/druid/coordination/ServerManagerTest.java b/server/src/test/java/com/metamx/druid/coordination/ServerManagerTest.java index d10566c5da4..be06a6b84c2 100644 --- a/server/src/test/java/com/metamx/druid/coordination/ServerManagerTest.java +++ b/server/src/test/java/com/metamx/druid/coordination/ServerManagerTest.java @@ -84,6 +84,12 @@ public class ServerManagerTest serverManager = new ServerManager( new SegmentLoader() { + @Override + public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException + { + return false; + } + @Override public Segment getSegment(final DataSegment segment) { diff --git a/server/src/test/java/com/metamx/druid/loading/NoopSegmentLoader.java b/server/src/test/java/com/metamx/druid/loading/NoopSegmentLoader.java index ca41c4dfec6..3a69cfbd8dc 100644 --- a/server/src/test/java/com/metamx/druid/loading/NoopSegmentLoader.java +++ b/server/src/test/java/com/metamx/druid/loading/NoopSegmentLoader.java @@ -29,6 +29,12 @@ import org.joda.time.Interval; */ public class NoopSegmentLoader implements SegmentLoader { + @Override + public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException + { + return false; + } + @Override public Segment getSegment(final DataSegment segment) throws SegmentLoadingException {