Add a bit more information to the mapping logging logic.

This commit is contained in:
Eric Tschetter 2015-06-16 19:11:04 -07:00
parent 6763e3780a
commit 1aad7ce521
1 changed files with 10 additions and 4 deletions

View File

@ -34,6 +34,7 @@ import org.apache.curator.framework.CuratorFramework;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -41,6 +42,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/** /**
*/ */
@ -86,8 +88,10 @@ public class ZkCoordinator extends BaseZkCoordinator
} }
List<DataSegment> cachedSegments = Lists.newArrayList(); List<DataSegment> cachedSegments = Lists.newArrayList();
for (File file : baseDir.listFiles()) { File[] segmentsToLoad = baseDir.listFiles();
log.info("Loading segment cache file [%s]", file); for (int i = 0; i < segmentsToLoad.length; i++) {
File file = segmentsToLoad[i];
log.info("Loading segment cache file [%d/%d][%s].", i, segmentsToLoad.length, file);
try { try {
DataSegment segment = jsonMapper.readValue(file, DataSegment.class); DataSegment segment = jsonMapper.readValue(file, DataSegment.class);
if (serverManager.isSegmentCached(segment)) { if (serverManager.isSegmentCached(segment)) {
@ -179,7 +183,7 @@ public class ZkCoordinator extends BaseZkCoordinator
} }
} }
public void addSegments(Iterable<DataSegment> segments, final DataSegmentChangeCallback callback) public void addSegments(Collection<DataSegment> segments, final DataSegmentChangeCallback callback)
{ {
try(final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer = try(final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer =
new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) { new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) {
@ -187,6 +191,8 @@ public class ZkCoordinator extends BaseZkCoordinator
final List<ListenableFuture> segmentLoading = Lists.newArrayList(); final List<ListenableFuture> segmentLoading = Lists.newArrayList();
final int numSegments = segments.size();
final AtomicLong counter = new AtomicLong(0);
for (final DataSegment segment : segments) { for (final DataSegment segment : segments) {
segmentLoading.add( segmentLoading.add(
getLoadingExecutor().submit( getLoadingExecutor().submit(
@ -196,7 +202,7 @@ public class ZkCoordinator extends BaseZkCoordinator
public Void call() throws SegmentLoadingException public Void call() throws SegmentLoadingException
{ {
try { try {
log.info("Loading segment %s", segment.getIdentifier()); log.info("Loading segment[%d/%d][%s]", counter.getAndIncrement(), numSegments, segment.getIdentifier());
final boolean loaded = loadSegment(segment, callback); final boolean loaded = loadSegment(segment, callback);
if (loaded) { if (loaded) {
try { try {