parallelize segment loading

This commit is contained in:
Xavier Léauté 2014-09-22 16:33:25 -07:00
parent 815ebeee25
commit 12449481e3
5 changed files with 112 additions and 68 deletions

View File

@ -43,6 +43,9 @@ public class SegmentLoaderConfig
@JsonProperty("announceIntervalMillis")
private int announceIntervalMillis = 5 * 1000; // 5 seconds
@JsonProperty("numLoadingThreads")
private int numLoadingThreads = 1;
@JsonProperty
private File infoDir = null;
@ -66,6 +69,11 @@ public class SegmentLoaderConfig
return announceIntervalMillis;
}
public int getNumLoadingThreads()
{
return numLoadingThreads;
}
public File getInfoDir()
{
if (infoDir == null) {

View File

@ -26,6 +26,7 @@ import com.metamx.common.logger.Logger;
import io.druid.client.ServerView;
import io.druid.concurrent.Execs;
import io.druid.db.DatabaseSegmentManager;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.realtime.DbSegmentPublisher;
import io.druid.server.coordination.BaseZkCoordinator;
import io.druid.server.coordination.DataSegmentChangeCallback;
@ -53,6 +54,7 @@ public class BridgeZkCoordinator extends BaseZkCoordinator
public BridgeZkCoordinator(
ObjectMapper jsonMapper,
ZkPathsConfig zkPaths,
SegmentLoaderConfig config,
DruidServerMetadata me,
@Bridge CuratorFramework curator,
DbSegmentPublisher dbSegmentPublisher,
@ -60,7 +62,7 @@ public class BridgeZkCoordinator extends BaseZkCoordinator
ServerView serverView
)
{
super(jsonMapper, zkPaths, me, curator);
super(jsonMapper, zkPaths, config, me, curator);
this.dbSegmentPublisher = dbSegmentPublisher;
this.databaseSegmentManager = databaseSegmentManager;

View File

@ -21,10 +21,13 @@ package io.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.server.initialization.ZkPathsConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
@ -34,6 +37,8 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
*/
@ -45,23 +50,33 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler
private final ObjectMapper jsonMapper;
private final ZkPathsConfig zkPaths;
private final SegmentLoaderConfig config;
private final DruidServerMetadata me;
private final CuratorFramework curator;
private volatile PathChildrenCache loadQueueCache;
private volatile boolean started;
private final ListeningExecutorService loadingExec;
public BaseZkCoordinator(
ObjectMapper jsonMapper,
ZkPathsConfig zkPaths,
SegmentLoaderConfig config,
DruidServerMetadata me,
CuratorFramework curator
)
{
this.jsonMapper = jsonMapper;
this.zkPaths = zkPaths;
this.config = config;
this.me = me;
this.curator = curator;
this.loadingExec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(
config.getNumLoadingThreads(),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ZkCoordinator-%s").build()
)
);
}
@LifecycleStart
@ -83,7 +98,7 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler
loadQueueLocation,
true,
true,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ZkCoordinator-%s").build()
loadingExec
);
try {
@ -200,4 +215,9 @@ public abstract class BaseZkCoordinator implements DataSegmentChangeHandler
public abstract void loadLocalCache();
public abstract DataSegmentChangeHandler getDataSegmentChangeHandler();
public ListeningExecutorService getLoadingExecutor()
{
return loadingExec;
}
}

View File

@ -22,6 +22,7 @@ package io.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import com.metamx.common.ISE;
@ -36,6 +37,7 @@ import org.apache.curator.framework.CuratorFramework;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
@ -66,7 +68,7 @@ public class ZkCoordinator extends BaseZkCoordinator
ScheduledExecutorFactory factory
)
{
super(jsonMapper, zkPaths, me, curator);
super(jsonMapper, zkPaths, config, me, curator);
this.jsonMapper = jsonMapper;
this.config = config;
@ -127,42 +129,47 @@ public class ZkCoordinator extends BaseZkCoordinator
return ZkCoordinator.this;
}
private boolean loadSegment(DataSegment segment, DataSegmentChangeCallback callback) throws SegmentLoadingException
{
final boolean loaded;
try {
loaded = serverManager.loadSegment(segment);
}
catch (Exception e) {
removeSegment(segment, callback);
throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getIdentifier());
}
if (loaded) {
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
if (!segmentInfoCacheFile.exists()) {
try {
jsonMapper.writeValue(segmentInfoCacheFile, segment);
}
catch (IOException e) {
removeSegment(segment, callback);
throw new SegmentLoadingException(
e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile
);
}
}
}
return loaded;
}
@Override
public void addSegment(DataSegment segment, DataSegmentChangeCallback callback)
{
try {
log.info("Loading segment %s", segment.getIdentifier());
final boolean loaded;
try {
loaded = serverManager.loadSegment(segment);
}
catch (Exception e) {
removeSegment(segment, callback);
throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getIdentifier());
}
if (loaded) {
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
if (!segmentInfoCacheFile.exists()) {
try {
jsonMapper.writeValue(segmentInfoCacheFile, segment);
}
catch (IOException e) {
removeSegment(segment, callback);
throw new SegmentLoadingException(
e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile
);
}
}
if(loadSegment(segment, callback)) {
try {
announcer.announceSegment(segment);
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier());
}
}
};
}
catch (SegmentLoadingException e) {
log.makeAlert(e, "Failed to load segment for dataSource")
@ -174,54 +181,58 @@ public class ZkCoordinator extends BaseZkCoordinator
}
}
public void addSegments(Iterable<DataSegment> segments, DataSegmentChangeCallback callback)
public void addSegments(Iterable<DataSegment> segments, final DataSegmentChangeCallback callback)
{
try(final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer =
new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) {
backgroundSegmentAnnouncer.startAnnouncing();
final List<String> segmentFailures = Lists.newArrayList();
for (DataSegment segment : segments) {
log.info("Loading segment %s", segment.getIdentifier());
final List<ListenableFuture<Boolean>> segmentLoading = Lists.newArrayList();
final boolean loaded;
try {
loaded = serverManager.loadSegment(segment);
}
catch (Exception e) {
log.error(e, "Exception loading segment[%s]", segment.getIdentifier());
removeSegment(segment, callback);
segmentFailures.add(segment.getIdentifier());
continue;
}
if (loaded) {
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
if (!segmentInfoCacheFile.exists()) {
try {
jsonMapper.writeValue(segmentInfoCacheFile, segment);
}
catch (IOException e) {
log.error(e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile);
removeSegment(segment, callback);
segmentFailures.add(segment.getIdentifier());
continue;
}
}
try {
backgroundSegmentAnnouncer.announceSegment(segment);
} catch(InterruptedException e) {
throw new SegmentLoadingException(e, "Loading Interrupted");
}
}
for (final DataSegment segment : segments) {
segmentLoading.add(
getLoadingExecutor().submit(
new Callable<Boolean>()
{
@Override
public Boolean call() throws SegmentLoadingException
{
try {
log.info("Loading segment %s", segment.getIdentifier());
final boolean loaded = loadSegment(segment, callback);
if (loaded) {
try {
backgroundSegmentAnnouncer.announceSegment(segment);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SegmentLoadingException(e, "Loading Interrupted");
}
}
return loaded;
} catch(SegmentLoadingException e) {
log.error(e, "[%s] failed to load", segment.getIdentifier());
throw e;
}
}
}
)
);
}
if (!segmentFailures.isEmpty()) {
for (String segmentFailure : segmentFailures) {
log.error("%s failed to load", segmentFailure);
int failed = 0;
for(ListenableFuture future : segmentLoading) {
try {
future.get();
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
throw new SegmentLoadingException(e, "Loading Interrupted");
} catch(ExecutionException e) {
failed++;
}
throw new SegmentLoadingException("%,d errors seen while loading segments", segmentFailures.size());
}
if(failed > 0) {
throw new SegmentLoadingException("%,d errors seen while loading segments", failed);
}
backgroundSegmentAnnouncer.finishAnnouncing();
@ -292,7 +303,7 @@ public class ZkCoordinator extends BaseZkCoordinator
private volatile ScheduledFuture startedAnnouncing = null;
private volatile ScheduledFuture nextAnnoucement = null;
private BackgroundSegmentAnnouncer(
public BackgroundSegmentAnnouncer(
DataSegmentAnnouncer announcer,
ScheduledExecutorService exec,
int intervalMillis
@ -366,6 +377,7 @@ public class ZkCoordinator extends BaseZkCoordinator
doneAnnouncing.get();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SegmentLoadingException(e, "Loading Interrupted");
}
catch (ExecutionException e) {

View File

@ -31,6 +31,7 @@ import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.curator.announcement.Announcer;
import io.druid.db.DatabaseSegmentManager;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.realtime.DbSegmentPublisher;
import io.druid.server.DruidNode;
import io.druid.server.coordination.BatchDataSegmentAnnouncer;
@ -156,6 +157,7 @@ public class DruidClusterBridgeTest
BridgeZkCoordinator bridgeZkCoordinator = new BridgeZkCoordinator(
jsonMapper,
zkPathsConfig,
new SegmentLoaderConfig(),
metadata,
remoteCf,
dbSegmentPublisher,