Merge pull request #1444 from druid-io/logging-improvement

Separate bootstrap threads from loading threads on historical startup
This commit is contained in:
Xavier Léauté 2015-06-17 15:24:07 -07:00
commit d276d2c90b
3 changed files with 67 additions and 59 deletions

View File

@ -44,6 +44,9 @@ public class SegmentLoaderConfig
@JsonProperty("numLoadingThreads")
private int numLoadingThreads = 1;
@JsonProperty("numBootstrapThreads")
private Integer numBootstrapThreads = null;
@JsonProperty
private File infoDir = null;
@ -72,6 +75,10 @@ public class SegmentLoaderConfig
return numLoadingThreads;
}
public int getNumBootstrapThreads() {
return numBootstrapThreads == null ? numLoadingThreads : numBootstrapThreads;
}
public File getInfoDir()
{
if (infoDir == null) {

View File

@ -53,7 +53,6 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler
private volatile PathChildrenCache loadQueueCache;
private volatile boolean started = false;
private final ListeningExecutorService loadingExec;
public BaseZkCoordinator(
ObjectMapper jsonMapper,
@ -68,12 +67,6 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler
this.config = config;
this.me = me;
this.curator = curator;
this.loadingExec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(
config.getNumLoadingThreads(),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ZkCoordinator-%s").build()
)
);
}
@LifecycleStart
@ -95,7 +88,10 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler
loadQueueLocation,
true,
true,
loadingExec
Executors.newFixedThreadPool(
config.getNumLoadingThreads(),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ZkCoordinator-%s").build()
)
);
try {
@ -217,9 +213,4 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler
public abstract void loadLocalCache();
public abstract DataSegmentChangeHandler getDataSegmentChangeHandler();
public ListeningExecutorService getLoadingExecutor()
{
return loadingExec;
}
}

View File

@ -20,12 +20,13 @@ package io.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.emitter.EmittingLogger;
import io.druid.concurrent.Execs;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.server.initialization.ZkPathsConfig;
@ -34,13 +35,18 @@ import org.apache.curator.framework.CuratorFramework;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
*/
@ -86,8 +92,10 @@ public class ZkCoordinator extends BaseZkCoordinator
}
List<DataSegment> cachedSegments = Lists.newArrayList();
for (File file : baseDir.listFiles()) {
log.info("Loading segment cache file [%s]", file);
File[] segmentsToLoad = baseDir.listFiles();
for (int i = 0; i < segmentsToLoad.length; i++) {
File file = segmentsToLoad[i];
log.info("Loading segment cache file [%d/%d][%s].", i, segmentsToLoad.length, file);
try {
DataSegment segment = jsonMapper.readValue(file, DataSegment.class);
if (serverManager.isSegmentCached(segment)) {
@ -179,69 +187,71 @@ public class ZkCoordinator extends BaseZkCoordinator
}
}
public void addSegments(Iterable<DataSegment> segments, final DataSegmentChangeCallback callback)
private void addSegments(Collection<DataSegment> segments, final DataSegmentChangeCallback callback)
{
try(final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer =
ExecutorService loadingExecutor = null;
try (final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer =
new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) {
backgroundSegmentAnnouncer.startAnnouncing();
final List<ListenableFuture> segmentLoading = Lists.newArrayList();
loadingExecutor = Execs.multiThreaded(config.getNumBootstrapThreads(), "ZkCoordinator-loading-%s");
final int numSegments = segments.size();
final CountDownLatch latch = new CountDownLatch(numSegments);
final AtomicInteger counter = new AtomicInteger(0);
final CopyOnWriteArrayList<DataSegment> failedSegments = new CopyOnWriteArrayList<>();
for (final DataSegment segment : segments) {
segmentLoading.add(
getLoadingExecutor().submit(
new Callable<Void>()
{
loadingExecutor.submit(
new Runnable() {
@Override
public Void call() throws SegmentLoadingException
{
public void run() {
try {
log.info("Loading segment %s", segment.getIdentifier());
log.info("Loading segment[%d/%d][%s]", counter.getAndIncrement(), numSegments, segment.getIdentifier());
final boolean loaded = loadSegment(segment, callback);
if (loaded) {
try {
backgroundSegmentAnnouncer.announceSegment(segment);
}
catch (InterruptedException e) {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SegmentLoadingException(e, "Loading Interrupted");
}
}
return null;
} catch(SegmentLoadingException e) {
} catch (SegmentLoadingException e) {
log.error(e, "[%s] failed to load", segment.getIdentifier());
throw e;
failedSegments.add(segment);
} finally {
latch.countDown();
}
}
}
)
);
}
int failed = 0;
for(ListenableFuture future : segmentLoading) {
try {
future.get();
try{
latch.await();
if(failedSegments.size() > 0) {
log.makeAlert("%,d errors seen while loading segments", failedSegments.size())
.addData("failedSegments", failedSegments);
}
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
throw new SegmentLoadingException(e, "Loading Interrupted");
} catch(ExecutionException e) {
failed++;
}
}
if(failed > 0) {
throw new SegmentLoadingException("%,d errors seen while loading segments", failed);
log.makeAlert(e, "LoadingInterrupted");
}
backgroundSegmentAnnouncer.finishAnnouncing();
}
catch (SegmentLoadingException e) {
log.makeAlert(e, "Failed to load segments")
.addData("segments", segments)
log.makeAlert(e, "Failed to load segments -- likely problem with announcing.")
.addData("numSegments", segments.size())
.emit();
}
finally {
callback.execute();
if (loadingExecutor != null) {
loadingExecutor.shutdownNow();
}
}
}