From 6dfe54c8381e2b9046de836fb07fabaa03a02452 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 13 Sep 2018 06:35:36 -0400 Subject: [PATCH] Use serializable exception in GCP listeners (#33657) We used TimeoutException here but that's not serializable. This commit switches to a serializable exception so that we can test for the exception type on the remote side. --- .../index/shard/GlobalCheckpointListeners.java | 13 +++++++------ .../index/shard/GlobalCheckpointListenersTests.java | 9 +++++---- .../org/elasticsearch/index/shard/IndexShardIT.java | 4 ++-- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java index 224d5be17e1..e738ebac160 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java +++ b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java @@ -21,6 +21,7 @@ package org.elasticsearch.index.shard; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.FutureUtils; @@ -33,7 +34,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -53,7 +53,8 @@ public class GlobalCheckpointListeners implements Closeable { * Callback when the global checkpoint is updated or the shard is closed. If the shard is closed, the value of the global checkpoint * will be set to {@link org.elasticsearch.index.seqno.SequenceNumbers#UNASSIGNED_SEQ_NO} and the exception will be non-null and an * instance of {@link IndexShardClosedException }. If the listener timed out waiting for notification then the exception will be - * non-null and an instance of {@link TimeoutException}. If the global checkpoint is updated, the exception will be null. + * non-null and an instance of {@link ElasticsearchTimeoutException}. If the global checkpoint is updated, the exception will be + * null. * * @param globalCheckpoint the updated global checkpoint * @param e if non-null, the shard is closed or the listener timed out @@ -96,8 +97,8 @@ public class GlobalCheckpointListeners implements Closeable { * shard is closed then the listener will be asynchronously notified on the executor used to construct this collection of global * checkpoint listeners. The listener will only be notified of at most one event, either the global checkpoint is updated or the shard * is closed. A listener must re-register after one of these events to receive subsequent events. Callers may add a timeout to be - * notified after if the timeout elapses. In this case, the listener will be notified with a {@link TimeoutException}. Passing null for - * the timeout means no timeout will be associated to the listener. + * notified after if the timeout elapses. In this case, the listener will be notified with a {@link ElasticsearchTimeoutException}. + * Passing null for the timeout means no timeout will be associated to the listener. * * @param currentGlobalCheckpoint the current global checkpoint known to the listener * @param listener the listener @@ -140,7 +141,7 @@ public class GlobalCheckpointListeners implements Closeable { removed = listeners != null && listeners.remove(listener) != null; } if (removed) { - final TimeoutException e = new TimeoutException(timeout.getStringRep()); + final ElasticsearchTimeoutException e = new ElasticsearchTimeoutException(timeout.getStringRep()); logger.trace("global checkpoint listener timed out", e); executor.execute(() -> notifyListener(listener, UNASSIGNED_SEQ_NO, e)); } @@ -225,7 +226,7 @@ public class GlobalCheckpointListeners implements Closeable { } else if (e instanceof IndexShardClosedException) { logger.warn("error notifying global checkpoint listener of closed shard", caught); } else { - assert e instanceof TimeoutException : e; + assert e instanceof ElasticsearchTimeoutException : e; logger.warn("error notifying global checkpoint listener of timeout", caught); } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java index e5e2453682f..8a1070d56d5 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/GlobalCheckpointListenersTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.index.shard; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -42,7 +43,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -512,10 +512,11 @@ public class GlobalCheckpointListenersTests extends ESTestCase { try { notified.set(true); assertThat(g, equalTo(UNASSIGNED_SEQ_NO)); - assertThat(e, instanceOf(TimeoutException.class)); + assertThat(e, instanceOf(ElasticsearchTimeoutException.class)); assertThat(e, hasToString(containsString(timeout.getStringRep()))); final ArgumentCaptor message = ArgumentCaptor.forClass(String.class); - final ArgumentCaptor t = ArgumentCaptor.forClass(TimeoutException.class); + final ArgumentCaptor t = + ArgumentCaptor.forClass(ElasticsearchTimeoutException.class); verify(mockLogger).trace(message.capture(), t.capture()); assertThat(message.getValue(), equalTo("global checkpoint listener timed out")); assertThat(t.getValue(), hasToString(containsString(timeout.getStringRep()))); @@ -547,7 +548,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase { try { notified.set(true); assertThat(g, equalTo(UNASSIGNED_SEQ_NO)); - assertThat(e, instanceOf(TimeoutException.class)); + assertThat(e, instanceOf(ElasticsearchTimeoutException.class)); } finally { latch.countDown(); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 8fe1daefe6d..715860e6ffa 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.shard; import org.apache.lucene.store.LockObtainFailedException; +import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -89,7 +90,6 @@ import java.util.Locale; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -798,7 +798,7 @@ public class IndexShardIT extends ESSingleNodeTestCase { notified.set(true); assertThat(g, equalTo(UNASSIGNED_SEQ_NO)); assertNotNull(e); - assertThat(e, instanceOf(TimeoutException.class)); + assertThat(e, instanceOf(ElasticsearchTimeoutException.class)); assertThat(e.getMessage(), equalTo(timeout.getStringRep())); } finally { latch.countDown();