update HRTR to account for task known to be running on a worker when it shows up (#8427)

This commit is contained in:
Himanshu 2019-09-19 10:19:17 -07:00 committed by GitHub
parent e184d24a74
commit 62afbca7b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 107 additions and 37 deletions

View File

@ -83,6 +83,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@ -105,7 +106,7 @@ import java.util.stream.Collectors;
/**
* A Remote TaskRunner to manage tasks on Middle Manager nodes using internal-discovery({@link DruidNodeDiscoveryProvider})
* to discover them and Http.
* Middle Managers expose 3 HTTP endpoints
* Middle Managers manages list of assigned/completed tasks on disk and exposes 3 HTTP endpoints
* 1. POST request for assigning a task
* 2. POST request for shutting down a task
* 3. GET request for getting list of assigned, running, completed tasks on Middle Manager and its enable/disable status.
@ -496,7 +497,35 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
WorkerHolder holder = workers.get(worker.getHost());
if (holder == null) {
holder = createWorkerHolder(smileMapper, httpClient, config, workersSyncExec, this::taskAddedOrUpdated, worker);
List<TaskAnnouncement> expectedAnnouncements = new ArrayList<>();
synchronized (statusLock) {
// It might be a worker that existed before, temporarily went away and came back. We might have a set of
// tasks that we think are running on this worker. Provide that information to WorkerHolder that
// manages the task syncing with that worker.
for (Map.Entry<String, HttpRemoteTaskRunnerWorkItem> e : tasks.entrySet()) {
if (e.getValue().getState() == HttpRemoteTaskRunnerWorkItem.State.RUNNING) {
Worker w = e.getValue().getWorker();
if (w != null && w.getHost().equals(worker.getHost())) {
expectedAnnouncements.add(
TaskAnnouncement.create(
e.getValue().getTask(),
TaskStatus.running(e.getKey()),
e.getValue().getLocation()
)
);
}
}
}
}
holder = createWorkerHolder(
smileMapper,
httpClient,
config,
workersSyncExec,
this::taskAddedOrUpdated,
worker,
expectedAnnouncements
);
holder.start();
workers.put(worker.getHost(), holder);
} else {
@ -515,10 +544,11 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
HttpRemoteTaskRunnerConfig config,
ScheduledExecutorService workersSyncExec,
WorkerHolder.Listener listener,
Worker worker
Worker worker,
List<TaskAnnouncement> knownAnnouncements
)
{
return new WorkerHolder(smileMapper, httpClient, config, workersSyncExec, listener, worker);
return new WorkerHolder(smileMapper, httpClient, config, workersSyncExec, listener, worker, knownAnnouncements);
}
private void removeWorker(final Worker worker)

View File

@ -77,7 +77,7 @@ public class WorkerHolder
protected final AtomicBoolean disabled;
// Known list of tasks running/completed on this worker.
protected final AtomicReference<Map<String, TaskAnnouncement>> tasksSnapshotRef = new AtomicReference<>(new ConcurrentHashMap<>());
protected final AtomicReference<Map<String, TaskAnnouncement>> tasksSnapshotRef;
private final AtomicReference<DateTime> lastCompletedTaskTime = new AtomicReference<>(DateTimes.nowUtc());
private final AtomicReference<DateTime> blacklistedUntil = new AtomicReference<>();
@ -97,7 +97,8 @@ public class WorkerHolder
HttpRemoteTaskRunnerConfig config,
ScheduledExecutorService workersSyncExec,
Listener listener,
Worker worker
Worker worker,
List<TaskAnnouncement> knownAnnouncements
)
{
this.smileMapper = smileMapper;
@ -119,6 +120,12 @@ public class WorkerHolder
config.getServerUnstabilityTimeout().toStandardDuration().getMillis(),
createSyncListener()
);
ConcurrentMap<String, TaskAnnouncement> announcements = new ConcurrentHashMap<>();
if (knownAnnouncements != null) {
knownAnnouncements.forEach(e -> announcements.put(e.getTaskId(), e));
}
tasksSnapshotRef = new AtomicReference<>(announcements);
}
public Worker getWorker()

View File

@ -117,7 +117,8 @@ public class HttpRemoteTaskRunnerTest
HttpRemoteTaskRunnerConfig config,
ScheduledExecutorService workersSyncExec,
WorkerHolder.Listener listener,
Worker worker
Worker worker,
List<TaskAnnouncement> knownAnnouncements
)
{
return HttpRemoteTaskRunnerTest.createWorkerHolder(
@ -128,6 +129,7 @@ public class HttpRemoteTaskRunnerTest
listener,
worker,
ImmutableList.of(),
ImmutableList.of(),
ImmutableMap.of(),
new AtomicInteger(),
ImmutableSet.of()
@ -212,7 +214,8 @@ public class HttpRemoteTaskRunnerTest
HttpRemoteTaskRunnerConfig config,
ScheduledExecutorService workersSyncExec,
WorkerHolder.Listener listener,
Worker worker
Worker worker,
List<TaskAnnouncement> knownAnnouncements
)
{
return HttpRemoteTaskRunnerTest.createWorkerHolder(
@ -223,6 +226,7 @@ public class HttpRemoteTaskRunnerTest
listener,
worker,
ImmutableList.of(),
ImmutableList.of(),
ImmutableMap.of(task1, ImmutableList.of()), //no announcements would be received for task1
new AtomicInteger(),
ImmutableSet.of()
@ -314,7 +318,8 @@ public class HttpRemoteTaskRunnerTest
HttpRemoteTaskRunnerConfig config,
ScheduledExecutorService workersSyncExec,
WorkerHolder.Listener listener,
Worker worker
Worker worker,
List<TaskAnnouncement> knownAnnouncements
)
{
if (workerHolders.containsKey(worker.getHost())) {
@ -324,7 +329,8 @@ public class HttpRemoteTaskRunnerTest
config,
workersSyncExec,
listener,
worker
worker,
knownAnnouncements
);
} else {
throw new ISE("No WorkerHolder for [%s].", worker.getHost());
@ -347,13 +353,14 @@ public class HttpRemoteTaskRunnerTest
workerHolders.put(
"host:1234",
(mapper, httpClient, config, exec, listener, worker) -> createWorkerHolder(
(mapper, httpClient, config, exec, listener, worker, knownAnnouncements) -> createWorkerHolder(
mapper,
httpClient,
config,
exec,
listener,
worker,
knownAnnouncements,
ImmutableList.of(
TaskAnnouncement.create(
task1,
@ -453,7 +460,8 @@ public class HttpRemoteTaskRunnerTest
HttpRemoteTaskRunnerConfig config,
ScheduledExecutorService workersSyncExec,
WorkerHolder.Listener listener,
Worker worker
Worker worker,
List<TaskAnnouncement> knownAnnouncements
)
{
if (workerHolders.containsKey(worker.getHost())) {
@ -463,7 +471,8 @@ public class HttpRemoteTaskRunnerTest
config,
workersSyncExec,
listener,
worker
worker,
knownAnnouncements
);
} else {
throw new ISE("No WorkerHolder for [%s].", worker.getHost());
@ -486,13 +495,14 @@ public class HttpRemoteTaskRunnerTest
workerHolders.put(
"host:1234",
(mapper, httpClient, config, exec, listener, worker) -> createWorkerHolder(
(mapper, httpClient, config, exec, listener, worker, knownAnnouncements) -> createWorkerHolder(
mapper,
httpClient,
config,
exec,
listener,
worker,
knownAnnouncements,
ImmutableList.of(),
ImmutableMap.of(
task1, ImmutableList.of(
@ -551,19 +561,15 @@ public class HttpRemoteTaskRunnerTest
workerHolders.put(
"host:1234",
(mapper, httpClient, config, exec, listener, worker) -> createWorkerHolder(
(mapper, httpClient, config, exec, listener, worker, knownAnnouncements) -> createWorkerHolder(
mapper,
httpClient,
config,
exec,
listener,
worker,
knownAnnouncements,
ImmutableList.of(
TaskAnnouncement.create(
task1,
TaskStatus.success(task1.getId()),
TaskLocation.create("host", 1234, 1235)
),
TaskAnnouncement.create(
task2,
TaskStatus.running(task2.getId()),
@ -629,7 +635,8 @@ public class HttpRemoteTaskRunnerTest
HttpRemoteTaskRunnerConfig config,
ScheduledExecutorService workersSyncExec,
WorkerHolder.Listener listener,
Worker worker
Worker worker,
List<TaskAnnouncement> knownAnnouncements
)
{
if (workerHolders.containsKey(worker.getHost())) {
@ -639,7 +646,8 @@ public class HttpRemoteTaskRunnerTest
config,
workersSyncExec,
listener,
worker
worker,
knownAnnouncements
);
} else {
throw new ISE("No WorkerHolder for [%s].", worker.getHost());
@ -660,13 +668,14 @@ public class HttpRemoteTaskRunnerTest
workerHolders.put(
"host:1234",
(mapper, httpClient, config, exec, listener, worker) -> createWorkerHolder(
(mapper, httpClient, config, exec, listener, worker, knownAnnouncements) -> createWorkerHolder(
mapper,
httpClient,
config,
exec,
listener,
worker,
knownAnnouncements,
ImmutableList.of(),
ImmutableMap.of(
task1, ImmutableList.of(
@ -726,13 +735,14 @@ public class HttpRemoteTaskRunnerTest
workerHolders.put(
"host:1234",
(mapper, httpClient, config, exec, listener, worker) -> createWorkerHolder(
(mapper, httpClient, config, exec, listener, worker, knownAnnouncements) -> createWorkerHolder(
mapper,
httpClient,
config,
exec,
listener,
worker,
knownAnnouncements,
ImmutableList.of(
TaskAnnouncement.create(
task1,
@ -806,7 +816,8 @@ public class HttpRemoteTaskRunnerTest
HttpRemoteTaskRunnerConfig config,
ScheduledExecutorService workersSyncExec,
WorkerHolder.Listener listener,
Worker worker
Worker worker,
List<TaskAnnouncement> knownAnnouncements
)
{
if (workerHolders.containsKey(worker.getHost())) {
@ -816,7 +827,8 @@ public class HttpRemoteTaskRunnerTest
config,
workersSyncExec,
listener,
worker
worker,
knownAnnouncements
);
} else {
throw new ISE("No WorkerHolder for [%s].", worker.getHost());
@ -838,13 +850,14 @@ public class HttpRemoteTaskRunnerTest
workerHolders.put(
"host1:8080",
(mapper, httpClient, config, exec, listener, worker) -> createWorkerHolder(
(mapper, httpClient, config, exec, listener, worker, knownAnnouncements) -> createWorkerHolder(
mapper,
httpClient,
config,
exec,
listener,
worker,
knownAnnouncements,
ImmutableList.of(),
ImmutableMap.of(
task1, ImmutableList.of(
@ -881,13 +894,14 @@ public class HttpRemoteTaskRunnerTest
workerHolders.put(
"host2:8080",
(mapper, httpClient, config, exec, listener, worker) -> createWorkerHolder(
(mapper, httpClient, config, exec, listener, worker, knownAnnouncements) -> createWorkerHolder(
mapper,
httpClient,
config,
exec,
listener,
worker,
knownAnnouncements,
ImmutableList.of(),
ImmutableMap.of(task2, ImmutableList.of()),
ticks,
@ -911,13 +925,14 @@ public class HttpRemoteTaskRunnerTest
workerHolders.put(
"host3:8080",
(mapper, httpClient, config, exec, listener, worker) -> createWorkerHolder(
(mapper, httpClient, config, exec, listener, worker, knownAnnouncements) -> createWorkerHolder(
mapper,
httpClient,
config,
exec,
listener,
worker,
knownAnnouncements,
ImmutableList.of(),
ImmutableMap.of(),
new AtomicInteger(),
@ -1267,6 +1282,7 @@ public class HttpRemoteTaskRunnerTest
ScheduledExecutorService workersSyncExec,
WorkerHolder.Listener listener,
Worker worker,
List<TaskAnnouncement> knownAnnouncements,
// simulates task announcements received from worker on first sync call for the tasks that are already
// running/completed on the worker.
@ -1284,7 +1300,7 @@ public class HttpRemoteTaskRunnerTest
Set<String> actualShutdowns
)
{
return new WorkerHolder(smileMapper, httpClient, config, workersSyncExec, listener, worker)
return new WorkerHolder(smileMapper, httpClient, config, workersSyncExec, listener, worker, knownAnnouncements)
{
private final String workerHost;
private final int workerPort;
@ -1428,7 +1444,8 @@ public class HttpRemoteTaskRunnerTest
HttpRemoteTaskRunnerConfig config,
ScheduledExecutorService workersSyncExec,
WorkerHolder.Listener listener,
Worker worker
Worker worker,
List<TaskAnnouncement> knownAnnouncements
);
}
}

View File

@ -48,23 +48,36 @@ public class WorkerHolderTest
{
List<TaskAnnouncement> updates = new ArrayList<>();
Task task0 = NoopTask.create("task0", 0);
Task task1 = NoopTask.create("task1", 0);
Task task2 = NoopTask.create("task2", 0);
Task task3 = NoopTask.create("task3", 0);
WorkerHolder workerHolder = new WorkerHolder(
TestHelper.makeJsonMapper(),
EasyMock.createNiceMock(HttpClient.class),
new HttpRemoteTaskRunnerConfig(),
EasyMock.createNiceMock(ScheduledExecutorService.class),
(taskAnnouncement, holder) -> updates.add(taskAnnouncement),
new Worker("http", "localhost", "127.0.0.1", 5, "v0")
new Worker("http", "localhost", "127.0.0.1", 5, "v0"),
ImmutableList.of(
TaskAnnouncement.create(
task0,
TaskStatus.running(task0.getId()),
TaskLocation.unknown()
),
TaskAnnouncement.create(
task1,
TaskStatus.running(task1.getId()),
TaskLocation.unknown()
)
)
);
ChangeRequestHttpSyncer.Listener<WorkerHistoryItem> syncListener = workerHolder.createSyncListener();
Assert.assertTrue(workerHolder.disabled.get());
Task task1 = NoopTask.create("task1", 0);
Task task2 = NoopTask.create("task2", 0);
Task task3 = NoopTask.create("task3", 0);
syncListener.fullSync(
ImmutableList.of(
new WorkerHistoryItem.Metadata(false),
@ -88,7 +101,7 @@ public class WorkerHolderTest
Assert.assertFalse(workerHolder.disabled.get());
Assert.assertEquals(3, updates.size());
Assert.assertEquals(4, updates.size());
Assert.assertEquals(task1.getId(), updates.get(0).getTaskId());
Assert.assertTrue(updates.get(0).getTaskStatus().isSuccess());
@ -99,6 +112,9 @@ public class WorkerHolderTest
Assert.assertEquals(task3.getId(), updates.get(2).getTaskId());
Assert.assertTrue(updates.get(2).getTaskStatus().isRunnable());
Assert.assertEquals(task0.getId(), updates.get(3).getTaskId());
Assert.assertTrue(updates.get(3).getTaskStatus().isFailure());
updates.clear();
syncListener.deltaSync(