Add a timeout in LQP to fail an assign if it takes too long

This commit is contained in:
fjy 2013-07-29 15:47:05 -07:00
parent b0090a1de6
commit 091dce11c8
7 changed files with 69 additions and 24 deletions

View File

@ -49,6 +49,7 @@ import com.metamx.druid.db.DatabaseSegmentManager;
import com.metamx.druid.db.DatabaseSegmentManagerConfig; import com.metamx.druid.db.DatabaseSegmentManagerConfig;
import com.metamx.druid.db.DbConnector; import com.metamx.druid.db.DbConnector;
import com.metamx.druid.db.DbConnectorConfig; import com.metamx.druid.db.DbConnectorConfig;
import com.metamx.druid.initialization.CuratorConfig;
import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig; import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.initialization.ServiceDiscoveryConfig;
@ -226,8 +227,13 @@ public class MasterMain
new ConfigManager(dbi, configManagerConfig), jsonMapper new ConfigManager(dbi, configManagerConfig), jsonMapper
); );
final ScheduledExecutorService scheduledExecutorService = scheduledExecutorFactory.create(1, "Master-Exec--%d");
final LoadQueueTaskMaster taskMaster = new LoadQueueTaskMaster( final LoadQueueTaskMaster taskMaster = new LoadQueueTaskMaster(
curatorFramework, jsonMapper, Execs.singleThreaded("Master-PeonExec--%d") curatorFramework,
jsonMapper,
Execs.singleThreaded("Master-PeonExec--%d"),
scheduledExecutorService,
druidMasterConfig
); );
final DruidMaster master = new DruidMaster( final DruidMaster master = new DruidMaster(
@ -239,7 +245,7 @@ public class MasterMain
databaseRuleManager, databaseRuleManager,
curatorFramework, curatorFramework,
emitter, emitter,
scheduledExecutorFactory, scheduledExecutorService,
indexingServiceClient, indexingServiceClient,
taskMaster taskMaster
); );

View File

@ -30,7 +30,6 @@ import com.google.common.collect.Sets;
import com.google.common.io.Closeables; import com.google.common.io.Closeables;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.Comparators; import com.metamx.common.guava.Comparators;
import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.FunctionalIterable;
@ -105,7 +104,7 @@ public class DruidMaster
DatabaseRuleManager databaseRuleManager, DatabaseRuleManager databaseRuleManager,
CuratorFramework curator, CuratorFramework curator,
ServiceEmitter emitter, ServiceEmitter emitter,
ScheduledExecutorFactory scheduledExecutorFactory, ScheduledExecutorService scheduledExecutorService,
IndexingServiceClient indexingServiceClient, IndexingServiceClient indexingServiceClient,
LoadQueueTaskMaster taskMaster LoadQueueTaskMaster taskMaster
) )
@ -119,7 +118,7 @@ public class DruidMaster
databaseRuleManager, databaseRuleManager,
curator, curator,
emitter, emitter,
scheduledExecutorFactory, scheduledExecutorService,
indexingServiceClient, indexingServiceClient,
taskMaster, taskMaster,
Maps.<String, LoadQueuePeon>newConcurrentMap() Maps.<String, LoadQueuePeon>newConcurrentMap()
@ -135,7 +134,7 @@ public class DruidMaster
DatabaseRuleManager databaseRuleManager, DatabaseRuleManager databaseRuleManager,
CuratorFramework curator, CuratorFramework curator,
ServiceEmitter emitter, ServiceEmitter emitter,
ScheduledExecutorFactory scheduledExecutorFactory, ScheduledExecutorService scheduledExecutorService,
IndexingServiceClient indexingServiceClient, IndexingServiceClient indexingServiceClient,
LoadQueueTaskMaster taskMaster, LoadQueueTaskMaster taskMaster,
ConcurrentMap<String, LoadQueuePeon> loadQueuePeonMap ConcurrentMap<String, LoadQueuePeon> loadQueuePeonMap
@ -153,7 +152,7 @@ public class DruidMaster
this.indexingServiceClient = indexingServiceClient; this.indexingServiceClient = indexingServiceClient;
this.taskMaster = taskMaster; this.taskMaster = taskMaster;
this.exec = scheduledExecutorFactory.create(1, "Master-Exec--%d"); this.exec = scheduledExecutorService;
this.leaderLatch = new AtomicReference<LeaderLatch>(null); this.leaderLatch = new AtomicReference<LeaderLatch>(null);
this.loadManagementPeons = loadQueuePeonMap; this.loadManagementPeons = loadQueuePeonMap;

