fix misleading error log due to race in RTR and concurrency test (#2878)

This commit is contained in:
Himanshu 2016-04-28 12:28:00 -05:00 committed by Fangjin Yang
parent 16080dc54f
commit 9669e79df2
4 changed files with 415 additions and 122 deletions

View File

@ -588,7 +588,11 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
String taskId = taskRunnerWorkItem.getTaskId();
if (tryAssignTasks.putIfAbsent(taskId, taskId) == null) {
try {
if (tryAssignTask(pendingTaskPayloads.get(taskId), taskRunnerWorkItem)) {
//this can still be null due to race from explicit task shutdown request
//or if another thread steals and completes this task right after this thread makes copy
//of pending tasks. See https://github.com/druid-io/druid/issues/2842 .
Task task = pendingTaskPayloads.get(taskId);
if (task != null && tryAssignTask(task, taskRunnerWorkItem)) {
pendingTaskPayloads.remove(taskId);
}
}
@ -597,8 +601,10 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
.addData("taskId", taskRunnerWorkItem.getTaskId())
.emit();
RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(taskId);
if (workItem != null) {
taskComplete(workItem, null, TaskStatus.failure(taskId));
}
}
finally {
tryAssignTasks.remove(taskId);
}
@ -677,8 +683,9 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
}
ZkWorker assignedWorker = null;
Optional<ImmutableWorkerInfo> immutableZkWorker = null;
try {
final Optional<ImmutableWorkerInfo> immutableZkWorker = strategy.findWorkerForTask(
immutableZkWorker = strategy.findWorkerForTask(
config,
ImmutableMap.copyOf(
Maps.transformEntries(
@ -708,22 +715,38 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
task
);
if (immutableZkWorker.isPresent()
&&
workersWithUnacknowledgedTask.putIfAbsent(immutableZkWorker.get().getWorker().getHost(), task.getId())
if (immutableZkWorker.isPresent()) {
if (workersWithUnacknowledgedTask.putIfAbsent(immutableZkWorker.get().getWorker().getHost(), task.getId())
== null) {
assignedWorker = zkWorkers.get(immutableZkWorker.get().getWorker().getHost());
return announceTask(task, assignedWorker, taskRunnerWorkItem);
} else {
log.debug(
"Lost race to run task [%s] on worker [%s]. Workers to ack tasks are [%s].",
task.getId(),
immutableZkWorker.get().getWorker().getHost(),
workersWithUnacknowledgedTask
);
}
} else {
log.debug(
"Unsuccessful task-assign attempt for task [%s] on workers [%s]. Workers to ack tasks are [%s].",
task.getId(),
zkWorkers.values(),
workersWithUnacknowledgedTask
);
}
log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values());
return false;
}
finally {
if (assignedWorker != null) {
workersWithUnacknowledgedTask.remove(assignedWorker.getWorker().getHost());
// note that this is essential as a task might not get a worker because a worker was assigned another task.
// so this will ensure that other pending tasks are tried for assignment again.
}
if(immutableZkWorker.isPresent()) {
//if this attempt lost the race to run the task then there might be another worker available to try on.
//if this attempt won the race to run the task then other task might be able to use this worker now after task ack.
runPendingTasks();
}
}
@ -1196,4 +1219,10 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
{
return config;
}
@VisibleForTesting
Map<String, String> getWorkersWithUnacknowledgedTask()
{
return workersWithUnacknowledgedTask;
}
}

View File

@ -0,0 +1,150 @@
/*
*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* /
*
*/
package io.druid.indexing.overlord;
import com.google.common.util.concurrent.ListenableFuture;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TestTasks;
import io.druid.indexing.common.task.Task;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
*/
public class RemoteTaskRunnerRunPendingTasksConcurrencyTest
{
private RemoteTaskRunner remoteTaskRunner;
private RemoteTaskRunnerTestUtils rtrTestUtils = new RemoteTaskRunnerTestUtils();
@Before
public void setUp() throws Exception
{
rtrTestUtils.setUp();
}
@After
public void tearDown() throws Exception
{
if (remoteTaskRunner != null) {
remoteTaskRunner.stop();
}
rtrTestUtils.tearDown();
}
// This task reproduces the races described in https://github.com/druid-io/druid/issues/2842
@Test(timeout = 60_000)
public void testConcurrency() throws Exception
{
rtrTestUtils.makeWorker("worker0");
rtrTestUtils.makeWorker("worker1");
remoteTaskRunner = rtrTestUtils.makeRemoteTaskRunner(
new TestRemoteTaskRunnerConfig(new Period("PT3600S"))
{
public int getPendingTasksRunnerNumThreads()
{
return 2;
}
}
);
int numTasks = 6;
ListenableFuture<TaskStatus>[] results = new ListenableFuture[numTasks];
Task[] tasks = new Task[numTasks];
//2 tasks
for (int i = 0; i < 2; i++) {
tasks[i] = TestTasks.unending("task" + i);
results[i] = (remoteTaskRunner.run(tasks[i]));
}
while (remoteTaskRunner.getWorkersWithUnacknowledgedTask().size() < 2) {
Thread.sleep(5);
}
//3 more tasks, all of which get queued up
for (int i = 2; i < 5; i++) {
tasks[i] = TestTasks.unending("task" + i);
results[i] = (remoteTaskRunner.run(tasks[i]));
}
//simulate completion of task0 and task1
if (rtrTestUtils.taskAnnounced("worker0", tasks[0].getId())) {
rtrTestUtils.mockWorkerRunningTask("worker0", tasks[0]);
rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", tasks[0]);
rtrTestUtils.mockWorkerRunningTask("worker1", tasks[1]);
rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker1", tasks[1]);
} else {
rtrTestUtils.mockWorkerRunningTask("worker0", tasks[1]);
rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", tasks[1]);
rtrTestUtils.mockWorkerRunningTask("worker1", tasks[0]);
rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker1", tasks[0]);
}
Assert.assertEquals(TaskStatus.Status.SUCCESS, results[0].get().getStatusCode());
Assert.assertEquals(TaskStatus.Status.SUCCESS, results[1].get().getStatusCode());
// now both threads race to run the last 3 tasks. task2 and task3 are being assigned
while (remoteTaskRunner.getWorkersWithUnacknowledgedTask().size() < 2) {
Thread.sleep(5);
}
//cancel task4, both executor threads should be able to ignore task4
remoteTaskRunner.shutdown("task4");
//simulate completion of task3 before task2 so that the executor thread with task2
//gets to task3 and ignores it
if (rtrTestUtils.taskAnnounced("worker0", tasks[3].getId())) {
rtrTestUtils.mockWorkerRunningTask("worker0", tasks[3]);
rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", tasks[3]);
rtrTestUtils.mockWorkerRunningTask("worker1", tasks[2]);
rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker1", tasks[2]);
} else {
rtrTestUtils.mockWorkerRunningTask("worker1", tasks[3]);
rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker1", tasks[3]);
rtrTestUtils.mockWorkerRunningTask("worker0", tasks[2]);
rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", tasks[2]);
}
Assert.assertEquals(TaskStatus.Status.SUCCESS, results[2].get().getStatusCode());
Assert.assertEquals(TaskStatus.Status.SUCCESS, results[3].get().getStatusCode());
//ensure that RTR is doing OK and still making progress
tasks[5] = TestTasks.unending("task5");
results[5] = remoteTaskRunner.run(tasks[5]);
while (remoteTaskRunner.getWorkersWithUnacknowledgedTask().size() < 1) {
Thread.sleep(5);
}
if (rtrTestUtils.taskAnnounced("worker0", tasks[5].getId())) {
rtrTestUtils.mockWorkerRunningTask("worker0", tasks[5]);
rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", tasks[5]);
} else {
rtrTestUtils.mockWorkerRunningTask("worker1", tasks[5]);
rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker1", tasks[5]);
}
Assert.assertEquals(TaskStatus.Status.SUCCESS, results[5].get().getStatusCode());
}
}

View File

@ -23,36 +23,22 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.common.guava.DSuppliers;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.curator.cache.SimplePathChildrenCacheFactory;
import io.druid.indexing.common.IndexingServiceCondition;
import io.druid.indexing.common.TaskLocation;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TestRealtimeTask;
import io.druid.indexing.common.TestTasks;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource;
import io.druid.indexing.overlord.autoscaling.NoopResourceManagementStrategy;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.worker.TaskAnnouncement;
import io.druid.indexing.worker.Worker;
import io.druid.server.initialization.IndexerZkConfig;
import io.druid.server.initialization.ZkPathsConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.apache.zookeeper.CreateMode;
import org.easymock.EasyMock;
import org.joda.time.Period;
@ -65,46 +51,29 @@ import java.util.Collection;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class RemoteTaskRunnerTest
{
private static final Joiner joiner = Joiner.on("/");
private static final String basePath = "/test/druid";
private static final String announcementsPath = String.format("%s/indexer/announcements/worker", basePath);
private static final String tasksPath = String.format("%s/indexer/tasks/worker", basePath);
private static final String statusPath = String.format("%s/indexer/status/worker", basePath);
private static final Joiner joiner = RemoteTaskRunnerTestUtils.joiner;
private static final String workerHost = "worker";
private static final String announcementsPath = joiner.join(RemoteTaskRunnerTestUtils.announcementsPath, workerHost);
private static final String statusPath = joiner.join(RemoteTaskRunnerTestUtils.statusPath, workerHost);
private static final int TIMEOUT_SECONDS = 20;
private static final TaskLocation DUMMY_LOCATION = TaskLocation.create("dummy", 9000);
private ObjectMapper jsonMapper;
private TestingCluster testingCluster;
private CuratorFramework cf;
private RemoteTaskRunner remoteTaskRunner;
private RemoteTaskRunnerTestUtils rtrTestUtils = new RemoteTaskRunnerTestUtils();
private ObjectMapper jsonMapper;
private CuratorFramework cf;
private Task task;
private Worker worker;
@Before
public void setUp() throws Exception
{
TestUtils testUtils = new TestUtils();
jsonMapper = testUtils.getTestObjectMapper();
testingCluster = new TestingCluster(1);
testingCluster.start();
cf = CuratorFrameworkFactory.builder()
.connectString(testingCluster.getConnectString())
.retryPolicy(new ExponentialBackoffRetry(1, 10))
.compressionProvider(new PotentiallyGzippedCompressionProvider(false))
.build();
cf.start();
cf.blockUntilConnected();
cf.create().creatingParentsIfNeeded().forPath(basePath);
cf.create().creatingParentsIfNeeded().forPath(tasksPath);
rtrTestUtils.setUp();
jsonMapper = rtrTestUtils.getObjectMapper();
cf = rtrTestUtils.getCuratorFramework();
task = TestTasks.unending("task");
}
@ -112,9 +81,10 @@ public class RemoteTaskRunnerTest
@After
public void tearDown() throws Exception
{
if (remoteTaskRunner != null) {
remoteTaskRunner.stop();
cf.close();
testingCluster.stop();
}
rtrTestUtils.tearDown();
}
@Test
@ -461,80 +431,27 @@ public class RemoteTaskRunnerTest
private void makeRemoteTaskRunner(RemoteTaskRunnerConfig config) throws Exception
{
remoteTaskRunner = new RemoteTaskRunner(
jsonMapper,
config,
new IndexerZkConfig(
new ZkPathsConfig()
{
@Override
public String getBase()
{
return basePath;
}
}, null, null, null, null, null
),
cf,
new SimplePathChildrenCacheFactory.Builder().build(),
null,
DSuppliers.of(new AtomicReference<>(WorkerBehaviorConfig.defaultConfig())),
ScheduledExecutors.fixed(1, "Remote-Task-Runner-Cleanup--%d"),
new NoopResourceManagementStrategy<WorkerTaskRunner>()
);
remoteTaskRunner.start();
remoteTaskRunner = rtrTestUtils.makeRemoteTaskRunner(config);
}
private void makeWorker() throws Exception
{
worker = new Worker(
"worker",
"localhost",
3,
"0"
);
cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(
announcementsPath,
jsonMapper.writeValueAsBytes(worker)
);
worker = rtrTestUtils.makeWorker(workerHost);
}
private void disableWorker() throws Exception
{
cf.setData().forPath(
announcementsPath,
jsonMapper.writeValueAsBytes(new Worker(worker.getHost(), worker.getIp(), worker.getCapacity(), ""))
);
rtrTestUtils.disableWorker(worker);
}
private boolean taskAnnounced(final String taskId)
{
return pathExists(joiner.join(tasksPath, taskId));
return rtrTestUtils.taskAnnounced(workerHost, taskId);
}
private boolean workerRunningTask(final String taskId)
{
return pathExists(joiner.join(statusPath, taskId));
}
private boolean pathExists(final String path)
{
return TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
try {
return cf.checkExists().forPath(path) != null;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
return rtrTestUtils.workerRunningTask(workerHost, taskId);
}
private boolean workerCompletedTask(final ListenableFuture<TaskStatus> result)
@ -553,18 +470,12 @@ public class RemoteTaskRunnerTest
private void mockWorkerRunningTask(final Task task) throws Exception
{
cf.delete().forPath(joiner.join(tasksPath, task.getId()));
TaskAnnouncement taskAnnouncement = TaskAnnouncement.create(task, TaskStatus.running(task.getId()), DUMMY_LOCATION);
cf.create()
.creatingParentsIfNeeded()
.forPath(joiner.join(statusPath, task.getId()), jsonMapper.writeValueAsBytes(taskAnnouncement));
rtrTestUtils.mockWorkerRunningTask("worker", task);
}
private void mockWorkerCompleteSuccessfulTask(final Task task) throws Exception
{
TaskAnnouncement taskAnnouncement = TaskAnnouncement.create(task, TaskStatus.success(task.getId()), DUMMY_LOCATION);
cf.setData().forPath(joiner.join(statusPath, task.getId()), jsonMapper.writeValueAsBytes(taskAnnouncement));
rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker", task);
}
@Test

View File

@ -0,0 +1,203 @@
/*
*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* /
*
*/
package io.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
import com.metamx.common.concurrent.ScheduledExecutors;
import io.druid.common.guava.DSuppliers;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.curator.cache.SimplePathChildrenCacheFactory;
import io.druid.indexing.common.IndexingServiceCondition;
import io.druid.indexing.common.TaskLocation;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.autoscaling.NoopResourceManagementStrategy;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.worker.TaskAnnouncement;
import io.druid.indexing.worker.Worker;
import io.druid.server.initialization.IndexerZkConfig;
import io.druid.server.initialization.ZkPathsConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.apache.zookeeper.CreateMode;
import java.util.concurrent.atomic.AtomicReference;
/**
*/
public class RemoteTaskRunnerTestUtils
{
static final Joiner joiner = Joiner.on("/");
static final String basePath = "/test/druid";
static final String announcementsPath = String.format("%s/indexer/announcements", basePath);
static final String tasksPath = String.format("%s/indexer/tasks", basePath);
static final String statusPath = String.format("%s/indexer/status", basePath);
static final TaskLocation DUMMY_LOCATION = TaskLocation.create("dummy", 9000);
private TestingCluster testingCluster;
private CuratorFramework cf;
private ObjectMapper jsonMapper;
RemoteTaskRunnerTestUtils()
{
TestUtils testUtils = new TestUtils();
jsonMapper = testUtils.getTestObjectMapper();
}
CuratorFramework getCuratorFramework()
{
return cf;
}
ObjectMapper getObjectMapper()
{
return jsonMapper;
}
void setUp() throws Exception
{
testingCluster = new TestingCluster(1);
testingCluster.start();
cf = CuratorFrameworkFactory.builder()
.connectString(testingCluster.getConnectString())
.retryPolicy(new ExponentialBackoffRetry(1, 10))
.compressionProvider(new PotentiallyGzippedCompressionProvider(false))
.build();
cf.start();
cf.blockUntilConnected();
cf.create().creatingParentsIfNeeded().forPath(basePath);
cf.create().creatingParentsIfNeeded().forPath(tasksPath);
}
void tearDown() throws Exception
{
cf.close();
testingCluster.stop();
}
RemoteTaskRunner makeRemoteTaskRunner(RemoteTaskRunnerConfig config) throws Exception
{
RemoteTaskRunner remoteTaskRunner = new RemoteTaskRunner(
jsonMapper,
config,
new IndexerZkConfig(
new ZkPathsConfig()
{
@Override
public String getBase()
{
return basePath;
}
}, null, null, null, null, null
),
cf,
new SimplePathChildrenCacheFactory.Builder().build(),
null,
DSuppliers.of(new AtomicReference<>(WorkerBehaviorConfig.defaultConfig())),
ScheduledExecutors.fixed(1, "Remote-Task-Runner-Cleanup--%d"),
new NoopResourceManagementStrategy<WorkerTaskRunner>()
);
remoteTaskRunner.start();
return remoteTaskRunner;
}
Worker makeWorker(final String workerId) throws Exception
{
Worker worker = new Worker(
workerId,
workerId,
3,
"0"
);
cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(
joiner.join(announcementsPath, workerId),
jsonMapper.writeValueAsBytes(worker)
);
cf.create().creatingParentsIfNeeded().forPath(joiner.join(tasksPath, workerId));
return worker;
}
void disableWorker(Worker worker) throws Exception
{
cf.setData().forPath(
joiner.join(announcementsPath, worker.getHost()),
jsonMapper.writeValueAsBytes(new Worker(worker.getHost(), worker.getIp(), worker.getCapacity(), ""))
);
}
void mockWorkerRunningTask(final String workerId, final Task task) throws Exception
{
cf.delete().forPath(joiner.join(tasksPath, workerId, task.getId()));
TaskAnnouncement taskAnnouncement = TaskAnnouncement.create(task, TaskStatus.running(task.getId()), DUMMY_LOCATION);
cf.create()
.creatingParentsIfNeeded()
.forPath(joiner.join(statusPath, workerId, task.getId()), jsonMapper.writeValueAsBytes(taskAnnouncement));
}
void mockWorkerCompleteSuccessfulTask(final String workerId, final Task task) throws Exception
{
TaskAnnouncement taskAnnouncement = TaskAnnouncement.create(task, TaskStatus.success(task.getId()), DUMMY_LOCATION);
cf.setData().forPath(joiner.join(statusPath, workerId, task.getId()), jsonMapper.writeValueAsBytes(taskAnnouncement));
}
boolean workerRunningTask(final String workerId, final String taskId)
{
return pathExists(joiner.join(statusPath, workerId, taskId));
}
boolean taskAnnounced(final String workerId, final String taskId)
{
return pathExists(joiner.join(tasksPath, workerId, taskId));
}
boolean pathExists(final String path)
{
return TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
try {
return cf.checkExists().forPath(path) != null;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
}
}