diff --git a/server/src/main/java/org/elasticsearch/action/support/ListenerTimeouts.java b/server/src/main/java/org/elasticsearch/action/support/ListenerTimeouts.java new file mode 100644 index 00000000000..df9afd32ca2 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/support/ListenerTimeouts.java @@ -0,0 +1,89 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support; + +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.threadpool.Scheduler; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.concurrent.atomic.AtomicBoolean; + +public class ListenerTimeouts { + + /** + * Wraps a listener with a listener that can timeout. After the timeout period the + * {@link ActionListener#onFailure(Exception)} will be called with a + * {@link ElasticsearchTimeoutException} if the listener has not already been completed. + * + * @param threadPool used to schedule the timeout + * @param listener to that can timeout + * @param timeout period before listener failed + * @param executor to use for scheduling timeout + * @param listenerName name of the listener for timeout exception + * @return the wrapped listener that will timeout + */ + public static ActionListener wrapWithTimeout(ThreadPool threadPool, ActionListener listener, + TimeValue timeout, String executor, String listenerName) { + TimeoutableListener wrappedListener = new TimeoutableListener<>(listener, timeout, listenerName); + wrappedListener.cancellable = threadPool.schedule(wrappedListener, timeout, executor); + return wrappedListener; + } + + private static class TimeoutableListener implements ActionListener, Runnable { + + private final AtomicBoolean isDone = new AtomicBoolean(false); + private final ActionListener delegate; + private final TimeValue timeout; + private final String listenerName; + private volatile Scheduler.ScheduledCancellable cancellable; + + private TimeoutableListener(ActionListener delegate, TimeValue timeout, String listenerName) { + this.delegate = delegate; + this.timeout = timeout; + this.listenerName = listenerName; + } + + @Override + public void onResponse(Response response) { + if (isDone.compareAndSet(false, true)) { + cancellable.cancel(); + delegate.onResponse(response); + } + } + + @Override + public void onFailure(Exception e) { + if (isDone.compareAndSet(false, true)) { + cancellable.cancel(); + delegate.onFailure(e); + } + } + + @Override + public void run() { + if (isDone.compareAndSet(false, true)) { + String timeoutMessage = "[" + listenerName + "]" + " timed out after [" + timeout + "]"; + delegate.onFailure(new ElasticsearchTimeoutException(timeoutMessage)); + } + } + } +} diff --git a/server/src/test/java/org/elasticsearch/action/support/ListenerTimeoutsTests.java b/server/src/test/java/org/elasticsearch/action/support/ListenerTimeoutsTests.java new file mode 100644 index 00000000000..d5e3f0031c7 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/support/ListenerTimeoutsTests.java @@ -0,0 +1,120 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support; + +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.coordination.DeterministicTaskQueue; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.Before; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.hamcrest.core.IsInstanceOf.instanceOf; + +public class ListenerTimeoutsTests extends ESTestCase { + + private final TimeValue timeout = TimeValue.timeValueMillis(10); + private final String generic = ThreadPool.Names.GENERIC; + private DeterministicTaskQueue taskQueue; + + @Before + public void setUp() throws Exception { + super.setUp(); + Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build(); + taskQueue = new DeterministicTaskQueue(settings, random()); + } + + public void testListenerTimeout() { + AtomicBoolean success = new AtomicBoolean(false); + AtomicReference exception = new AtomicReference<>(); + ActionListener listener = wrap(success, exception); + + ActionListener wrapped = ListenerTimeouts.wrapWithTimeout(taskQueue.getThreadPool(), listener, timeout, generic, "test"); + assertTrue(taskQueue.hasDeferredTasks()); + taskQueue.advanceTime(); + taskQueue.runAllRunnableTasks(); + + wrapped.onResponse(null); + wrapped.onFailure(new IOException("incorrect exception")); + + assertFalse(success.get()); + assertThat(exception.get(), instanceOf(ElasticsearchTimeoutException.class)); + } + + public void testFinishNormallyBeforeTimeout() { + AtomicBoolean success = new AtomicBoolean(false); + AtomicReference exception = new AtomicReference<>(); + ActionListener listener = wrap(success, exception); + + ActionListener wrapped = ListenerTimeouts.wrapWithTimeout(taskQueue.getThreadPool(), listener, timeout, generic, "test"); + wrapped.onResponse(null); + wrapped.onFailure(new IOException("boom")); + wrapped.onResponse(null); + + assertTrue(taskQueue.hasDeferredTasks()); + taskQueue.advanceTime(); + taskQueue.runAllRunnableTasks(); + + assertTrue(success.get()); + assertNull(exception.get()); + } + + public void testFinishExceptionallyBeforeTimeout() { + AtomicBoolean success = new AtomicBoolean(false); + AtomicReference exception = new AtomicReference<>(); + ActionListener listener = wrap(success, exception); + + ActionListener wrapped = ListenerTimeouts.wrapWithTimeout(taskQueue.getThreadPool(), listener, timeout, generic, "test"); + wrapped.onFailure(new IOException("boom")); + + assertTrue(taskQueue.hasDeferredTasks()); + taskQueue.advanceTime(); + taskQueue.runAllRunnableTasks(); + + assertFalse(success.get()); + assertThat(exception.get(), instanceOf(IOException.class)); + } + + private ActionListener wrap(AtomicBoolean success, AtomicReference exception) { + return new ActionListener() { + + private final AtomicBoolean completed = new AtomicBoolean(); + + @Override + public void onResponse(Void aVoid) { + assertTrue(completed.compareAndSet(false, true)); + assertTrue(success.compareAndSet(false, true)); + } + + @Override + public void onFailure(Exception e) { + assertTrue(completed.compareAndSet(false, true)); + assertTrue(exception.compareAndSet(null, e)); + } + }; + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 89f1af49f0e..88f4e974bea 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -16,6 +16,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.action.support.ListenerTimeouts; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -103,7 +104,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit private final ThreadPool threadPool; private final CounterMetric throttledTime = new CounterMetric(); - + public CcrRepository(RepositoryMetaData metadata, Client client, CcrLicenseChecker ccrLicenseChecker, Settings settings, CcrSettings ccrSettings, ThreadPool threadPool) { this.metadata = metadata; @@ -377,7 +378,8 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit protected void restoreFiles(List filesToRecover, Store store) throws IOException { logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover); - try (MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> {})) { + try (MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> { + })) { final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); final AtomicReference> error = new AtomicReference<>(); @@ -403,8 +405,9 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit logger.trace("[{}] [{}] fetching chunk for file [{}], expected offset: {}, size: {}", shardId, snapshotId, fileInfo.name(), offset, bytesRequested); - remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request, - ActionListener.wrap( + TimeValue timeout = ccrSettings.getRecoveryActionTimeout(); + ActionListener listener = + ListenerTimeouts.wrapWithTimeout(threadPool, ActionListener.wrap( r -> threadPool.generic().execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { @@ -428,7 +431,8 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e)); requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId); } - )); + ), timeout, ThreadPool.Names.GENERIC, GetCcrRestoreFileChunkAction.NAME); + remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request, listener); } catch (Exception e) { error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e)); requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java index d39262208e7..45adec46a21 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java @@ -37,6 +37,7 @@ import org.elasticsearch.snapshots.RestoreInfo; import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.TransportActionProxy; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.CcrIntegTestCase; import org.elasticsearch.xpack.ccr.action.repositories.GetCcrRestoreFileChunkAction; @@ -292,7 +293,6 @@ public class CcrRepositoryIT extends CcrIntegTestCase { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/38027") public void testIndividualActionsTimeout() throws Exception { ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); TimeValue timeValue = TimeValue.timeValueMillis(100); @@ -315,7 +315,8 @@ public class CcrRepositoryIT extends CcrIntegTestCase { MockTransportService mockTransportService = (MockTransportService) transportService; transportServices.add(mockTransportService); mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> { - if (action.equals(GetCcrRestoreFileChunkAction.NAME) == false) { + if (action.equals(GetCcrRestoreFileChunkAction.NAME) == false && + action.equals(TransportActionProxy.getProxyAction(GetCcrRestoreFileChunkAction.NAME)) == false) { connection.sendRequest(requestId, action, request, options); } }); @@ -337,33 +338,34 @@ public class CcrRepositoryIT extends CcrIntegTestCase { .renameReplacement(followerIndex).masterNodeTimeout(new TimeValue(1L, TimeUnit.HOURS)) .indexSettings(settingsBuilder); - final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class); - final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class); - PlainActionFuture future = PlainActionFuture.newFuture(); - restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future)); - - // Depending on when the timeout occurs this can fail in two ways. If it times-out when fetching - // metadata this will throw an exception. If it times-out when restoring a shard, the shard will - // be marked as failed. Either one is a success for the purpose of this test. try { - RestoreInfo restoreInfo = future.actionGet(); - assertThat(restoreInfo.failedShards(), greaterThan(0)); - assertThat(restoreInfo.successfulShards(), lessThan(restoreInfo.totalShards())); - assertEquals(numberOfPrimaryShards, restoreInfo.totalShards()); - } catch (Exception e) { - assertThat(ExceptionsHelper.unwrapCause(e), instanceOf(ElasticsearchTimeoutException.class)); + final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class); + final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class); + PlainActionFuture future = PlainActionFuture.newFuture(); + restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future)); + + // Depending on when the timeout occurs this can fail in two ways. If it times-out when fetching + // metadata this will throw an exception. If it times-out when restoring a shard, the shard will + // be marked as failed. Either one is a success for the purpose of this test. + try { + RestoreInfo restoreInfo = future.actionGet(); + assertThat(restoreInfo.failedShards(), greaterThan(0)); + assertThat(restoreInfo.successfulShards(), lessThan(restoreInfo.totalShards())); + assertEquals(numberOfPrimaryShards, restoreInfo.totalShards()); + } catch (Exception e) { + assertThat(ExceptionsHelper.unwrapCause(e), instanceOf(ElasticsearchTimeoutException.class)); + } + } finally { + for (MockTransportService transportService : transportServices) { + transportService.clearAllRules(); + } + + settingsRequest = new ClusterUpdateSettingsRequest(); + TimeValue defaultValue = CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getDefault(Settings.EMPTY); + settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getKey(), + defaultValue)); + assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet()); } - - - for (MockTransportService transportService : transportServices) { - transportService.clearAllRules(); - } - - settingsRequest = new ClusterUpdateSettingsRequest(); - TimeValue defaultValue = CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getDefault(Settings.EMPTY); - settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getKey(), - defaultValue)); - assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet()); } public void testFollowerMappingIsUpdated() throws IOException {