diff --git a/server/src/main/java/com/metamx/druid/http/MasterMain.java b/server/src/main/java/com/metamx/druid/http/MasterMain.java index 426660ee799..1c63eee2ef7 100644 --- a/server/src/main/java/com/metamx/druid/http/MasterMain.java +++ b/server/src/main/java/com/metamx/druid/http/MasterMain.java @@ -227,12 +227,10 @@ public class MasterMain new ConfigManager(dbi, configManagerConfig), jsonMapper ); - final ScheduledExecutorService scheduledExecutorService = scheduledExecutorFactory.create(1, "Master-Exec--%d"); final LoadQueueTaskMaster taskMaster = new LoadQueueTaskMaster( curatorFramework, jsonMapper, - Execs.singleThreaded("Master-PeonExec--%d"), - scheduledExecutorService, + scheduledExecutorFactory.create(1, "Master-PeonExec--%d"), druidMasterConfig ); @@ -245,7 +243,7 @@ public class MasterMain databaseRuleManager, curatorFramework, emitter, - scheduledExecutorService, + scheduledExecutorFactory, indexingServiceClient, taskMaster ); diff --git a/server/src/main/java/com/metamx/druid/master/DruidMaster.java b/server/src/main/java/com/metamx/druid/master/DruidMaster.java index 924096cc10c..592e76f0d06 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMaster.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMaster.java @@ -30,6 +30,7 @@ import com.google.common.collect.Sets; import com.google.common.io.Closeables; import com.metamx.common.IAE; import com.metamx.common.Pair; +import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.guava.Comparators; import com.metamx.common.guava.FunctionalIterable; @@ -104,7 +105,7 @@ public class DruidMaster DatabaseRuleManager databaseRuleManager, CuratorFramework curator, ServiceEmitter emitter, - ScheduledExecutorService scheduledExecutorService, + ScheduledExecutorFactory scheduledExecutorFactory, IndexingServiceClient indexingServiceClient, LoadQueueTaskMaster taskMaster ) @@ -118,7 +119,7 @@ public class DruidMaster databaseRuleManager, curator, emitter, - scheduledExecutorService, + scheduledExecutorFactory, indexingServiceClient, taskMaster, Maps.newConcurrentMap() @@ -134,7 +135,7 @@ public class DruidMaster DatabaseRuleManager databaseRuleManager, CuratorFramework curator, ServiceEmitter emitter, - ScheduledExecutorService scheduledExecutorService, + ScheduledExecutorFactory scheduledExecutorFactory, IndexingServiceClient indexingServiceClient, LoadQueueTaskMaster taskMaster, ConcurrentMap loadQueuePeonMap @@ -152,7 +153,7 @@ public class DruidMaster this.indexingServiceClient = indexingServiceClient; this.taskMaster = taskMaster; - this.exec = scheduledExecutorService; + this.exec = scheduledExecutorFactory.create(1, "Master-Exec--%d"); this.leaderLatch = new AtomicReference(null); this.loadManagementPeons = loadQueuePeonMap; diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java b/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java index 382d03966b6..c2992921f30 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java @@ -185,6 +185,12 @@ public class DruidMasterLogger implements DruidMasterHelper "master/loadQueue/size", queuePeon.getLoadQueueSize() ) ); + emitter.emit( + new ServiceMetricEvent.Builder() + .setUser1(serverName).build( + "master/loadQueue/failed", queuePeon.getFailedAssignCount() + ) + ); emitter.emit( new ServiceMetricEvent.Builder() .setUser1(serverName).build( diff --git a/server/src/main/java/com/metamx/druid/master/LoadQueuePeon.java b/server/src/main/java/com/metamx/druid/master/LoadQueuePeon.java index d75e1f39959..189a48efc30 100644 --- a/server/src/main/java/com/metamx/druid/master/LoadQueuePeon.java +++ b/server/src/main/java/com/metamx/druid/master/LoadQueuePeon.java @@ -24,7 +24,6 @@ import com.google.common.base.Function; import com.google.common.collect.Collections2; import com.google.common.collect.Lists; import com.metamx.common.ISE; -import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.guava.Comparators; import com.metamx.druid.client.DataSegment; import com.metamx.druid.coordination.DataSegmentChangeRequest; @@ -44,10 +43,10 @@ import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; /** @@ -72,11 +71,11 @@ public class LoadQueuePeon private final CuratorFramework curator; private final String basePath; private final ObjectMapper jsonMapper; - private final ExecutorService zkWritingExecutor; - private final ScheduledExecutorService scheduledExecutorService; + private final ScheduledExecutorService zkWritingExecutor; private final DruidMasterConfig config; private final AtomicLong queuedSize = new AtomicLong(0); + private final AtomicInteger failedAssignCount = new AtomicInteger(0); private final ConcurrentSkipListSet segmentsToLoad = new ConcurrentSkipListSet( segmentHolderComparator @@ -93,8 +92,7 @@ public class LoadQueuePeon CuratorFramework curator, String basePath, ObjectMapper jsonMapper, - ExecutorService zkWritingExecutor, - ScheduledExecutorService scheduledExecutorService, + ScheduledExecutorService zkWritingExecutor, DruidMasterConfig config ) { @@ -102,7 +100,6 @@ public class LoadQueuePeon this.basePath = basePath; this.jsonMapper = jsonMapper; this.zkWritingExecutor = zkWritingExecutor; - this.scheduledExecutorService = scheduledExecutorService; this.config = config; } @@ -145,6 +142,11 @@ public class LoadQueuePeon return queuedSize.get(); } + public int getFailedAssignCount() + { + return failedAssignCount.get(); + } + public void loadSegment( DataSegment segment, LoadPeonCallback callback @@ -242,20 +244,24 @@ public class LoadQueuePeon final byte[] payload = jsonMapper.writeValueAsBytes(currentlyLoading.getChangeRequest()); curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload); - ScheduledExecutors.scheduleWithFixedDelay( - scheduledExecutorService, - config.getLoadTimeoutDelay(), - new Callable() + zkWritingExecutor.schedule( + new Runnable() { @Override - public ScheduledExecutors.Signal call() throws Exception + public void run() { - if (curator.checkExists().forPath(path) != null) { - throw new ISE("%s was never removed! Failing this assign!", path); + try { + if (curator.checkExists().forPath(path) != null) { + failAssign(new ISE("%s was never removed! Failing this assign!", path)); + } + } + catch (Exception e) { + failAssign(e); } - return ScheduledExecutors.Signal.STOP; } - } + }, + config.getLoadTimeoutDelay().getMillis(), + TimeUnit.MILLISECONDS ); final Stat stat = curator.checkExists().usingWatcher( @@ -294,10 +300,7 @@ public class LoadQueuePeon } } catch (Exception e) { - log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, currentlyLoading); - // Act like it was completed so that the master gives it to someone else - actionCompleted(); - doNext(); + failAssign(e); } } } @@ -353,6 +356,7 @@ public class LoadQueuePeon segmentsToLoad.clear(); queuedSize.set(0L); + failedAssignCount.set(0); } } @@ -377,6 +381,17 @@ public class LoadQueuePeon doNext(); } + private void failAssign(Exception e) + { + synchronized (lock) { + log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, currentlyLoading); + failedAssignCount.getAndIncrement(); + // Act like it was completed so that the master gives it to someone else + actionCompleted(); + doNext(); + } + } + private class SegmentHolder { private final DataSegment segment; diff --git a/server/src/main/java/com/metamx/druid/master/LoadQueueTaskMaster.java b/server/src/main/java/com/metamx/druid/master/LoadQueueTaskMaster.java index 254b83e32cd..2547127bc5a 100644 --- a/server/src/main/java/com/metamx/druid/master/LoadQueueTaskMaster.java +++ b/server/src/main/java/com/metamx/druid/master/LoadQueueTaskMaster.java @@ -32,27 +32,24 @@ public class LoadQueueTaskMaster { private final CuratorFramework curator; private final ObjectMapper jsonMapper; - private final ExecutorService peonExec; - private final ScheduledExecutorService scheduledExecutorService; + private final ScheduledExecutorService peonExec; private final DruidMasterConfig config; public LoadQueueTaskMaster( CuratorFramework curator, ObjectMapper jsonMapper, - ExecutorService peonExec, - ScheduledExecutorService scheduledExecutorService, + ScheduledExecutorService peonExec, DruidMasterConfig config ) { this.curator = curator; this.jsonMapper = jsonMapper; this.peonExec = peonExec; - this.scheduledExecutorService = scheduledExecutorService; this.config = config; } public LoadQueuePeon giveMePeon(String basePath) { - return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec, scheduledExecutorService, config); + return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec, config); } }