Merge pull request #1417 from metamx/rtr-fixes

fix behaviour of middlemanagers around ZK disconnects , fixes #709
This commit is contained in:
Xavier Léauté 2015-06-10 14:42:49 -07:00
commit 6763e3780a
12 changed files with 369 additions and 59 deletions

View File

@ -70,6 +70,7 @@ The following configs only apply if the overlord is running in remote mode:
|`druid.indexer.runner.minWorkerVersion`|The minimum middle manager version to send tasks to. |"0"|
|`druid.indexer.runner.compressZnodes`|Indicates whether or not the overlord should expect middle managers to compress Znodes.|true|
|`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper.|524288|
|`druid.indexer.runner.taskCleanupTimeout`|How long to wait before failing a task after a middle manager is disconnected from Zookeeper.|PT15M|
There are additional configs for autoscaling (if it is enabled):

View File

@ -82,6 +82,8 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
@ -95,7 +97,8 @@ import java.util.concurrent.TimeUnit;
* fail. The RemoteTaskRunner depends on another component to create additional worker resources.
* For example, {@link io.druid.indexing.overlord.autoscaling.ResourceManagementScheduler} can take care of these duties.
* <p/>
* If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will fail any tasks associated with the worker.
* If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will fail any tasks associated with the
* worker after waiting for RemoteTaskRunnerConfig.taskCleanupTimeout for the worker to show up.
* <p/>
* The RemoteTaskRunner uses ZK for job management and assignment and http for IPC messages.
*/
@ -135,6 +138,10 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
private volatile boolean started = false;
private final ScheduledExecutorService cleanupExec;
private final ConcurrentMap<String, ScheduledFuture> removedWorkerCleanups = new ConcurrentHashMap<>();
public RemoteTaskRunner(
ObjectMapper jsonMapper,
RemoteTaskRunnerConfig config,
@ -142,7 +149,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
CuratorFramework cf,
PathChildrenCacheFactory pathChildrenCacheFactory,
HttpClient httpClient,
Supplier<WorkerBehaviorConfig> workerConfigRef
Supplier<WorkerBehaviorConfig> workerConfigRef,
ScheduledExecutorService cleanupExec
)
{
this.jsonMapper = jsonMapper;
@ -153,6 +161,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
this.workerPathCache = pathChildrenCacheFactory.make(cf, indexerZkConfig.getAnnouncementsPath());
this.httpClient = httpClient;
this.workerConfigRef = workerConfigRef;
this.cleanupExec = cleanupExec;
}
@LifecycleStart
@ -239,6 +248,18 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
waitingForMonitor.wait();
}
}
// Schedule cleanup for task status of the workers that might have disconnected while overlord was not running
for (String worker : cf.getChildren().forPath(indexerZkConfig.getStatusPath())) {
if (!zkWorkers.containsKey(worker)
&& cf.checkExists().forPath(JOINER.join(indexerZkConfig.getAnnouncementsPath(), worker)) == null) {
scheduleTasksCleanupForWorker(
worker,
cf.getChildren()
.forPath(JOINER.join(indexerZkConfig.getStatusPath(), worker))
);
}
}
started = true;
}
catch (Exception e) {
@ -675,6 +696,12 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
log.info("Worker[%s] reportin' for duty!", worker.getHost());
try {
ScheduledFuture previousCleanup = removedWorkerCleanups.remove(worker.getHost());
if (previousCleanup != null) {
log.info("Cancelling Worker[%s] scheduled task cleanup", worker.getHost());
previousCleanup.cancel(false);
}
final String workerStatusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker.getHost());
final PathChildrenCache statusCache = pathChildrenCacheFactory.make(cf, workerStatusPath);
final SettableFuture<ZkWorker> retVal = SettableFuture.create();
@ -819,22 +846,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
final ZkWorker zkWorker = zkWorkers.get(worker.getHost());
if (zkWorker != null) {
try {
List<String> tasksToFail = getAssignedTasks(worker);
for (String assignedTask : tasksToFail) {
RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.remove(assignedTask);
if (taskRunnerWorkItem != null) {
String taskPath = JOINER.join(indexerZkConfig.getTasksPath(), worker.getHost(), assignedTask);
if (cf.checkExists().forPath(taskPath) != null) {
cf.delete().guaranteed().forPath(taskPath);
}
log.info("Failing task[%s]", assignedTask);
taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTaskId()));
} else {
log.warn("RemoteTaskRunner has no knowledge of task[%s]", assignedTask);
}
}
scheduleTasksCleanupForWorker(worker.getHost(), getAssignedTasks(worker));
}
catch (Exception e) {
throw Throwables.propagate(e);
@ -852,6 +864,53 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
lazyWorkers.remove(worker.getHost());
}
private void scheduleTasksCleanupForWorker(final String worker, final List<String> tasksToFail)
{
removedWorkerCleanups.put(
worker, cleanupExec.schedule(
new Runnable()
{
@Override
public void run()
{
log.info("Running scheduled cleanup for Worker[%s]", worker);
try {
for (String assignedTask : tasksToFail) {
String taskPath = JOINER.join(indexerZkConfig.getTasksPath(), worker, assignedTask);
String statusPath = JOINER.join(indexerZkConfig.getStatusPath(), worker, assignedTask);
try {
if (cf.checkExists().forPath(taskPath) != null) {
cf.delete().guaranteed().forPath(taskPath);
}
if (cf.checkExists().forPath(statusPath) != null) {
cf.delete().guaranteed().forPath(statusPath);
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
log.info("Failing task[%s]", assignedTask);
RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.remove(assignedTask);
if (taskRunnerWorkItem != null) {
taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTaskId()));
} else {
log.warn("RemoteTaskRunner has no knowledge of task[%s]", assignedTask);
}
}
}
finally {
removedWorkerCleanups.remove(worker);
}
}
},
config.getTaskCleanupTimeout().toStandardDuration().getMillis(),
TimeUnit.MILLISECONDS
)
);
}
private void taskComplete(
RemoteTaskRunnerWorkItem taskRunnerWorkItem,
ZkWorker zkWorker,
@ -881,7 +940,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
taskRunnerWorkItem.setResult(taskStatus);
}
public List<ZkWorker> markWokersLazy(Predicate<ZkWorker> isLazyWorker, int maxWorkers)
public List<ZkWorker> markWorkersLazy(Predicate<ZkWorker> isLazyWorker, int maxWorkers)
{
// status lock is used to prevent any tasks being assigned to the worker while we mark it lazy
synchronized (statusLock) {
@ -929,8 +988,14 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
return assignedTasks;
}
// Used for tests
public List<ZkWorker> getLazyWorkers()
{
return ImmutableList.copyOf(lazyWorkers.values());
}
ConcurrentMap<String, ScheduledFuture> getRemovedWorkerCleanups()
{
return removedWorkerCleanups;
}
}

View File

@ -20,6 +20,7 @@ package io.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.http.client.HttpClient;
import io.druid.curator.cache.SimplePathChildrenCacheFactory;
import io.druid.guice.annotations.Global;
@ -28,6 +29,8 @@ import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.server.initialization.IndexerZkConfig;
import org.apache.curator.framework.CuratorFramework;
import java.util.concurrent.ScheduledExecutorService;
/**
*/
public class RemoteTaskRunnerFactory implements TaskRunnerFactory
@ -38,6 +41,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
private final ObjectMapper jsonMapper;
private final HttpClient httpClient;
private final Supplier<WorkerBehaviorConfig> workerConfigRef;
private final ScheduledExecutorService cleanupExec;
@Inject
public RemoteTaskRunnerFactory(
@ -46,7 +50,8 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
final IndexerZkConfig zkPaths,
final ObjectMapper jsonMapper,
@Global final HttpClient httpClient,
final Supplier<WorkerBehaviorConfig> workerConfigRef
final Supplier<WorkerBehaviorConfig> workerConfigRef,
ScheduledExecutorFactory factory
)
{
this.curator = curator;
@ -55,6 +60,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
this.jsonMapper = jsonMapper;
this.httpClient = httpClient;
this.workerConfigRef = workerConfigRef;
this.cleanupExec = factory.create(1,"RemoteTaskRunner-Scheduled-Cleanup--%d");
}
@Override
@ -70,7 +76,8 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
.withCompressed(true)
.build(),
httpClient,
workerConfigRef
workerConfigRef,
cleanupExec
);
}
}

View File

@ -187,7 +187,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
final Predicate<ZkWorker> isLazyWorker = createLazyWorkerPredicate(config);
final List<String> laziestWorkerIps =
Lists.transform(
runner.markWokersLazy(isLazyWorker, excessWorkers),
runner.markWorkersLazy(isLazyWorker, excessWorkers),
new Function<ZkWorker, String>()
{
@Override

View File

@ -31,6 +31,10 @@ public class RemoteTaskRunnerConfig
@NotNull
private Period taskAssignmentTimeout = new Period("PT5M");
@JsonProperty
@NotNull
private Period taskCleanupTimeout = new Period("PT15M");
@JsonProperty
private String minWorkerVersion = "0";
@ -43,6 +47,11 @@ public class RemoteTaskRunnerConfig
return taskAssignmentTimeout;
}
@JsonProperty
public Period getTaskCleanupTimeout(){
return taskCleanupTimeout;
}
public String getMinWorkerVersion()
{
return minWorkerVersion;

View File

@ -34,8 +34,13 @@ public class TaskAnnouncement
public static TaskAnnouncement create(Task task, TaskStatus status)
{
Preconditions.checkArgument(status.getId().equals(task.getId()), "task id == status id");
return new TaskAnnouncement(null, null, status, task.getTaskResource());
return create(task.getId(), task.getTaskResource(), status);
}
public static TaskAnnouncement create(String taskId, TaskResource resource, TaskStatus status)
{
Preconditions.checkArgument(status.getId().equals(taskId), "task id == status id");
return new TaskAnnouncement(null, null, status, resource);
}
@JsonCreator

View File

@ -18,9 +18,11 @@
package io.druid.indexing.worker;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.common.ISE;
@ -34,7 +36,9 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.List;
/**
* The CuratorCoordinator provides methods to use Curator. Persistent ZK paths are created on {@link #start()}.
@ -190,7 +194,7 @@ public class WorkerCuratorCoordinator
}
}
public void announceTastAnnouncement(TaskAnnouncement announcement)
public void announceTaskAnnouncement(TaskAnnouncement announcement)
{
synchronized (lock) {
if (!started) {
@ -206,7 +210,7 @@ public class WorkerCuratorCoordinator
}
curatorFramework.create()
.withMode(CreateMode.EPHEMERAL)
.withMode(CreateMode.PERSISTENT)
.forPath(
getStatusPathForId(announcement.getTaskStatus().getId()), rawBytes
);
@ -226,7 +230,7 @@ public class WorkerCuratorCoordinator
try {
if (curatorFramework.checkExists().forPath(getStatusPathForId(announcement.getTaskStatus().getId())) == null) {
announceTastAnnouncement(announcement);
announceTaskAnnouncement(announcement);
return;
}
byte[] rawBytes = jsonMapper.writeValueAsBytes(announcement);
@ -247,6 +251,30 @@ public class WorkerCuratorCoordinator
}
}
public List<TaskAnnouncement> getAnnouncements(){
try {
return Lists.transform(
curatorFramework.getChildren().forPath(getStatusPathForWorker()), new Function<String, TaskAnnouncement>()
{
@Nullable
@Override
public TaskAnnouncement apply(String input)
{
try {
return jsonMapper.readValue(curatorFramework.getData().forPath(getStatusPathForId(input)),TaskAnnouncement.class);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
public void updateWorkerAnnouncement(Worker newWorker) throws Exception
{
synchronized (lock) {

View File

@ -84,6 +84,19 @@ public class WorkerTaskMonitor
public void start()
{
try {
// cleanup any old running task announcements which are invalid after restart
for (TaskAnnouncement announcement : workerCuratorCoordinator.getAnnouncements()){
if(announcement.getTaskStatus().isRunnable()) {
workerCuratorCoordinator.updateAnnouncement(
TaskAnnouncement.create(
announcement.getTaskId(),
announcement.getTaskResource(),
TaskStatus.failure(announcement.getTaskId())
)
);
}
}
pathChildrenCache.getListenable().addListener(
new PathChildrenCacheListener()
{
@ -122,7 +135,7 @@ public class WorkerTaskMonitor
TaskStatus taskStatus;
try {
workerCuratorCoordinator.unannounceTask(task.getId());
workerCuratorCoordinator.announceTastAnnouncement(
workerCuratorCoordinator.announceTaskAnnouncement(
TaskAnnouncement.create(
task,
TaskStatus.running(task.getId())

View File

@ -26,6 +26,9 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.common.guava.DSuppliers;
@ -51,6 +54,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.apache.zookeeper.CreateMode;
import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -59,6 +63,7 @@ import org.junit.Test;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -79,6 +84,7 @@ public class RemoteTaskRunnerTest
private TestMergeTask task;
private Worker worker;
private RemoteTaskRunnerConfig config;
@Before
public void setUp() throws Exception
@ -361,6 +367,18 @@ public class RemoteTaskRunnerTest
TaskStatus status = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
Assert.assertEquals(TaskStatus.Status.FAILED, status.getStatusCode());
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
return remoteTaskRunner.getRemovedWorkerCleanups().isEmpty();
}
}
)
);
}
@Test
@ -389,12 +407,11 @@ public class RemoteTaskRunnerTest
private void doSetup() throws Exception
{
makeWorker();
makeRemoteTaskRunner();
makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(new Period("PT1S")));
}
private void makeRemoteTaskRunner() throws Exception
private void makeRemoteTaskRunner(RemoteTaskRunnerConfig config) throws Exception
{
RemoteTaskRunnerConfig config = new TestRemoteTaskRunnerConfig();
remoteTaskRunner = new RemoteTaskRunner(
jsonMapper,
config,
@ -411,7 +428,8 @@ public class RemoteTaskRunnerTest
cf,
new SimplePathChildrenCacheFactory.Builder().build(),
null,
DSuppliers.of(new AtomicReference<>(WorkerBehaviorConfig.defaultConfig()))
DSuppliers.of(new AtomicReference<>(WorkerBehaviorConfig.defaultConfig())),
ScheduledExecutors.fixed(1, "Remote-Task-Runner-Cleanup--%d")
);
remoteTaskRunner.start();
@ -505,7 +523,7 @@ public class RemoteTaskRunnerTest
remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId()));
mockWorkerRunningTask(task);
Collection<ZkWorker> lazyworkers = remoteTaskRunner.markWokersLazy(
Collection<ZkWorker> lazyworkers = remoteTaskRunner.markWorkersLazy(
new Predicate<ZkWorker>()
{
@Override
@ -526,7 +544,7 @@ public class RemoteTaskRunnerTest
doSetup();
remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId()));
Collection<ZkWorker> lazyworkers = remoteTaskRunner.markWokersLazy(
Collection<ZkWorker> lazyworkers = remoteTaskRunner.markWorkersLazy(
new Predicate<ZkWorker>()
{
@Override
@ -545,7 +563,7 @@ public class RemoteTaskRunnerTest
public void testFindLazyWorkerNotRunningAnyTask() throws Exception
{
doSetup();
Collection<ZkWorker> lazyworkers = remoteTaskRunner.markWokersLazy(
Collection<ZkWorker> lazyworkers = remoteTaskRunner.markWorkersLazy(
new Predicate<ZkWorker>()
{
@Override
@ -559,4 +577,54 @@ public class RemoteTaskRunnerTest
Assert.assertEquals(1, remoteTaskRunner.getLazyWorkers().size());
}
@Test
public void testWorkerZKReconnect() throws Exception
{
makeWorker();
makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(new Period("PT5M")));
Future<TaskStatus> future = remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId()));
mockWorkerRunningTask(task);
Assert.assertTrue(workerRunningTask(task.getId()));
byte[] bytes = cf.getData().forPath(announcementsPath);
cf.delete().forPath(announcementsPath);
// worker task cleanup scheduled
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
return remoteTaskRunner.getRemovedWorkerCleanups().containsKey(worker.getHost());
}
}
)
);
// Worker got reconnected
cf.create().forPath(announcementsPath, bytes);
// worker task cleanup should get cancelled and removed
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
return !remoteTaskRunner.getRemovedWorkerCleanups().containsKey(worker.getHost());
}
}
)
);
mockWorkerCompleteSuccessfulTask(task);
TaskStatus status = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
Assert.assertEquals(status.getStatusCode(), TaskStatus.Status.SUCCESS);
Assert.assertEquals(TaskStatus.Status.SUCCESS, status.getStatusCode());
}
}

View File

@ -24,10 +24,23 @@ import org.joda.time.Period;
*/
public class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig
{
private final Period timeout;
public TestRemoteTaskRunnerConfig(Period timeout)
{
this.timeout = timeout;
}
@Override
public Period getTaskAssignmentTimeout()
{
return new Period("PT1S");
return timeout;
}
@Override
public Period getTaskCleanupTimeout()
{
return timeout;
}
@Override

View File

@ -275,7 +275,7 @@ public class SimpleResourceManagementStrategyTest
new TestZkWorker(testTask)
)
).times(2);
EasyMock.expect(runner.markWokersLazy((Predicate<ZkWorker>) EasyMock.anyObject(), EasyMock.anyInt())).andReturn(
EasyMock.expect(runner.markWorkersLazy((Predicate<ZkWorker>) EasyMock.anyObject(), EasyMock.anyInt())).andReturn(
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
)
@ -318,7 +318,7 @@ public class SimpleResourceManagementStrategyTest
)
).times(2);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<ZkWorker>newArrayList()).times(2);
EasyMock.expect(runner.markWokersLazy((Predicate<ZkWorker>) EasyMock.anyObject(), EasyMock.anyInt())).andReturn(
EasyMock.expect(runner.markWorkersLazy((Predicate<ZkWorker>) EasyMock.anyObject(), EasyMock.anyInt())).andReturn(
Arrays.<ZkWorker>asList(
new TestZkWorker(testTask)
)
@ -368,7 +368,7 @@ public class SimpleResourceManagementStrategyTest
)
).times(2);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<ZkWorker>newArrayList());
EasyMock.expect(runner.markWokersLazy((Predicate<ZkWorker>) EasyMock.anyObject(), EasyMock.anyInt())).andReturn(
EasyMock.expect(runner.markWorkersLazy((Predicate<ZkWorker>) EasyMock.anyObject(), EasyMock.anyInt())).andReturn(
Collections.<ZkWorker>emptyList()
);
EasyMock.replay(runner);
@ -412,7 +412,7 @@ public class SimpleResourceManagementStrategyTest
)
).times(3);
EasyMock.expect(runner.getLazyWorkers()).andReturn(Lists.<ZkWorker>newArrayList());
EasyMock.expect(runner.markWokersLazy((Predicate<ZkWorker>) EasyMock.anyObject(), EasyMock.anyInt())).andReturn(
EasyMock.expect(runner.markWorkersLazy((Predicate<ZkWorker>) EasyMock.anyObject(), EasyMock.anyInt())).andReturn(
Collections.<ZkWorker>emptyList()
);
EasyMock.replay(runner);

View File

@ -17,6 +17,7 @@
package io.druid.indexing.worker;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Joiner;
@ -45,6 +46,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -96,30 +98,40 @@ public class WorkerTaskMonitorTest
workerCuratorCoordinator = new WorkerCuratorCoordinator(
jsonMapper,
new IndexerZkConfig(
new ZkPathsConfig()
{
@Override
public String getBase()
{
return basePath;
}
},null,null,null,null,null),
new TestRemoteTaskRunnerConfig(),
new ZkPathsConfig()
{
@Override
public String getBase()
{
return basePath;
}
}, null, null, null, null, null
),
new TestRemoteTaskRunnerConfig(new Period("PT1S")),
cf,
worker
);
workerCuratorCoordinator.start();
final File tmp = Files.createTempDir();
// Start a task monitor
workerTaskMonitor = new WorkerTaskMonitor(
workerTaskMonitor = createTaskMonitor();
jsonMapper.registerSubtypes(new NamedType(TestMergeTask.class, "test"));
jsonMapper.registerSubtypes(new NamedType(TestRealtimeTask.class, "test_realtime"));
workerTaskMonitor.start();
task = TestMergeTask.createDummyTask("test");
}
private WorkerTaskMonitor createTaskMonitor()
{
return new WorkerTaskMonitor(
jsonMapper,
cf,
workerCuratorCoordinator,
new ThreadPoolTaskRunner(
new TaskToolboxFactory(
new TaskConfig(tmp.toString(), null, null, 0, null),
new TaskConfig(Files.createTempDir().toString(), null, null, 0, null),
null, null, null, null, null, null, null, null, null, null, null, new SegmentLoaderFactory(
new SegmentLoaderLocalCacheManager(
null,
@ -131,18 +143,14 @@ public class WorkerTaskMonitorTest
return Lists.newArrayList();
}
}
, jsonMapper)
, jsonMapper
)
), jsonMapper
),
null
),
new WorkerConfig().setCapacity(1)
);
jsonMapper.registerSubtypes(new NamedType(TestMergeTask.class, "test"));
jsonMapper.registerSubtypes(new NamedType(TestRealtimeTask.class, "test_realtime"));
workerTaskMonitor.start();
task = TestMergeTask.createDummyTask("test");
}
@After
@ -178,7 +186,6 @@ public class WorkerTaskMonitorTest
)
);
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
@ -204,4 +211,98 @@ public class WorkerTaskMonitorTest
Assert.assertEquals(task.getId(), taskAnnouncement.getTaskStatus().getId());
Assert.assertEquals(TaskStatus.Status.RUNNING, taskAnnouncement.getTaskStatus().getStatusCode());
}
@Test
public void testGetAnnouncements() throws Exception
{
cf.create()
.creatingParentsIfNeeded()
.forPath(joiner.join(tasksPath, task.getId()), jsonMapper.writeValueAsBytes(task));
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
try {
return cf.checkExists().forPath(joiner.join(statusPath, task.getId())) != null;
}
catch (Exception e) {
return false;
}
}
}
)
);
List<TaskAnnouncement> announcements = workerCuratorCoordinator.getAnnouncements();
Assert.assertEquals(1, announcements.size());
Assert.assertEquals(task.getId(), announcements.get(0).getTaskStatus().getId());
Assert.assertEquals(TaskStatus.Status.RUNNING, announcements.get(0).getTaskStatus().getStatusCode());
}
@Test
public void testRestartCleansOldStatus() throws Exception
{
cf.create()
.creatingParentsIfNeeded()
.forPath(joiner.join(tasksPath, task.getId()), jsonMapper.writeValueAsBytes(task));
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
try {
return cf.checkExists().forPath(joiner.join(statusPath, task.getId())) != null;
}
catch (Exception e) {
return false;
}
}
}
)
);
// simulate node restart
workerTaskMonitor.stop();
workerTaskMonitor = createTaskMonitor();
workerTaskMonitor.start();
List<TaskAnnouncement> announcements = workerCuratorCoordinator.getAnnouncements();
Assert.assertEquals(1, announcements.size());
Assert.assertEquals(task.getId(), announcements.get(0).getTaskStatus().getId());
Assert.assertEquals(TaskStatus.Status.FAILED, announcements.get(0).getTaskStatus().getStatusCode());
}
@Test
public void testStatusAnnouncementsArePersistent() throws Exception
{
cf.create()
.creatingParentsIfNeeded()
.forPath(joiner.join(tasksPath, task.getId()), jsonMapper.writeValueAsBytes(task));
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
try {
return cf.checkExists().forPath(joiner.join(statusPath, task.getId())) != null;
}
catch (Exception e) {
return false;
}
}
}
)
);
// ephermal owner is 0 is created node is PERSISTENT
Assert.assertEquals(0, cf.checkExists().forPath(joiner.join(statusPath, task.getId())).getEphemeralOwner());
}
}