Merge pull request #670 from metamx/fix-deadlock

fix deadlock condition that can exist in the coordinator
This commit is contained in:
xvrl 2014-08-07 16:40:16 -07:00
commit 3b6bc48a51
4 changed files with 31 additions and 6 deletions

View File

@ -45,6 +45,7 @@ import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -73,6 +74,7 @@ public class LoadQueuePeon
private final String basePath; private final String basePath;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final ScheduledExecutorService zkWritingExecutor; private final ScheduledExecutorService zkWritingExecutor;
private final ExecutorService callBackExecutor;
private final DruidCoordinatorConfig config; private final DruidCoordinatorConfig config;
private final AtomicLong queuedSize = new AtomicLong(0); private final AtomicLong queuedSize = new AtomicLong(0);
@ -94,12 +96,14 @@ public class LoadQueuePeon
String basePath, String basePath,
ObjectMapper jsonMapper, ObjectMapper jsonMapper,
ScheduledExecutorService zkWritingExecutor, ScheduledExecutorService zkWritingExecutor,
ExecutorService callbackExecutor,
DruidCoordinatorConfig config DruidCoordinatorConfig config
) )
{ {
this.curator = curator; this.curator = curator;
this.basePath = basePath; this.basePath = basePath;
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.callBackExecutor = callbackExecutor;
this.zkWritingExecutor = zkWritingExecutor; this.zkWritingExecutor = zkWritingExecutor;
this.config = config; this.config = config;
} }
@ -333,10 +337,20 @@ public class LoadQueuePeon
default: default:
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
callBackExecutor.execute(
new Runnable()
{
@Override
public void run()
{
currentlyProcessing.executeCallbacks(); currentlyProcessing.executeCallbacks();
currentlyProcessing = null; currentlyProcessing = null;
} }
} }
);
}
}
public void stop() public void stop()
{ {

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
/** /**
@ -33,6 +34,7 @@ public class LoadQueueTaskMaster
private final CuratorFramework curator; private final CuratorFramework curator;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final ScheduledExecutorService peonExec; private final ScheduledExecutorService peonExec;
private final ExecutorService callbackExec;
private final DruidCoordinatorConfig config; private final DruidCoordinatorConfig config;
@Inject @Inject
@ -40,17 +42,19 @@ public class LoadQueueTaskMaster
CuratorFramework curator, CuratorFramework curator,
ObjectMapper jsonMapper, ObjectMapper jsonMapper,
ScheduledExecutorService peonExec, ScheduledExecutorService peonExec,
ExecutorService callbackExec,
DruidCoordinatorConfig config DruidCoordinatorConfig config
) )
{ {
this.curator = curator; this.curator = curator;
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.peonExec = peonExec; this.peonExec = peonExec;
this.callbackExec = callbackExec;
this.config = config; this.config = config;
} }
public LoadQueuePeon giveMePeon(String basePath) public LoadQueuePeon giveMePeon(String basePath)
{ {
return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec, config); return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec, callbackExec, config);
} }
} }

View File

@ -29,7 +29,7 @@ public class LoadQueuePeonTester extends LoadQueuePeon
public LoadQueuePeonTester() public LoadQueuePeonTester()
{ {
super(null, null, null, null, null); super(null, null, null, null, null, null);
} }
@Override @Override

View File

@ -61,6 +61,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import java.util.List; import java.util.List;
import java.util.concurrent.Executors;
/** /**
*/ */
@ -128,10 +129,16 @@ public class CliCoordinator extends ServerRunnable
@Provides @Provides
@LazySingleton @LazySingleton
public LoadQueueTaskMaster getLoadQueueTaskMaster( public LoadQueueTaskMaster getLoadQueueTaskMaster(
CuratorFramework curator, ObjectMapper jsonMapper, ScheduledExecutorFactory factory, DruidCoordinatorConfig config CuratorFramework curator,
ObjectMapper jsonMapper,
ScheduledExecutorFactory factory,
DruidCoordinatorConfig config
) )
{ {
return new LoadQueueTaskMaster(curator, jsonMapper, factory.create(1, "Master-PeonExec--%d"), config); return new LoadQueueTaskMaster(
curator, jsonMapper, factory.create(1, "Master-PeonExec--%d"),
Executors.newSingleThreadExecutor(), config
);
} }
} }
); );