Fix EsAbortPolicy to conform to API (#29075)

The rejected execution handler API says that rejectedExecution(Runnable,
ThreadPoolExecutor) throws a RejectedExecutionException if the task must
be rejected due to capacity on the executor. We do throw something that
smells like a RejectedExecutionException (it is named
EsRejectedExecutionException) yet we violate the API because
EsRejectedExecutionException is not a RejectedExecutionException. This
has caused problems before where we try to catch RejectedExecution when
invoking rejectedExecution but this causes EsRejectedExecutionException
to go uncaught. This commit addresses this by modifying
EsRejectedExecutionException to extend
RejectedExecutionException.
This commit is contained in:
Jason Tedor 2018-03-16 14:34:36 -04:00 committed by GitHub
parent 9b41917266
commit 6bf742dd1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 52 additions and 29 deletions

View File

@ -115,6 +115,7 @@ import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@ -330,7 +331,8 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 0, emptyList(), null);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 10, response);
ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get());
assertThat(e.getMessage(), equalTo("EsRejectedExecutionException[test]"));
assertThat(e.getCause(), instanceOf(EsRejectedExecutionException.class));
assertThat(e.getCause(), hasToString(containsString("test")));
assertThat(client.scrollsCleared, contains(scrollId));
// When the task is rejected we don't increment the throttled timer

View File

@ -827,8 +827,7 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
org.elasticsearch.indices.IndexTemplateMissingException::new, 57, UNKNOWN_VERSION_ADDED),
SEND_REQUEST_TRANSPORT_EXCEPTION(org.elasticsearch.transport.SendRequestTransportException.class,
org.elasticsearch.transport.SendRequestTransportException::new, 58, UNKNOWN_VERSION_ADDED),
ES_REJECTED_EXECUTION_EXCEPTION(org.elasticsearch.common.util.concurrent.EsRejectedExecutionException.class,
org.elasticsearch.common.util.concurrent.EsRejectedExecutionException::new, 59, UNKNOWN_VERSION_ADDED),
// 59 used to be EsRejectedExecutionException
// 60 used to be for EarlyTerminationException
// 61 used to be for RoutingValidationException
NOT_SERIALIZABLE_EXCEPTION_WRAPPER(org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper.class,

View File

@ -26,6 +26,7 @@ import org.apache.lucene.index.IndexFormatTooOldException;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.Index;
import org.elasticsearch.rest.RestStatus;
@ -67,6 +68,8 @@ public final class ExceptionsHelper {
return ((ElasticsearchException) t).status();
} else if (t instanceof IllegalArgumentException) {
return RestStatus.BAD_REQUEST;
} else if (t instanceof EsRejectedExecutionException) {
return RestStatus.TOO_MANY_REQUESTS;
}
}
return RestStatus.INTERNAL_SERVER_ERROR;

View File

