fix according to code review comments

This commit is contained in:
fjy 2013-07-29 17:56:19 -07:00
parent 091dce11c8
commit 6a96c1fb76
5 changed files with 52 additions and 35 deletions

View File

@ -227,12 +227,10 @@ public class MasterMain
new ConfigManager(dbi, configManagerConfig), jsonMapper
);
final ScheduledExecutorService scheduledExecutorService = scheduledExecutorFactory.create(1, "Master-Exec--%d");
final LoadQueueTaskMaster taskMaster = new LoadQueueTaskMaster(
curatorFramework,
jsonMapper,
Execs.singleThreaded("Master-PeonExec--%d"),
scheduledExecutorService,
scheduledExecutorFactory.create(1, "Master-PeonExec--%d"),
druidMasterConfig
);
@ -245,7 +243,7 @@ public class MasterMain
databaseRuleManager,
curatorFramework,
emitter,
scheduledExecutorService,
scheduledExecutorFactory,
indexingServiceClient,
taskMaster
);

View File

@ -30,6 +30,7 @@ import com.google.common.collect.Sets;
import com.google.common.io.Closeables;
import com.metamx.common.IAE;
import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.Comparators;
import com.metamx.common.guava.FunctionalIterable;
@ -104,7 +105,7 @@ public class DruidMaster
DatabaseRuleManager databaseRuleManager,
CuratorFramework curator,
ServiceEmitter emitter,
ScheduledExecutorService scheduledExecutorService,
ScheduledExecutorFactory scheduledExecutorFactory,
IndexingServiceClient indexingServiceClient,
LoadQueueTaskMaster taskMaster
)
@ -118,7 +119,7 @@ public class DruidMaster
databaseRuleManager,
curator,
emitter,
scheduledExecutorService,
scheduledExecutorFactory,
indexingServiceClient,
taskMaster,
Maps.<String, LoadQueuePeon>newConcurrentMap()
@ -134,7 +135,7 @@ public class DruidMaster
DatabaseRuleManager databaseRuleManager,
CuratorFramework curator,
ServiceEmitter emitter,
ScheduledExecutorService scheduledExecutorService,
ScheduledExecutorFactory scheduledExecutorFactory,
IndexingServiceClient indexingServiceClient,
LoadQueueTaskMaster taskMaster,
ConcurrentMap<String, LoadQueuePeon> loadQueuePeonMap
@ -152,7 +153,7 @@ public class DruidMaster
this.indexingServiceClient = indexingServiceClient;
this.taskMaster = taskMaster;
this.exec = scheduledExecutorService;
this.exec = scheduledExecutorFactory.create(1, "Master-Exec--%d");
this.leaderLatch = new AtomicReference<LeaderLatch>(null);
this.loadManagementPeons = loadQueuePeonMap;

View File

@ -185,6 +185,12 @@ public class DruidMasterLogger implements DruidMasterHelper
"master/loadQueue/size", queuePeon.getLoadQueueSize()
)
);
emitter.emit(
new ServiceMetricEvent.Builder()
.setUser1(serverName).build(
"master/loadQueue/failed", queuePeon.getFailedAssignCount()
)
);
emitter.emit(
new ServiceMetricEvent.Builder()
.setUser1(serverName).build(

View File

@ -24,7 +24,6 @@ import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.Comparators;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.coordination.DataSegmentChangeRequest;
@ -44,10 +43,10 @@ import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
@ -72,11 +71,11 @@ public class LoadQueuePeon
private final CuratorFramework curator;
private final String basePath;
private final ObjectMapper jsonMapper;
private final ExecutorService zkWritingExecutor;
private final ScheduledExecutorService scheduledExecutorService;
private final ScheduledExecutorService zkWritingExecutor;
private final DruidMasterConfig config;
private final AtomicLong queuedSize = new AtomicLong(0);
private final AtomicInteger failedAssignCount = new AtomicInteger(0);
private final ConcurrentSkipListSet<SegmentHolder> segmentsToLoad = new ConcurrentSkipListSet<SegmentHolder>(
segmentHolderComparator
@ -93,8 +92,7 @@ public class LoadQueuePeon
CuratorFramework curator,
String basePath,
ObjectMapper jsonMapper,
ExecutorService zkWritingExecutor,
ScheduledExecutorService scheduledExecutorService,
ScheduledExecutorService zkWritingExecutor,
DruidMasterConfig config
)
{
@ -102,7 +100,6 @@ public class LoadQueuePeon
this.basePath = basePath;
this.jsonMapper = jsonMapper;
this.zkWritingExecutor = zkWritingExecutor;
this.scheduledExecutorService = scheduledExecutorService;
this.config = config;
}
@ -145,6 +142,11 @@ public class LoadQueuePeon
return queuedSize.get();
}
public int getFailedAssignCount()
{
return failedAssignCount.get();
}
public void loadSegment(
DataSegment segment,
LoadPeonCallback callback
@ -242,20 +244,24 @@ public class LoadQueuePeon
final byte[] payload = jsonMapper.writeValueAsBytes(currentlyLoading.getChangeRequest());
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
ScheduledExecutors.scheduleWithFixedDelay(
scheduledExecutorService,
config.getLoadTimeoutDelay(),
new Callable<ScheduledExecutors.Signal>()
zkWritingExecutor.schedule(
new Runnable()
{
@Override
public ScheduledExecutors.Signal call() throws Exception
public void run()
{
if (curator.checkExists().forPath(path) != null) {
throw new ISE("%s was never removed! Failing this assign!", path);
try {
if (curator.checkExists().forPath(path) != null) {
failAssign(new ISE("%s was never removed! Failing this assign!", path));
}
}
catch (Exception e) {
failAssign(e);
}
return ScheduledExecutors.Signal.STOP;
}
}
},
config.getLoadTimeoutDelay().getMillis(),
TimeUnit.MILLISECONDS
);
final Stat stat = curator.checkExists().usingWatcher(
@ -294,10 +300,7 @@ public class LoadQueuePeon
}
}
catch (Exception e) {
log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, currentlyLoading);
// Act like it was completed so that the master gives it to someone else
actionCompleted();
doNext();
failAssign(e);
}
}
}
@ -353,6 +356,7 @@ public class LoadQueuePeon
segmentsToLoad.clear();
queuedSize.set(0L);
failedAssignCount.set(0);
}
}
@ -377,6 +381,17 @@ public class LoadQueuePeon
doNext();
}
private void failAssign(Exception e)
{
synchronized (lock) {
log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, currentlyLoading);
failedAssignCount.getAndIncrement();
// Act like it was completed so that the master gives it to someone else
actionCompleted();
doNext();
}
}
private class SegmentHolder
{
private final DataSegment segment;

View File

@ -32,27 +32,24 @@ public class LoadQueueTaskMaster
{
private final CuratorFramework curator;
private final ObjectMapper jsonMapper;
private final ExecutorService peonExec;
private final ScheduledExecutorService scheduledExecutorService;
private final ScheduledExecutorService peonExec;
private final DruidMasterConfig config;
public LoadQueueTaskMaster(
CuratorFramework curator,
ObjectMapper jsonMapper,
ExecutorService peonExec,
ScheduledExecutorService scheduledExecutorService,
ScheduledExecutorService peonExec,
DruidMasterConfig config
)
{
this.curator = curator;
this.jsonMapper = jsonMapper;
this.peonExec = peonExec;
this.scheduledExecutorService = scheduledExecutorService;
this.config = config;
}
public LoadQueuePeon giveMePeon(String basePath)
{
return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec, scheduledExecutorService, config);
return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec, config);
}
}