Avoid stack overflow in auto-follow coordinator (#44421)
This commit avoids a situation where we might stack overflow in the auto-follower coordinator. In the face of repeated failures to get the remote cluster state, we would previously be called back on the same thread and then recurse to try again. If this failure persists, the repeated callbacks on the same thread would lead to a stack overflow. The repeated failures can occur, for example, if the connect queue is full when we attempt to make a connection to the remote cluster. This commit avoids this by truncating the call stack if we are called back on the same thread as the initial request was made on.
This commit is contained in:
parent
c18a6402cd
commit
100cb89f3e
|
@ -181,16 +181,17 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
|
|||
CcrRestoreSourceService restoreSourceService = new CcrRestoreSourceService(threadPool, ccrSettings);
|
||||
this.restoreSourceService.set(restoreSourceService);
|
||||
return Arrays.asList(
|
||||
ccrLicenseChecker,
|
||||
restoreSourceService,
|
||||
new CcrRepositoryManager(settings, clusterService, client),
|
||||
new AutoFollowCoordinator(
|
||||
settings,
|
||||
client,
|
||||
clusterService,
|
||||
ccrLicenseChecker,
|
||||
restoreSourceService,
|
||||
new CcrRepositoryManager(settings, clusterService, client),
|
||||
new AutoFollowCoordinator(
|
||||
settings,
|
||||
client,
|
||||
clusterService,
|
||||
ccrLicenseChecker,
|
||||
threadPool::relativeTimeInMillis,
|
||||
threadPool::absoluteTimeInMillis));
|
||||
threadPool::relativeTimeInMillis,
|
||||
threadPool::absoluteTimeInMillis,
|
||||
threadPool.executor(Ccr.CCR_THREAD_POOL_NAME)));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -55,6 +55,7 @@ import java.util.Map;
|
|||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
@ -78,6 +79,7 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
|
|||
private final CcrLicenseChecker ccrLicenseChecker;
|
||||
private final LongSupplier relativeMillisTimeProvider;
|
||||
private final LongSupplier absoluteMillisTimeProvider;
|
||||
private final Executor executor;
|
||||
|
||||
private volatile TimeValue waitForMetadataTimeOut;
|
||||
private volatile Map<String, AutoFollower> autoFollowers = Collections.emptyMap();
|
||||
|
@ -89,18 +91,20 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
|
|||
private final LinkedHashMap<String, Tuple<Long, ElasticsearchException>> recentAutoFollowErrors;
|
||||
|
||||
public AutoFollowCoordinator(
|
||||
Settings settings,
|
||||
Client client,
|
||||
ClusterService clusterService,
|
||||
CcrLicenseChecker ccrLicenseChecker,
|
||||
LongSupplier relativeMillisTimeProvider,
|
||||
LongSupplier absoluteMillisTimeProvider) {
|
||||
final Settings settings,
|
||||
final Client client,
|
||||
final ClusterService clusterService,
|
||||
final CcrLicenseChecker ccrLicenseChecker,
|
||||
final LongSupplier relativeMillisTimeProvider,
|
||||
final LongSupplier absoluteMillisTimeProvider,
|
||||
final Executor executor) {
|
||||
|
||||
this.client = client;
|
||||
this.clusterService = clusterService;
|
||||
this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker, "ccrLicenseChecker");
|
||||
this.relativeMillisTimeProvider = relativeMillisTimeProvider;
|
||||
this.absoluteMillisTimeProvider = absoluteMillisTimeProvider;
|
||||
this.executor = Objects.requireNonNull(executor);
|
||||
this.recentAutoFollowErrors = new LinkedHashMap<String, Tuple<Long, ElasticsearchException>>() {
|
||||
@Override
|
||||
protected boolean removeEldestEntry(final Map.Entry<String, Tuple<Long, ElasticsearchException>> eldest) {
|
||||
|
@ -210,7 +214,7 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
|
|||
Map<String, AutoFollower> newAutoFollowers = new HashMap<>(newRemoteClusters.size());
|
||||
for (String remoteCluster : newRemoteClusters) {
|
||||
AutoFollower autoFollower =
|
||||
new AutoFollower(remoteCluster, this::updateStats, clusterService::state, relativeMillisTimeProvider) {
|
||||
new AutoFollower(remoteCluster, this::updateStats, clusterService::state, relativeMillisTimeProvider, executor) {
|
||||
|
||||
@Override
|
||||
void getRemoteClusterState(final String remoteCluster,
|
||||
|
@ -332,6 +336,7 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
|
|||
private final Consumer<List<AutoFollowResult>> statsUpdater;
|
||||
private final Supplier<ClusterState> followerClusterStateSupplier;
|
||||
private final LongSupplier relativeTimeProvider;
|
||||
private final Executor executor;
|
||||
|
||||
private volatile long lastAutoFollowTimeInMillis = -1;
|
||||
private volatile long metadataVersion = 0;
|
||||
|
@ -344,11 +349,13 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
|
|||
AutoFollower(final String remoteCluster,
|
||||
final Consumer<List<AutoFollowResult>> statsUpdater,
|
||||
final Supplier<ClusterState> followerClusterStateSupplier,
|
||||
LongSupplier relativeTimeProvider) {
|
||||
final LongSupplier relativeTimeProvider,
|
||||
final Executor executor) {
|
||||
this.remoteCluster = remoteCluster;
|
||||
this.statsUpdater = statsUpdater;
|
||||
this.followerClusterStateSupplier = followerClusterStateSupplier;
|
||||
this.relativeTimeProvider = relativeTimeProvider;
|
||||
this.executor = Objects.requireNonNull(executor);
|
||||
}
|
||||
|
||||
void start() {
|
||||
|
@ -387,6 +394,7 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
|
|||
this.autoFollowPatternsCountDown = new CountDown(patterns.size());
|
||||
this.autoFollowResults = new AtomicArray<>(patterns.size());
|
||||
|
||||
final Thread thread = Thread.currentThread();
|
||||
getRemoteClusterState(remoteCluster, metadataVersion + 1, (remoteClusterStateResponse, remoteError) -> {
|
||||
// Also check removed flag here, as it may take a while for this remote cluster state api call to return:
|
||||
if (removed) {
|
||||
|
@ -403,7 +411,7 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
|
|||
}
|
||||
ClusterState remoteClusterState = remoteClusterStateResponse.getState();
|
||||
metadataVersion = remoteClusterState.metaData().version();
|
||||
autoFollowIndices(autoFollowMetadata, clusterState, remoteClusterState, patterns);
|
||||
autoFollowIndices(autoFollowMetadata, clusterState, remoteClusterState, patterns, thread);
|
||||
} else {
|
||||
assert remoteError != null;
|
||||
if (remoteError instanceof NoSuchRemoteClusterException) {
|
||||
|
@ -414,7 +422,7 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
|
|||
|
||||
for (int i = 0; i < patterns.size(); i++) {
|
||||
String autoFollowPatternName = patterns.get(i);
|
||||
finalise(i, new AutoFollowResult(autoFollowPatternName, remoteError));
|
||||
finalise(i, new AutoFollowResult(autoFollowPatternName, remoteError), thread);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -428,7 +436,8 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
|
|||
private void autoFollowIndices(final AutoFollowMetadata autoFollowMetadata,
|
||||
final ClusterState clusterState,
|
||||
final ClusterState remoteClusterState,
|
||||
final List<String> patterns) {
|
||||
final List<String> patterns,
|
||||
final Thread thread) {
|
||||
int i = 0;
|
||||
for (String autoFollowPatternName : patterns) {
|
||||
final int slot = i;
|
||||
|
@ -439,7 +448,7 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
|
|||
final List<Index> leaderIndicesToFollow =
|
||||
getLeaderIndicesToFollow(autoFollowPattern, remoteClusterState, followedIndices);
|
||||
if (leaderIndicesToFollow.isEmpty()) {
|
||||
finalise(slot, new AutoFollowResult(autoFollowPatternName));
|
||||
finalise(slot, new AutoFollowResult(autoFollowPatternName), thread);
|
||||
} else {
|
||||
List<Tuple<String, AutoFollowPattern>> patternsForTheSameRemoteCluster = autoFollowMetadata.getPatterns()
|
||||
.entrySet().stream()
|
||||
|
@ -448,7 +457,7 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
|
|||
.map(item -> new Tuple<>(item.getKey(), item.getValue()))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
Consumer<AutoFollowResult> resultHandler = result -> finalise(slot, result);
|
||||
Consumer<AutoFollowResult> resultHandler = result -> finalise(slot, result, thread);
|
||||
checkAutoFollowPattern(autoFollowPatternName, remoteCluster, autoFollowPattern, leaderIndicesToFollow, headers,
|
||||
patternsForTheSameRemoteCluster, remoteClusterState.metaData(), clusterState.metaData(), resultHandler);
|
||||
}
|
||||
|
@ -561,11 +570,23 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
|
|||
createAndFollow(headers, request, successHandler, onResult);
|
||||
}
|
||||
|
||||
private void finalise(int slot, AutoFollowResult result) {
|
||||
private void finalise(int slot, AutoFollowResult result, final Thread thread) {
|
||||
assert autoFollowResults.get(slot) == null;
|
||||
autoFollowResults.set(slot, result);
|
||||
if (autoFollowPatternsCountDown.countDown()) {
|
||||
statsUpdater.accept(autoFollowResults.asList());
|
||||
/*
|
||||
* In the face of a failure, we could be called back on the same thread. That is, it could be that we
|
||||
* never fired off the asynchronous remote cluster state call, instead failing beforehand. In this case,
|
||||
* we will recurse on the same thread. If there are repeated failures, we could blow the stack and
|
||||
* overflow. A real-world scenario in which this can occur is if the local connect queue is full. To
|
||||
* avoid this, if we are called back on the same thread, then we truncate the stack by forking to
|
||||
* another thread.
|
||||
*/
|
||||
if (thread == Thread.currentThread()) {
|
||||
executor.execute(this::start);
|
||||
return;
|
||||
}
|
||||
start();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,9 @@ import org.elasticsearch.cluster.service.ClusterService;
|
|||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
@ -41,6 +44,9 @@ import java.util.HashMap;
|
|||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
|
@ -92,7 +98,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101"));
|
||||
assertThat(entries.get(0).getValue(), nullValue());
|
||||
};
|
||||
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(currentState), () -> 1L) {
|
||||
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(currentState), () -> 1L, Runnable::run) {
|
||||
@Override
|
||||
void getRemoteClusterState(String remoteCluster,
|
||||
long metadataVersion,
|
||||
|
@ -157,7 +163,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
assertThat(results.get(0).clusterStateFetchException, sameInstance(failure));
|
||||
assertThat(results.get(0).autoFollowExecutionResults.entrySet().size(), equalTo(0));
|
||||
};
|
||||
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(clusterState), () -> 1L) {
|
||||
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(clusterState), () -> 1L, Runnable::run) {
|
||||
@Override
|
||||
void getRemoteClusterState(String remoteCluster,
|
||||
long metadataVersion,
|
||||
|
@ -212,7 +218,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101"));
|
||||
assertThat(entries.get(0).getValue(), sameInstance(failure));
|
||||
};
|
||||
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(clusterState), () -> 1L) {
|
||||
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(clusterState), () -> 1L, Runnable::run) {
|
||||
@Override
|
||||
void getRemoteClusterState(String remoteCluster,
|
||||
long metadataVersion,
|
||||
|
@ -269,7 +275,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101"));
|
||||
assertThat(entries.get(0).getValue(), sameInstance(failure));
|
||||
};
|
||||
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(clusterState), () -> 1L) {
|
||||
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(clusterState), () -> 1L, Runnable::run) {
|
||||
@Override
|
||||
void getRemoteClusterState(String remoteCluster,
|
||||
long metadataVersion,
|
||||
|
@ -538,7 +544,9 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
null,
|
||||
mockClusterService(),
|
||||
new CcrLicenseChecker(() -> true, () -> false),
|
||||
() -> 1L, () -> 1L);
|
||||
() -> 1L,
|
||||
() -> 1L,
|
||||
Runnable::run);
|
||||
|
||||
autoFollowCoordinator.updateStats(Collections.singletonList(
|
||||
new AutoFollowCoordinator.AutoFollowResult("_alias1"))
|
||||
|
@ -603,7 +611,9 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
null,
|
||||
clusterService,
|
||||
new CcrLicenseChecker(() -> true, () -> false),
|
||||
() -> 1L, () -> 1L);
|
||||
() -> 1L,
|
||||
() -> 1L,
|
||||
Runnable::run);
|
||||
// Add 3 patterns:
|
||||
Map<String, AutoFollowPattern> patterns = new HashMap<>();
|
||||
patterns.put("pattern1", new AutoFollowPattern("remote1", Collections.singletonList("logs-*"), null, null, null,
|
||||
|
@ -671,7 +681,9 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
null,
|
||||
mockClusterService(),
|
||||
new CcrLicenseChecker(() -> true, () -> false),
|
||||
() -> 1L, () -> 1L);
|
||||
() -> 1L,
|
||||
() -> 1L,
|
||||
Runnable::run);
|
||||
ClusterState clusterState = ClusterState.builder(new ClusterName("remote"))
|
||||
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
|
||||
new AutoFollowMetadata(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap())))
|
||||
|
@ -686,7 +698,9 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
null,
|
||||
mockClusterService(),
|
||||
new CcrLicenseChecker(() -> true, () -> false),
|
||||
() -> 1L, () -> 1L);
|
||||
() -> 1L,
|
||||
() -> 1L,
|
||||
Runnable::run);
|
||||
ClusterState clusterState = ClusterState.builder(new ClusterName("remote")).build();
|
||||
autoFollowCoordinator.updateAutoFollowers(clusterState);
|
||||
assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().size(), equalTo(0));
|
||||
|
@ -719,7 +733,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
|
||||
List<AutoFollowCoordinator.AutoFollowResult> allResults = new ArrayList<>();
|
||||
Consumer<List<AutoFollowCoordinator.AutoFollowResult>> handler = allResults::addAll;
|
||||
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(states), () -> 1L) {
|
||||
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(states), () -> 1L, Runnable::run) {
|
||||
|
||||
long previousRequestedMetadataVersion = 0;
|
||||
|
||||
|
@ -777,7 +791,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
fail("should not be invoked");
|
||||
};
|
||||
AtomicInteger counter = new AtomicInteger();
|
||||
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(states), () -> 1L) {
|
||||
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(states), () -> 1L, Runnable::run) {
|
||||
|
||||
long previousRequestedMetadataVersion = 0;
|
||||
|
||||
|
@ -832,7 +846,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
|
||||
List<AutoFollowCoordinator.AutoFollowResult> results = new ArrayList<>();
|
||||
Consumer<List<AutoFollowCoordinator.AutoFollowResult>> handler = results::addAll;
|
||||
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(currentState), () -> 1L) {
|
||||
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(currentState), () -> 1L, Runnable::run) {
|
||||
@Override
|
||||
void getRemoteClusterState(String remoteCluster,
|
||||
long metadataVersion,
|
||||
|
@ -908,7 +922,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
Consumer<List<AutoFollowCoordinator.AutoFollowResult>> handler = results -> {
|
||||
resultHolder[0] = results;
|
||||
};
|
||||
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(currentState), () -> 1L) {
|
||||
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(currentState), () -> 1L, Runnable::run) {
|
||||
@Override
|
||||
void getRemoteClusterState(String remoteCluster,
|
||||
long metadataVersion,
|
||||
|
@ -953,6 +967,88 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
assertThat(entries.get(0).getValue(), nullValue());
|
||||
}
|
||||
|
||||
/*
|
||||
* This tests for a situation where in the face of repeated failures we would be called back on the same thread, and
|
||||
* then recurse through the start method again, and eventually stack overflow. Now when we are called back on the
|
||||
* same thread, we fork a new thread to avoid this. This test simulates a repeated failure to exercise this logic
|
||||
* and ensures that we do not stack overflow. If we did stack overflow, it would go as an uncaught exception and
|
||||
* fail the test. We have sufficiently high iterations here to ensure that we would indeed stack overflow were it
|
||||
* not for this logic.
|
||||
*/
|
||||
public void testRepeatedFailures() throws InterruptedException {
|
||||
final ClusterState clusterState = mock(ClusterState.class);
|
||||
final MetaData metaData = mock(MetaData.class);
|
||||
when(clusterState.metaData()).thenReturn(metaData);
|
||||
final AutoFollowPattern pattern = new AutoFollowPattern(
|
||||
"remote",
|
||||
Collections.singletonList("*"),
|
||||
"{}",
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
ByteSizeValue.ZERO,
|
||||
ByteSizeValue.ZERO,
|
||||
0,
|
||||
ByteSizeValue.ZERO,
|
||||
TimeValue.ZERO,
|
||||
TimeValue.ZERO);
|
||||
final AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(
|
||||
Collections.singletonMap("remote", pattern),
|
||||
Collections.emptyMap(),
|
||||
Collections.emptyMap());
|
||||
when(metaData.custom(AutoFollowMetadata.TYPE)).thenReturn(autoFollowMetadata);
|
||||
|
||||
final int iterations = randomIntBetween(16384, 32768); // sufficiently large to exercise that we do not stack overflow
|
||||
final AtomicInteger counter = new AtomicInteger();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||
try {
|
||||
final AutoFollower autoFollower = new AutoFollower("remote", x -> {}, () -> clusterState, () -> 1, executor) {
|
||||
|
||||
@Override
|
||||
void getRemoteClusterState(
|
||||
final String remoteCluster,
|
||||
final long metadataVersion,
|
||||
final BiConsumer<ClusterStateResponse, Exception> handler) {
|
||||
counter.incrementAndGet();
|
||||
if (counter.incrementAndGet() > iterations) {
|
||||
this.stop();
|
||||
latch.countDown();
|
||||
/*
|
||||
* Do not call back the handler here, when we unlatch the test thread it will shutdown the
|
||||
* executor which would lead to the execution of the callback facing a rejected execution
|
||||
* exception (from the executor being shutdown).
|
||||
*/
|
||||
return;
|
||||
}
|
||||
handler.accept(null, new EsRejectedExecutionException());
|
||||
}
|
||||
|
||||
@Override
|
||||
void createAndFollow(
|
||||
final Map<String, String> headers,
|
||||
final PutFollowAction.Request followRequest,
|
||||
final Runnable successHandler,
|
||||
final Consumer<Exception> failureHandler) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
void updateAutoFollowMetadata(
|
||||
final Function<ClusterState, ClusterState> updateFunction,
|
||||
final Consumer<Exception> handler) {
|
||||
|
||||
}
|
||||
|
||||
};
|
||||
autoFollower.start();
|
||||
latch.await();
|
||||
} finally {
|
||||
executor.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
private static ClusterState createRemoteClusterState(String indexName, Boolean enableSoftDeletes) {
|
||||
Settings.Builder indexSettings;
|
||||
if (enableSoftDeletes != null) {
|
||||
|
|
Loading…
Reference in New Issue