diff --git a/server/src/main/java/com/metamx/druid/http/MasterMain.java b/server/src/main/java/com/metamx/druid/http/MasterMain.java index 63521717e79..426660ee799 100644 --- a/server/src/main/java/com/metamx/druid/http/MasterMain.java +++ b/server/src/main/java/com/metamx/druid/http/MasterMain.java @@ -49,6 +49,7 @@ import com.metamx.druid.db.DatabaseSegmentManager; import com.metamx.druid.db.DatabaseSegmentManagerConfig; import com.metamx.druid.db.DbConnector; import com.metamx.druid.db.DbConnectorConfig; +import com.metamx.druid.initialization.CuratorConfig; import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServerConfig; import com.metamx.druid.initialization.ServiceDiscoveryConfig; @@ -226,8 +227,13 @@ public class MasterMain new ConfigManager(dbi, configManagerConfig), jsonMapper ); + final ScheduledExecutorService scheduledExecutorService = scheduledExecutorFactory.create(1, "Master-Exec--%d"); final LoadQueueTaskMaster taskMaster = new LoadQueueTaskMaster( - curatorFramework, jsonMapper, Execs.singleThreaded("Master-PeonExec--%d") + curatorFramework, + jsonMapper, + Execs.singleThreaded("Master-PeonExec--%d"), + scheduledExecutorService, + druidMasterConfig ); final DruidMaster master = new DruidMaster( @@ -239,7 +245,7 @@ public class MasterMain databaseRuleManager, curatorFramework, emitter, - scheduledExecutorFactory, + scheduledExecutorService, indexingServiceClient, taskMaster ); diff --git a/server/src/main/java/com/metamx/druid/master/DruidMaster.java b/server/src/main/java/com/metamx/druid/master/DruidMaster.java index 592e76f0d06..924096cc10c 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMaster.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMaster.java @@ -30,7 +30,6 @@ import com.google.common.collect.Sets; import com.google.common.io.Closeables; import com.metamx.common.IAE; import com.metamx.common.Pair; -import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.guava.Comparators; import com.metamx.common.guava.FunctionalIterable; @@ -105,7 +104,7 @@ public class DruidMaster DatabaseRuleManager databaseRuleManager, CuratorFramework curator, ServiceEmitter emitter, - ScheduledExecutorFactory scheduledExecutorFactory, + ScheduledExecutorService scheduledExecutorService, IndexingServiceClient indexingServiceClient, LoadQueueTaskMaster taskMaster ) @@ -119,7 +118,7 @@ public class DruidMaster databaseRuleManager, curator, emitter, - scheduledExecutorFactory, + scheduledExecutorService, indexingServiceClient, taskMaster, Maps.newConcurrentMap() @@ -135,7 +134,7 @@ public class DruidMaster DatabaseRuleManager databaseRuleManager, CuratorFramework curator, ServiceEmitter emitter, - ScheduledExecutorFactory scheduledExecutorFactory, + ScheduledExecutorService scheduledExecutorService, IndexingServiceClient indexingServiceClient, LoadQueueTaskMaster taskMaster, ConcurrentMap loadQueuePeonMap @@ -153,7 +152,7 @@ public class DruidMaster this.indexingServiceClient = indexingServiceClient; this.taskMaster = taskMaster; - this.exec = scheduledExecutorFactory.create(1, "Master-Exec--%d"); + this.exec = scheduledExecutorService; this.leaderLatch = new AtomicReference(null); this.loadManagementPeons = loadQueuePeonMap; diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java b/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java index 662569e8485..d514b4d5c4f 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java @@ -89,4 +89,10 @@ public abstract class DruidMasterConfig @Config("druid.master.replicant.throttleLimit") @Default("10") public abstract int getReplicantThrottleLimit(); + + @Config("druid.master.load.timeout") + public Duration getLoadTimeoutDelay() + { + return new Duration(15 * 60 * 1000); + } } diff --git a/server/src/main/java/com/metamx/druid/master/LoadQueuePeon.java b/server/src/main/java/com/metamx/druid/master/LoadQueuePeon.java index c4ce6ac5a8c..d75e1f39959 100644 --- a/server/src/main/java/com/metamx/druid/master/LoadQueuePeon.java +++ b/server/src/main/java/com/metamx/druid/master/LoadQueuePeon.java @@ -23,6 +23,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.collect.Collections2; import com.google.common.collect.Lists; +import com.metamx.common.ISE; +import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.guava.Comparators; import com.metamx.druid.client.DataSegment; import com.metamx.druid.coordination.DataSegmentChangeRequest; @@ -42,8 +44,10 @@ import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicLong; /** @@ -54,15 +58,6 @@ public class LoadQueuePeon private static final int DROP = 0; private static final int LOAD = 1; - private final Object lock = new Object(); - - private final CuratorFramework curator; - private final String basePath; - private final ObjectMapper jsonMapper; - private final ExecutorService zkWritingExecutor; - - private final AtomicLong queuedSize = new AtomicLong(0); - private static Comparator segmentHolderComparator = new Comparator() { private Comparator comparator = Comparators.inverse(DataSegment.bucketMonthComparator()); @@ -74,6 +69,15 @@ public class LoadQueuePeon } }; + private final CuratorFramework curator; + private final String basePath; + private final ObjectMapper jsonMapper; + private final ExecutorService zkWritingExecutor; + private final ScheduledExecutorService scheduledExecutorService; + private final DruidMasterConfig config; + + private final AtomicLong queuedSize = new AtomicLong(0); + private final ConcurrentSkipListSet segmentsToLoad = new ConcurrentSkipListSet( segmentHolderComparator ); @@ -81,19 +85,25 @@ public class LoadQueuePeon segmentHolderComparator ); + private final Object lock = new Object(); + private volatile SegmentHolder currentlyLoading = null; LoadQueuePeon( CuratorFramework curator, String basePath, ObjectMapper jsonMapper, - ExecutorService zkWritingExecutor + ExecutorService zkWritingExecutor, + ScheduledExecutorService scheduledExecutorService, + DruidMasterConfig config ) { this.curator = curator; this.basePath = basePath; this.jsonMapper = jsonMapper; this.zkWritingExecutor = zkWritingExecutor; + this.scheduledExecutorService = scheduledExecutorService; + this.config = config; } public Set getSegmentsToLoad() @@ -232,6 +242,22 @@ public class LoadQueuePeon final byte[] payload = jsonMapper.writeValueAsBytes(currentlyLoading.getChangeRequest()); curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload); + ScheduledExecutors.scheduleWithFixedDelay( + scheduledExecutorService, + config.getLoadTimeoutDelay(), + new Callable() + { + @Override + public ScheduledExecutors.Signal call() throws Exception + { + if (curator.checkExists().forPath(path) != null) { + throw new ISE("%s was never removed! Failing this assign!", path); + } + return ScheduledExecutors.Signal.STOP; + } + } + ); + final Stat stat = curator.checkExists().usingWatcher( new CuratorWatcher() { diff --git a/server/src/main/java/com/metamx/druid/master/LoadQueueTaskMaster.java b/server/src/main/java/com/metamx/druid/master/LoadQueueTaskMaster.java index 9ef5b61e5a0..254b83e32cd 100644 --- a/server/src/main/java/com/metamx/druid/master/LoadQueueTaskMaster.java +++ b/server/src/main/java/com/metamx/druid/master/LoadQueueTaskMaster.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.curator.framework.CuratorFramework; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; /** * Provides LoadQueuePeons @@ -32,20 +33,26 @@ public class LoadQueueTaskMaster private final CuratorFramework curator; private final ObjectMapper jsonMapper; private final ExecutorService peonExec; + private final ScheduledExecutorService scheduledExecutorService; + private final DruidMasterConfig config; public LoadQueueTaskMaster( CuratorFramework curator, ObjectMapper jsonMapper, - ExecutorService peonExec + ExecutorService peonExec, + ScheduledExecutorService scheduledExecutorService, + DruidMasterConfig config ) { this.curator = curator; this.jsonMapper = jsonMapper; this.peonExec = peonExec; + this.scheduledExecutorService = scheduledExecutorService; + this.config = config; } public LoadQueuePeon giveMePeon(String basePath) { - return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec); + return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec, scheduledExecutorService, config); } } diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java index a49dc85a582..e47afd8d109 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java @@ -36,6 +36,7 @@ import org.junit.Before; import org.junit.Test; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; /** */ @@ -46,7 +47,7 @@ public class DruidMasterTest private LoadQueueTaskMaster taskMaster; private DatabaseSegmentManager databaseSegmentManager; private SingleServerInventoryView serverInventoryView; - private ScheduledExecutorFactory scheduledExecutorFactory; + private ScheduledExecutorService scheduledExecutorService; private DruidServer druidServer; private DataSegment segment; private ConcurrentMap loadManagementPeons; @@ -64,8 +65,8 @@ public class DruidMasterTest databaseSegmentManager = EasyMock.createNiceMock(DatabaseSegmentManager.class); EasyMock.replay(databaseSegmentManager); - scheduledExecutorFactory = EasyMock.createNiceMock(ScheduledExecutorFactory.class); - EasyMock.replay(scheduledExecutorFactory); + scheduledExecutorService = EasyMock.createNiceMock(ScheduledExecutorService.class); + EasyMock.replay(scheduledExecutorService); master = new DruidMaster( new DruidMasterConfig() @@ -138,7 +139,7 @@ public class DruidMasterTest null, curator, new NoopServiceEmitter(), - scheduledExecutorFactory, + scheduledExecutorService, null, taskMaster, loadManagementPeons diff --git a/server/src/test/java/com/metamx/druid/master/LoadQueuePeonTester.java b/server/src/test/java/com/metamx/druid/master/LoadQueuePeonTester.java index b0e000d20ad..3594c660c09 100644 --- a/server/src/test/java/com/metamx/druid/master/LoadQueuePeonTester.java +++ b/server/src/test/java/com/metamx/druid/master/LoadQueuePeonTester.java @@ -10,7 +10,7 @@ public class LoadQueuePeonTester extends LoadQueuePeon public LoadQueuePeonTester() { - super(null, null, null, null); + super(null, null, null, null, null, null); } @Override