fix task status issues on zk outages

docs

review comments

fix test

review comments

Review comments

fix compilation

fix typo
This commit is contained in:
nishant 2015-06-02 23:03:37 +05:30
parent 78d468700b
commit e9afec4a2b
12 changed files with 369 additions and 59 deletions

View File

@ -70,6 +70,7 @@ The following configs only apply if the overlord is running in remote mode:
|`druid.indexer.runner.minWorkerVersion`|The minimum middle manager version to send tasks to. |"0"| |`druid.indexer.runner.minWorkerVersion`|The minimum middle manager version to send tasks to. |"0"|
|`druid.indexer.runner.compressZnodes`|Indicates whether or not the overlord should expect middle managers to compress Znodes.|true| |`druid.indexer.runner.compressZnodes`|Indicates whether or not the overlord should expect middle managers to compress Znodes.|true|
|`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper.|524288| |`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper.|524288|
|`druid.indexer.runner.taskCleanupTimeout`|How long to wait before failing a task after a middle manager is disconnected from Zookeeper.|PT15M|
There are additional configs for autoscaling (if it is enabled): There are additional configs for autoscaling (if it is enabled):

View File

@ -82,6 +82,8 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
@ -95,7 +97,8 @@ import java.util.concurrent.TimeUnit;
* fail. The RemoteTaskRunner depends on another component to create additional worker resources. * fail. The RemoteTaskRunner depends on another component to create additional worker resources.
* For example, {@link io.druid.indexing.overlord.autoscaling.ResourceManagementScheduler} can take care of these duties. * For example, {@link io.druid.indexing.overlord.autoscaling.ResourceManagementScheduler} can take care of these duties.
* <p/> * <p/>
* If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will fail any tasks associated with the worker. * If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will fail any tasks associated with the
* worker after waiting for RemoteTaskRunnerConfig.taskCleanupTimeout for the worker to show up.
* <p/> * <p/>
* The RemoteTaskRunner uses ZK for job management and assignment and http for IPC messages. * The RemoteTaskRunner uses ZK for job management and assignment and http for IPC messages.
*/ */
@ -135,6 +138,10 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
private volatile boolean started = false; private volatile boolean started = false;
private final ScheduledExecutorService cleanupExec;
private final ConcurrentMap<String, ScheduledFuture> removedWorkerCleanups = new ConcurrentHashMap<>();
public RemoteTaskRunner( public RemoteTaskRunner(
ObjectMapper jsonMapper, ObjectMapper jsonMapper,
RemoteTaskRunnerConfig config, RemoteTaskRunnerConfig config,
@ -142,7 +149,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
CuratorFramework cf, CuratorFramework cf,
PathChildrenCacheFactory pathChildrenCacheFactory, PathChildrenCacheFactory pathChildrenCacheFactory,
HttpClient httpClient, HttpClient httpClient,
Supplier<WorkerBehaviorConfig> workerConfigRef Supplier<WorkerBehaviorConfig> workerConfigRef,
ScheduledExecutorService cleanupExec
) )
{ {
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
@ -153,6 +161,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
this.workerPathCache = pathChildrenCacheFactory.make(cf, indexerZkConfig.getAnnouncementsPath()); this.workerPathCache = pathChildrenCacheFactory.make(cf, indexerZkConfig.getAnnouncementsPath());
this.httpClient = httpClient; this.httpClient = httpClient;
this.workerConfigRef = workerConfigRef; this.workerConfigRef = workerConfigRef;
this.cleanupExec = cleanupExec;
} }
@LifecycleStart @LifecycleStart
@ -239,6 +248,18 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
waitingForMonitor.wait(); waitingForMonitor.wait();
} }
} }
// Schedule cleanup for task status of the workers that might have disconnected while overlord was not running
for (String worker : cf.getChildren().forPath(indexerZkConfig.getStatusPath())) {
if (!zkWorkers.containsKey(worker)
&& cf.checkExists().forPath(JOINER.join(indexerZkConfig.getAnnouncementsPath(), worker)) == null) {
scheduleTasksCleanupForWorker(
worker,
cf.getChildren()
.forPath(JOINER.join(indexerZkConfig.getStatusPath(), worker))
);
}
}
started = true; started = true;
} }
catch (Exception e) { catch (Exception e) {
@ -675,6 +696,12 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
log.info("Worker[%s] reportin' for duty!", worker.getHost()); log.info("Worker[%s] reportin' for duty!", worker.getHost());
try { try {
ScheduledFuture previousCleanup = removedWorkerCleanups.remove(worker.getHost());
if (previousCleanup != null) {
log.info("Cancelling Worker[%s] scheduled task cleanup", worker.getHost());
previousCleanup.cancel(false);
}
final String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker.getHost()); final String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker.getHost());
final PathChildrenCache statusCache = pathChildrenCacheFactory.make(cf, workerStatusPath); final PathChildrenCache statusCache = pathChildrenCacheFactory.make(cf, workerStatusPath);
final SettableFuture<ZkWorker> retVal = SettableFuture.create(); final SettableFuture<ZkWorker> retVal = SettableFuture.create();
@ -819,22 +846,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
final ZkWorker zkWorker = zkWorkers.get(worker.getHost()); final ZkWorker zkWorker = zkWorkers.get(worker.getHost());
if (zkWorker != null) { if (zkWorker != null) {
try { try {
scheduleTasksCleanupForWorker(worker.getHost(), getAssignedTasks(worker));
List<String> tasksToFail = getAssignedTasks(worker);
for (String assignedTask : tasksToFail) {
RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.remove(assignedTask);
if (taskRunnerWorkItem != null) {
String taskPath = JOINER.join(indexerZkConfig.getTasksPath(), worker.getHost(), assignedTask);
if (cf.checkExists().forPath(taskPath) != null) {
cf.delete().guaranteed().forPath(taskPath);
}
log.info("Failing task[%s]", assignedTask);
taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTaskId()));
} else {
log.warn("RemoteTaskRunner has no knowledge of task[%s]", assignedTask);
}
}
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);
@ -852,6 +864,53 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
lazyWorkers.remove(worker.getHost()); lazyWorkers.remove(worker.getHost());
} }
private void scheduleTasksCleanupForWorker(final String worker, final List<String> tasksToFail)
{
removedWorkerCleanups.put(
worker, cleanupExec.schedule(
new Runnable()
{
@Override
public void run()
{
log.info("Running scheduled cleanup for Worker[%s]", worker);
try {
for (String assignedTask : tasksToFail) {
String taskPath = JOINER.join(indexerZkConfig.getTasksPath(), worker, assignedTask);
String statusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker, assignedTask);
try {
if (cf.checkExists().forPath(taskPath) != null) {
cf.delete().guaranteed().forPath(taskPath);
}
if (cf.checkExists().forPath(statusPath) != null) {
cf.delete().guaranteed().forPath(statusPath);
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
log.info("Failing task[%s]", assignedTask);
RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.remove(assignedTask);
if (taskRunnerWorkItem != null) {
taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTaskId()));
} else {
log.warn("RemoteTaskRunner has no knowledge of task[%s]", assignedTask);
}
}
}
finally {
removedWorkerCleanups.remove(worker);
}
}
},
config.getTaskCleanupTimeout().toStandardDuration().getMillis(),
TimeUnit.MILLISECONDS
)
);
}
private void taskComplete( private void taskComplete(
RemoteTaskRunnerWorkItem taskRunnerWorkItem, RemoteTaskRunnerWorkItem taskRunnerWorkItem,
ZkWorker zkWorker, ZkWorker zkWorker,
@ -881,7 +940,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
taskRunnerWorkItem.setResult(taskStatus); taskRunnerWorkItem.setResult(taskStatus);
} }
public List<ZkWorker> markWokersLazy(Predicate<ZkWorker> isLazyWorker, int maxWorkers) public List<ZkWorker> markWorkersLazy(Predicate<ZkWorker> isLazyWorker, int maxWorkers)
{ {
// status lock is used to prevent any tasks being assigned to the worker while we mark it lazy // status lock is used to prevent any tasks being assigned to the worker while we mark it lazy
synchronized (statusLock) { synchronized (statusLock) {
@ -929,8 +988,14 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
return assignedTasks; return assignedTasks;
} }
// Used for tests
public List<ZkWorker> getLazyWorkers() public List<ZkWorker> getLazyWorkers()
{ {
return ImmutableList.copyOf(lazyWorkers.values()); return ImmutableList.copyOf(lazyWorkers.values());
} }
ConcurrentMap<String, ScheduledFuture> getRemovedWorkerCleanups()
{
return removedWorkerCleanups;
}
} }

