diff --git a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java index 3804db09041..29578ed68ba 100644 --- a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java +++ b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java @@ -34,6 +34,7 @@ import org.apache.curator.framework.CuratorFramework; import java.io.File; import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -41,6 +42,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; /** */ @@ -86,8 +88,10 @@ public class ZkCoordinator extends BaseZkCoordinator } List cachedSegments = Lists.newArrayList(); - for (File file : baseDir.listFiles()) { - log.info("Loading segment cache file [%s]", file); + File[] segmentsToLoad = baseDir.listFiles(); + for (int i = 0; i < segmentsToLoad.length; i++) { + File file = segmentsToLoad[i]; + log.info("Loading segment cache file [%d/%d][%s].", i, segmentsToLoad.length, file); try { DataSegment segment = jsonMapper.readValue(file, DataSegment.class); if (serverManager.isSegmentCached(segment)) { @@ -179,7 +183,7 @@ public class ZkCoordinator extends BaseZkCoordinator } } - public void addSegments(Iterable segments, final DataSegmentChangeCallback callback) + public void addSegments(Collection segments, final DataSegmentChangeCallback callback) { try(final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer = new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) { @@ -187,6 +191,8 @@ public class ZkCoordinator extends BaseZkCoordinator final List segmentLoading = Lists.newArrayList(); + final int numSegments = segments.size(); + final AtomicLong counter = new AtomicLong(0); for (final DataSegment segment : segments) { segmentLoading.add( getLoadingExecutor().submit( @@ -196,7 +202,7 @@ public class ZkCoordinator extends BaseZkCoordinator public Void call() throws SegmentLoadingException { try { - log.info("Loading segment %s", segment.getIdentifier()); + log.info("Loading segment[%d/%d][%s]", counter.getAndIncrement(), numSegments, segment.getIdentifier()); final boolean loaded = loadSegment(segment, callback); if (loaded) { try {