@ -36,6 +36,7 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@ -747,6 +748,13 @@ public abstract class StreamInput extends InputStream {
switch (key) {
case 0:
final int ord = readVInt();
// TODO: remove the if branch when master is bumped to 8.0.0
assert Version.CURRENT.major < 8;
if (ord == 59) {
final ElasticsearchException ex = new ElasticsearchException(this);
final boolean isExecutorShutdown = readBoolean();
return (T) new EsRejectedExecutionException(ex.getMessage(), isExecutorShutdown);
}
return (T) ElasticsearchException.readException(this, ord);
case 1:
String msg1 = readOptionalString();
@ -831,6 +839,9 @@ public abstract class StreamInput extends InputStream {
return (T) readStackTrace(new InterruptedException(readOptionalString()), this);
case 17:
return (T) readStackTrace(new IOException(readOptionalString(), readException()), this);
case 18:
final boolean isExecutorShutdown = readBoolean();
return (T) readStackTrace(new EsRejectedExecutionException(readOptionalString(), isExecutorShutdown), this);
default:
throw new IOException("no such exception for id: " + key);
}

View File

@ -35,6 +35,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.io.stream.Writeable.Writer;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadableInstant;
@ -852,8 +853,28 @@ public abstract class StreamOutput extends OutputStream {
writeCause = false;
} else if (throwable instanceof IOException) {
writeVInt(17);
} else if (throwable instanceof EsRejectedExecutionException) {
// TODO: remove the if branch when master is bumped to 8.0.0
assert Version.CURRENT.major < 8;
if (version.before(Version.V_7_0_0_alpha1)) {
/*
* This is a backwards compatibility layer when speaking to nodes that still treated EsRejectedExceutionException as an
* instance of ElasticsearchException. As such, we serialize this in a way that the receiving node would read this as an
* EsRejectedExecutionException.
*/
final ElasticsearchException ex = new ElasticsearchException(throwable.getMessage());
writeVInt(0);
writeVInt(59);
ex.writeTo(this);
writeBoolean(((EsRejectedExecutionException) throwable).isExecutorShutdown());
return;
} else {
writeVInt(18);
writeBoolean(((EsRejectedExecutionException) throwable).isExecutorShutdown());
writeCause = false;
}
} else {
ElasticsearchException ex;
final ElasticsearchException ex;
if (throwable instanceof ElasticsearchException && ElasticsearchException.isRegistered(throwable.getClass(), version)) {
ex = (ElasticsearchException) throwable;
} else {
@ -863,7 +884,6 @@ public abstract class StreamOutput extends OutputStream {
writeVInt(ElasticsearchException.getId(ex.getClass()));
ex.writeTo(this);
return;
}
if (writeMessage) {
writeOptionalString(throwable.getMessage());

View File

@ -19,14 +19,9 @@
package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.rest.RestStatus;
import java.util.concurrent.RejectedExecutionException;
import java.io.IOException;
public class EsRejectedExecutionException extends ElasticsearchException {
public class EsRejectedExecutionException extends RejectedExecutionException {
private final boolean isExecutorShutdown;
@ -43,22 +38,6 @@ public class EsRejectedExecutionException extends ElasticsearchException {
this(null, false);
}
@Override
public RestStatus status() {
return RestStatus.TOO_MANY_REQUESTS;
}
public EsRejectedExecutionException(StreamInput in) throws IOException{
super(in);
isExecutorShutdown = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(isExecutorShutdown);
}
/**
* Checks if the thread pool that rejected the execution was terminated
* shortly after the rejection. Its possible that this returns false and the

View File

@ -35,6 +35,7 @@ import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -76,6 +77,7 @@ import static java.util.Collections.singletonList;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.startsWith;

View File

@ -728,7 +728,7 @@ public class ExceptionSerializationTests extends ESTestCase {
ids.put(56, org.elasticsearch.common.settings.SettingsException.class);
ids.put(57, org.elasticsearch.indices.IndexTemplateMissingException.class);
ids.put(58, org.elasticsearch.transport.SendRequestTransportException.class);
ids.put(59, org.elasticsearch.common.util.concurrent.EsRejectedExecutionException.class);
ids.put(59, null); // weas EsRejectedExecutionException, which is no longer an instance of ElasticsearchException
ids.put(60, null); // EarlyTerminationException was removed in 6.0
ids.put(61, null); // RoutingValidationException was removed in 5.0
ids.put(62, org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper.class);

View File

@ -20,6 +20,8 @@
package org.elasticsearch;
import org.apache.commons.codec.DecoderException;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import java.util.Optional;
@ -84,4 +86,9 @@ public class ExceptionsHelperTests extends ESTestCase {
assertThat(maybeError.get(), equalTo(error));
}
public void testStatus() {
assertThat(ExceptionsHelper.status(new IllegalArgumentException("illegal")), equalTo(RestStatus.BAD_REQUEST));
assertThat(ExceptionsHelper.status(new EsRejectedExecutionException("rejected")), equalTo(RestStatus.TOO_MANY_REQUESTS));
}
}