Fix infinite iteration in http sync monitoring (#13731)

* Fix infinite iteration in http task runner

* Fix infinite iteration in http server view

* Add tests
This commit is contained in:
AmatyaAvadhanula 2023-02-08 15:14:11 +05:30 committed by GitHub
parent d925ebdc9e
commit 34c04daa9f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 182 additions and 44 deletions

View File

@ -31,6 +31,7 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@ -574,7 +575,8 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
);
}
private void addWorker(final Worker worker)
@VisibleForTesting
void addWorker(final Worker worker)
{
synchronized (workers) {
log.info("Worker[%s] reportin' for duty!", worker.getHost());
@ -752,7 +754,28 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
log.debug("Running the Sync Monitoring.");
try {
for (Map.Entry<String, WorkerHolder> e : workers.entrySet()) {
syncMonitoring();
}
catch (Exception ex) {
if (ex instanceof InterruptedException) {
Thread.currentThread().interrupt();
} else {
log.makeAlert(ex, "Exception in sync monitoring.").emit();
}
}
},
1,
5,
TimeUnit.MINUTES
);
}
@VisibleForTesting
void syncMonitoring()
{
// Ensure that the collection is not being modified during iteration. Iterate over a copy
final Set<Map.Entry<String, WorkerHolder>> workerEntrySet = ImmutableSet.copyOf(workers.entrySet());
for (Map.Entry<String, WorkerHolder> e : workerEntrySet) {
WorkerHolder workerHolder = e.getValue();
if (!workerHolder.getUnderlyingSyncer().isOK()) {
synchronized (workers) {
@ -770,19 +793,6 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
}
}
}
catch (Exception ex) {
if (ex instanceof InterruptedException) {
Thread.currentThread().interrupt();
} else {
log.makeAlert(ex, "Exception in sync monitoring.").emit();
}
}
},
1,
5,
TimeUnit.MINUTES
);
}
/**
* This method returns the debugging information exposed by {@link HttpRemoteTaskRunnerResource} and meant

View File

@ -58,6 +58,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.ChangeRequestHttpSyncer;
import org.apache.druid.server.initialization.IndexerZkConfig;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
@ -70,6 +71,7 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -1668,6 +1670,52 @@ public class HttpRemoteTaskRunnerTest
}
@Test(timeout = 60_000L)
public void testSyncMonitoring_finiteIteration()
{
TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
.andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscoveryProvider);
HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
TestHelper.makeJsonMapper(),
new HttpRemoteTaskRunnerConfig(),
EasyMock.createNiceMock(HttpClient.class),
DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
new NoopProvisioningStrategy<>(),
druidNodeDiscoveryProvider,
EasyMock.createMock(TaskStorage.class),
EasyMock.createNiceMock(CuratorFramework.class),
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
new NoopServiceEmitter()
)
{
@Override
protected WorkerHolder createWorkerHolder(
ObjectMapper smileMapper,
HttpClient httpClient,
HttpRemoteTaskRunnerConfig config,
ScheduledExecutorService workersSyncExec,
WorkerHolder.Listener listener,
Worker worker,
List<TaskAnnouncement> knownAnnouncements
)
{
return createNonSyncingWorkerHolder(worker);
}
};
taskRunner.start();
taskRunner.addWorker(createWorker("abc"));
taskRunner.addWorker(createWorker("xyz"));
taskRunner.addWorker(createWorker("lol"));
Assert.assertEquals(3, taskRunner.getWorkerSyncerDebugInfo().size());
taskRunner.syncMonitoring();
Assert.assertEquals(3, taskRunner.getWorkerSyncerDebugInfo().size());
}
public static HttpRemoteTaskRunner createTaskRunnerForTestTaskAddedOrUpdated(
TaskStorage taskStorage,
List<Object> listenerNotificationsAccumulator
@ -1730,6 +1778,30 @@ public class HttpRemoteTaskRunnerTest
return taskRunner;
}
private Worker createWorker(String host)
{
Worker worker = EasyMock.createMock(Worker.class);
EasyMock.expect(worker.getHost()).andReturn(host).anyTimes();
EasyMock.replay(worker);
return worker;
}
private WorkerHolder createNonSyncingWorkerHolder(Worker worker)
{
ChangeRequestHttpSyncer syncer = EasyMock.createMock(ChangeRequestHttpSyncer.class);
EasyMock.expect(syncer.isOK()).andReturn(false).anyTimes();
EasyMock.expect(syncer.getDebugInfo()).andReturn(Collections.emptyMap()).anyTimes();
WorkerHolder workerHolder = EasyMock.createMock(WorkerHolder.class);
EasyMock.expect(workerHolder.getUnderlyingSyncer()).andReturn(syncer).anyTimes();
EasyMock.expect(workerHolder.getWorker()).andReturn(worker).anyTimes();
workerHolder.start();
EasyMock.expectLastCall();
workerHolder.stop();
EasyMock.expectLastCall();
EasyMock.replay(syncer, workerHolder);
return workerHolder;
}
private static WorkerHolder createWorkerHolder(
ObjectMapper smileMapper,
HttpClient httpClient,

View File

@ -21,11 +21,13 @@ package org.apache.druid.client;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.net.HostAndPort;
import org.apache.druid.concurrent.LifecycleLock;
@ -58,6 +60,7 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
@ -375,7 +378,8 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
);
}
private void serverAdded(DruidServer server)
@VisibleForTesting
void serverAdded(DruidServer server)
{
synchronized (servers) {
DruidServerHolder holder = servers.get(server.getName());
@ -430,7 +434,28 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
log.debug("Running the Sync Monitoring.");
try {
for (Map.Entry<String, DruidServerHolder> e : servers.entrySet()) {
syncMonitoring();
}
catch (Exception ex) {
if (ex instanceof InterruptedException) {
Thread.currentThread().interrupt();
} else {
log.makeAlert(ex, "Exception in sync monitoring.").emit();
}
}
},
1,
5,
TimeUnit.MINUTES
);
}
@VisibleForTesting
void syncMonitoring()
{
// Ensure that the collection is not being modified during iteration. Iterate over a copy
final Set<Map.Entry<String, DruidServerHolder>> serverEntrySet = ImmutableSet.copyOf(servers.entrySet());
for (Map.Entry<String, DruidServerHolder> e : serverEntrySet) {
DruidServerHolder serverHolder = e.getValue();
if (!serverHolder.syncer.isOK()) {
synchronized (servers) {
@ -456,19 +481,6 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
}
}
}
catch (Exception ex) {
if (ex instanceof InterruptedException) {
Thread.currentThread().interrupt();
} else {
log.makeAlert(ex, "Exception in sync monitoring.").emit();
}
}
},
1,
5,
TimeUnit.MINUTES
);
}
@Override
public boolean isStarted()

View File

@ -273,6 +273,50 @@ public class HttpServerInventoryViewTest
httpServerInventoryView.stop();
}
@Test(timeout = 60_000L)
public void testSyncMonitoring()
{
ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForService(DataNodeService.DISCOVERY_SERVICE_KEY))
.andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscoveryProvider);
TestHttpClient httpClient = new TestHttpClient(ImmutableList.of());
HttpServerInventoryView httpServerInventoryView = new HttpServerInventoryView(
jsonMapper,
httpClient,
druidNodeDiscoveryProvider,
(pair) -> !pair.rhs.getDataSource().equals("non-loading-datasource"),
new HttpServerInventoryViewConfig(null, null, null),
"test"
);
httpServerInventoryView.start();
httpServerInventoryView.serverAdded(makeServer("abc.com:8080"));
httpServerInventoryView.serverAdded(makeServer("xyz.com:8080"));
httpServerInventoryView.serverAdded(makeServer("lol.com:8080"));
Assert.assertEquals(3, httpServerInventoryView.getDebugInfo().size());
httpServerInventoryView.syncMonitoring();
Assert.assertEquals(3, httpServerInventoryView.getDebugInfo().size());
}
private DruidServer makeServer(String host)
{
return new DruidServer(
host,
host,
host,
100_000_000L,
ServerType.HISTORICAL,
"__default_tier",
50
);
}
private static class TestDruidNodeDiscovery implements DruidNodeDiscovery
{
Listener listener;