View File

@ -89,4 +89,10 @@ public abstract class DruidMasterConfig
@Config("druid.master.replicant.throttleLimit") @Config("druid.master.replicant.throttleLimit")
@Default("10") @Default("10")
public abstract int getReplicantThrottleLimit(); public abstract int getReplicantThrottleLimit();
@Config("druid.master.load.timeout")
public Duration getLoadTimeoutDelay()
{
return new Duration(15 * 60 * 1000);
}
} }

View File

@ -23,6 +23,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.collect.Collections2; import com.google.common.collect.Collections2;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.Comparators; import com.metamx.common.guava.Comparators;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.coordination.DataSegmentChangeRequest; import com.metamx.druid.coordination.DataSegmentChangeRequest;
@ -42,8 +44,10 @@ import java.util.Collection;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
/** /**
@ -54,15 +58,6 @@ public class LoadQueuePeon
private static final int DROP = 0; private static final int DROP = 0;
private static final int LOAD = 1; private static final int LOAD = 1;
private final Object lock = new Object();
private final CuratorFramework curator;
private final String basePath;
private final ObjectMapper jsonMapper;
private final ExecutorService zkWritingExecutor;
private final AtomicLong queuedSize = new AtomicLong(0);
private static Comparator<SegmentHolder> segmentHolderComparator = new Comparator<SegmentHolder>() private static Comparator<SegmentHolder> segmentHolderComparator = new Comparator<SegmentHolder>()
{ {
private Comparator<DataSegment> comparator = Comparators.inverse(DataSegment.bucketMonthComparator()); private Comparator<DataSegment> comparator = Comparators.inverse(DataSegment.bucketMonthComparator());
@ -74,6 +69,15 @@ public class LoadQueuePeon
} }
}; };
private final CuratorFramework curator;
private final String basePath;
private final ObjectMapper jsonMapper;
private final ExecutorService zkWritingExecutor;
private final ScheduledExecutorService scheduledExecutorService;
private final DruidMasterConfig config;
private final AtomicLong queuedSize = new AtomicLong(0);
private final ConcurrentSkipListSet<SegmentHolder> segmentsToLoad = new ConcurrentSkipListSet<SegmentHolder>( private final ConcurrentSkipListSet<SegmentHolder> segmentsToLoad = new ConcurrentSkipListSet<SegmentHolder>(
segmentHolderComparator segmentHolderComparator
); );
@ -81,19 +85,25 @@ public class LoadQueuePeon
segmentHolderComparator segmentHolderComparator
); );
private final Object lock = new Object();
private volatile SegmentHolder currentlyLoading = null; private volatile SegmentHolder currentlyLoading = null;
LoadQueuePeon( LoadQueuePeon(
CuratorFramework curator, CuratorFramework curator,
String basePath, String basePath,
ObjectMapper jsonMapper, ObjectMapper jsonMapper,
ExecutorService zkWritingExecutor ExecutorService zkWritingExecutor,
ScheduledExecutorService scheduledExecutorService,
DruidMasterConfig config
) )
{ {
this.curator = curator; this.curator = curator;
this.basePath = basePath; this.basePath = basePath;
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.zkWritingExecutor = zkWritingExecutor; this.zkWritingExecutor = zkWritingExecutor;
this.scheduledExecutorService = scheduledExecutorService;
this.config = config;
} }
public Set<DataSegment> getSegmentsToLoad() public Set<DataSegment> getSegmentsToLoad()
@ -232,6 +242,22 @@ public class LoadQueuePeon
final byte[] payload = jsonMapper.writeValueAsBytes(currentlyLoading.getChangeRequest()); final byte[] payload = jsonMapper.writeValueAsBytes(currentlyLoading.getChangeRequest());
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload); curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
ScheduledExecutors.scheduleWithFixedDelay(
scheduledExecutorService,
config.getLoadTimeoutDelay(),
new Callable<ScheduledExecutors.Signal>()
{
@Override
public ScheduledExecutors.Signal call() throws Exception
{
if (curator.checkExists().forPath(path) != null) {
throw new ISE("%s was never removed! Failing this assign!", path);
}
return ScheduledExecutors.Signal.STOP;
}
}
);
final Stat stat = curator.checkExists().usingWatcher( final Stat stat = curator.checkExists().usingWatcher(
new CuratorWatcher() new CuratorWatcher()
{ {

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
/** /**
* Provides LoadQueuePeons * Provides LoadQueuePeons
@ -32,20 +33,26 @@ public class LoadQueueTaskMaster
private final CuratorFramework curator; private final CuratorFramework curator;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final ExecutorService peonExec; private final ExecutorService peonExec;
private final ScheduledExecutorService scheduledExecutorService;
private final DruidMasterConfig config;
public LoadQueueTaskMaster( public LoadQueueTaskMaster(
CuratorFramework curator, CuratorFramework curator,
ObjectMapper jsonMapper, ObjectMapper jsonMapper,
ExecutorService peonExec ExecutorService peonExec,
ScheduledExecutorService scheduledExecutorService,
DruidMasterConfig config
) )
{ {
this.curator = curator; this.curator = curator;
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.peonExec = peonExec; this.peonExec = peonExec;
this.scheduledExecutorService = scheduledExecutorService;
this.config = config;
} }
public LoadQueuePeon giveMePeon(String basePath) public LoadQueuePeon giveMePeon(String basePath)
{ {
return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec); return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec, scheduledExecutorService, config);
} }
} }

View File

@ -36,6 +36,7 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
/** /**
*/ */
@ -46,7 +47,7 @@ public class DruidMasterTest
private LoadQueueTaskMaster taskMaster; private LoadQueueTaskMaster taskMaster;
private DatabaseSegmentManager databaseSegmentManager; private DatabaseSegmentManager databaseSegmentManager;
private SingleServerInventoryView serverInventoryView; private SingleServerInventoryView serverInventoryView;
private ScheduledExecutorFactory scheduledExecutorFactory; private ScheduledExecutorService scheduledExecutorService;
private DruidServer druidServer; private DruidServer druidServer;
private DataSegment segment; private DataSegment segment;
private ConcurrentMap<String, LoadQueuePeon> loadManagementPeons; private ConcurrentMap<String, LoadQueuePeon> loadManagementPeons;
@ -64,8 +65,8 @@ public class DruidMasterTest
databaseSegmentManager = EasyMock.createNiceMock(DatabaseSegmentManager.class); databaseSegmentManager = EasyMock.createNiceMock(DatabaseSegmentManager.class);
EasyMock.replay(databaseSegmentManager); EasyMock.replay(databaseSegmentManager);
scheduledExecutorFactory = EasyMock.createNiceMock(ScheduledExecutorFactory.class); scheduledExecutorService = EasyMock.createNiceMock(ScheduledExecutorService.class);
EasyMock.replay(scheduledExecutorFactory); EasyMock.replay(scheduledExecutorService);
master = new DruidMaster( master = new DruidMaster(
new DruidMasterConfig() new DruidMasterConfig()
@ -138,7 +139,7 @@ public class DruidMasterTest
null, null,
curator, curator,
new NoopServiceEmitter(), new NoopServiceEmitter(),
scheduledExecutorFactory, scheduledExecutorService,
null, null,
taskMaster, taskMaster,
loadManagementPeons loadManagementPeons

View File

@ -10,7 +10,7 @@ public class LoadQueuePeonTester extends LoadQueuePeon
public LoadQueuePeonTester() public LoadQueuePeonTester()
{ {
super(null, null, null, null); super(null, null, null, null, null, null);
} }
@Override @Override