View File

@ -20,6 +20,7 @@ package io.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import io.druid.curator.cache.SimplePathChildrenCacheFactory; import io.druid.curator.cache.SimplePathChildrenCacheFactory;
import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Global;
@ -28,6 +29,8 @@ import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.server.initialization.IndexerZkConfig; import io.druid.server.initialization.IndexerZkConfig;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import java.util.concurrent.ScheduledExecutorService;
/** /**
*/ */
public class RemoteTaskRunnerFactory implements TaskRunnerFactory public class RemoteTaskRunnerFactory implements TaskRunnerFactory
@ -38,6 +41,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final HttpClient httpClient; private final HttpClient httpClient;
private final Supplier<WorkerBehaviorConfig> workerConfigRef; private final Supplier<WorkerBehaviorConfig> workerConfigRef;
private final ScheduledExecutorService cleanupExec;
@Inject @Inject
public RemoteTaskRunnerFactory( public RemoteTaskRunnerFactory(
@ -46,7 +50,8 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
final IndexerZkConfig zkPaths, final IndexerZkConfig zkPaths,
final ObjectMapper jsonMapper, final ObjectMapper jsonMapper,
@Global final HttpClient httpClient, @Global final HttpClient httpClient,
final Supplier<WorkerBehaviorConfig> workerConfigRef final Supplier<WorkerBehaviorConfig> workerConfigRef,
ScheduledExecutorFactory factory
) )
{ {
this.curator = curator; this.curator = curator;
@ -55,6 +60,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.httpClient = httpClient; this.httpClient = httpClient;
this.workerConfigRef = workerConfigRef; this.workerConfigRef = workerConfigRef;
this.cleanupExec = factory.create(1,"RemoteTaskRunner-Scheduled-Cleanup--%d");
} }
@Override @Override
@ -70,7 +76,8 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
.withCompressed(true) .withCompressed(true)
.build(), .build(),
httpClient, httpClient,
workerConfigRef workerConfigRef,
cleanupExec
); );
} }
} }

