From 62afbca7b9f535e00c75afce30963a8c3a81f669 Mon Sep 17 00:00:00 2001 From: Himanshu Date: Thu, 19 Sep 2019 10:19:17 -0700 Subject: [PATCH] update HRTR to account for task known to be running on a worker when it shows up (#8427) --- .../overlord/hrtr/HttpRemoteTaskRunner.java | 38 +++++++++-- .../indexing/overlord/hrtr/WorkerHolder.java | 11 ++- .../hrtr/HttpRemoteTaskRunnerTest.java | 67 ++++++++++++------- .../overlord/hrtr/WorkerHolderTest.java | 28 ++++++-- 4 files changed, 107 insertions(+), 37 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index e96c606b7a9..0657168f46c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -83,6 +83,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; import java.net.URL; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -105,7 +106,7 @@ import java.util.stream.Collectors; /** * A Remote TaskRunner to manage tasks on Middle Manager nodes using internal-discovery({@link DruidNodeDiscoveryProvider}) * to discover them and Http. - * Middle Managers expose 3 HTTP endpoints + * Middle Managers manages list of assigned/completed tasks on disk and exposes 3 HTTP endpoints * 1. POST request for assigning a task * 2. POST request for shutting down a task * 3. GET request for getting list of assigned, running, completed tasks on Middle Manager and its enable/disable status. @@ -496,7 +497,35 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer WorkerHolder holder = workers.get(worker.getHost()); if (holder == null) { - holder = createWorkerHolder(smileMapper, httpClient, config, workersSyncExec, this::taskAddedOrUpdated, worker); + List expectedAnnouncements = new ArrayList<>(); + synchronized (statusLock) { + // It might be a worker that existed before, temporarily went away and came back. We might have a set of + // tasks that we think are running on this worker. Provide that information to WorkerHolder that + // manages the task syncing with that worker. + for (Map.Entry e : tasks.entrySet()) { + if (e.getValue().getState() == HttpRemoteTaskRunnerWorkItem.State.RUNNING) { + Worker w = e.getValue().getWorker(); + if (w != null && w.getHost().equals(worker.getHost())) { + expectedAnnouncements.add( + TaskAnnouncement.create( + e.getValue().getTask(), + TaskStatus.running(e.getKey()), + e.getValue().getLocation() + ) + ); + } + } + } + } + holder = createWorkerHolder( + smileMapper, + httpClient, + config, + workersSyncExec, + this::taskAddedOrUpdated, + worker, + expectedAnnouncements + ); holder.start(); workers.put(worker.getHost(), holder); } else { @@ -515,10 +544,11 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer HttpRemoteTaskRunnerConfig config, ScheduledExecutorService workersSyncExec, WorkerHolder.Listener listener, - Worker worker + Worker worker, + List knownAnnouncements ) { - return new WorkerHolder(smileMapper, httpClient, config, workersSyncExec, listener, worker); + return new WorkerHolder(smileMapper, httpClient, config, workersSyncExec, listener, worker, knownAnnouncements); } private void removeWorker(final Worker worker) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java index b3241204b32..ee8421b5c44 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java @@ -77,7 +77,7 @@ public class WorkerHolder protected final AtomicBoolean disabled; // Known list of tasks running/completed on this worker. - protected final AtomicReference> tasksSnapshotRef = new AtomicReference<>(new ConcurrentHashMap<>()); + protected final AtomicReference> tasksSnapshotRef; private final AtomicReference lastCompletedTaskTime = new AtomicReference<>(DateTimes.nowUtc()); private final AtomicReference blacklistedUntil = new AtomicReference<>(); @@ -97,7 +97,8 @@ public class WorkerHolder HttpRemoteTaskRunnerConfig config, ScheduledExecutorService workersSyncExec, Listener listener, - Worker worker + Worker worker, + List knownAnnouncements ) { this.smileMapper = smileMapper; @@ -119,6 +120,12 @@ public class WorkerHolder config.getServerUnstabilityTimeout().toStandardDuration().getMillis(), createSyncListener() ); + + ConcurrentMap announcements = new ConcurrentHashMap<>(); + if (knownAnnouncements != null) { + knownAnnouncements.forEach(e -> announcements.put(e.getTaskId(), e)); + } + tasksSnapshotRef = new AtomicReference<>(announcements); } public Worker getWorker() diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java index a94dd5e9475..4f73e49200f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java @@ -117,7 +117,8 @@ public class HttpRemoteTaskRunnerTest HttpRemoteTaskRunnerConfig config, ScheduledExecutorService workersSyncExec, WorkerHolder.Listener listener, - Worker worker + Worker worker, + List knownAnnouncements ) { return HttpRemoteTaskRunnerTest.createWorkerHolder( @@ -128,6 +129,7 @@ public class HttpRemoteTaskRunnerTest listener, worker, ImmutableList.of(), + ImmutableList.of(), ImmutableMap.of(), new AtomicInteger(), ImmutableSet.of() @@ -212,7 +214,8 @@ public class HttpRemoteTaskRunnerTest HttpRemoteTaskRunnerConfig config, ScheduledExecutorService workersSyncExec, WorkerHolder.Listener listener, - Worker worker + Worker worker, + List knownAnnouncements ) { return HttpRemoteTaskRunnerTest.createWorkerHolder( @@ -223,6 +226,7 @@ public class HttpRemoteTaskRunnerTest listener, worker, ImmutableList.of(), + ImmutableList.of(), ImmutableMap.of(task1, ImmutableList.of()), //no announcements would be received for task1 new AtomicInteger(), ImmutableSet.of() @@ -314,7 +318,8 @@ public class HttpRemoteTaskRunnerTest HttpRemoteTaskRunnerConfig config, ScheduledExecutorService workersSyncExec, WorkerHolder.Listener listener, - Worker worker + Worker worker, + List knownAnnouncements ) { if (workerHolders.containsKey(worker.getHost())) { @@ -324,7 +329,8 @@ public class HttpRemoteTaskRunnerTest config, workersSyncExec, listener, - worker + worker, + knownAnnouncements ); } else { throw new ISE("No WorkerHolder for [%s].", worker.getHost()); @@ -347,13 +353,14 @@ public class HttpRemoteTaskRunnerTest workerHolders.put( "host:1234", - (mapper, httpClient, config, exec, listener, worker) -> createWorkerHolder( + (mapper, httpClient, config, exec, listener, worker, knownAnnouncements) -> createWorkerHolder( mapper, httpClient, config, exec, listener, worker, + knownAnnouncements, ImmutableList.of( TaskAnnouncement.create( task1, @@ -453,7 +460,8 @@ public class HttpRemoteTaskRunnerTest HttpRemoteTaskRunnerConfig config, ScheduledExecutorService workersSyncExec, WorkerHolder.Listener listener, - Worker worker + Worker worker, + List knownAnnouncements ) { if (workerHolders.containsKey(worker.getHost())) { @@ -463,7 +471,8 @@ public class HttpRemoteTaskRunnerTest config, workersSyncExec, listener, - worker + worker, + knownAnnouncements ); } else { throw new ISE("No WorkerHolder for [%s].", worker.getHost()); @@ -486,13 +495,14 @@ public class HttpRemoteTaskRunnerTest workerHolders.put( "host:1234", - (mapper, httpClient, config, exec, listener, worker) -> createWorkerHolder( + (mapper, httpClient, config, exec, listener, worker, knownAnnouncements) -> createWorkerHolder( mapper, httpClient, config, exec, listener, worker, + knownAnnouncements, ImmutableList.of(), ImmutableMap.of( task1, ImmutableList.of( @@ -551,19 +561,15 @@ public class HttpRemoteTaskRunnerTest workerHolders.put( "host:1234", - (mapper, httpClient, config, exec, listener, worker) -> createWorkerHolder( + (mapper, httpClient, config, exec, listener, worker, knownAnnouncements) -> createWorkerHolder( mapper, httpClient, config, exec, listener, worker, + knownAnnouncements, ImmutableList.of( - TaskAnnouncement.create( - task1, - TaskStatus.success(task1.getId()), - TaskLocation.create("host", 1234, 1235) - ), TaskAnnouncement.create( task2, TaskStatus.running(task2.getId()), @@ -629,7 +635,8 @@ public class HttpRemoteTaskRunnerTest HttpRemoteTaskRunnerConfig config, ScheduledExecutorService workersSyncExec, WorkerHolder.Listener listener, - Worker worker + Worker worker, + List knownAnnouncements ) { if (workerHolders.containsKey(worker.getHost())) { @@ -639,7 +646,8 @@ public class HttpRemoteTaskRunnerTest config, workersSyncExec, listener, - worker + worker, + knownAnnouncements ); } else { throw new ISE("No WorkerHolder for [%s].", worker.getHost()); @@ -660,13 +668,14 @@ public class HttpRemoteTaskRunnerTest workerHolders.put( "host:1234", - (mapper, httpClient, config, exec, listener, worker) -> createWorkerHolder( + (mapper, httpClient, config, exec, listener, worker, knownAnnouncements) -> createWorkerHolder( mapper, httpClient, config, exec, listener, worker, + knownAnnouncements, ImmutableList.of(), ImmutableMap.of( task1, ImmutableList.of( @@ -726,13 +735,14 @@ public class HttpRemoteTaskRunnerTest workerHolders.put( "host:1234", - (mapper, httpClient, config, exec, listener, worker) -> createWorkerHolder( + (mapper, httpClient, config, exec, listener, worker, knownAnnouncements) -> createWorkerHolder( mapper, httpClient, config, exec, listener, worker, + knownAnnouncements, ImmutableList.of( TaskAnnouncement.create( task1, @@ -806,7 +816,8 @@ public class HttpRemoteTaskRunnerTest HttpRemoteTaskRunnerConfig config, ScheduledExecutorService workersSyncExec, WorkerHolder.Listener listener, - Worker worker + Worker worker, + List knownAnnouncements ) { if (workerHolders.containsKey(worker.getHost())) { @@ -816,7 +827,8 @@ public class HttpRemoteTaskRunnerTest config, workersSyncExec, listener, - worker + worker, + knownAnnouncements ); } else { throw new ISE("No WorkerHolder for [%s].", worker.getHost()); @@ -838,13 +850,14 @@ public class HttpRemoteTaskRunnerTest workerHolders.put( "host1:8080", - (mapper, httpClient, config, exec, listener, worker) -> createWorkerHolder( + (mapper, httpClient, config, exec, listener, worker, knownAnnouncements) -> createWorkerHolder( mapper, httpClient, config, exec, listener, worker, + knownAnnouncements, ImmutableList.of(), ImmutableMap.of( task1, ImmutableList.of( @@ -881,13 +894,14 @@ public class HttpRemoteTaskRunnerTest workerHolders.put( "host2:8080", - (mapper, httpClient, config, exec, listener, worker) -> createWorkerHolder( + (mapper, httpClient, config, exec, listener, worker, knownAnnouncements) -> createWorkerHolder( mapper, httpClient, config, exec, listener, worker, + knownAnnouncements, ImmutableList.of(), ImmutableMap.of(task2, ImmutableList.of()), ticks, @@ -911,13 +925,14 @@ public class HttpRemoteTaskRunnerTest workerHolders.put( "host3:8080", - (mapper, httpClient, config, exec, listener, worker) -> createWorkerHolder( + (mapper, httpClient, config, exec, listener, worker, knownAnnouncements) -> createWorkerHolder( mapper, httpClient, config, exec, listener, worker, + knownAnnouncements, ImmutableList.of(), ImmutableMap.of(), new AtomicInteger(), @@ -1267,6 +1282,7 @@ public class HttpRemoteTaskRunnerTest ScheduledExecutorService workersSyncExec, WorkerHolder.Listener listener, Worker worker, + List knownAnnouncements, // simulates task announcements received from worker on first sync call for the tasks that are already // running/completed on the worker. @@ -1284,7 +1300,7 @@ public class HttpRemoteTaskRunnerTest Set actualShutdowns ) { - return new WorkerHolder(smileMapper, httpClient, config, workersSyncExec, listener, worker) + return new WorkerHolder(smileMapper, httpClient, config, workersSyncExec, listener, worker, knownAnnouncements) { private final String workerHost; private final int workerPort; @@ -1428,7 +1444,8 @@ public class HttpRemoteTaskRunnerTest HttpRemoteTaskRunnerConfig config, ScheduledExecutorService workersSyncExec, WorkerHolder.Listener listener, - Worker worker + Worker worker, + List knownAnnouncements ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java index cc26b66859b..bc07e51f0cb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java @@ -48,23 +48,36 @@ public class WorkerHolderTest { List updates = new ArrayList<>(); + Task task0 = NoopTask.create("task0", 0); + Task task1 = NoopTask.create("task1", 0); + Task task2 = NoopTask.create("task2", 0); + Task task3 = NoopTask.create("task3", 0); + WorkerHolder workerHolder = new WorkerHolder( TestHelper.makeJsonMapper(), EasyMock.createNiceMock(HttpClient.class), new HttpRemoteTaskRunnerConfig(), EasyMock.createNiceMock(ScheduledExecutorService.class), (taskAnnouncement, holder) -> updates.add(taskAnnouncement), - new Worker("http", "localhost", "127.0.0.1", 5, "v0") + new Worker("http", "localhost", "127.0.0.1", 5, "v0"), + ImmutableList.of( + TaskAnnouncement.create( + task0, + TaskStatus.running(task0.getId()), + TaskLocation.unknown() + ), + TaskAnnouncement.create( + task1, + TaskStatus.running(task1.getId()), + TaskLocation.unknown() + ) + ) ); ChangeRequestHttpSyncer.Listener syncListener = workerHolder.createSyncListener(); Assert.assertTrue(workerHolder.disabled.get()); - Task task1 = NoopTask.create("task1", 0); - Task task2 = NoopTask.create("task2", 0); - Task task3 = NoopTask.create("task3", 0); - syncListener.fullSync( ImmutableList.of( new WorkerHistoryItem.Metadata(false), @@ -88,7 +101,7 @@ public class WorkerHolderTest Assert.assertFalse(workerHolder.disabled.get()); - Assert.assertEquals(3, updates.size()); + Assert.assertEquals(4, updates.size()); Assert.assertEquals(task1.getId(), updates.get(0).getTaskId()); Assert.assertTrue(updates.get(0).getTaskStatus().isSuccess()); @@ -99,6 +112,9 @@ public class WorkerHolderTest Assert.assertEquals(task3.getId(), updates.get(2).getTaskId()); Assert.assertTrue(updates.get(2).getTaskStatus().isRunnable()); + Assert.assertEquals(task0.getId(), updates.get(3).getTaskId()); + Assert.assertTrue(updates.get(3).getTaskStatus().isFailure()); + updates.clear(); syncListener.deltaSync(