diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java index db259de4111..17345f5c85b 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java @@ -115,6 +115,7 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -330,7 +331,8 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 0, emptyList(), null); simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 10, response); ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get()); - assertThat(e.getMessage(), equalTo("EsRejectedExecutionException[test]")); + assertThat(e.getCause(), instanceOf(EsRejectedExecutionException.class)); + assertThat(e.getCause(), hasToString(containsString("test"))); assertThat(client.scrollsCleared, contains(scrollId)); // When the task is rejected we don't increment the throttled timer diff --git a/server/src/main/java/org/elasticsearch/ElasticsearchException.java b/server/src/main/java/org/elasticsearch/ElasticsearchException.java index ed20f52754d..bfa37808402 100644 --- a/server/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/server/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -827,8 +827,7 @@ public class ElasticsearchException extends RuntimeException implements ToXConte org.elasticsearch.indices.IndexTemplateMissingException::new, 57, UNKNOWN_VERSION_ADDED), SEND_REQUEST_TRANSPORT_EXCEPTION(org.elasticsearch.transport.SendRequestTransportException.class, org.elasticsearch.transport.SendRequestTransportException::new, 58, UNKNOWN_VERSION_ADDED), - ES_REJECTED_EXECUTION_EXCEPTION(org.elasticsearch.common.util.concurrent.EsRejectedExecutionException.class, - org.elasticsearch.common.util.concurrent.EsRejectedExecutionException::new, 59, UNKNOWN_VERSION_ADDED), + // 59 used to be EsRejectedExecutionException // 60 used to be for EarlyTerminationException // 61 used to be for RoutingValidationException NOT_SERIALIZABLE_EXCEPTION_WRAPPER(org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper.class, diff --git a/server/src/main/java/org/elasticsearch/ExceptionsHelper.java b/server/src/main/java/org/elasticsearch/ExceptionsHelper.java index 05ac4d942b3..0427685b8ef 100644 --- a/server/src/main/java/org/elasticsearch/ExceptionsHelper.java +++ b/server/src/main/java/org/elasticsearch/ExceptionsHelper.java @@ -26,6 +26,7 @@ import org.apache.lucene.index.IndexFormatTooOldException; import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.Index; import org.elasticsearch.rest.RestStatus; @@ -67,6 +68,8 @@ public final class ExceptionsHelper { return ((ElasticsearchException) t).status(); } else if (t instanceof IllegalArgumentException) { return RestStatus.BAD_REQUEST; + } else if (t instanceof EsRejectedExecutionException) { + return RestStatus.TOO_MANY_REQUESTS; } } return RestStatus.INTERNAL_SERVER_ERROR; diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java b/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java index a5f01f74ed7..886a61b29c1 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java @@ -36,6 +36,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.common.text.Text; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -747,6 +748,13 @@ public abstract class StreamInput extends InputStream { switch (key) { case 0: final int ord = readVInt(); + // TODO: remove the if branch when master is bumped to 8.0.0 + assert Version.CURRENT.major < 8; + if (ord == 59) { + final ElasticsearchException ex = new ElasticsearchException(this); + final boolean isExecutorShutdown = readBoolean(); + return (T) new EsRejectedExecutionException(ex.getMessage(), isExecutorShutdown); + } return (T) ElasticsearchException.readException(this, ord); case 1: String msg1 = readOptionalString(); @@ -831,6 +839,9 @@ public abstract class StreamInput extends InputStream { return (T) readStackTrace(new InterruptedException(readOptionalString()), this); case 17: return (T) readStackTrace(new IOException(readOptionalString(), readException()), this); + case 18: + final boolean isExecutorShutdown = readBoolean(); + return (T) readStackTrace(new EsRejectedExecutionException(readOptionalString(), isExecutorShutdown), this); default: throw new IOException("no such exception for id: " + key); } diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java index 98a126e75e5..f52869c5e80 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java @@ -35,6 +35,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.common.io.stream.Writeable.Writer; import org.elasticsearch.common.text.Text; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.joda.time.DateTimeZone; import org.joda.time.ReadableInstant; @@ -852,8 +853,28 @@ public abstract class StreamOutput extends OutputStream { writeCause = false; } else if (throwable instanceof IOException) { writeVInt(17); + } else if (throwable instanceof EsRejectedExecutionException) { + // TODO: remove the if branch when master is bumped to 8.0.0 + assert Version.CURRENT.major < 8; + if (version.before(Version.V_7_0_0_alpha1)) { + /* + * This is a backwards compatibility layer when speaking to nodes that still treated EsRejectedExceutionException as an + * instance of ElasticsearchException. As such, we serialize this in a way that the receiving node would read this as an + * EsRejectedExecutionException. + */ + final ElasticsearchException ex = new ElasticsearchException(throwable.getMessage()); + writeVInt(0); + writeVInt(59); + ex.writeTo(this); + writeBoolean(((EsRejectedExecutionException) throwable).isExecutorShutdown()); + return; + } else { + writeVInt(18); + writeBoolean(((EsRejectedExecutionException) throwable).isExecutorShutdown()); + writeCause = false; + } } else { - ElasticsearchException ex; + final ElasticsearchException ex; if (throwable instanceof ElasticsearchException && ElasticsearchException.isRegistered(throwable.getClass(), version)) { ex = (ElasticsearchException) throwable; } else { @@ -863,7 +884,6 @@ public abstract class StreamOutput extends OutputStream { writeVInt(ElasticsearchException.getId(ex.getClass())); ex.writeTo(this); return; - } if (writeMessage) { writeOptionalString(throwable.getMessage()); diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsRejectedExecutionException.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsRejectedExecutionException.java index a38bbf452b7..7174058ab78 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsRejectedExecutionException.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsRejectedExecutionException.java @@ -19,14 +19,9 @@ package org.elasticsearch.common.util.concurrent; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.rest.RestStatus; +import java.util.concurrent.RejectedExecutionException; -import java.io.IOException; - -public class EsRejectedExecutionException extends ElasticsearchException { +public class EsRejectedExecutionException extends RejectedExecutionException { private final boolean isExecutorShutdown; @@ -43,22 +38,6 @@ public class EsRejectedExecutionException extends ElasticsearchException { this(null, false); } - @Override - public RestStatus status() { - return RestStatus.TOO_MANY_REQUESTS; - } - - public EsRejectedExecutionException(StreamInput in) throws IOException{ - super(in); - isExecutorShutdown = in.readBoolean(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeBoolean(isExecutorShutdown); - } - /** * Checks if the thread pool that rejected the execution was terminated * shortly after the rejection. Its possible that this returns false and the diff --git a/server/src/test/java/org/elasticsearch/ElasticsearchExceptionTests.java b/server/src/test/java/org/elasticsearch/ElasticsearchExceptionTests.java index 9bce92cb481..4c095efbbf8 100644 --- a/server/src/test/java/org/elasticsearch/ElasticsearchExceptionTests.java +++ b/server/src/test/java/org/elasticsearch/ElasticsearchExceptionTests.java @@ -35,6 +35,7 @@ import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -76,6 +77,7 @@ import static java.util.Collections.singletonList; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.CoreMatchers.hasItems; +import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.startsWith; diff --git a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java index b794ded7f8d..f9d90ffd7fb 100644 --- a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java @@ -728,7 +728,7 @@ public class ExceptionSerializationTests extends ESTestCase { ids.put(56, org.elasticsearch.common.settings.SettingsException.class); ids.put(57, org.elasticsearch.indices.IndexTemplateMissingException.class); ids.put(58, org.elasticsearch.transport.SendRequestTransportException.class); - ids.put(59, org.elasticsearch.common.util.concurrent.EsRejectedExecutionException.class); + ids.put(59, null); // weas EsRejectedExecutionException, which is no longer an instance of ElasticsearchException ids.put(60, null); // EarlyTerminationException was removed in 6.0 ids.put(61, null); // RoutingValidationException was removed in 5.0 ids.put(62, org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper.class); diff --git a/server/src/test/java/org/elasticsearch/ExceptionsHelperTests.java b/server/src/test/java/org/elasticsearch/ExceptionsHelperTests.java index 011f5b380ec..5a36b3b5e85 100644 --- a/server/src/test/java/org/elasticsearch/ExceptionsHelperTests.java +++ b/server/src/test/java/org/elasticsearch/ExceptionsHelperTests.java @@ -20,6 +20,8 @@ package org.elasticsearch; import org.apache.commons.codec.DecoderException; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; import java.util.Optional; @@ -84,4 +86,9 @@ public class ExceptionsHelperTests extends ESTestCase { assertThat(maybeError.get(), equalTo(error)); } + public void testStatus() { + assertThat(ExceptionsHelper.status(new IllegalArgumentException("illegal")), equalTo(RestStatus.BAD_REQUEST)); + assertThat(ExceptionsHelper.status(new EsRejectedExecutionException("rejected")), equalTo(RestStatus.TOO_MANY_REQUESTS)); + } + }