View File

@ -187,7 +187,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config); final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config);
final List<String> laziestWorkerIps = final List<String> laziestWorkerIps =
Lists.transform( Lists.transform(
runner.markWokersLazy(isLazyWorker, excessWorkers), runner.markWorkersLazy(isLazyWorker, excessWorkers),
new Function<ZkWorker, String>() new Function<ZkWorker, String>()
{ {
@Override @Override

View File

@ -31,6 +31,10 @@ public class RemoteTaskRunnerConfig
@NotNull @NotNull
private Period taskAssignmentTimeout = new Period("PT5M"); private Period taskAssignmentTimeout = new Period("PT5M");
@JsonProperty
@NotNull
private Period taskCleanupTimeout = new Period("PT15M");
@JsonProperty @JsonProperty
private String minWorkerVersion = "0"; private String minWorkerVersion = "0";
@ -43,6 +47,11 @@ public class RemoteTaskRunnerConfig
return taskAssignmentTimeout; return taskAssignmentTimeout;
} }
@JsonProperty
public Period getTaskCleanupTimeout(){
return taskCleanupTimeout;
}
public String getMinWorkerVersion() public String getMinWorkerVersion()
{ {
return minWorkerVersion; return minWorkerVersion;

View File

@ -34,8 +34,13 @@ public class TaskAnnouncement
public static TaskAnnouncement create(Task task, TaskStatus status) public static TaskAnnouncement create(Task task, TaskStatus status)
{ {
Preconditions.checkArgument(status.getId().equals(task.getId()), "task id == status id"); return create(task.getId(), task.getTaskResource(), status);
return new TaskAnnouncement(null, null, status, task.getTaskResource()); }
public static TaskAnnouncement create(String taskId, TaskResource resource, TaskStatus status)
{
Preconditions.checkArgument(status.getId().equals(taskId), "task id == status id");
return new TaskAnnouncement(null, null, status, resource);
} }
@JsonCreator @JsonCreator

View File

@ -18,9 +18,11 @@
package io.druid.indexing.worker; package io.druid.indexing.worker;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.ISE; import com.metamx.common.ISE;
@ -34,7 +36,9 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
/** /**
* The CuratorCoordinator provides methods to use Curator. Persistent ZK paths are created on {@link #start()}. * The CuratorCoordinator provides methods to use Curator. Persistent ZK paths are created on {@link #start()}.
@ -190,7 +194,7 @@ public class WorkerCuratorCoordinator
} }
} }
public void announceTastAnnouncement(TaskAnnouncement announcement) public void announceTaskAnnouncement(TaskAnnouncement announcement)
{ {
synchronized (lock) { synchronized (lock) {
if (!started) { if (!started) {
@ -206,7 +210,7 @@ public class WorkerCuratorCoordinator
} }
curatorFramework.create() curatorFramework.create()
.withMode(CreateMode.EPHEMERAL) .withMode(CreateMode.PERSISTENT)
.forPath( .forPath(
getStatusPathForId(announcement.getTaskStatus().getId()), rawBytes getStatusPathForId(announcement.getTaskStatus().getId()), rawBytes
); );
@ -226,7 +230,7 @@ public class WorkerCuratorCoordinator
try { try {
if (curatorFramework.checkExists().forPath(getStatusPathForId(announcement.getTaskStatus().getId())) == null) { if (curatorFramework.checkExists().forPath(getStatusPathForId(announcement.getTaskStatus().getId())) == null) {
announceTastAnnouncement(announcement); announceTaskAnnouncement(announcement);
return; return;
} }
byte[] rawBytes = jsonMapper.writeValueAsBytes(announcement); byte[] rawBytes = jsonMapper.writeValueAsBytes(announcement);
@ -247,6 +251,30 @@ public class WorkerCuratorCoordinator
} }
} }
public List<TaskAnnouncement> getAnnouncements(){
try {
return Lists.transform(
curatorFramework.getChildren().forPath(getStatusPathForWorker()), new Function<String, TaskAnnouncement>()
{
@Nullable
@Override
public TaskAnnouncement apply(String input)
{
try {
return jsonMapper.readValue(curatorFramework.getData().forPath(getStatusPathForId(input)),TaskAnnouncement.class);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
public void updateWorkerAnnouncement(Worker newWorker) throws Exception public void updateWorkerAnnouncement(Worker newWorker) throws Exception
{ {
synchronized (lock) { synchronized (lock) {

View File

@ -84,6 +84,19 @@ public class WorkerTaskMonitor
public void start() public void start()
{ {
try { try {
// cleanup any old running task announcements which are invalid after restart
for (TaskAnnouncement announcement : workerCuratorCoordinator.getAnnouncements()){
if(announcement.getTaskStatus().isRunnable()) {
workerCuratorCoordinator.updateAnnouncement(
TaskAnnouncement.create(
announcement.getTaskId(),
announcement.getTaskResource(),
TaskStatus.failure(announcement.getTaskId())
)
);
}
}
pathChildrenCache.getListenable().addListener( pathChildrenCache.getListenable().addListener(
new PathChildrenCacheListener() new PathChildrenCacheListener()
{ {
@ -122,7 +135,7 @@ public class WorkerTaskMonitor
TaskStatus taskStatus; TaskStatus taskStatus;
try { try {
workerCuratorCoordinator.unannounceTask(task.getId()); workerCuratorCoordinator.unannounceTask(task.getId());
workerCuratorCoordinator.announceTastAnnouncement( workerCuratorCoordinator.announceTaskAnnouncement(
TaskAnnouncement.create( TaskAnnouncement.create(
task, task,
TaskStatus.running(task.getId()) TaskStatus.running(task.getId())

View File

@ -26,6 +26,9 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import io.druid.common.guava.DSuppliers; import io.druid.common.guava.DSuppliers;
@ -51,6 +54,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster; import org.apache.curator.test.TestingCluster;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -59,6 +63,7 @@ import org.junit.Test;
import java.util.Collection; import java.util.Collection;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -79,6 +84,7 @@ public class RemoteTaskRunnerTest
private TestMergeTask task; private TestMergeTask task;
private Worker worker; private Worker worker;
private RemoteTaskRunnerConfig config;
@Before @Before
public void setUp() throws Exception public void setUp() throws Exception
@ -361,6 +367,18 @@ public class RemoteTaskRunnerTest
TaskStatus status = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); TaskStatus status = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
Assert.assertEquals(TaskStatus.Status.FAILED, status.getStatusCode()); Assert.assertEquals(TaskStatus.Status.FAILED, status.getStatusCode());
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
return remoteTaskRunner.getRemovedWorkerCleanups().isEmpty();
}
}
)
);
} }
@Test @Test
@ -389,12 +407,11 @@ public class RemoteTaskRunnerTest
private void doSetup() throws Exception private void doSetup() throws Exception
{ {
makeWorker(); makeWorker();
makeRemoteTaskRunner(); makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(new Period("PT1S")));
} }
private void makeRemoteTaskRunner() throws Exception private void makeRemoteTaskRunner(RemoteTaskRunnerConfig config) throws Exception
{ {
RemoteTaskRunnerConfig config = new TestRemoteTaskRunnerConfig();
remoteTaskRunner = new RemoteTaskRunner( remoteTaskRunner = new RemoteTaskRunner(
jsonMapper, jsonMapper,
config, config,
@ -411,7 +428,8 @@ public class RemoteTaskRunnerTest
cf, cf,
new SimplePathChildrenCacheFactory.Builder().build(), new SimplePathChildrenCacheFactory.Builder().build(),
null, null,
DSuppliers.of(new AtomicReference<>(WorkerBehaviorConfig.defaultConfig())) DSuppliers.of(new AtomicReference<>(WorkerBehaviorConfig.defaultConfig())),
ScheduledExecutors.fixed(1, "Remote-Task-Runner-Cleanup--%d")
); );
remoteTaskRunner.start(); remoteTaskRunner.start();
@ -505,7 +523,7 @@ public class RemoteTaskRunnerTest
remoteTaskRunner.run(task); remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId())); Assert.assertTrue(taskAnnounced(task.getId()));
mockWorkerRunningTask(task); mockWorkerRunningTask(task);
Collection<ZkWorker> lazyworkers = remoteTaskRunner.markWokersLazy( Collection<ZkWorker> lazyworkers = remoteTaskRunner.markWorkersLazy(
new Predicate<ZkWorker>() new Predicate<ZkWorker>()
{ {
@Override @Override
@ -526,7 +544,7 @@ public class RemoteTaskRunnerTest
doSetup(); doSetup();
remoteTaskRunner.run(task); remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId())); Assert.assertTrue(taskAnnounced(task.getId()));
Collection<ZkWorker> lazyworkers = remoteTaskRunner.markWokersLazy( Collection<ZkWorker> lazyworkers = remoteTaskRunner.markWorkersLazy(
new Predicate<ZkWorker>() new Predicate<ZkWorker>()
{ {
@Override @Override
@ -545,7 +563,7 @@ public class RemoteTaskRunnerTest
public void testFindLazyWorkerNotRunningAnyTask() throws Exception public void testFindLazyWorkerNotRunningAnyTask() throws Exception
{ {
doSetup(); doSetup();
Collection<ZkWorker> lazyworkers = remoteTaskRunner.markWokersLazy( Collection<ZkWorker> lazyworkers = remoteTaskRunner.markWorkersLazy(
new Predicate<ZkWorker>() new Predicate<ZkWorker>()
{ {
@Override @Override
@ -559,4 +577,54 @@ public class RemoteTaskRunnerTest
Assert.assertEquals(1, remoteTaskRunner.getLazyWorkers().size()); Assert.assertEquals(1, remoteTaskRunner.getLazyWorkers().size());
} }
@Test
public void testWorkerZKReconnect() throws Exception
{
makeWorker();
makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(new Period("PT5M")));
Future<TaskStatus> future = remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId()));
mockWorkerRunningTask(task);
Assert.assertTrue(workerRunningTask(task.getId()));
byte[] bytes = cf.getData().forPath(announcementsPath);
cf.delete().forPath(announcementsPath);
// worker task cleanup scheduled
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
return remoteTaskRunner.getRemovedWorkerCleanups().containsKey(worker.getHost());
}
}
)
);
// Worker got reconnected
cf.create().forPath(announcementsPath, bytes);
// worker task cleanup should get cancelled and removed
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
return !remoteTaskRunner.getRemovedWorkerCleanups().containsKey(worker.getHost());
}
}
)
);
mockWorkerCompleteSuccessfulTask(task);
TaskStatus status = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
Assert.assertEquals(status.getStatusCode(), TaskStatus.Status.SUCCESS);
Assert.assertEquals(TaskStatus.Status.SUCCESS, status.getStatusCode());
}
} }

View File

@ -24,10 +24,23 @@ import org.joda.time.Period;
*/ */
public class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig public class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig
{ {
private final Period timeout;
public TestRemoteTaskRunnerConfig(Period timeout)
{
this.timeout = timeout;
}
@Override @Override
public Period getTaskAssignmentTimeout() public Period getTaskAssignmentTimeout()
{ {
return new Period("PT1S"); return timeout;
}
@Override
public Period getTaskCleanupTimeout()
{
return timeout;
} }
@Override @Override

View File

@ -275,7 +275,7 @@ public class SimpleResourceManagementStrategyTest
new TestZkWorker(testTask) new TestZkWorker(testTask)
) )
).times(2); ).times(2);
EasyMock.expect(runner.markWokersLazy((Predicate<ZkWorker>) EasyMock.anyObject(), EasyMock.anyInt())).andReturn( EasyMock.expect(runner.markWorkersLazy((Predicate<ZkWorker>) EasyMock.anyObject(), EasyMock.anyInt())).andReturn(
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
new TestZkWorker(testTask) new TestZkWorker(testTask)
) )
@ -318,7 +318,7 @@ public class SimpleResourceManagementStrategyTest
) )
).times(2); ).times(2);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<ZkWorker>newArrayList()).times(2); EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<ZkWorker>newArrayList()).times(2);
EasyMock.expect(runner.markWokersLazy((Predicate<ZkWorker>) EasyMock.anyObject(), EasyMock.anyInt())).andReturn( EasyMock.expect(runner.markWorkersLazy((Predicate<ZkWorker>) EasyMock.anyObject(), EasyMock.anyInt())).andReturn(
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
new TestZkWorker(testTask) new TestZkWorker(testTask)
) )
@ -368,7 +368,7 @@ public class SimpleResourceManagementStrategyTest
) )
).times(2); ).times(2);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<ZkWorker>newArrayList()); EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<ZkWorker>newArrayList());
EasyMock.expect(runner.markWokersLazy((Predicate<ZkWorker>) EasyMock.anyObject(), EasyMock.anyInt())).andReturn( EasyMock.expect(runner.markWorkersLazy((Predicate<ZkWorker>) EasyMock.anyObject(), EasyMock.anyInt())).andReturn(
Collections.<ZkWorker>emptyList() Collections.<ZkWorker>emptyList()
); );
EasyMock.replay(runner); EasyMock.replay(runner);
@ -412,7 +412,7 @@ public class SimpleResourceManagementStrategyTest
) )
).times(3); ).times(3);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<ZkWorker>newArrayList()); EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<ZkWorker>newArrayList());
EasyMock.expect(runner.markWokersLazy((Predicate<ZkWorker>) EasyMock.anyObject(), EasyMock.anyInt())).andReturn( EasyMock.expect(runner.markWorkersLazy((Predicate<ZkWorker>) EasyMock.anyObject(), EasyMock.anyInt())).andReturn(
Collections.<ZkWorker>emptyList() Collections.<ZkWorker>emptyList()
); );
EasyMock.replay(runner); EasyMock.replay(runner);

