Revert "Use serializable exception in GCP listeners (#33657)"
This reverts commit 6dfe54c838
.
This commit is contained in:
parent
7e51b960fb
commit
e4eb631b8e
|
@ -21,7 +21,6 @@ package org.elasticsearch.index.shard;
|
||||||
|
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.FutureUtils;
|
import org.elasticsearch.common.util.concurrent.FutureUtils;
|
||||||
|
|
||||||
|
@ -34,6 +33,7 @@ import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
|
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
|
||||||
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
|
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
|
||||||
|
@ -53,8 +53,7 @@ public class GlobalCheckpointListeners implements Closeable {
|
||||||
* Callback when the global checkpoint is updated or the shard is closed. If the shard is closed, the value of the global checkpoint
|
* Callback when the global checkpoint is updated or the shard is closed. If the shard is closed, the value of the global checkpoint
|
||||||
* will be set to {@link org.elasticsearch.index.seqno.SequenceNumbers#UNASSIGNED_SEQ_NO} and the exception will be non-null and an
|
* will be set to {@link org.elasticsearch.index.seqno.SequenceNumbers#UNASSIGNED_SEQ_NO} and the exception will be non-null and an
|
||||||
* instance of {@link IndexShardClosedException }. If the listener timed out waiting for notification then the exception will be
|
* instance of {@link IndexShardClosedException }. If the listener timed out waiting for notification then the exception will be
|
||||||
* non-null and an instance of {@link ElasticsearchTimeoutException}. If the global checkpoint is updated, the exception will be
|
* non-null and an instance of {@link TimeoutException}. If the global checkpoint is updated, the exception will be null.
|
||||||
* null.
|
|
||||||
*
|
*
|
||||||
* @param globalCheckpoint the updated global checkpoint
|
* @param globalCheckpoint the updated global checkpoint
|
||||||
* @param e if non-null, the shard is closed or the listener timed out
|
* @param e if non-null, the shard is closed or the listener timed out
|
||||||
|
@ -97,8 +96,8 @@ public class GlobalCheckpointListeners implements Closeable {
|
||||||
* shard is closed then the listener will be asynchronously notified on the executor used to construct this collection of global
|
* shard is closed then the listener will be asynchronously notified on the executor used to construct this collection of global
|
||||||
* checkpoint listeners. The listener will only be notified of at most one event, either the global checkpoint is updated or the shard
|
* checkpoint listeners. The listener will only be notified of at most one event, either the global checkpoint is updated or the shard
|
||||||
* is closed. A listener must re-register after one of these events to receive subsequent events. Callers may add a timeout to be
|
* is closed. A listener must re-register after one of these events to receive subsequent events. Callers may add a timeout to be
|
||||||
* notified after if the timeout elapses. In this case, the listener will be notified with a {@link ElasticsearchTimeoutException}.
|
* notified after if the timeout elapses. In this case, the listener will be notified with a {@link TimeoutException}. Passing null for
|
||||||
* Passing null for the timeout means no timeout will be associated to the listener.
|
* the timeout means no timeout will be associated to the listener.
|
||||||
*
|
*
|
||||||
* @param currentGlobalCheckpoint the current global checkpoint known to the listener
|
* @param currentGlobalCheckpoint the current global checkpoint known to the listener
|
||||||
* @param listener the listener
|
* @param listener the listener
|
||||||
|
@ -141,7 +140,7 @@ public class GlobalCheckpointListeners implements Closeable {
|
||||||
removed = listeners != null && listeners.remove(listener) != null;
|
removed = listeners != null && listeners.remove(listener) != null;
|
||||||
}
|
}
|
||||||
if (removed) {
|
if (removed) {
|
||||||
final ElasticsearchTimeoutException e = new ElasticsearchTimeoutException(timeout.getStringRep());
|
final TimeoutException e = new TimeoutException(timeout.getStringRep());
|
||||||
logger.trace("global checkpoint listener timed out", e);
|
logger.trace("global checkpoint listener timed out", e);
|
||||||
executor.execute(() -> notifyListener(listener, UNASSIGNED_SEQ_NO, e));
|
executor.execute(() -> notifyListener(listener, UNASSIGNED_SEQ_NO, e));
|
||||||
}
|
}
|
||||||
|
@ -226,7 +225,7 @@ public class GlobalCheckpointListeners implements Closeable {
|
||||||
} else if (e instanceof IndexShardClosedException) {
|
} else if (e instanceof IndexShardClosedException) {
|
||||||
logger.warn("error notifying global checkpoint listener of closed shard", caught);
|
logger.warn("error notifying global checkpoint listener of closed shard", caught);
|
||||||
} else {
|
} else {
|
||||||
assert e instanceof ElasticsearchTimeoutException : e;
|
assert e instanceof TimeoutException : e;
|
||||||
logger.warn("error notifying global checkpoint listener of timeout", caught);
|
logger.warn("error notifying global checkpoint listener of timeout", caught);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,6 @@ package org.elasticsearch.index.shard;
|
||||||
|
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||||
|
@ -43,6 +42,7 @@ import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
@ -515,11 +515,10 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
|
||||||
try {
|
try {
|
||||||
notified.set(true);
|
notified.set(true);
|
||||||
assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
|
assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
|
||||||
assertThat(e, instanceOf(ElasticsearchTimeoutException.class));
|
assertThat(e, instanceOf(TimeoutException.class));
|
||||||
assertThat(e, hasToString(containsString(timeout.getStringRep())));
|
assertThat(e, hasToString(containsString(timeout.getStringRep())));
|
||||||
final ArgumentCaptor<String> message = ArgumentCaptor.forClass(String.class);
|
final ArgumentCaptor<String> message = ArgumentCaptor.forClass(String.class);
|
||||||
final ArgumentCaptor<ElasticsearchTimeoutException> t =
|
final ArgumentCaptor<TimeoutException> t = ArgumentCaptor.forClass(TimeoutException.class);
|
||||||
ArgumentCaptor.forClass(ElasticsearchTimeoutException.class);
|
|
||||||
verify(mockLogger).trace(message.capture(), t.capture());
|
verify(mockLogger).trace(message.capture(), t.capture());
|
||||||
assertThat(message.getValue(), equalTo("global checkpoint listener timed out"));
|
assertThat(message.getValue(), equalTo("global checkpoint listener timed out"));
|
||||||
assertThat(t.getValue(), hasToString(containsString(timeout.getStringRep())));
|
assertThat(t.getValue(), hasToString(containsString(timeout.getStringRep())));
|
||||||
|
@ -551,7 +550,7 @@ public class GlobalCheckpointListenersTests extends ESTestCase {
|
||||||
try {
|
try {
|
||||||
notified.set(true);
|
notified.set(true);
|
||||||
assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
|
assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
|
||||||
assertThat(e, instanceOf(ElasticsearchTimeoutException.class));
|
assertThat(e, instanceOf(TimeoutException.class));
|
||||||
} finally {
|
} finally {
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
package org.elasticsearch.index.shard;
|
package org.elasticsearch.index.shard;
|
||||||
|
|
||||||
import org.apache.lucene.store.LockObtainFailedException;
|
import org.apache.lucene.store.LockObtainFailedException;
|
||||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
@ -90,6 +89,7 @@ import java.util.Locale;
|
||||||
import java.util.concurrent.BrokenBarrierException;
|
import java.util.concurrent.BrokenBarrierException;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.CyclicBarrier;
|
import java.util.concurrent.CyclicBarrier;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
@ -798,7 +798,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {
|
||||||
notified.set(true);
|
notified.set(true);
|
||||||
assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
|
assertThat(g, equalTo(UNASSIGNED_SEQ_NO));
|
||||||
assertNotNull(e);
|
assertNotNull(e);
|
||||||
assertThat(e, instanceOf(ElasticsearchTimeoutException.class));
|
assertThat(e, instanceOf(TimeoutException.class));
|
||||||
assertThat(e.getMessage(), equalTo(timeout.getStringRep()));
|
assertThat(e.getMessage(), equalTo(timeout.getStringRep()));
|
||||||
} finally {
|
} finally {
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
|
|
Loading…
Reference in New Issue