From b95a1b9878588ccc975fd6724a02b18b89c4d46f Mon Sep 17 00:00:00 2001
From: Maytas Monsereenusorn <52679095+maytasm@users.noreply.github.com>
Date: Tue, 7 Apr 2020 11:53:51 -1000
Subject: [PATCH] Fix NPE in RemoteTaskRunner event handler causes JVM shutdown
(#9610)
* Fix NPE in RemoteTaskRunner event handler causes JVM shutdown
* address comments
* fix compile
* fix checkstyle
* fix lgtm
* fix merge
* fix test
* fix tests
* change scope
* address comments
* address comments
---
.../ambari-metrics-emitter/pom.xml | 12 +
.../indexing/overlord/RemoteTaskRunner.java | 234 ++++++++++--------
.../overlord/RemoteTaskRunnerTest.java | 36 +++
licenses.yaml | 2 +-
pom.xml | 2 +-
.../curator/discovery/DiscoveryModule.java | 12 +
6 files changed, 192 insertions(+), 106 deletions(-)
diff --git a/extensions-contrib/ambari-metrics-emitter/pom.xml b/extensions-contrib/ambari-metrics-emitter/pom.xml
index 72786ed80c1..aadf49d948e 100644
--- a/extensions-contrib/ambari-metrics-emitter/pom.xml
+++ b/extensions-contrib/ambari-metrics-emitter/pom.xml
@@ -131,6 +131,18 @@
JUnitParams
test
+
+ org.codehaus.jackson
+ jackson-core-asl
+ ${codehaus.jackson.version}
+ test
+
+
+ org.codehaus.jackson
+ jackson-mapper-asl
+ ${codehaus.jackson.version}
+ test
+
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
index fe6e8be1767..dbaadf98e9d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
@@ -45,6 +45,7 @@ import com.google.common.util.concurrent.SettableFuture;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.curator.CuratorUtils;
@@ -969,110 +970,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
);
// Add status listener to the watcher for status changes
- zkWorker.addListener(
- (client, event) -> {
- final String taskId;
- final RemoteTaskRunnerWorkItem taskRunnerWorkItem;
- synchronized (statusLock) {
- try {
- switch (event.getType()) {
- case CHILD_ADDED:
- case CHILD_UPDATED:
- taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
- final TaskAnnouncement announcement = jsonMapper.readValue(
- event.getData().getData(), TaskAnnouncement.class
- );
-
- log.info(
- "Worker[%s] wrote %s status for task [%s] on [%s]",
- zkWorker.getWorker().getHost(),
- announcement.getTaskStatus().getStatusCode(),
- taskId,
- announcement.getTaskLocation()
- );
-
- // Synchronizing state with ZK
- statusLock.notifyAll();
-
- final RemoteTaskRunnerWorkItem tmp;
- if ((tmp = runningTasks.get(taskId)) != null) {
- taskRunnerWorkItem = tmp;
- } else {
- final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
- taskId,
- announcement.getTaskType(),
- zkWorker.getWorker(),
- TaskLocation.unknown(),
- announcement.getTaskDataSource()
- );
- final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent(
- taskId,
- newTaskRunnerWorkItem
- );
- if (existingItem == null) {
- log.warn(
- "Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s",
- zkWorker.getWorker().getHost(),
- taskId
- );
- taskRunnerWorkItem = newTaskRunnerWorkItem;
- } else {
- taskRunnerWorkItem = existingItem;
- }
- }
-
- if (!announcement.getTaskLocation().equals(taskRunnerWorkItem.getLocation())) {
- taskRunnerWorkItem.setLocation(announcement.getTaskLocation());
- TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation());
- }
-
- if (announcement.getTaskStatus().isComplete()) {
- taskComplete(taskRunnerWorkItem, zkWorker, announcement.getTaskStatus());
- runPendingTasks();
- }
- break;
- case CHILD_REMOVED:
- taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
- taskRunnerWorkItem = runningTasks.remove(taskId);
- if (taskRunnerWorkItem != null) {
- log.info("Task[%s] just disappeared!", taskId);
- taskRunnerWorkItem.setResult(TaskStatus.failure(taskId));
- TaskRunnerUtils.notifyStatusChanged(listeners, taskId, TaskStatus.failure(taskId));
- } else {
- log.info("Task[%s] went bye bye.", taskId);
- }
- break;
- case INITIALIZED:
- if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) {
- retVal.set(zkWorker);
- } else {
- final String message = StringUtils.format(
- "WTF?! Tried to add already-existing worker[%s]",
- worker.getHost()
- );
- log.makeAlert(message)
- .addData("workerHost", worker.getHost())
- .addData("workerIp", worker.getIp())
- .emit();
- retVal.setException(new IllegalStateException(message));
- }
- runPendingTasks();
- break;
- case CONNECTION_SUSPENDED:
- case CONNECTION_RECONNECTED:
- case CONNECTION_LOST:
- // do nothing
- }
- }
- catch (Exception e) {
- log.makeAlert(e, "Failed to handle new worker status")
- .addData("worker", zkWorker.getWorker().getHost())
- .addData("znode", event.getData().getPath())
- .emit();
- }
- }
- }
- );
+ zkWorker.addListener(getStatusListener(worker, zkWorker, retVal));
zkWorker.start();
return retVal;
}
@@ -1081,6 +979,134 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
}
}
+ @VisibleForTesting
+ PathChildrenCacheListener getStatusListener(final Worker worker, final ZkWorker zkWorker, final SettableFuture retVal)
+ {
+ return (client, event) -> {
+ final String taskId;
+ final RemoteTaskRunnerWorkItem taskRunnerWorkItem;
+ synchronized (statusLock) {
+ try {
+ switch (event.getType()) {
+ case CHILD_ADDED:
+ case CHILD_UPDATED:
+ if (event.getData() == null) {
+ log.error("Unexpected null for event.getData() in handle new worker status for [%s]", event.getType().toString());
+ log.makeAlert("Unexpected null for event.getData() in handle new worker status")
+ .addData("worker", zkWorker.getWorker().getHost())
+ .addData("eventType", event.getType().toString())
+ .emit();
+ return;
+ }
+ taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
+ final TaskAnnouncement announcement = jsonMapper.readValue(
+ event.getData().getData(), TaskAnnouncement.class
+ );
+
+ log.info(
+ "Worker[%s] wrote %s status for task [%s] on [%s]",
+ zkWorker.getWorker().getHost(),
+ announcement.getTaskStatus().getStatusCode(),
+ taskId,
+ announcement.getTaskLocation()
+ );
+
+ // Synchronizing state with ZK
+ statusLock.notifyAll();
+
+ final RemoteTaskRunnerWorkItem tmp;
+ if ((tmp = runningTasks.get(taskId)) != null) {
+ taskRunnerWorkItem = tmp;
+ } else {
+ final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
+ taskId,
+ announcement.getTaskType(),
+ zkWorker.getWorker(),
+ TaskLocation.unknown(),
+ announcement.getTaskDataSource()
+ );
+ final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent(
+ taskId,
+ newTaskRunnerWorkItem
+ );
+ if (existingItem == null) {
+ log.warn(
+ "Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s",
+ zkWorker.getWorker().getHost(),
+ taskId
+ );
+ taskRunnerWorkItem = newTaskRunnerWorkItem;
+ } else {
+ taskRunnerWorkItem = existingItem;
+ }
+ }
+
+ if (!announcement.getTaskLocation().equals(taskRunnerWorkItem.getLocation())) {
+ taskRunnerWorkItem.setLocation(announcement.getTaskLocation());
+ TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation());
+ }
+
+ if (announcement.getTaskStatus().isComplete()) {
+ taskComplete(taskRunnerWorkItem, zkWorker, announcement.getTaskStatus());
+ runPendingTasks();
+ }
+ break;
+ case CHILD_REMOVED:
+ if (event.getData() == null) {
+ log.error("Unexpected null for event.getData() in handle new worker status for [%s]", event.getType().toString());
+ log.makeAlert("Unexpected null for event.getData() in handle new worker status")
+ .addData("worker", zkWorker.getWorker().getHost())
+ .addData("eventType", event.getType().toString())
+ .emit();
+ return;
+ }
+ taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
+ taskRunnerWorkItem = runningTasks.remove(taskId);
+ if (taskRunnerWorkItem != null) {
+ log.info("Task[%s] just disappeared!", taskId);
+ taskRunnerWorkItem.setResult(TaskStatus.failure(taskId));
+ TaskRunnerUtils.notifyStatusChanged(listeners, taskId, TaskStatus.failure(taskId));
+ } else {
+ log.info("Task[%s] went bye bye.", taskId);
+ }
+ break;
+ case INITIALIZED:
+ if (zkWorkers.putIfAbsent(worker.getHost(), zkWorker) == null) {
+ retVal.set(zkWorker);
+ } else {
+ final String message = StringUtils.format(
+ "This should not happen...tried to add already-existing worker[%s]",
+ worker.getHost()
+ );
+ log.makeAlert(message)
+ .addData("workerHost", worker.getHost())
+ .addData("workerIp", worker.getIp())
+ .emit();
+ retVal.setException(new IllegalStateException(message));
+ }
+ runPendingTasks();
+ break;
+ case CONNECTION_SUSPENDED:
+ case CONNECTION_RECONNECTED:
+ case CONNECTION_LOST:
+ // do nothing
+ }
+ }
+ catch (Exception e) {
+ String znode = null;
+ if (event.getData() != null) {
+ znode = event.getData().getPath();
+ }
+ log.makeAlert(e, "Failed to handle new worker status")
+ .addData("worker", zkWorker.getWorker().getHost())
+ .addData("znode", znode)
+ .addData("eventType", event.getType().toString())
+ .emit();
+ }
+ }
+ };
+ }
+
/**
* We allow workers to change their own capacities and versions. They cannot change their own hosts or ips without
* dropping themselves and re-announcing.
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
index 060164887b9..b23e0638c22 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
@@ -29,6 +29,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IndexingServiceCondition;
@@ -44,6 +45,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.testing.DeadlockDetectingTimeout;
+import org.easymock.Capture;
import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.After;
@@ -55,6 +57,7 @@ import org.junit.rules.TestRule;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -944,4 +947,37 @@ public class RemoteTaskRunnerTest
Assert.assertTrue(taskFuture2.get().isSuccess());
Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
}
+
+ @Test
+ public void testStatusListenerEventDataNullShouldNotThrowException() throws Exception
+ {
+ // Set up mock emitter to verify log alert when exception is thrown inside the status listener
+ Worker worker = EasyMock.createMock(Worker.class);
+ EasyMock.expect(worker.getHost()).andReturn("host").atLeastOnce();
+ EasyMock.replay(worker);
+ ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class);
+ Capture capturedArgument = Capture.newInstance();
+ emitter.emit(EasyMock.capture(capturedArgument));
+ EasyMock.expectLastCall().atLeastOnce();
+ EmittingLogger.registerEmitter(emitter);
+ EasyMock.replay(emitter);
+
+ PathChildrenCache cache = new PathChildrenCache(cf, "/test", true);
+ testStartWithNoWorker();
+ cache.getListenable().addListener(remoteTaskRunner.getStatusListener(worker, new ZkWorker(worker, cache, jsonMapper), null));
+ cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
+
+ // Status listener will recieve event with null data
+ Assert.assertTrue(
+ TestUtils.conditionValid(() -> cache.getCurrentData().size() == 1)
+ );
+
+ // Verify that the log emitter was called
+ EasyMock.verify(worker);
+ EasyMock.verify(emitter);
+ Map alertDataMap = capturedArgument.getValue().build(null).getDataMap();
+ Assert.assertTrue(alertDataMap.containsKey("znode"));
+ Assert.assertNull(alertDataMap.get("znode"));
+ // Status listener should successfully completes without throwing exception
+ }
}
diff --git a/licenses.yaml b/licenses.yaml
index ad81dc4aa54..bb51741488b 100644
--- a/licenses.yaml
+++ b/licenses.yaml
@@ -1315,7 +1315,7 @@ name: Apache Curator
license_category: binary
module: java-core
license_name: Apache License version 2.0
-version: 4.1.0
+version: 4.3.0
libraries:
- org.apache.curator: curator-client
- org.apache.curator: curator-framework
diff --git a/pom.xml b/pom.xml
index 0305e8797d3..e26d58fd52d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -76,7 +76,7 @@
8
UTF-8
0.9.0.M2
- 4.1.0
+ 4.3.0
2.12.0
2.2.2
2.0.0
diff --git a/server/src/main/java/org/apache/druid/curator/discovery/DiscoveryModule.java b/server/src/main/java/org/apache/druid/curator/discovery/DiscoveryModule.java
index fd899640c67..78c56691967 100644
--- a/server/src/main/java/org/apache/druid/curator/discovery/DiscoveryModule.java
+++ b/server/src/main/java/org/apache/druid/curator/discovery/DiscoveryModule.java
@@ -482,6 +482,18 @@ public class DiscoveryModule implements Module
{
return this;
}
+
+ @Override
+ public ServiceProviderBuilder executorService(ExecutorService executorService)
+ {
+ return this;
+ }
+
+ @Override
+ public ServiceProviderBuilder executorService(CloseableExecutorService closeableExecutorService)
+ {
+ return this;
+ }
}
private static class NoopServiceProvider implements ServiceProvider