View File

@ -17,6 +17,7 @@
package io.druid.indexing.worker; package io.druid.indexing.worker;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
@ -45,6 +46,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster; import org.apache.curator.test.TestingCluster;
import org.joda.time.Period;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -96,30 +98,40 @@ public class WorkerTaskMonitorTest
workerCuratorCoordinator = new WorkerCuratorCoordinator( workerCuratorCoordinator = new WorkerCuratorCoordinator(
jsonMapper, jsonMapper,
new IndexerZkConfig( new IndexerZkConfig(
new ZkPathsConfig() new ZkPathsConfig()
{ {
@Override @Override
public String getBase() public String getBase()
{ {
return basePath; return basePath;
} }
},null,null,null,null,null), }, null, null, null, null, null
new TestRemoteTaskRunnerConfig(), ),
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
cf, cf,
worker worker
); );
workerCuratorCoordinator.start(); workerCuratorCoordinator.start();
final File tmp = Files.createTempDir();
// Start a task monitor // Start a task monitor
workerTaskMonitor = new WorkerTaskMonitor( workerTaskMonitor = createTaskMonitor();
jsonMapper.registerSubtypes(new NamedType(TestMergeTask.class, "test"));
jsonMapper.registerSubtypes(new NamedType(TestRealtimeTask.class, "test_realtime"));
workerTaskMonitor.start();
task = TestMergeTask.createDummyTask("test");
}
private WorkerTaskMonitor createTaskMonitor()
{
return new WorkerTaskMonitor(
jsonMapper, jsonMapper,
cf, cf,
workerCuratorCoordinator, workerCuratorCoordinator,
new ThreadPoolTaskRunner( new ThreadPoolTaskRunner(
new TaskToolboxFactory( new TaskToolboxFactory(
new TaskConfig(tmp.toString(), null, null, 0, null), new TaskConfig(Files.createTempDir().toString(), null, null, 0, null),
null, null, null, null, null, null, null, null, null, null, null, new SegmentLoaderFactory( null, null, null, null, null, null, null, null, null, null, null, new SegmentLoaderFactory(
new SegmentLoaderLocalCacheManager( new SegmentLoaderLocalCacheManager(
null, null,
@ -131,18 +143,14 @@ public class WorkerTaskMonitorTest
return Lists.newArrayList(); return Lists.newArrayList();
} }
} }
, jsonMapper) , jsonMapper
)
), jsonMapper ), jsonMapper
), ),
null null
), ),
new WorkerConfig().setCapacity(1) new WorkerConfig().setCapacity(1)
); );
jsonMapper.registerSubtypes(new NamedType(TestMergeTask.class, "test"));
jsonMapper.registerSubtypes(new NamedType(TestRealtimeTask.class, "test_realtime"));
workerTaskMonitor.start();
task = TestMergeTask.createDummyTask("test");
} }
@After @After
@ -178,7 +186,6 @@ public class WorkerTaskMonitorTest
) )
); );
Assert.assertTrue( Assert.assertTrue(
TestUtils.conditionValid( TestUtils.conditionValid(
new IndexingServiceCondition() new IndexingServiceCondition()
@ -204,4 +211,98 @@ public class WorkerTaskMonitorTest
Assert.assertEquals(task.getId(), taskAnnouncement.getTaskStatus().getId()); Assert.assertEquals(task.getId(), taskAnnouncement.getTaskStatus().getId());
Assert.assertEquals(TaskStatus.Status.RUNNING, taskAnnouncement.getTaskStatus().getStatusCode()); Assert.assertEquals(TaskStatus.Status.RUNNING, taskAnnouncement.getTaskStatus().getStatusCode());
} }
@Test
public void testGetAnnouncements() throws Exception
{
cf.create()
.creatingParentsIfNeeded()
.forPath(joiner.join(tasksPath, task.getId()), jsonMapper.writeValueAsBytes(task));
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
try {
return cf.checkExists().forPath(joiner.join(statusPath, task.getId())) != null;
}
catch (Exception e) {
return false;
}
}
}
)
);
List<TaskAnnouncement> announcements = workerCuratorCoordinator.getAnnouncements();
Assert.assertEquals(1, announcements.size());
Assert.assertEquals(task.getId(), announcements.get(0).getTaskStatus().getId());
Assert.assertEquals(TaskStatus.Status.RUNNING, announcements.get(0).getTaskStatus().getStatusCode());
}
@Test
public void testRestartCleansOldStatus() throws Exception
{
cf.create()
.creatingParentsIfNeeded()
.forPath(joiner.join(tasksPath, task.getId()), jsonMapper.writeValueAsBytes(task));
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
try {
return cf.checkExists().forPath(joiner.join(statusPath, task.getId())) != null;
}
catch (Exception e) {
return false;
}
}
}
)
);
// simulate node restart
workerTaskMonitor.stop();
workerTaskMonitor = createTaskMonitor();
workerTaskMonitor.start();
List<TaskAnnouncement> announcements = workerCuratorCoordinator.getAnnouncements();
Assert.assertEquals(1, announcements.size());
Assert.assertEquals(task.getId(), announcements.get(0).getTaskStatus().getId());
Assert.assertEquals(TaskStatus.Status.FAILED, announcements.get(0).getTaskStatus().getStatusCode());
}
@Test
public void testStatusAnnouncementsArePersistent() throws Exception
{
cf.create()
.creatingParentsIfNeeded()
.forPath(joiner.join(tasksPath, task.getId()), jsonMapper.writeValueAsBytes(task));
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
try {
return cf.checkExists().forPath(joiner.join(statusPath, task.getId())) != null;
}
catch (Exception e) {
return false;
}
}
}
)
);
// ephermal owner is 0 is created node is PERSISTENT
Assert.assertEquals(0, cf.checkExists().forPath(joiner.join(statusPath, task.getId())).getEphemeralOwner());
}
} }