fix deadlock condition that can exist in the coordinator

This commit is contained in:
fjy 2014-08-07 15:55:25 -07:00
parent 91ebe45b4e
commit 08c7aabb34
4 changed files with 31 additions and 6 deletions

View File

@ -45,6 +45,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -73,6 +74,7 @@ public class LoadQueuePeon
private final String basePath;
private final ObjectMapper jsonMapper;
private final ScheduledExecutorService zkWritingExecutor;
private final ExecutorService callBackExecutor;
private final DruidCoordinatorConfig config;
private final AtomicLong queuedSize = new AtomicLong(0);
@ -94,12 +96,14 @@ public class LoadQueuePeon
String basePath,
ObjectMapper jsonMapper,
ScheduledExecutorService zkWritingExecutor,
ExecutorService callbackExecutor,
DruidCoordinatorConfig config
)
{
this.curator = curator;
this.basePath = basePath;
this.jsonMapper = jsonMapper;
this.callBackExecutor = callbackExecutor;
this.zkWritingExecutor = zkWritingExecutor;
this.config = config;
}
@ -333,8 +337,18 @@ public class LoadQueuePeon
default:
throw new UnsupportedOperationException();
}
currentlyProcessing.executeCallbacks();
currentlyProcessing = null;
callBackExecutor.execute(
new Runnable()
{
@Override
public void run()
{
currentlyProcessing.executeCallbacks();
currentlyProcessing = null;
}
}
);
}
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.curator.framework.CuratorFramework;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
/**
@ -33,6 +34,7 @@ public class LoadQueueTaskMaster
private final CuratorFramework curator;
private final ObjectMapper jsonMapper;
private final ScheduledExecutorService peonExec;
private final ExecutorService callbackExec;
private final DruidCoordinatorConfig config;
@Inject
@ -40,17 +42,19 @@ public class LoadQueueTaskMaster
CuratorFramework curator,
ObjectMapper jsonMapper,
ScheduledExecutorService peonExec,
ExecutorService callbackExec,
DruidCoordinatorConfig config
)
{
this.curator = curator;
this.jsonMapper = jsonMapper;
this.peonExec = peonExec;
this.callbackExec = callbackExec;
this.config = config;
}
public LoadQueuePeon giveMePeon(String basePath)
{
return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec, config);
return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec, callbackExec, config);
}
}

View File

@ -29,7 +29,7 @@ public class LoadQueuePeonTester extends LoadQueuePeon
public LoadQueuePeonTester()
{
super(null, null, null, null, null);
super(null, null, null, null, null, null);
}
@Override

View File

@ -61,6 +61,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.eclipse.jetty.server.Server;
import java.util.List;
import java.util.concurrent.Executors;
/**
*/
@ -128,10 +129,16 @@ public class CliCoordinator extends ServerRunnable
@Provides
@LazySingleton
public LoadQueueTaskMaster getLoadQueueTaskMaster(
CuratorFramework curator, ObjectMapper jsonMapper, ScheduledExecutorFactory factory, DruidCoordinatorConfig config
CuratorFramework curator,
ObjectMapper jsonMapper,
ScheduledExecutorFactory factory,
DruidCoordinatorConfig config
)
{
return new LoadQueueTaskMaster(curator, jsonMapper, factory.create(1, "Master-PeonExec--%d"), config);
return new LoadQueueTaskMaster(
curator, jsonMapper, factory.create(1, "Master-PeonExec--%d"),
Executors.newSingleThreadExecutor(), config
);
}
}
);