Fix NPE in RemoteTaskRunner event handler causes JVM shutdown (#9610)

* Fix NPE in RemoteTaskRunner event handler causes JVM shutdown

* address comments

* fix compile

* fix checkstyle

* fix lgtm

* fix merge

* fix test

* fix tests

* change scope

* address comments

* address comments
This commit is contained in:
Maytas Monsereenusorn 2020-04-07 11:53:51 -10:00 committed by GitHub
parent 6e50d29b4e
commit b95a1b9878
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 192 additions and 106 deletions

View File

@ -131,6 +131,18 @@
<artifactId>JUnitParams</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>${codehaus.jackson.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>${codehaus.jackson.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -45,6 +45,7 @@ import com.google.common.util.concurrent.SettableFuture;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.curator.CuratorUtils;
@ -969,110 +970,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
);
// Add status listener to the watcher for status changes
zkWorker.addListener(
(client, event) -> {
final String taskId;
final RemoteTaskRunnerWorkItem taskRunnerWorkItem;
synchronized (statusLock) {
try {
switch (event.getType()) {
case CHILD_ADDED:
case CHILD_UPDATED:
taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
final TaskAnnouncement announcement = jsonMapper.readValue(
event.getData().getData(), TaskAnnouncement.class
);
log.info(
"Worker[%s] wrote %s status for task [%s] on [%s]",
zkWorker.getWorker().getHost(),
announcement.getTaskStatus().getStatusCode(),
taskId,
announcement.getTaskLocation()
);
// Synchronizing state with ZK
statusLock.notifyAll();
final RemoteTaskRunnerWorkItem tmp;
if ((tmp = runningTasks.get(taskId)) != null) {
taskRunnerWorkItem = tmp;
} else {
final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
taskId,
announcement.getTaskType(),
zkWorker.getWorker(),
TaskLocation.unknown(),
announcement.getTaskDataSource()
);
final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent(
taskId,
newTaskRunnerWorkItem
);
if (existingItem == null) {
log.warn(
"Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s",
zkWorker.getWorker().getHost(),
taskId
);
taskRunnerWorkItem = newTaskRunnerWorkItem;
} else {
taskRunnerWorkItem = existingItem;
}
}
if (!announcement.getTaskLocation().equals(taskRunnerWorkItem.getLocation())) {
taskRunnerWorkItem.setLocation(announcement.getTaskLocation());
TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation());
}
if (announcement.getTaskStatus().isComplete()) {
taskComplete(taskRunnerWorkItem, zkWorker, announcement.getTaskStatus());
runPendingTasks();
}
break;
case CHILD_REMOVED:
taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
taskRunnerWorkItem = runningTasks.remove(taskId);
if (taskRunnerWorkItem != null) {
log.info("Task[%s] just disappeared!", taskId);
taskRunnerWorkItem.setResult(TaskStatus.failure(taskId));
TaskRunnerUtils.notifyStatusChanged(listeners, taskId, TaskStatus.failure(taskId));
} else {
log.info("Task[%s] went bye bye.", taskId);
}
break;
case INITIALIZED:
if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) {
retVal.set(zkWorker);
} else {
final String message = StringUtils.format(
"WTF?! Tried to add already-existing worker[%s]",
worker.getHost()
);
log.makeAlert(message)
.addData("workerHost", worker.getHost())
.addData("workerIp", worker.getIp())
.emit();
retVal.setException(new IllegalStateException(message));
}
runPendingTasks();
break;
case CONNECTION_SUSPENDED:
case CONNECTION_RECONNECTED:
case CONNECTION_LOST:
// do nothing
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to handle new worker status")
.addData("worker", zkWorker.getWorker().getHost())
.addData("znode", event.getData().getPath())
.emit();
}
}
}
);
zkWorker.addListener(getStatusListener(worker, zkWorker, retVal));
zkWorker.start();
return retVal;
}
@ -1081,6 +979,134 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
}
}
@VisibleForTesting
PathChildrenCacheListener getStatusListener(final Worker worker, final ZkWorker zkWorker, final SettableFuture<ZkWorker> retVal)
{
return (client, event) -> {
final String taskId;
final RemoteTaskRunnerWorkItem taskRunnerWorkItem;
synchronized (statusLock) {
try {
switch (event.getType()) {
case CHILD_ADDED:
case CHILD_UPDATED:
if (event.getData() == null) {
log.error("Unexpected null for event.getData() in handle new worker status for [%s]", event.getType().toString());
log.makeAlert("Unexpected null for event.getData() in handle new worker status")
.addData("worker", zkWorker.getWorker().getHost())
.addData("eventType", event.getType().toString())
.emit();
return;
}
taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
final TaskAnnouncement announcement = jsonMapper.readValue(
event.getData().getData(), TaskAnnouncement.class
);
log.info(
"Worker[%s] wrote %s status for task [%s] on [%s]",
zkWorker.getWorker().getHost(),
announcement.getTaskStatus().getStatusCode(),
taskId,
announcement.getTaskLocation()
);
// Synchronizing state with ZK
statusLock.notifyAll();
final RemoteTaskRunnerWorkItem tmp;
if ((tmp = runningTasks.get(taskId)) != null) {
taskRunnerWorkItem = tmp;
} else {
final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
taskId,
announcement.getTaskType(),
zkWorker.getWorker(),
TaskLocation.unknown(),
announcement.getTaskDataSource()
);
final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent(
taskId,
newTaskRunnerWorkItem
);
if (existingItem == null) {
log.warn(
"Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s",
zkWorker.getWorker().getHost(),
taskId
);
taskRunnerWorkItem = newTaskRunnerWorkItem;
} else {
taskRunnerWorkItem = existingItem;
}
}
if (!announcement.getTaskLocation().equals(taskRunnerWorkItem.getLocation())) {
taskRunnerWorkItem.setLocation(announcement.getTaskLocation());
TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation());
}
if (announcement.getTaskStatus().isComplete()) {
taskComplete(taskRunnerWorkItem, zkWorker, announcement.getTaskStatus());
runPendingTasks();
}
break;
case CHILD_REMOVED:
if (event.getData() == null) {
log.error("Unexpected null for event.getData() in handle new worker status for [%s]", event.getType().toString());
log.makeAlert("Unexpected null for event.getData() in handle new worker status")
.addData("worker", zkWorker.getWorker().getHost())
.addData("eventType", event.getType().toString())
.emit();
return;
}
taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
taskRunnerWorkItem = runningTasks.remove(taskId);
if (taskRunnerWorkItem != null) {
log.info("Task[%s] just disappeared!", taskId);
taskRunnerWorkItem.setResult(TaskStatus.failure(taskId));
TaskRunnerUtils.notifyStatusChanged(listeners, taskId, TaskStatus.failure(taskId));
} else {
log.info("Task[%s] went bye bye.", taskId);
}
break;
case INITIALIZED:
if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) {
retVal.set(zkWorker);
} else {
final String message = StringUtils.format(
"This should not happen...tried to add already-existing worker[%s]",
worker.getHost()
);
log.makeAlert(message)
.addData("workerHost", worker.getHost())
.addData("workerIp", worker.getIp())
.emit();
retVal.setException(new IllegalStateException(message));
}
runPendingTasks();
break;
case CONNECTION_SUSPENDED:
case CONNECTION_RECONNECTED:
case CONNECTION_LOST:
// do nothing
}
}
catch (Exception e) {
String znode = null;
if (event.getData() != null) {
znode = event.getData().getPath();
}
log.makeAlert(e, "Failed to handle new worker status")
.addData("worker", zkWorker.getWorker().getHost())
.addData("znode", znode)
.addData("eventType", event.getType().toString())
.emit();
}
}
};
}
/**
* We allow workers to change their own capacities and versions. They cannot change their own hosts or ips without
* dropping themselves and re-announcing.

View File

@ -29,6 +29,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IndexingServiceCondition;
@ -44,6 +45,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.testing.DeadlockDetectingTimeout;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.After;
@ -55,6 +57,7 @@ import org.junit.rules.TestRule;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@ -944,4 +947,37 @@ public class RemoteTaskRunnerTest
Assert.assertTrue(taskFuture2.get().isSuccess());
Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
}
@Test
public void testStatusListenerEventDataNullShouldNotThrowException() throws Exception
{
// Set up mock emitter to verify log alert when exception is thrown inside the status listener
Worker worker = EasyMock.createMock(Worker.class);
EasyMock.expect(worker.getHost()).andReturn("host").atLeastOnce();
EasyMock.replay(worker);
ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class);
Capture<EmittingLogger.EmittingAlertBuilder> capturedArgument = Capture.newInstance();
emitter.emit(EasyMock.capture(capturedArgument));
EasyMock.expectLastCall().atLeastOnce();
EmittingLogger.registerEmitter(emitter);
EasyMock.replay(emitter);
PathChildrenCache cache = new PathChildrenCache(cf, "/test", true);
testStartWithNoWorker();
cache.getListenable().addListener(remoteTaskRunner.getStatusListener(worker, new ZkWorker(worker, cache, jsonMapper), null));
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
// Status listener will recieve event with null data
Assert.assertTrue(
TestUtils.conditionValid(() -> cache.getCurrentData().size() == 1)
);
// Verify that the log emitter was called
EasyMock.verify(worker);
EasyMock.verify(emitter);
Map<String, Object> alertDataMap = capturedArgument.getValue().build(null).getDataMap();
Assert.assertTrue(alertDataMap.containsKey("znode"));
Assert.assertNull(alertDataMap.get("znode"));
// Status listener should successfully completes without throwing exception
}
}

View File

@ -1315,7 +1315,7 @@ name: Apache Curator
license_category: binary
module: java-core
license_name: Apache License version 2.0
version: 4.1.0
version: 4.3.0
libraries:
- org.apache.curator: curator-client
- org.apache.curator: curator-framework

View File

@ -76,7 +76,7 @@
<java.version>8</java.version>
<project.build.resourceEncoding>UTF-8</project.build.resourceEncoding>
<aether.version>0.9.0.M2</aether.version>
<apache.curator.version>4.1.0</apache.curator.version>
<apache.curator.version>4.3.0</apache.curator.version>
<apache.curator.test.version>2.12.0</apache.curator.test.version>
<apache.kafka.version>2.2.2</apache.kafka.version>
<apache.ranger.version>2.0.0</apache.ranger.version>

View File

@ -482,6 +482,18 @@ public class DiscoveryModule implements Module
{
return this;
}
@Override
public ServiceProviderBuilder<T> executorService(ExecutorService executorService)
{
return this;
}
@Override
public ServiceProviderBuilder<T> executorService(CloseableExecutorService closeableExecutorService)
{
return this;
}
}
private static class NoopServiceProvider<T> implements ServiceProvider<T>