Merge pull request #206 from metamx/peontimeout

Add a timeout in LoadQueuePeon to fail an assign if it takes too long
This commit is contained in:
cheddar 2013-07-30 15:08:46 -07:00
commit 843b6650a2
7 changed files with 85 additions and 24 deletions

View File

@ -233,7 +233,10 @@ public class MasterMain
);
final LoadQueueTaskMaster taskMaster = new LoadQueueTaskMaster(
curatorFramework, jsonMapper, Execs.singleThreaded("Master-PeonExec--%d")
curatorFramework,
jsonMapper,
scheduledExecutorFactory.create(1, "Master-PeonExec--%d"),
druidMasterConfig
);
final DruidMaster master = new DruidMaster(

View File

@ -89,4 +89,10 @@ public abstract class DruidMasterConfig
@Config("druid.master.replicant.throttleLimit")
@Default("10")
public abstract int getReplicantThrottleLimit();
@Config("druid.master.load.timeout")
public Duration getLoadTimeoutDelay()
{
return new Duration(15 * 60 * 1000);
}
}

View File

@ -185,6 +185,12 @@ public class DruidMasterLogger implements DruidMasterHelper
"master/loadQueue/size", queuePeon.getLoadQueueSize()
)
);
emitter.emit(
new ServiceMetricEvent.Builder()
.setUser1(serverName).build(
"master/loadQueue/failed", queuePeon.getAndResetFailedAssignCount()
)
);
emitter.emit(
new ServiceMetricEvent.Builder()
.setUser1(serverName).build(

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.guava.Comparators;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.coordination.DataSegmentChangeRequest;
@ -43,7 +44,9 @@ import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
@ -54,15 +57,6 @@ public class LoadQueuePeon
private static final int DROP = 0;
private static final int LOAD = 1;
private final Object lock = new Object();
private final CuratorFramework curator;
private final String basePath;
private final ObjectMapper jsonMapper;
private final ExecutorService zkWritingExecutor;
private final AtomicLong queuedSize = new AtomicLong(0);
private static Comparator<SegmentHolder> segmentHolderComparator = new Comparator<SegmentHolder>()
{
private Comparator<DataSegment> comparator = Comparators.inverse(DataSegment.bucketMonthComparator());
@ -74,6 +68,15 @@ public class LoadQueuePeon
}
};
private final CuratorFramework curator;
private final String basePath;
private final ObjectMapper jsonMapper;
private final ScheduledExecutorService zkWritingExecutor;
private final DruidMasterConfig config;
private final AtomicLong queuedSize = new AtomicLong(0);
private final AtomicInteger failedAssignCount = new AtomicInteger(0);
private final ConcurrentSkipListSet<SegmentHolder> segmentsToLoad = new ConcurrentSkipListSet<SegmentHolder>(
segmentHolderComparator
);
@ -81,19 +84,23 @@ public class LoadQueuePeon
segmentHolderComparator
);
private final Object lock = new Object();
private volatile SegmentHolder currentlyLoading = null;
LoadQueuePeon(
CuratorFramework curator,
String basePath,
ObjectMapper jsonMapper,
ExecutorService zkWritingExecutor
ScheduledExecutorService zkWritingExecutor,
DruidMasterConfig config
)
{
this.curator = curator;
this.basePath = basePath;
this.jsonMapper = jsonMapper;
this.zkWritingExecutor = zkWritingExecutor;
this.config = config;
}
public Set<DataSegment> getSegmentsToLoad()
@ -135,6 +142,11 @@ public class LoadQueuePeon
return queuedSize.get();
}
public int getAndResetFailedAssignCount()
{
return failedAssignCount.getAndSet(0);
}
public void loadSegment(
DataSegment segment,
LoadPeonCallback callback
@ -232,6 +244,26 @@ public class LoadQueuePeon
final byte[] payload = jsonMapper.writeValueAsBytes(currentlyLoading.getChangeRequest());
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
zkWritingExecutor.schedule(
new Runnable()
{
@Override
public void run()
{
try {
if (curator.checkExists().forPath(path) != null) {
failAssign(new ISE("%s was never removed! Failing this assign!", path));
}
}
catch (Exception e) {
failAssign(e);
}
}
},
config.getLoadTimeoutDelay().getMillis(),
TimeUnit.MILLISECONDS
);
final Stat stat = curator.checkExists().usingWatcher(
new CuratorWatcher()
{
@ -268,10 +300,7 @@ public class LoadQueuePeon
}
}
catch (Exception e) {
log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, currentlyLoading);
// Act like it was completed so that the master gives it to someone else
actionCompleted();
doNext();
failAssign(e);
}
}
}
@ -327,6 +356,7 @@ public class LoadQueuePeon
segmentsToLoad.clear();
queuedSize.set(0L);
failedAssignCount.set(0);
}
}
@ -351,6 +381,17 @@ public class LoadQueuePeon
doNext();
}
private void failAssign(Exception e)
{
synchronized (lock) {
log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, currentlyLoading);
failedAssignCount.getAndIncrement();
// Act like it was completed so that the master gives it to someone else
actionCompleted();
doNext();
}
}
private class SegmentHolder
{
private final DataSegment segment;

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.curator.framework.CuratorFramework;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
/**
* Provides LoadQueuePeons
@ -31,21 +32,24 @@ public class LoadQueueTaskMaster
{
private final CuratorFramework curator;
private final ObjectMapper jsonMapper;
private final ExecutorService peonExec;
private final ScheduledExecutorService peonExec;
private final DruidMasterConfig config;
public LoadQueueTaskMaster(
CuratorFramework curator,
ObjectMapper jsonMapper,
ExecutorService peonExec
ScheduledExecutorService peonExec,
DruidMasterConfig config
)
{
this.curator = curator;
this.jsonMapper = jsonMapper;
this.peonExec = peonExec;
this.config = config;
}
public LoadQueuePeon giveMePeon(String basePath)
{
return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec);
return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec, config);
}
}

View File

@ -36,6 +36,7 @@ import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
/**
*/
@ -46,7 +47,7 @@ public class DruidMasterTest
private LoadQueueTaskMaster taskMaster;
private DatabaseSegmentManager databaseSegmentManager;
private SingleServerInventoryView serverInventoryView;
private ScheduledExecutorFactory scheduledExecutorFactory;
private ScheduledExecutorService scheduledExecutorService;
private DruidServer druidServer;
private DataSegment segment;
private ConcurrentMap<String, LoadQueuePeon> loadManagementPeons;
@ -64,8 +65,8 @@ public class DruidMasterTest
databaseSegmentManager = EasyMock.createNiceMock(DatabaseSegmentManager.class);
EasyMock.replay(databaseSegmentManager);
scheduledExecutorFactory = EasyMock.createNiceMock(ScheduledExecutorFactory.class);
EasyMock.replay(scheduledExecutorFactory);
scheduledExecutorService = EasyMock.createNiceMock(ScheduledExecutorService.class);
EasyMock.replay(scheduledExecutorService);
master = new DruidMaster(
new DruidMasterConfig()
@ -138,7 +139,7 @@ public class DruidMasterTest
null,
curator,
new NoopServiceEmitter(),
scheduledExecutorFactory,
scheduledExecutorService,
null,
taskMaster,
loadManagementPeons

View File

@ -10,7 +10,7 @@ public class LoadQueuePeonTester extends LoadQueuePeon
public LoadQueuePeonTester()
{
super(null, null, null, null);
super(null, null, null, null, null, null);
}
@Override