Add get file chunk timeouts with listener timeouts (#38758)
This commit adds a `ListenerTimeouts` class that will wrap a `ActionListener` in a listener with a timeout scheduled on the generic thread pool. If the timeout expires before the listener is completed, `onFailure` will be called with an `ElasticsearchTimeoutException`. Timeouts for the get ccr file chunk action are implemented using this functionality. Additionally, this commit attempts to fix #38027 by also blocking proxied get ccr file chunk actions. This test being un-muted is useful to verify the timeout functionality.
This commit is contained in:
parent
d80325f288
commit
b1c1daa63f
|
@ -0,0 +1,89 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.support;
|
||||
|
||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.threadpool.Scheduler;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public class ListenerTimeouts {
|
||||
|
||||
/**
|
||||
* Wraps a listener with a listener that can timeout. After the timeout period the
|
||||
* {@link ActionListener#onFailure(Exception)} will be called with a
|
||||
* {@link ElasticsearchTimeoutException} if the listener has not already been completed.
|
||||
*
|
||||
* @param threadPool used to schedule the timeout
|
||||
* @param listener to that can timeout
|
||||
* @param timeout period before listener failed
|
||||
* @param executor to use for scheduling timeout
|
||||
* @param listenerName name of the listener for timeout exception
|
||||
* @return the wrapped listener that will timeout
|
||||
*/
|
||||
public static <Response> ActionListener<Response> wrapWithTimeout(ThreadPool threadPool, ActionListener<Response> listener,
|
||||
TimeValue timeout, String executor, String listenerName) {
|
||||
TimeoutableListener<Response> wrappedListener = new TimeoutableListener<>(listener, timeout, listenerName);
|
||||
wrappedListener.cancellable = threadPool.schedule(wrappedListener, timeout, executor);
|
||||
return wrappedListener;
|
||||
}
|
||||
|
||||
private static class TimeoutableListener<Response> implements ActionListener<Response>, Runnable {
|
||||
|
||||
private final AtomicBoolean isDone = new AtomicBoolean(false);
|
||||
private final ActionListener<Response> delegate;
|
||||
private final TimeValue timeout;
|
||||
private final String listenerName;
|
||||
private volatile Scheduler.ScheduledCancellable cancellable;
|
||||
|
||||
private TimeoutableListener(ActionListener<Response> delegate, TimeValue timeout, String listenerName) {
|
||||
this.delegate = delegate;
|
||||
this.timeout = timeout;
|
||||
this.listenerName = listenerName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResponse(Response response) {
|
||||
if (isDone.compareAndSet(false, true)) {
|
||||
cancellable.cancel();
|
||||
delegate.onResponse(response);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
if (isDone.compareAndSet(false, true)) {
|
||||
cancellable.cancel();
|
||||
delegate.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (isDone.compareAndSet(false, true)) {
|
||||
String timeoutMessage = "[" + listenerName + "]" + " timed out after [" + timeout + "]";
|
||||
delegate.onFailure(new ElasticsearchTimeoutException(timeoutMessage));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,120 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.support;
|
||||
|
||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
|
||||
import static org.hamcrest.core.IsInstanceOf.instanceOf;
|
||||
|
||||
public class ListenerTimeoutsTests extends ESTestCase {
|
||||
|
||||
private final TimeValue timeout = TimeValue.timeValueMillis(10);
|
||||
private final String generic = ThreadPool.Names.GENERIC;
|
||||
private DeterministicTaskQueue taskQueue;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build();
|
||||
taskQueue = new DeterministicTaskQueue(settings, random());
|
||||
}
|
||||
|
||||
public void testListenerTimeout() {
|
||||
AtomicBoolean success = new AtomicBoolean(false);
|
||||
AtomicReference<Exception> exception = new AtomicReference<>();
|
||||
ActionListener<Void> listener = wrap(success, exception);
|
||||
|
||||
ActionListener<Void> wrapped = ListenerTimeouts.wrapWithTimeout(taskQueue.getThreadPool(), listener, timeout, generic, "test");
|
||||
assertTrue(taskQueue.hasDeferredTasks());
|
||||
taskQueue.advanceTime();
|
||||
taskQueue.runAllRunnableTasks();
|
||||
|
||||
wrapped.onResponse(null);
|
||||
wrapped.onFailure(new IOException("incorrect exception"));
|
||||
|
||||
assertFalse(success.get());
|
||||
assertThat(exception.get(), instanceOf(ElasticsearchTimeoutException.class));
|
||||
}
|
||||
|
||||
public void testFinishNormallyBeforeTimeout() {
|
||||
AtomicBoolean success = new AtomicBoolean(false);
|
||||
AtomicReference<Exception> exception = new AtomicReference<>();
|
||||
ActionListener<Void> listener = wrap(success, exception);
|
||||
|
||||
ActionListener<Void> wrapped = ListenerTimeouts.wrapWithTimeout(taskQueue.getThreadPool(), listener, timeout, generic, "test");
|
||||
wrapped.onResponse(null);
|
||||
wrapped.onFailure(new IOException("boom"));
|
||||
wrapped.onResponse(null);
|
||||
|
||||
assertTrue(taskQueue.hasDeferredTasks());
|
||||
taskQueue.advanceTime();
|
||||
taskQueue.runAllRunnableTasks();
|
||||
|
||||
assertTrue(success.get());
|
||||
assertNull(exception.get());
|
||||
}
|
||||
|
||||
public void testFinishExceptionallyBeforeTimeout() {
|
||||
AtomicBoolean success = new AtomicBoolean(false);
|
||||
AtomicReference<Exception> exception = new AtomicReference<>();
|
||||
ActionListener<Void> listener = wrap(success, exception);
|
||||
|
||||
ActionListener<Void> wrapped = ListenerTimeouts.wrapWithTimeout(taskQueue.getThreadPool(), listener, timeout, generic, "test");
|
||||
wrapped.onFailure(new IOException("boom"));
|
||||
|
||||
assertTrue(taskQueue.hasDeferredTasks());
|
||||
taskQueue.advanceTime();
|
||||
taskQueue.runAllRunnableTasks();
|
||||
|
||||
assertFalse(success.get());
|
||||
assertThat(exception.get(), instanceOf(IOException.class));
|
||||
}
|
||||
|
||||
private ActionListener<Void> wrap(AtomicBoolean success, AtomicReference<Exception> exception) {
|
||||
return new ActionListener<Void>() {
|
||||
|
||||
private final AtomicBoolean completed = new AtomicBoolean();
|
||||
|
||||
@Override
|
||||
public void onResponse(Void aVoid) {
|
||||
assertTrue(completed.compareAndSet(false, true));
|
||||
assertTrue(success.compareAndSet(false, true));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
assertTrue(completed.compareAndSet(false, true));
|
||||
assertTrue(exception.compareAndSet(null, e));
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
|
@ -16,6 +16,7 @@ import org.elasticsearch.action.ActionListener;
|
|||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
|
||||
import org.elasticsearch.action.support.ListenerTimeouts;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
|
@ -103,7 +104,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
|||
private final ThreadPool threadPool;
|
||||
|
||||
private final CounterMetric throttledTime = new CounterMetric();
|
||||
|
||||
|
||||
public CcrRepository(RepositoryMetaData metadata, Client client, CcrLicenseChecker ccrLicenseChecker, Settings settings,
|
||||
CcrSettings ccrSettings, ThreadPool threadPool) {
|
||||
this.metadata = metadata;
|
||||
|
@ -377,7 +378,8 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
|||
protected void restoreFiles(List<FileInfo> filesToRecover, Store store) throws IOException {
|
||||
logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover);
|
||||
|
||||
try (MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> {})) {
|
||||
try (MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> {
|
||||
})) {
|
||||
final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
|
||||
final AtomicReference<Tuple<StoreFileMetaData, Exception>> error = new AtomicReference<>();
|
||||
|
||||
|
@ -403,8 +405,9 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
|||
logger.trace("[{}] [{}] fetching chunk for file [{}], expected offset: {}, size: {}", shardId, snapshotId,
|
||||
fileInfo.name(), offset, bytesRequested);
|
||||
|
||||
remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request,
|
||||
ActionListener.wrap(
|
||||
TimeValue timeout = ccrSettings.getRecoveryActionTimeout();
|
||||
ActionListener<GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse> listener =
|
||||
ListenerTimeouts.wrapWithTimeout(threadPool, ActionListener.wrap(
|
||||
r -> threadPool.generic().execute(new AbstractRunnable() {
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
|
@ -428,7 +431,8 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
|||
error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e));
|
||||
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
|
||||
}
|
||||
));
|
||||
), timeout, ThreadPool.Names.GENERIC, GetCcrRestoreFileChunkAction.NAME);
|
||||
remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request, listener);
|
||||
} catch (Exception e) {
|
||||
error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e));
|
||||
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.elasticsearch.snapshots.RestoreInfo;
|
|||
import org.elasticsearch.snapshots.RestoreService;
|
||||
import org.elasticsearch.snapshots.Snapshot;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.transport.TransportActionProxy;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.CcrIntegTestCase;
|
||||
import org.elasticsearch.xpack.ccr.action.repositories.GetCcrRestoreFileChunkAction;
|
||||
|
@ -292,7 +293,6 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/38027")
|
||||
public void testIndividualActionsTimeout() throws Exception {
|
||||
ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
|
||||
TimeValue timeValue = TimeValue.timeValueMillis(100);
|
||||
|
@ -315,7 +315,8 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
|
|||
MockTransportService mockTransportService = (MockTransportService) transportService;
|
||||
transportServices.add(mockTransportService);
|
||||
mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
|
||||
if (action.equals(GetCcrRestoreFileChunkAction.NAME) == false) {
|
||||
if (action.equals(GetCcrRestoreFileChunkAction.NAME) == false &&
|
||||
action.equals(TransportActionProxy.getProxyAction(GetCcrRestoreFileChunkAction.NAME)) == false) {
|
||||
connection.sendRequest(requestId, action, request, options);
|
||||
}
|
||||
});
|
||||
|
@ -337,33 +338,34 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
|
|||
.renameReplacement(followerIndex).masterNodeTimeout(new TimeValue(1L, TimeUnit.HOURS))
|
||||
.indexSettings(settingsBuilder);
|
||||
|
||||
final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class);
|
||||
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);
|
||||
PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
|
||||
restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
|
||||
|
||||
// Depending on when the timeout occurs this can fail in two ways. If it times-out when fetching
|
||||
// metadata this will throw an exception. If it times-out when restoring a shard, the shard will
|
||||
// be marked as failed. Either one is a success for the purpose of this test.
|
||||
try {
|
||||
RestoreInfo restoreInfo = future.actionGet();
|
||||
assertThat(restoreInfo.failedShards(), greaterThan(0));
|
||||
assertThat(restoreInfo.successfulShards(), lessThan(restoreInfo.totalShards()));
|
||||
assertEquals(numberOfPrimaryShards, restoreInfo.totalShards());
|
||||
} catch (Exception e) {
|
||||
assertThat(ExceptionsHelper.unwrapCause(e), instanceOf(ElasticsearchTimeoutException.class));
|
||||
final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class);
|
||||
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);
|
||||
PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
|
||||
restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
|
||||
|
||||
// Depending on when the timeout occurs this can fail in two ways. If it times-out when fetching
|
||||
// metadata this will throw an exception. If it times-out when restoring a shard, the shard will
|
||||
// be marked as failed. Either one is a success for the purpose of this test.
|
||||
try {
|
||||
RestoreInfo restoreInfo = future.actionGet();
|
||||
assertThat(restoreInfo.failedShards(), greaterThan(0));
|
||||
assertThat(restoreInfo.successfulShards(), lessThan(restoreInfo.totalShards()));
|
||||
assertEquals(numberOfPrimaryShards, restoreInfo.totalShards());
|
||||
} catch (Exception e) {
|
||||
assertThat(ExceptionsHelper.unwrapCause(e), instanceOf(ElasticsearchTimeoutException.class));
|
||||
}
|
||||
} finally {
|
||||
for (MockTransportService transportService : transportServices) {
|
||||
transportService.clearAllRules();
|
||||
}
|
||||
|
||||
settingsRequest = new ClusterUpdateSettingsRequest();
|
||||
TimeValue defaultValue = CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getDefault(Settings.EMPTY);
|
||||
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getKey(),
|
||||
defaultValue));
|
||||
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
|
||||
}
|
||||
|
||||
|
||||
for (MockTransportService transportService : transportServices) {
|
||||
transportService.clearAllRules();
|
||||
}
|
||||
|
||||
settingsRequest = new ClusterUpdateSettingsRequest();
|
||||
TimeValue defaultValue = CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getDefault(Settings.EMPTY);
|
||||
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getKey(),
|
||||
defaultValue));
|
||||
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
|
||||
}
|
||||
|
||||
public void testFollowerMappingIsUpdated() throws IOException {
|
||||
|
|
Loading…
Reference in New Issue