Do not catch throwable

Today throughout the codebase, catch throwable is used with reckless
abandon. This is dangerous because the throwable could be a fatal
virtual machine error resulting from an internal error in the JVM, or an
out of memory error or a stack overflow error that leaves the virtual
machine in an unstable and unpredictable state. This commit removes
catch throwable from the codebase and removes the temptation to use it
by modifying listener APIs to receive instances of Exception instead of
the top-level Throwable.

Relates #19231
This commit is contained in:
Jason Tedor 2016-07-04 08:41:06 -04:00 committed by GitHub
parent 86d2e88362
commit 3343ceeae4
355 changed files with 2146 additions and 2142 deletions

View File

@ -100,7 +100,7 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
}
public ElasticsearchException(StreamInput in) throws IOException {
super(in.readOptionalString(), in.readThrowable());
super(in.readOptionalString(), in.readException());
readStackTrace(this, in);
int numKeys = in.readVInt();
for (int i = 0; i < numKeys; i++) {
@ -162,7 +162,7 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
* Unwraps the actual cause from the exception for cases when the exception is a
* {@link ElasticsearchWrapperException}.
*
* @see org.elasticsearch.ExceptionsHelper#unwrapCause(Throwable)
* @see ExceptionsHelper#unwrapCause(Throwable)
*/
public Throwable unwrapCause() {
return ExceptionsHelper.unwrapCause(this);
@ -415,7 +415,7 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
int numSuppressed = in.readVInt();
for (int i = 0; i < numSuppressed; i++) {
throwable.addSuppressed(in.readThrowable());
throwable.addSuppressed(in.readException());
}
return throwable;
}
@ -794,9 +794,9 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
return null;
}
public static void renderThrowable(XContentBuilder builder, Params params, Throwable t) throws IOException {
public static void renderException(XContentBuilder builder, Params params, Exception e) throws IOException {
builder.startObject("error");
final ElasticsearchException[] rootCauses = ElasticsearchException.guessRootCauses(t);
final ElasticsearchException[] rootCauses = ElasticsearchException.guessRootCauses(e);
builder.field("root_cause");
builder.startArray();
for (ElasticsearchException rootCause : rootCauses) {
@ -806,7 +806,7 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
builder.endObject();
}
builder.endArray();
ElasticsearchException.toXContent(builder, params, t);
ElasticsearchException.toXContent(builder, params, e);
builder.endObject();
}

View File

@ -36,7 +36,7 @@ public class ElasticsearchSecurityException extends ElasticsearchException {
this.status = status ;
}
public ElasticsearchSecurityException(String msg, Throwable cause, Object... args) {
public ElasticsearchSecurityException(String msg, Exception cause, Object... args) {
this(msg, ExceptionsHelper.status(cause), cause, args);
}

View File

@ -37,25 +37,22 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
*
*/
public final class ExceptionsHelper {
private static final ESLogger logger = Loggers.getLogger(ExceptionsHelper.class);
public static RuntimeException convertToRuntime(Throwable t) {
if (t instanceof RuntimeException) {
return (RuntimeException) t;
public static RuntimeException convertToRuntime(Exception e) {
if (e instanceof RuntimeException) {
return (RuntimeException) e;
}
return new ElasticsearchException(t);
return new ElasticsearchException(e);
}
public static ElasticsearchException convertToElastic(Throwable t) {
if (t instanceof ElasticsearchException) {
return (ElasticsearchException) t;
public static ElasticsearchException convertToElastic(Exception e) {
if (e instanceof ElasticsearchException) {
return (ElasticsearchException) e;
}
return new ElasticsearchException(t);
return new ElasticsearchException(e);
}
public static RestStatus status(Throwable t) {
@ -209,7 +206,6 @@ public final class ExceptionsHelper {
return true;
}
/**
* Deduplicate the failures by exception message and index.
*/

View File

@ -32,5 +32,5 @@ public interface ActionListener<Response> {
/**
* A failure caused by an exception at some phase of the task.
*/
void onFailure(Throwable e);
void onFailure(Exception e);
}

View File

@ -22,7 +22,7 @@ package org.elasticsearch.action;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
/**
* Base class for {@link Runnable}s that need to call {@link ActionListener#onFailure(Throwable)} in case an uncaught
* Base class for {@link Runnable}s that need to call {@link ActionListener#onFailure(Exception)} in case an uncaught
* exception or error is thrown while the actual action is run.
*/
public abstract class ActionRunnable<Response> extends AbstractRunnable {
@ -34,11 +34,11 @@ public abstract class ActionRunnable<Response> extends AbstractRunnable {
}
/**
* Calls the action listeners {@link ActionListener#onFailure(Throwable)} method with the given exception.
* Calls the action listeners {@link ActionListener#onFailure(Exception)} method with the given exception.
* This method is invoked for all exception thrown by {@link #doRun()}
*/
@Override
public void onFailure(Throwable t) {
listener.onFailure(t);
public void onFailure(Exception e) {
listener.onFailure(e);
}
}

View File

@ -45,7 +45,7 @@ public class LatchedActionListener<T> implements ActionListener<T> {
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
try {
delegate.onFailure(e);
} finally {

View File

@ -43,15 +43,15 @@ public final class TaskOperationFailure implements Writeable, ToXContent {
private final long taskId;
private final Throwable reason;
private final Exception reason;
private final RestStatus status;
public TaskOperationFailure(String nodeId, long taskId, Throwable t) {
public TaskOperationFailure(String nodeId, long taskId, Exception e) {
this.nodeId = nodeId;
this.taskId = taskId;
this.reason = t;
status = ExceptionsHelper.status(t);
this.reason = e;
status = ExceptionsHelper.status(e);
}
/**
@ -60,7 +60,7 @@ public final class TaskOperationFailure implements Writeable, ToXContent {
public TaskOperationFailure(StreamInput in) throws IOException {
nodeId = in.readString();
taskId = in.readLong();
reason = in.readThrowable();
reason = in.readException();
status = RestStatus.readFrom(in);
}

View File

@ -145,7 +145,7 @@ public class TransportClusterAllocationExplainAction
// No copies of the data
storeCopy = ClusterAllocationExplanation.StoreCopy.NONE;
} else {
final Throwable storeErr = storeStatus.getStoreException();
final Exception storeErr = storeStatus.getStoreException();
if (storeErr != null) {
if (ExceptionsHelper.unwrapCause(storeErr) instanceof CorruptIndexException) {
storeCopy = ClusterAllocationExplanation.StoreCopy.CORRUPT;
@ -323,7 +323,7 @@ public class TransportClusterAllocationExplainAction
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
listener.onFailure(e);
}
});

View File

@ -104,9 +104,9 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
}
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
listener.onFailure(t);
public void onFailure(String source, Exception e) {
logger.error("unexpected failure during [{}]", e, source);
listener.onFailure(e);
}
@Override

View File

@ -154,8 +154,8 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
}
@Override
public void onFailure(Throwable t) {
listener.onFailure(t);
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
} else {
@ -179,7 +179,7 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
/*
* We couldn't load the task from the task index. Instead of 404 we should use the snapshot we took after it finished. If
* the error isn't a 404 then we'll just throw it back to the user.
@ -207,13 +207,13 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
public void onResponse(GetResponse getResponse) {
try {
onGetFinishedTaskFromIndex(getResponse, listener);
} catch (Throwable e) {
} catch (Exception e) {
listener.onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
if (ExceptionsHelper.unwrap(e, IndexNotFoundException.class) != null) {
// We haven't yet created the index for the task results so it can't be found.
listener.onFailure(new ResourceNotFoundException("task [{}] isn't running or persisted", e, request.getTaskId()));

View File

@ -76,7 +76,7 @@ public class TransportDeleteRepositoryAction extends TransportMasterNodeAction<D
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
listener.onFailure(e);
}
});

View File

@ -80,7 +80,7 @@ public class TransportPutRepositoryAction extends TransportMasterNodeAction<PutR
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
listener.onFailure(e);
}
});

View File

@ -78,7 +78,7 @@ public class TransportVerifyRepositoryAction extends TransportMasterNodeAction<V
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
listener.onFailure(e);
}
});

View File

@ -102,9 +102,9 @@ public class TransportClusterRerouteAction extends TransportMasterNodeAction<Clu
}
@Override
public void onFailure(String source, Throwable t) {
logger.debug("failed to perform [{}]", t, source);
super.onFailure(source, t);
public void onFailure(String source, Exception e) {
logger.debug("failed to perform [{}]", e, source);
super.onFailure(source, e);
}
@Override

View File

@ -93,11 +93,11 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeAct
}
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
public void onAllNodesAcked(@Nullable Exception e) {
if (changed) {
reroute(true);
} else {
super.onAllNodesAcked(t);
super.onAllNodesAcked(e);
}
}
@ -146,10 +146,10 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeAct
}
@Override
public void onFailure(String source, Throwable t) {
public void onFailure(String source, Exception e) {
//if the reroute fails we only log
logger.debug("failed to perform [{}]", t, source);
listener.onFailure(new ElasticsearchException("reroute after update settings failed", t));
logger.debug("failed to perform [{}]", e, source);
listener.onFailure(new ElasticsearchException("reroute after update settings failed", e));
}
@Override
@ -165,9 +165,9 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeAct
}
@Override
public void onFailure(String source, Throwable t) {
logger.debug("failed to perform [{}]", t, source);
super.onFailure(source, t);
public void onFailure(String source, Exception e) {
logger.debug("failed to perform [{}]", e, source);
super.onFailure(source, e);
}
@Override

View File

@ -94,10 +94,10 @@ public class TransportCreateSnapshotAction extends TransportMasterNodeAction<Cre
}
@Override
public void onSnapshotFailure(Snapshot snapshot, Throwable t) {
public void onSnapshotFailure(Snapshot snapshot, Exception e) {
if (snapshot.getRepository().equals(request.repository()) &&
snapshot.getSnapshotId().getName().equals(request.snapshot())) {
listener.onFailure(t);
listener.onFailure(e);
snapshotsService.removeListener(this);
}
}
@ -108,8 +108,8 @@ public class TransportCreateSnapshotAction extends TransportMasterNodeAction<Cre
}
@Override
public void onFailure(Throwable t) {
listener.onFailure(t);
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}

View File

@ -72,8 +72,8 @@ public class TransportDeleteSnapshotAction extends TransportMasterNodeAction<Del
}
@Override
public void onFailure(Throwable t) {
listener.onFailure(t);
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}

View File

@ -120,8 +120,8 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
snapshotInfoBuilder.addAll(snapshotsService.snapshots(repository, new ArrayList<>(toResolve), request.ignoreUnavailable()));
}
listener.onResponse(new GetSnapshotsResponse(snapshotInfoBuilder));
} catch (Throwable t) {
listener.onFailure(t);
} catch (Exception e) {
listener.onFailure(e);
}
}

View File

@ -94,7 +94,7 @@ public class TransportRestoreSnapshotAction extends TransportMasterNodeAction<Re
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
@ -104,7 +104,7 @@ public class TransportRestoreSnapshotAction extends TransportMasterNodeAction<Re
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
listener.onFailure(t);
}
});

View File

@ -125,13 +125,13 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
List<SnapshotsInProgress.Entry> currentSnapshots =
snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots()));
listener.onResponse(buildResponse(request, currentSnapshots, nodeSnapshotStatuses));
} catch (Throwable e) {
} catch (Exception e) {
listener.onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
listener.onFailure(e);
}
});

View File

@ -119,7 +119,7 @@ public class TransportIndicesAliasesAction extends TransportMasterNodeAction<Ind
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
logger.debug("failed to perform aliases", t);
listener.onFailure(t);
}

View File

@ -107,7 +107,7 @@ public class TransportCloseIndexAction extends TransportMasterNodeAction<CloseIn
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
logger.debug("failed to close indices [{}]", t, (Object)concreteIndices);
listener.onFailure(t);
}

View File

@ -87,7 +87,7 @@ public class TransportCreateIndexAction extends TransportMasterNodeAction<Create
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
if (t instanceof IndexAlreadyExistsException) {
logger.trace("[{}] failed to create", t, request.index());
} else {

View File

@ -99,7 +99,7 @@ public class TransportDeleteIndexAction extends TransportMasterNodeAction<Delete
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
logger.debug("failed to delete indices [{}]", t, concreteIndices);
listener.onFailure(t);
}

View File

@ -77,7 +77,7 @@ public class TransportGetFieldMappingsAction extends HandledTransportAction<GetF
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
int index = indexCounter.getAndIncrement();
indexResponses.set(index, e);
if (completionCounter.decrementAndGet() == 0) {

View File

@ -91,7 +91,7 @@ public class TransportPutMappingAction extends TransportMasterNodeAction<PutMapp
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
logger.debug("failed to put mappings on indices [{}], type [{}]", t, concreteIndices, request.type());
listener.onFailure(t);
}

View File

@ -92,7 +92,7 @@ public class TransportOpenIndexAction extends TransportMasterNodeAction<OpenInde
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
logger.debug("failed to open indices [{}]", t, (Object)concreteIndices);
listener.onFailure(t);
}

View File

@ -43,7 +43,6 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -132,14 +131,14 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
listener.onFailure(t);
}
});
@ -152,7 +151,7 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
listener.onFailure(e);
}
}

View File

@ -91,7 +91,7 @@ public class TransportUpdateSettingsAction extends TransportMasterNodeAction<Upd
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
logger.debug("failed to update settings on indices [{}]", t, (Object)concreteIndices);
listener.onFailure(t);
}

View File

@ -57,7 +57,7 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
private DiscoveryNode node;
private long legacyVersion;
private String allocationId;
private Throwable storeException;
private Exception storeException;
private AllocationStatus allocationStatus;
/**
@ -116,7 +116,7 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
private StoreStatus() {
}
public StoreStatus(DiscoveryNode node, long legacyVersion, String allocationId, AllocationStatus allocationStatus, Throwable storeException) {
public StoreStatus(DiscoveryNode node, long legacyVersion, String allocationId, AllocationStatus allocationStatus, Exception storeException) {
this.node = node;
this.legacyVersion = legacyVersion;
this.allocationId = allocationId;
@ -150,7 +150,7 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
* Exception while trying to open the
* shard index or from when the shard failed
*/
public Throwable getStoreException() {
public Exception getStoreException() {
return storeException;
}
@ -177,7 +177,7 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
allocationId = in.readOptionalString();
allocationStatus = AllocationStatus.readFrom(in);
if (in.readBoolean()) {
storeException = in.readThrowable();
storeException = in.readException();
}
}

View File

@ -100,7 +100,7 @@ public class TransportShrinkAction extends TransportMasterNodeAction<ShrinkReque
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
if (t instanceof IndexAlreadyExistsException) {
logger.trace("[{}] failed to create shrink index", t, updateRequest.index());
} else {
@ -112,7 +112,7 @@ public class TransportShrinkAction extends TransportMasterNodeAction<ShrinkReque
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
listener.onFailure(e);
}
});

View File

@ -72,9 +72,9 @@ public class TransportDeleteIndexTemplateAction extends TransportMasterNodeActio
}
@Override
public void onFailure(Throwable t) {
logger.debug("failed to delete templates [{}]", t, request.name());
listener.onFailure(t);
public void onFailure(Exception e) {
logger.debug("failed to delete templates [{}]", e, request.name());
listener.onFailure(e);
}
});
}

View File

@ -93,9 +93,9 @@ public class TransportPutIndexTemplateAction extends TransportMasterNodeAction<P
}
@Override
public void onFailure(Throwable t) {
logger.debug("failed to put template [{}]", t, request.name());
listener.onFailure(t);
public void onFailure(Exception e) {
logger.debug("failed to put template [{}]", e, request.name());
listener.onFailure(e);
}
});
}

View File

@ -190,13 +190,13 @@ public class TransportUpgradeAction extends TransportBroadcastByNodeAction<Upgra
} else {
updateSettings(upgradeResponse, listener);
}
} catch (Throwable t) {
listener.onFailure(t);
} catch (Exception e) {
listener.onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
listener.onFailure(e);
}
};
@ -212,7 +212,7 @@ public class TransportUpgradeAction extends TransportBroadcastByNodeAction<Upgra
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
listener.onFailure(e);
}
});

View File

@ -78,7 +78,7 @@ public class TransportUpgradeSettingsAction extends TransportMasterNodeAction<Up
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
logger.debug("failed to upgrade minimum compatibility version settings on indices [{}]", t, request.versions().keySet());
listener.onFailure(t);
}

View File

@ -91,12 +91,12 @@ public class BulkItemResponse implements Streamable, StatusToXContent {
private final Throwable cause;
private final RestStatus status;
public Failure(String index, String type, String id, Throwable t) {
public Failure(String index, String type, String id, Throwable cause) {
this.index = index;
this.type = type;
this.id = id;
this.cause = t;
this.status = ExceptionsHelper.status(t);
this.cause = cause;
this.status = ExceptionsHelper.status(cause);
}
/**
@ -106,7 +106,7 @@ public class BulkItemResponse implements Streamable, StatusToXContent {
index = in.readString();
type = in.readString();
id = in.readOptionalString();
cause = in.readThrowable();
cause = in.readException();
status = ExceptionsHelper.status(cause);
}

View File

@ -80,10 +80,10 @@ abstract class BulkRequestHandler {
if (!afterCalled) {
listener.afterBulk(executionId, bulkRequest, e);
}
} catch (Throwable t) {
logger.warn("Failed to execute bulk request {}.", t, executionId);
} catch (Exception e) {
logger.warn("Failed to execute bulk request {}.", e, executionId);
if (!afterCalled) {
listener.afterBulk(executionId, bulkRequest, t);
listener.afterBulk(executionId, bulkRequest, e);
}
}
}
@ -131,7 +131,7 @@ abstract class BulkRequestHandler {
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
try {
listener.afterBulk(executionId, bulkRequest, e);
} finally {
@ -144,9 +144,9 @@ abstract class BulkRequestHandler {
Thread.currentThread().interrupt();
logger.info("Bulk request {} has been cancelled.", e, executionId);
listener.afterBulk(executionId, bulkRequest, e);
} catch (Throwable t) {
logger.warn("Failed to execute bulk request {}.", t, executionId);
listener.afterBulk(executionId, bulkRequest, t);
} catch (Exception e) {
logger.warn("Failed to execute bulk request {}.", e, executionId);
listener.afterBulk(executionId, bulkRequest, e);
} finally {
if (!bulkRequestSetupSuccessful && acquired) { // if we fail on client.bulk() release the semaphore
semaphore.release();

View File

@ -130,7 +130,7 @@ public class Retry {
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
try {
listener.onFailure(e);
} finally {
@ -163,8 +163,8 @@ public class Retry {
}
for (BulkItemResponse bulkItemResponse : bulkItemResponses) {
if (bulkItemResponse.isFailed()) {
Throwable cause = bulkItemResponse.getFailure().getCause();
Throwable rootCause = ExceptionsHelper.unwrapCause(cause);
final Throwable cause = bulkItemResponse.getFailure().getCause();
final Throwable rootCause = ExceptionsHelper.unwrapCause(cause);
if (!rootCause.getClass().equals(retryOnThrowable)) {
return false;
}

View File

@ -150,14 +150,14 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
if (counter.decrementAndGet() == 0) {
try {
executeBulk(task, bulkRequest, startTime, listener, responses);
} catch (Throwable t) {
listener.onFailure(t);
} catch (Exception e) {
listener.onFailure(e);
}
}
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
if (!(ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException)) {
// fail all requests involving this index, if create didnt work
for (int i = 0; i < bulkRequest.requests.size(); i++) {
@ -170,8 +170,9 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
if (counter.decrementAndGet() == 0) {
try {
executeBulk(task, bulkRequest, startTime, listener, responses);
} catch (Throwable t) {
listener.onFailure(t);
} catch (Exception inner) {
inner.addSuppressed(e);
listener.onFailure(inner);
}
}
}
@ -195,7 +196,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
return autoCreateIndex.shouldAutoCreate(index, state);
}
private boolean setResponseFailureIfIndexMatches(AtomicArray<BulkItemResponse> responses, int idx, ActionRequest request, String index, Throwable e) {
private boolean setResponseFailureIfIndexMatches(AtomicArray<BulkItemResponse> responses, int idx, ActionRequest request, String index, Exception e) {
if (request instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) request;
if (index.equals(indexRequest.index())) {
@ -367,7 +368,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
// create failures for all relevant requests
for (BulkItemRequest request : requests) {
final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();

View File

@ -158,7 +158,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
// add the response
IndexResponse indexResponse = result.getResponse();
setResponse(item, new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), indexResponse));
} catch (Throwable e) {
} catch (Exception e) {
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
if (retryPrimaryException(e)) {
// restore updated versions...
@ -181,11 +181,11 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
return location;
}
private <ReplicationRequestT extends ReplicationRequest<ReplicationRequestT>> void logFailure(Throwable e, String operation, ShardId shardId, ReplicationRequest<ReplicationRequestT> request) {
if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) {
logger.trace("{} failed to execute bulk item ({}) {}", e, shardId, operation, request);
private <ReplicationRequestT extends ReplicationRequest<ReplicationRequestT>> void logFailure(Throwable t, String operation, ShardId shardId, ReplicationRequest<ReplicationRequestT> request) {
if (ExceptionsHelper.status(t) == RestStatus.CONFLICT) {
logger.trace("{} failed to execute bulk item ({}) {}", t, shardId, operation, request);
} else {
logger.debug("{} failed to execute bulk item ({}) {}", e, shardId, operation, request);
logger.debug("{} failed to execute bulk item ({}) {}", t, shardId, operation, request);
}
}
@ -200,7 +200,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
DeleteResponse deleteResponse = writeResult.getResponse();
location = locationToSync(location, writeResult.getLocation());
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_DELETE, deleteResponse));
} catch (Throwable e) {
} catch (Exception e) {
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
if (retryPrimaryException(e)) {
// restore updated versions...
@ -232,7 +232,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
UpdateResult updateResult;
try {
updateResult = shardUpdateOperation(metaData, request, updateRequest, indexShard);
} catch (Throwable t) {
} catch (Exception t) {
updateResult = new UpdateResult(null, null, false, t, null);
}
if (updateResult.success()) {
@ -275,43 +275,43 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
// NOTE: Breaking out of the retry_on_conflict loop!
break;
} else if (updateResult.failure()) {
Throwable t = updateResult.error;
Throwable e = updateResult.error;
if (updateResult.retry) {
// updateAttemptCount is 0 based and marks current attempt, if it's equal to retryOnConflict we are going out of the iteration
if (updateAttemptsCount >= updateRequest.retryOnConflict()) {
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE,
new BulkItemResponse.Failure(request.index(), updateRequest.type(), updateRequest.id(), t)));
new BulkItemResponse.Failure(request.index(), updateRequest.type(), updateRequest.id(), e)));
}
} else {
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
if (retryPrimaryException(t)) {
if (retryPrimaryException(e)) {
// restore updated versions...
for (int j = 0; j < requestIndex; j++) {
applyVersion(request.items()[j], preVersions[j], preVersionTypes[j]);
}
throw (ElasticsearchException) t;
throw (ElasticsearchException) e;
}
// if its a conflict failure, and we already executed the request on a primary (and we execute it
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
// then just use the response we got from the successful execution
if (item.getPrimaryResponse() != null && isConflictException(t)) {
if (item.getPrimaryResponse() != null && isConflictException(e)) {
setResponse(item, item.getPrimaryResponse());
} else if (updateResult.result == null) {
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, new BulkItemResponse.Failure(request.index(), updateRequest.type(), updateRequest.id(), t)));
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, new BulkItemResponse.Failure(request.index(), updateRequest.type(), updateRequest.id(), e)));
} else {
switch (updateResult.result.operation()) {
case UPSERT:
case INDEX:
IndexRequest indexRequest = updateResult.request();
logFailure(t, "index", request.shardId(), indexRequest);
logFailure(e, "index", request.shardId(), indexRequest);
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE,
new BulkItemResponse.Failure(request.index(), indexRequest.type(), indexRequest.id(), t)));
new BulkItemResponse.Failure(request.index(), indexRequest.type(), indexRequest.id(), e)));
break;
case DELETE:
DeleteRequest deleteRequest = updateResult.request();
logFailure(t, "delete", request.shardId(), deleteRequest);
logFailure(e, "delete", request.shardId(), deleteRequest);
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_DELETE,
new BulkItemResponse.Failure(request.index(), deleteRequest.type(), deleteRequest.id(), t)));
new BulkItemResponse.Failure(request.index(), deleteRequest.type(), deleteRequest.id(), e)));
break;
}
}
@ -335,7 +335,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
}
private WriteResult<IndexResponse> shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, IndexMetaData metaData,
IndexShard indexShard, boolean processed) throws Throwable {
IndexShard indexShard, boolean processed) throws Exception {
MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type());
if (!processed) {
@ -406,26 +406,26 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
try {
WriteResult result = shardIndexOperation(bulkShardRequest, indexRequest, metaData, indexShard, false);
return new UpdateResult(translate, indexRequest, result);
} catch (Throwable t) {
t = ExceptionsHelper.unwrapCause(t);
} catch (Exception e) {
final Throwable cause = ExceptionsHelper.unwrapCause(e);
boolean retry = false;
if (t instanceof VersionConflictEngineException) {
if (cause instanceof VersionConflictEngineException) {
retry = true;
}
return new UpdateResult(translate, indexRequest, retry, t, null);
return new UpdateResult(translate, indexRequest, retry, cause, null);
}
case DELETE:
DeleteRequest deleteRequest = translate.action();
try {
WriteResult<DeleteResponse> result = TransportDeleteAction.executeDeleteRequestOnPrimary(deleteRequest, indexShard);
return new UpdateResult(translate, deleteRequest, result);
} catch (Throwable t) {
t = ExceptionsHelper.unwrapCause(t);
} catch (Exception e) {
final Throwable cause = ExceptionsHelper.unwrapCause(e);
boolean retry = false;
if (t instanceof VersionConflictEngineException) {
if (cause instanceof VersionConflictEngineException) {
retry = true;
}
return new UpdateResult(translate, deleteRequest, retry, t, null);
return new UpdateResult(translate, deleteRequest, retry, cause, null);
}
case NONE:
UpdateResponse updateResponse = translate.action();
@ -449,7 +449,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
try {
Engine.Index operation = TransportIndexAction.executeIndexRequestOnReplica(indexRequest, indexShard);
location = locationToSync(location, operation.getTranslogLocation());
} catch (Throwable e) {
} catch (Exception e) {
// if its not an ignore replica failure, we need to make sure to bubble up the failure
// so we will fail the shard
if (!ignoreReplicaException(e)) {
@ -462,7 +462,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
Engine.Delete delete = TransportDeleteAction.executeDeleteRequestOnReplica(deleteRequest, indexShard);
indexShard.delete(delete);
location = locationToSync(location, delete.getTranslogLocation());
} catch (Throwable e) {
} catch (Exception e) {
// if its not an ignore replica failure, we need to make sure to bubble up the failure
// so we will fail the shard
if (!ignoreReplicaException(e)) {

View File

@ -77,7 +77,7 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
// we have the index, do it
innerExecute(task, request, listener);

View File

@ -40,17 +40,17 @@ public class MultiGetResponse extends ActionResponse implements Iterable<MultiGe
private String index;
private String type;
private String id;
private Throwable throwable;
private Exception exception;
Failure() {
}
public Failure(String index, String type, String id, Throwable throwable) {
public Failure(String index, String type, String id, Exception exception) {
this.index = index;
this.type = type;
this.id = id;
this.throwable = throwable;
this.exception = exception;
}
/**
@ -78,7 +78,7 @@ public class MultiGetResponse extends ActionResponse implements Iterable<MultiGe
* The failure message.
*/
public String getMessage() {
return throwable != null ? throwable.getMessage() : null;
return exception != null ? exception.getMessage() : null;
}
public static Failure readFailure(StreamInput in) throws IOException {
@ -92,7 +92,7 @@ public class MultiGetResponse extends ActionResponse implements Iterable<MultiGe
index = in.readString();
type = in.readOptionalString();
id = in.readString();
throwable = in.readThrowable();
exception = in.readException();
}
@Override
@ -100,11 +100,11 @@ public class MultiGetResponse extends ActionResponse implements Iterable<MultiGe
out.writeString(index);
out.writeOptionalString(type);
out.writeString(id);
out.writeThrowable(throwable);
out.writeThrowable(exception);
}
public Throwable getFailure() {
return throwable;
public Exception getFailure() {
return exception;
}
}
@ -136,7 +136,7 @@ public class MultiGetResponse extends ActionResponse implements Iterable<MultiGe
builder.field(Fields._INDEX, failure.getIndex());
builder.field(Fields._TYPE, failure.getType());
builder.field(Fields._ID, failure.getId());
ElasticsearchException.renderThrowable(builder, params, failure.getFailure());
ElasticsearchException.renderException(builder, params, failure.getFailure());
builder.endObject();
} else {
GetResponse getResponse = response.getResponse();

View File

@ -105,7 +105,7 @@ public class TransportMultiGetAction extends HandledTransportAction<MultiGetRequ
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
// create failures for all relevant requests
for (int i = 0; i < shardRequest.locations.size(); i++) {
MultiGetRequest.Item item = shardRequest.items.get(i);

View File

@ -88,12 +88,12 @@ public class TransportShardMultiGetAction extends TransportSingleShardAction<Mul
try {
GetResult getResult = indexShard.getService().get(item.type(), item.id(), item.fields(), request.realtime(), item.version(), item.versionType(), item.fetchSourceContext(), request.ignoreErrorsOnGeneratedFields());
response.add(request.locations.get(i), new GetResponse(getResult));
} catch (Throwable t) {
if (TransportActions.isShardNotAvailableException(t)) {
throw (ElasticsearchException) t;
} catch (Exception e) {
if (TransportActions.isShardNotAvailableException(e)) {
throw (ElasticsearchException) e;
} else {
logger.debug("{} failed to execute multi_get for [{}]/[{}]", t, shardId, item.type(), item.id());
response.add(request.locations.get(i), new MultiGetResponse.Failure(request.index(), item.type(), item.id(), t));
logger.debug("{} failed to execute multi_get for [{}]/[{}]", e, shardId, item.type(), item.id());
response.add(request.locations.get(i), new MultiGetResponse.Failure(request.index(), item.type(), item.id(), e));
}
}
}

View File

@ -101,13 +101,14 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
// we have the index, do it
try {
innerExecute(task, request, listener);
} catch (Throwable e1) {
listener.onFailure(e1);
} catch (Exception inner) {
inner.addSuppressed(e);
listener.onFailure(inner);
}
} else {
listener.onFailure(e);

View File

@ -104,13 +104,13 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
void processBulkIndexRequest(Task task, BulkRequest original, String action, ActionFilterChain chain, ActionListener<BulkResponse> listener) {
long ingestStartTimeInNanos = System.nanoTime();
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
executionService.executeBulkRequest(() -> bulkRequestModifier, (indexRequest, throwable) -> {
logger.debug("failed to execute pipeline [{}] for document [{}/{}/{}]", throwable, indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id());
bulkRequestModifier.markCurrentItemAsFailed(throwable);
}, (throwable) -> {
if (throwable != null) {
logger.error("failed to execute pipeline for a bulk request", throwable);
listener.onFailure(throwable);
executionService.executeBulkRequest(() -> bulkRequestModifier, (indexRequest, exception) -> {
logger.debug("failed to execute pipeline [{}] for document [{}/{}/{}]", exception, indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id());
bulkRequestModifier.markCurrentItemAsFailed(exception);
}, (exception) -> {
if (exception != null) {
logger.error("failed to execute pipeline for a bulk request", exception);
listener.onFailure(exception);
} else {
long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ingestStartTimeInNanos);
BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest();
@ -188,7 +188,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
actionListener.onFailure(e);
}
};
@ -197,7 +197,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
}
}
void markCurrentItemAsFailed(Throwable e) {
void markCurrentItemAsFailed(Exception e) {
IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(currentSlot);
// We hit a error during preprocessing a request, so we:
// 1) Remember the request item slot from the bulk, so that we're done processing all requests we know what failed
@ -233,7 +233,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
actionListener.onFailure(e);
}
}

View File

@ -90,7 +90,7 @@ public class PutPipelineTransportAction extends TransportMasterNodeAction<PutPip
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
listener.onFailure(e);
}
});

View File

@ -49,7 +49,7 @@ public final class SimulateDocumentBaseResult implements SimulateDocumentResult
public SimulateDocumentBaseResult(StreamInput in) throws IOException {
if (in.readBoolean()) {
ingestDocument = null;
failure = in.readThrowable();
failure = in.readException();
} else {
ingestDocument = new WriteableIngestDocument(in);
failure = null;
@ -84,7 +84,7 @@ public final class SimulateDocumentBaseResult implements SimulateDocumentResult
if (failure == null) {
ingestDocument.toXContent(builder, params);
} else {
ElasticsearchException.renderThrowable(builder, params, failure);
ElasticsearchException.renderException(builder, params, failure);
}
builder.endObject();
return builder;

View File

@ -52,7 +52,7 @@ public class SimulateProcessorResult implements Writeable, ToXContent {
public SimulateProcessorResult(StreamInput in) throws IOException {
this.processorTag = in.readString();
if (in.readBoolean()) {
this.failure = in.readThrowable();
this.failure = in.readException();
this.ingestDocument = null;
} else {
this.ingestDocument = new WriteableIngestDocument(in);
@ -96,7 +96,7 @@ public class SimulateProcessorResult implements Writeable, ToXContent {
if (failure == null) {
ingestDocument.toXContent(builder, params);
} else {
ElasticsearchException.renderThrowable(builder, params, failure);
ElasticsearchException.renderException(builder, params, failure);
}
builder.endObject();
return builder;

View File

@ -167,7 +167,7 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
onFirstPhaseResult(shardIndex, shard, node.getId(), shardIt, t);
}
});
@ -188,7 +188,7 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
if (xTotalOps == expectedTotalOps) {
try {
innerMoveToSecondPhase();
} catch (Throwable e) {
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("{}: Failed to execute [{}] while moving to second phase", e, shardIt.shardId(), request);
}
@ -201,33 +201,34 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
}
void onFirstPhaseResult(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId,
final ShardIterator shardIt, Throwable t) {
final ShardIterator shardIt, Exception e) {
// we always add the shard failure for a specific shard instance
// we do make sure to clean it on a successful response from a shard
SearchShardTarget shardTarget = new SearchShardTarget(nodeId, shardIt.shardId().getIndex(), shardIt.shardId().getId());
addShardFailure(shardIndex, shardTarget, t);
addShardFailure(shardIndex, shardTarget, e);
if (totalOps.incrementAndGet() == expectedTotalOps) {
if (logger.isDebugEnabled()) {
if (t != null && !TransportActions.isShardNotAvailableException(t)) {
logger.debug("{}: Failed to execute [{}]", t, shard != null ? shard.shortSummary() : shardIt.shardId(), request);
if (e != null && !TransportActions.isShardNotAvailableException(e)) {
logger.debug("{}: Failed to execute [{}]", e, shard != null ? shard.shortSummary() : shardIt.shardId(), request);
} else if (logger.isTraceEnabled()) {
logger.trace("{}: Failed to execute [{}]", t, shard, request);
logger.trace("{}: Failed to execute [{}]", e, shard, request);
}
}
final ShardSearchFailure[] shardSearchFailures = buildShardFailures();
if (successfulOps.get() == 0) {
if (logger.isDebugEnabled()) {
logger.debug("All shards failed for phase: [{}]", t, firstPhaseName());
logger.debug("All shards failed for phase: [{}]", e, firstPhaseName());
}
// no successful ops, raise an exception
raiseEarlyFailure(new SearchPhaseExecutionException(firstPhaseName(), "all shards failed", t, shardSearchFailures));
raiseEarlyFailure(new SearchPhaseExecutionException(firstPhaseName(), "all shards failed", e, shardSearchFailures));
} else {
try {
innerMoveToSecondPhase();
} catch (Throwable e) {
raiseEarlyFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e, shardSearchFailures));
} catch (Exception inner) {
inner.addSuppressed(e);
raiseEarlyFailure(new ReduceSearchPhaseException(firstPhaseName(), "", inner, shardSearchFailures));
}
}
} else {
@ -235,20 +236,21 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
final boolean lastShard = nextShard == null;
// trace log this exception
if (logger.isTraceEnabled()) {
logger.trace("{}: Failed to execute [{}] lastShard [{}]", t, shard != null ? shard.shortSummary() : shardIt.shardId(),
logger.trace("{}: Failed to execute [{}] lastShard [{}]", e, shard != null ? shard.shortSummary() : shardIt.shardId(),
request, lastShard);
}
if (!lastShard) {
try {
performFirstPhase(shardIndex, shardIt, nextShard);
} catch (Throwable t1) {
onFirstPhaseResult(shardIndex, shard, shard.currentNodeId(), shardIt, t1);
} catch (Exception inner) {
inner.addSuppressed(e);
onFirstPhaseResult(shardIndex, shard, shard.currentNodeId(), shardIt, inner);
}
} else {
// no more shards active, add a failure
if (logger.isDebugEnabled() && !logger.isTraceEnabled()) { // do not double log this exception
if (t != null && !TransportActions.isShardNotAvailableException(t)) {
logger.debug("{}: Failed to execute [{}] lastShard [{}]", t,
if (e != null && !TransportActions.isShardNotAvailableException(e)) {
logger.debug("{}: Failed to execute [{}] lastShard [{}]", e,
shard != null ? shard.shortSummary() : shardIt.shardId(), request, lastShard);
}
}
@ -269,9 +271,9 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
return failures;
}
protected final void addShardFailure(final int shardIndex, @Nullable SearchShardTarget shardTarget, Throwable t) {
protected final void addShardFailure(final int shardIndex, @Nullable SearchShardTarget shardTarget, Exception e) {
// we don't aggregate shard failures on non active shards (but do keep the header counts right)
if (TransportActions.isShardNotAvailableException(t)) {
if (TransportActions.isShardNotAvailableException(e)) {
return;
}
@ -285,26 +287,27 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
}
ShardSearchFailure failure = shardFailures.get(shardIndex);
if (failure == null) {
shardFailures.set(shardIndex, new ShardSearchFailure(t, shardTarget));
shardFailures.set(shardIndex, new ShardSearchFailure(e, shardTarget));
} else {
// the failure is already present, try and not override it with an exception that is less meaningless
// for example, getting illegal shard state
if (TransportActions.isReadOverrideException(t)) {
shardFailures.set(shardIndex, new ShardSearchFailure(t, shardTarget));
if (TransportActions.isReadOverrideException(e)) {
shardFailures.set(shardIndex, new ShardSearchFailure(e, shardTarget));
}
}
}
private void raiseEarlyFailure(Throwable t) {
private void raiseEarlyFailure(Exception e) {
for (AtomicArray.Entry<FirstResult> entry : firstResults.asList()) {
try {
DiscoveryNode node = nodes.get(entry.value.shardTarget().nodeId());
sendReleaseSearchContext(entry.value.id(), node);
} catch (Throwable t1) {
logger.trace("failed to release context", t1);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.trace("failed to release context", inner);
}
}
listener.onFailure(t);
listener.onFailure(e);
}
/**
@ -324,8 +327,8 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
try {
DiscoveryNode node = nodes.get(entry.value.queryResult().shardTarget().nodeId());
sendReleaseSearchContext(entry.value.queryResult().id(), node);
} catch (Throwable t1) {
logger.trace("failed to release context", t1);
} catch (Exception e) {
logger.trace("failed to release context", e);
}
}
}

View File

@ -29,7 +29,6 @@ import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.Arrays;
@ -45,22 +44,22 @@ public class MultiSearchResponse extends ActionResponse implements Iterable<Mult
*/
public static class Item implements Streamable {
private SearchResponse response;
private Throwable throwable;
private Exception exception;
Item() {
}
public Item(SearchResponse response, Throwable throwable) {
public Item(SearchResponse response, Exception exception) {
this.response = response;
this.throwable = throwable;
this.exception = exception;
}
/**
* Is it a failed search?
*/
public boolean isFailure() {
return throwable != null;
return exception != null;
}
/**
@ -68,7 +67,7 @@ public class MultiSearchResponse extends ActionResponse implements Iterable<Mult
*/
@Nullable
public String getFailureMessage() {
return throwable == null ? null : throwable.getMessage();
return exception == null ? null : exception.getMessage();
}
/**
@ -91,7 +90,7 @@ public class MultiSearchResponse extends ActionResponse implements Iterable<Mult
this.response = new SearchResponse();
response.readFrom(in);
} else {
throwable = in.readThrowable();
exception = in.readException();
}
}
@ -102,12 +101,12 @@ public class MultiSearchResponse extends ActionResponse implements Iterable<Mult
response.writeTo(out);
} else {
out.writeBoolean(false);
out.writeThrowable(throwable);
out.writeThrowable(exception);
}
}
public Throwable getFailure() {
return throwable;
public Exception getFailure() {
return exception;
}
}
@ -156,7 +155,7 @@ public class MultiSearchResponse extends ActionResponse implements Iterable<Mult
for (Item item : items) {
builder.startObject();
if (item.isFailure()) {
ElasticsearchException.renderThrowable(builder, params, item.getFailure());
ElasticsearchException.renderException(builder, params, item.getFailure());
builder.field(Fields.STATUS, ExceptionsHelper.status(item.getFailure()).getStatus());
} else {
item.getResponse().toXContent(builder, params);

View File

@ -89,7 +89,7 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<DfsSea
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
try {
onSecondPhaseFailure(t, querySearchRequest, shardIndex, dfsResult, counter);
} finally {
@ -102,12 +102,12 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<DfsSea
});
}
void onSecondPhaseFailure(Throwable t, QuerySearchRequest querySearchRequest, int shardIndex, DfsSearchResult dfsResult,
void onSecondPhaseFailure(Exception e, QuerySearchRequest querySearchRequest, int shardIndex, DfsSearchResult dfsResult,
AtomicInteger counter) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
logger.debug("[{}] Failed to execute query phase", e, querySearchRequest.id());
}
this.addShardFailure(shardIndex, dfsResult.shardTarget(), t);
this.addShardFailure(shardIndex, dfsResult.shardTarget(), e);
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
@ -130,12 +130,12 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<DfsSea
}
@Override
public void onFailure(Throwable t) {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("query_fetch", "", t, buildShardFailures());
public void onFailure(Exception e) {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("query_fetch", "", e, buildShardFailures());
if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure);
}
super.onFailure(t);
super.onFailure(e);
}
});

View File

@ -97,7 +97,7 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
try {
onQueryFailure(t, querySearchRequest, shardIndex, dfsResult, counter);
} finally {
@ -110,12 +110,12 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
});
}
void onQueryFailure(Throwable t, QuerySearchRequest querySearchRequest, int shardIndex, DfsSearchResult dfsResult,
void onQueryFailure(Exception e, QuerySearchRequest querySearchRequest, int shardIndex, DfsSearchResult dfsResult,
AtomicInteger counter) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
logger.debug("[{}] Failed to execute query phase", e, querySearchRequest.id());
}
this.addShardFailure(shardIndex, dfsResult.shardTarget(), t);
this.addShardFailure(shardIndex, dfsResult.shardTarget(), e);
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
if (successfulOps.get() == 0) {
@ -129,7 +129,7 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
void executeFetchPhase() {
try {
innerExecuteFetchPhase();
} catch (Throwable e) {
} catch (Exception e) {
listener.onFailure(new ReduceSearchPhaseException("query", "", e, buildShardFailures()));
}
}
@ -169,7 +169,7 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
// the search context might not be cleared on the node where the fetch was executed for example
// because the action was rejected by the thread pool. in this case we need to send a dedicated
// request to clear the search context. by setting docIdsToLoad to null, the context will be cleared
@ -180,12 +180,12 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
});
}
void onFetchFailure(Throwable t, ShardFetchSearchRequest fetchSearchRequest, int shardIndex,
void onFetchFailure(Exception e, ShardFetchSearchRequest fetchSearchRequest, int shardIndex,
SearchShardTarget shardTarget, AtomicInteger counter) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
logger.debug("[{}] Failed to execute fetch phase", e, fetchSearchRequest.id());
}
this.addShardFailure(shardIndex, shardTarget, t);
this.addShardFailure(shardIndex, shardTarget, e);
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
@ -208,9 +208,9 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception e) {
try {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", t, buildShardFailures());
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", e, buildShardFailures());
if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure);
}

View File

@ -32,9 +32,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
*
*/
public class SearchPhaseExecutionException extends ElasticsearchException {
private final String phaseName;
private final ShardSearchFailure[] shardFailures;

View File

@ -73,8 +73,8 @@ class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<QueryFetc
}
@Override
public void onFailure(Throwable t) {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", t, buildShardFailures());
public void onFailure(Exception e) {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", e, buildShardFailures());
if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure);
}

View File

@ -102,7 +102,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<QuerySea
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
// the search context might not be cleared on the node where the fetch was executed for example
// because the action was rejected by the thread pool. in this case we need to send a dedicated
// request to clear the search context. by setting docIdsToLoad to null, the context will be cleared
@ -113,12 +113,12 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<QuerySea
});
}
void onFetchFailure(Throwable t, ShardFetchSearchRequest fetchSearchRequest, int shardIndex, SearchShardTarget shardTarget,
void onFetchFailure(Exception e, ShardFetchSearchRequest fetchSearchRequest, int shardIndex, SearchShardTarget shardTarget,
AtomicInteger counter) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
logger.debug("[{}] Failed to execute fetch phase", e, fetchSearchRequest.id());
}
this.addShardFailure(shardIndex, shardTarget, t);
this.addShardFailure(shardIndex, shardTarget, e);
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
@ -141,9 +141,9 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<QuerySea
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception e) {
try {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", t, buildShardFailures());
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", e, buildShardFailures());
if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure);
}

View File

@ -138,21 +138,21 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction {
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
onPhaseFailure(t, searchId, shardIndex);
}
});
}
private void onPhaseFailure(Throwable t, long searchId, int shardIndex) {
private void onPhaseFailure(Exception e, long searchId, int shardIndex) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, searchId);
logger.debug("[{}] Failed to execute query phase", e, searchId);
}
addShardFailure(shardIndex, new ShardSearchFailure(t));
addShardFailure(shardIndex, new ShardSearchFailure(e));
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
if (successfulOps.get() == 0) {
listener.onFailure(new SearchPhaseExecutionException("query_fetch", "all shards failed", t, buildShardFailures()));
listener.onFailure(new SearchPhaseExecutionException("query_fetch", "all shards failed", e, buildShardFailures()));
} else {
finishHim();
}
@ -162,7 +162,7 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction {
private void finishHim() {
try {
innerFinishHim();
} catch (Throwable e) {
} catch (Exception e) {
listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()));
}
}

View File

@ -113,7 +113,7 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
if (counter.decrementAndGet() == 0) {
try {
executeFetchPhase();
} catch (Throwable e) {
} catch (Exception e) {
listener.onFailure(new SearchPhaseExecutionException("query", "Fetch failed", e, ShardSearchFailure.EMPTY_ARRAY));
return;
}
@ -131,32 +131,33 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
if (counter.decrementAndGet() == 0) {
try {
executeFetchPhase();
} catch (Throwable e) {
} catch (Exception e) {
onFailure(e);
}
}
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
onQueryPhaseFailure(shardIndex, counter, searchId, t);
}
});
}
void onQueryPhaseFailure(final int shardIndex, final AtomicInteger counter, final long searchId, Throwable t) {
void onQueryPhaseFailure(final int shardIndex, final AtomicInteger counter, final long searchId, Exception failure) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, searchId);
logger.debug("[{}] Failed to execute query phase", failure, searchId);
}
addShardFailure(shardIndex, new ShardSearchFailure(t));
addShardFailure(shardIndex, new ShardSearchFailure(failure));
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
if (successfulOps.get() == 0) {
listener.onFailure(new SearchPhaseExecutionException("query", "all shards failed", t, buildShardFailures()));
listener.onFailure(new SearchPhaseExecutionException("query", "all shards failed", failure, buildShardFailures()));
} else {
try {
executeFetchPhase();
} catch (Throwable e) {
} catch (Exception e) {
e.addSuppressed(failure);
listener.onFailure(new SearchPhaseExecutionException("query", "Fetch failed", e, ShardSearchFailure.EMPTY_ARRAY));
}
}
@ -193,7 +194,7 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to execute fetch phase", t);
}
@ -209,8 +210,8 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
private void finishHim() {
try {
innerFinishHim();
} catch (Throwable e) {
listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()));
} catch (Exception e) {
listener.onFailure(new ReduceSearchPhaseException("fetch", "inner finish failed", e, buildShardFailures()));
}
}

View File

@ -48,19 +48,19 @@ public class ShardSearchFailure implements ShardOperationFailedException {
}
public ShardSearchFailure(Throwable t) {
this(t, null);
public ShardSearchFailure(Exception e) {
this(e, null);
}
public ShardSearchFailure(Throwable t, @Nullable SearchShardTarget shardTarget) {
Throwable actual = ExceptionsHelper.unwrapCause(t);
public ShardSearchFailure(Exception e, @Nullable SearchShardTarget shardTarget) {
final Throwable actual = ExceptionsHelper.unwrapCause(e);
if (actual != null && actual instanceof SearchException) {
this.shardTarget = ((SearchException) actual).shard();
} else if (shardTarget != null) {
this.shardTarget = shardTarget;
}
status = ExceptionsHelper.status(actual);
this.reason = ExceptionsHelper.detailedMessage(t);
this.reason = ExceptionsHelper.detailedMessage(e);
this.cause = actual;
}
@ -135,7 +135,7 @@ public class ShardSearchFailure implements ShardOperationFailedException {
}
reason = in.readString();
status = RestStatus.readFrom(in);
cause = in.readThrowable();
cause = in.readException();
}
@Override

View File

@ -103,7 +103,7 @@ public class TransportClearScrollAction extends HandledTransportAction<ClearScro
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
onFailedFreedContext(e, node);
}
});
@ -124,7 +124,7 @@ public class TransportClearScrollAction extends HandledTransportAction<ClearScro
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
onFailedFreedContext(e, node);
}
});

View File

@ -30,7 +30,6 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -119,7 +118,7 @@ public class TransportMultiSearchAction extends HandledTransportAction<MultiSear
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
responses.set(request.responseSlot, new MultiSearchResponse.Item(null, e));
handleResponse();
}

View File

@ -74,7 +74,7 @@ public class TransportSearchScrollAction extends HandledTransportAction<SearchSc
throw new IllegalArgumentException("Scroll id type [" + scrollId.getType() + "] unrecognized");
}
action.start();
} catch (Throwable e) {
} catch (Exception e) {
listener.onFailure(e);
}
}

View File

@ -102,7 +102,7 @@ public abstract class AbstractListenableActionFuture<T, L> extends AdapterAction
// we use a timeout of 0 to by pass assertion forbidding to call actionGet() (blocking) on a network thread.
// here we know we will never block
listener.onResponse(actionGet(0));
} catch (Throwable e) {
} catch (Exception e) {
listener.onFailure(e);
}
}

View File

@ -98,7 +98,7 @@ public abstract class AdapterActionFuture<T, L> extends BaseFuture<T> implements
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
setException(e);
}

View File

@ -54,11 +54,11 @@ public class DefaultShardOperationFailedException implements ShardOperationFaile
this.status = e.status();
}
public DefaultShardOperationFailedException(String index, int shardId, Throwable t) {
public DefaultShardOperationFailedException(String index, int shardId, Throwable reason) {
this.index = index;
this.shardId = shardId;
this.reason = t;
status = ExceptionsHelper.status(t);
this.reason = reason;
this.status = ExceptionsHelper.status(reason);
}
@Override
@ -98,7 +98,7 @@ public class DefaultShardOperationFailedException implements ShardOperationFaile
index = in.readString();
}
shardId = in.readVInt();
reason = in.readThrowable();
reason = in.readException();
status = RestStatus.readFrom(in);
}

View File

@ -41,7 +41,7 @@ public abstract class DelegatingActionListener<Instigator extends ActionResponse
}
@Override
public final void onFailure(Throwable e) {
public final void onFailure(Exception e) {
delegatedActionListener.onFailure(e);
}
}

View File

@ -65,13 +65,13 @@ public abstract class HandledTransportAction<Request extends ActionRequest<Reque
public void onResponse(Response response) {
try {
channel.sendResponse(response);
} catch (Throwable e) {
} catch (Exception e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {

View File

@ -21,9 +21,6 @@ package org.elasticsearch.action.support;
import org.elasticsearch.threadpool.ThreadPool;
/**
*
*/
public class PlainListenableActionFuture<T> extends AbstractListenableActionFuture<T, T> {
public PlainListenableActionFuture(ThreadPool threadPool) {
@ -34,4 +31,5 @@ public class PlainListenableActionFuture<T> extends AbstractListenableActionFutu
protected T convert(T response) {
return response;
}
}

View File

@ -97,14 +97,14 @@ public final class ThreadedActionListener<Response> implements ActionListener<Re
}
@Override
public void onFailure(Throwable t) {
listener.onFailure(t);
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
@Override
public void onFailure(final Throwable e) {
public void onFailure(final Exception e) {
threadPool.executor(executor).execute(new AbstractRunnable() {
@Override
public boolean isForceExecution() {
@ -117,8 +117,8 @@ public final class ThreadedActionListener<Response> implements ActionListener<Re
}
@Override
public void onFailure(Throwable t) {
logger.warn("failed to execute failure callback on [{}], failure [{}]", t, listener, e);
public void onFailure(Exception e) {
logger.warn("failed to execute failure callback on [{}], failure [{}]", e, listener, e);
}
});
}

View File

@ -92,7 +92,7 @@ public abstract class TransportAction<Request extends ActionRequest<Request>, Re
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
taskManager.unregister(task);
listener.onFailure(e);
}
@ -113,7 +113,7 @@ public abstract class TransportAction<Request extends ActionRequest<Request>, Re
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
if (task != null) {
taskManager.unregister(task);
}
@ -140,9 +140,9 @@ public abstract class TransportAction<Request extends ActionRequest<Request>, Re
if (filters.length == 0) {
try {
doExecute(task, request, listener);
} catch(Throwable t) {
logger.trace("Error during transport action execution.", t);
listener.onFailure(t);
} catch(Exception e) {
logger.trace("Error during transport action execution.", e);
listener.onFailure(e);
}
} else {
RequestFilterChain<Request, Response> requestFilterChain = new RequestFilterChain<>(this, logger);
@ -180,9 +180,9 @@ public abstract class TransportAction<Request extends ActionRequest<Request>, Re
} else {
listener.onFailure(new IllegalStateException("proceed was called too many times"));
}
} catch(Throwable t) {
logger.trace("Error during transport action execution.", t);
listener.onFailure(t);
} catch(Exception e) {
logger.trace("Error during transport action execution.", e);
listener.onFailure(e);
}
}
@ -221,9 +221,9 @@ public abstract class TransportAction<Request extends ActionRequest<Request>, Re
} else {
listener.onFailure(new IllegalStateException("proceed was called too many times"));
}
} catch (Throwable t) {
logger.trace("Error during transport action execution.", t);
listener.onFailure(t);
} catch (Exception e) {
logger.trace("Error during transport action execution.", e);
listener.onFailure(e);
}
}
}
@ -246,7 +246,7 @@ public abstract class TransportAction<Request extends ActionRequest<Request>, Re
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
listener.onFailure(e);
}
}
@ -269,17 +269,18 @@ public abstract class TransportAction<Request extends ActionRequest<Request>, Re
public void onResponse(Response response) {
try {
taskManager.persistResult(task, response, delegate);
} catch (Throwable e) {
} catch (Exception e) {
delegate.onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
try {
taskManager.persistResult(task, e, delegate);
} catch (Throwable e1) {
delegate.onFailure(e1);
} catch (Exception inner) {
inner.addSuppressed(e);
delegate.onFailure(inner);
}
}
}

View File

@ -27,30 +27,23 @@ import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.ShardNotFoundException;
/**
*/
public class TransportActions {
public static boolean isShardNotAvailableException(Throwable t) {
Throwable actual = ExceptionsHelper.unwrapCause(t);
if (actual instanceof ShardNotFoundException ||
public static boolean isShardNotAvailableException(final Throwable e) {
final Throwable actual = ExceptionsHelper.unwrapCause(e);
return (actual instanceof ShardNotFoundException ||
actual instanceof IndexNotFoundException ||
actual instanceof IllegalIndexShardStateException ||
actual instanceof NoShardAvailableActionException ||
actual instanceof UnavailableShardsException ||
actual instanceof AlreadyClosedException) {
return true;
}
return false;
actual instanceof AlreadyClosedException);
}
/**
* If a failure is already present, should this failure override it or not for read operations.
*/
public static boolean isReadOverrideException(Throwable t) {
if (isShardNotAvailableException(t)) {
return false;
}
return true;
public static boolean isReadOverrideException(Exception e) {
return !isShardNotAvailableException(e);
}
}

View File

@ -144,7 +144,7 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
// no shards
try {
listener.onResponse(newResponse(request, new AtomicReferenceArray(0), clusterState));
} catch (Throwable e) {
} catch (Exception e) {
listener.onFailure(e);
}
return;
@ -199,7 +199,7 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
}
});
}
} catch (Throwable e) {
} catch (Exception e) {
onOperation(shard, shardIt, shardIndex, e);
}
}
@ -215,25 +215,25 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
}
@SuppressWarnings({"unchecked"})
void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, int shardIndex, Throwable t) {
void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, int shardIndex, Exception e) {
// we set the shard failure always, even if its the first in the replication group, and the next one
// will work (it will just override it...)
setFailure(shardIt, shardIndex, t);
setFailure(shardIt, shardIndex, e);
ShardRouting nextShard = shardIt.nextOrNull();
if (nextShard != null) {
if (t != null) {
if (e != null) {
if (logger.isTraceEnabled()) {
if (!TransportActions.isShardNotAvailableException(t)) {
logger.trace("{}: failed to execute [{}]", t, shard != null ? shard.shortSummary() : shardIt.shardId(), request);
if (!TransportActions.isShardNotAvailableException(e)) {
logger.trace("{}: failed to execute [{}]", e, shard != null ? shard.shortSummary() : shardIt.shardId(), request);
}
}
}
performOperation(shardIt, nextShard, shardIndex);
} else {
if (logger.isDebugEnabled()) {
if (t != null) {
if (!TransportActions.isShardNotAvailableException(t)) {
logger.debug("{}: failed to execute [{}]", t, shard != null ? shard.shortSummary() : shardIt.shardId(), request);
if (e != null) {
if (!TransportActions.isShardNotAvailableException(e)) {
logger.debug("{}: failed to execute [{}]", e, shard != null ? shard.shortSummary() : shardIt.shardId(), request);
}
}
}
@ -246,25 +246,25 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
protected void finishHim() {
try {
listener.onResponse(newResponse(request, shardsResponses, clusterState));
} catch (Throwable e) {
} catch (Exception e) {
listener.onFailure(e);
}
}
void setFailure(ShardIterator shardIt, int shardIndex, Throwable t) {
void setFailure(ShardIterator shardIt, int shardIndex, Exception e) {
// we don't aggregate shard failures on non active shards (but do keep the header counts right)
if (TransportActions.isShardNotAvailableException(t)) {
if (TransportActions.isShardNotAvailableException(e)) {
return;
}
if (!(t instanceof BroadcastShardOperationFailedException)) {
t = new BroadcastShardOperationFailedException(shardIt.shardId(), t);
if (!(e instanceof BroadcastShardOperationFailedException)) {
e = new BroadcastShardOperationFailedException(shardIt.shardId(), e);
}
Object response = shardsResponses.get(shardIndex);
if (response == null) {
// just override it and return
shardsResponses.set(shardIndex, t);
shardsResponses.set(shardIndex, e);
}
if (!(response instanceof Throwable)) {
@ -274,8 +274,8 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
// the failure is already present, try and not override it with an exception that is less meaningless
// for example, getting illegal shard state
if (TransportActions.isReadOverrideException(t)) {
shardsResponses.set(shardIndex, t);
if (TransportActions.isReadOverrideException(e)) {
shardsResponses.set(shardIndex, e);
}
}
}

View File

@ -299,7 +299,7 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
if (nodeIds.size() == 0) {
try {
onCompletion();
} catch (Throwable e) {
} catch (Exception e) {
listener.onFailure(e);
}
} else {
@ -340,7 +340,7 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
return ThreadPool.Names.SAME;
}
});
} catch (Throwable e) {
} catch (Exception e) {
onNodeFailure(node, nodeIndex, e);
}
}
@ -380,15 +380,15 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
Response response = null;
try {
response = newResponse(request, responses, unavailableShardExceptions, nodeIds, clusterState);
} catch (Throwable t) {
logger.debug("failed to combine responses from nodes", t);
listener.onFailure(t);
} catch (Exception e) {
logger.debug("failed to combine responses from nodes", e);
listener.onFailure(e);
}
if (response != null) {
try {
listener.onResponse(response);
} catch (Throwable t) {
listener.onFailure(t);
} catch (Exception e) {
listener.onFailure(e);
}
}
}
@ -433,18 +433,19 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
if (logger.isTraceEnabled()) {
logger.trace("[{}] completed operation for shard [{}]", actionName, shardRouting.shortSummary());
}
} catch (Throwable t) {
BroadcastShardOperationFailedException e = new BroadcastShardOperationFailedException(shardRouting.shardId(), "operation " + actionName + " failed", t);
e.setIndex(shardRouting.getIndexName());
e.setShard(shardRouting.shardId());
shardResults[shardIndex] = e;
if (TransportActions.isShardNotAvailableException(t)) {
} catch (Exception e) {
BroadcastShardOperationFailedException failure =
new BroadcastShardOperationFailedException(shardRouting.shardId(), "operation " + actionName + " failed", e);
failure.setIndex(shardRouting.getIndexName());
failure.setShard(shardRouting.shardId());
shardResults[shardIndex] = failure;
if (TransportActions.isShardNotAvailableException(e)) {
if (logger.isTraceEnabled()) {
logger.trace("[{}] failed to execute operation for shard [{}]", t, actionName, shardRouting.shortSummary());
logger.trace("[{}] failed to execute operation for shard [{}]", e, actionName, shardRouting.shortSummary());
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("[{}] failed to execute operation for shard [{}]", t, actionName, shardRouting.shortSummary());
logger.debug("[{}] failed to execute operation for shard [{}]", e, actionName, shardRouting.shortSummary());
}
}
}

View File

@ -152,7 +152,7 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
if (t instanceof Discovery.FailedToCommitClusterStateException
|| (t instanceof NotMasterException)) {
logger.debug("master could not publish cluster state or stepped down before publishing action [{}], scheduling a retry", t, actionName);

View File

@ -48,9 +48,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Supplier;
/**
*
*/
public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest<NodesRequest>,
NodesResponse extends BaseNodesResponse,
NodeRequest extends BaseNodeRequest,
@ -226,8 +223,8 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
}
});
}
} catch (Throwable t) {
onFailure(idx, nodeId, t);
} catch (Exception e) {
onFailure(idx, nodeId, e);
}
}
}
@ -255,9 +252,9 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
NodesResponse finalResponse;
try {
finalResponse = newResponse(request, responses);
} catch (Throwable t) {
logger.debug("failed to combine responses from nodes", t);
listener.onFailure(t);
} catch (Exception e) {
logger.debug("failed to combine responses from nodes", e);
listener.onFailure(e);
return;
}
listener.onResponse(finalResponse);

View File

@ -159,7 +159,7 @@ public class ReplicationOperation<
}
@Override
public void onFailure(Throwable replicaException) {
public void onFailure(Exception replicaException) {
logger.trace("[{}] failure while performing [{}] on replica {}, request [{}]", replicaException, shard.shardId(), opType,
shard, replicaRequest);
if (ignoreReplicaException(replicaException)) {
@ -180,7 +180,7 @@ public class ReplicationOperation<
});
}
private void onPrimaryDemoted(Throwable demotionFailure) {
private void onPrimaryDemoted(Exception demotionFailure) {
String primaryFail = String.format(Locale.ROOT,
"primary shard [%s] was demoted while failing replica shard",
primary.routingEntry());
@ -266,9 +266,9 @@ public class ReplicationOperation<
}
}
private void finishAsFailed(Throwable throwable) {
private void finishAsFailed(Exception exception) {
if (finished.compareAndSet(false, true)) {
resultListener.onFailure(throwable);
resultListener.onFailure(exception);
}
}
@ -276,7 +276,7 @@ public class ReplicationOperation<
/**
* Should an exception be ignored when the operation is performed on the replica.
*/
public static boolean ignoreReplicaException(Throwable e) {
public static boolean ignoreReplicaException(Exception e) {
if (TransportActions.isShardNotAvailableException(e)) {
return true;
}
@ -288,14 +288,11 @@ public class ReplicationOperation<
return false;
}
public static boolean isConflictException(Throwable e) {
Throwable cause = ExceptionsHelper.unwrapCause(e);
public static boolean isConflictException(Throwable t) {
final Throwable cause = ExceptionsHelper.unwrapCause(t);
// on version conflict or document missing, it means
// that a new change has crept into the replica, and it's fine
if (cause instanceof VersionConflictEngineException) {
return true;
}
return false;
return cause instanceof VersionConflictEngineException;
}
@ -313,7 +310,7 @@ public class ReplicationOperation<
/**
* fail the primary, typically due to the fact that the operation has learned the primary has been demoted by the master
*/
void failShard(String message, Throwable throwable);
void failShard(String message, Exception exception);
/**
* Performs the given request on this primary. Yes, this returns as soon as it can with the request for the replicas and calls a
@ -340,20 +337,17 @@ public class ReplicationOperation<
/**
* Fail the specified shard, removing it from the current set of active shards
*
* @param replica shard to fail
* @param primary the primary shard that requested the failure
* @param message a (short) description of the reason
* @param throwable the original exception which caused the ReplicationOperation to request the shard to be failed
* @param exception the original exception which caused the ReplicationOperation to request the shard to be failed
* @param onSuccess a callback to call when the shard has been successfully removed from the active set.
* @param onPrimaryDemoted a callback to call when the shard can not be failed because the current primary has been demoted
* by the master.
* @param onIgnoredFailure a callback to call when failing a shard has failed, but it that failure can be safely ignored and the
* replication operation can finish processing
* Note: this callback should be used in extreme situations, typically node shutdown.
*/
void failShard(ShardRouting replica, ShardRouting primary, String message, Throwable throwable, Runnable onSuccess,
Consumer<Throwable> onPrimaryDemoted, Consumer<Throwable> onIgnoredFailure);
void failShard(ShardRouting replica, ShardRouting primary, String message, Exception exception, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure);
}
public static class RetryOnPrimaryException extends ElasticsearchException {

View File

@ -175,11 +175,11 @@ public class ReplicationResponse extends ActionResponse {
private ShardId shardId;
private String nodeId;
private Throwable cause;
private Exception cause;
private RestStatus status;
private boolean primary;
public Failure(ShardId shardId, @Nullable String nodeId, Throwable cause, RestStatus status, boolean primary) {
public Failure(ShardId shardId, @Nullable String nodeId, Exception cause, RestStatus status, boolean primary) {
this.shardId = shardId;
this.nodeId = nodeId;
this.cause = cause;
@ -251,7 +251,7 @@ public class ReplicationResponse extends ActionResponse {
public void readFrom(StreamInput in) throws IOException {
shardId = ShardId.readShardId(in);
nodeId = in.readOptionalString();
cause = in.readThrowable();
cause = in.readException();
status = RestStatus.readFrom(in);
primary = in.readBoolean();
}

View File

@ -93,7 +93,7 @@ public abstract class TransportBroadcastReplicationAction<Request extends Broadc
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
logger.trace("{}: got failure from {}", actionName, shardId);
int totalNumCopies = clusterState.getMetaData().getIndexSafe(shardId.getIndex()).getNumberOfReplicas() + 1;
ShardResponse shardResponse = newShardResponse();

View File

@ -196,7 +196,7 @@ public abstract class TransportReplicationAction<
return TransportRequestOptions.EMPTY;
}
protected boolean retryPrimaryException(Throwable e) {
protected boolean retryPrimaryException(final Throwable e) {
return e.getClass() == ReplicationOperation.RetryOnPrimaryException.class
|| TransportActions.isShardNotAvailableException(e);
}
@ -209,17 +209,18 @@ public abstract class TransportReplicationAction<
public void onResponse(Response result) {
try {
channel.sendResponse(result);
} catch (Throwable e) {
} catch (Exception e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (Throwable e1) {
logger.warn("Failed to send response for {}", e1, actionName);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.warn("Failed to send response for {}", inner, actionName);
}
}
});
@ -300,25 +301,25 @@ public abstract class TransportReplicationAction<
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
listener.onFailure(e);
}
}, primaryShardReference, executeOnReplicas).execute();
}
} catch (Throwable t) {
} catch (Exception e) {
Releasables.closeWhileHandlingException(primaryShardReference); // release shard operation lock before responding to caller
onFailure(t);
onFailure(e);
}
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception e) {
setPhase(replicationTask, "finished");
try {
channel.sendResponse(t);
} catch (IOException e) {
e.addSuppressed(t);
logger.warn("failed to send response", e);
channel.sendResponse(e);
} catch (IOException inner) {
inner.addSuppressed(e);
logger.warn("failed to send response", inner);
}
}
@ -336,7 +337,7 @@ public abstract class TransportReplicationAction<
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
primaryShardReference.close(); // release shard operation lock before responding to caller
setPhase(replicationTask, "finished");
try {
@ -439,16 +440,16 @@ public abstract class TransportReplicationAction<
ReplicaResult replicaResult = shardOperationOnReplica(request);
releasable.close(); // release shard operation lock before responding to caller
replicaResult.respond(new ResponseListener());
} catch (Throwable t) {
} catch (Exception e) {
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
AsyncReplicaAction.this.onFailure(t);
AsyncReplicaAction.this.onFailure(e);
}
}
@Override
public void onFailure(Throwable t) {
if (t instanceof RetryOnReplicaException) {
logger.trace("Retrying operation on replica, action [{}], request [{}]", t, transportReplicaAction, request);
public void onFailure(Exception e) {
if (e instanceof RetryOnReplicaException) {
logger.trace("Retrying operation on replica, action [{}], request [{}]", e, transportReplicaAction, request);
final ThreadContext.StoredContext context = threadPool.getThreadContext().newStoredContext();
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
@ -473,17 +474,17 @@ public abstract class TransportReplicationAction<
}
});
} else {
responseWithFailure(t);
responseWithFailure(e);
}
}
protected void responseWithFailure(Throwable t) {
protected void responseWithFailure(Exception e) {
try {
setPhase(task, "finished");
channel.sendResponse(t);
channel.sendResponse(e);
} catch (IOException responseException) {
responseException.addSuppressed(e);
logger.warn("failed to send error message back to client for action [{}]", responseException, transportReplicaAction);
logger.warn("actual Exception", t);
}
}
@ -513,7 +514,7 @@ public abstract class TransportReplicationAction<
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
responseWithFailure(e);
}
}
@ -544,7 +545,7 @@ public abstract class TransportReplicationAction<
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
finishWithUnexpectedFailure(e);
}
@ -694,14 +695,15 @@ public abstract class TransportReplicationAction<
} else {
finishAsFailed(exp);
}
} catch (Throwable t) {
finishWithUnexpectedFailure(t);
} catch (Exception e) {
e.addSuppressed(exp);
finishWithUnexpectedFailure(e);
}
}
});
}
void retry(Throwable failure) {
void retry(Exception failure) {
assert failure != null;
if (observer.isTimedOut()) {
// we running as a last attempt after a timeout has happened. don't retry
@ -731,7 +733,7 @@ public abstract class TransportReplicationAction<
});
}
void finishAsFailed(Throwable failure) {
void finishAsFailed(Exception failure) {
if (finished.compareAndSet(false, true)) {
setPhase(task, "failed");
logger.trace("operation failed. action [{}], request [{}]", failure, actionName, request);
@ -741,7 +743,7 @@ public abstract class TransportReplicationAction<
}
}
void finishWithUnexpectedFailure(Throwable failure) {
void finishWithUnexpectedFailure(Exception failure) {
logger.warn("unexpected error during the primary phase for action [{}], request [{}]", failure, actionName, request);
if (finished.compareAndSet(false, true)) {
setPhase(task, "failed");
@ -790,7 +792,7 @@ public abstract class TransportReplicationAction<
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
onReferenceAcquired.onFailure(e);
}
};
@ -835,11 +837,11 @@ public abstract class TransportReplicationAction<
}
@Override
public void failShard(String reason, Throwable e) {
public void failShard(String reason, Exception e) {
try {
indexShard.failShard(reason, e);
} catch (Throwable suppressed) {
e.addSuppressed(suppressed);
} catch (Exception inner) {
e.addSuppressed(inner);
}
}
@ -871,10 +873,10 @@ public abstract class TransportReplicationAction<
}
@Override
public void failShard(ShardRouting replica, ShardRouting primary, String message, Throwable throwable,
Runnable onSuccess, Consumer<Throwable> onFailure, Consumer<Throwable> onIgnoredFailure) {
public void failShard(ShardRouting replica, ShardRouting primary, String message, Exception exception,
Runnable onSuccess, Consumer<Exception> onFailure, Consumer<Exception> onIgnoredFailure) {
shardStateAction.shardFailed(
replica, primary, message, throwable,
replica, primary, message, exception,
new ShardStateAction.Listener() {
@Override
public void onSuccess() {
@ -882,7 +884,7 @@ public abstract class TransportReplicationAction<
}
@Override
public void onFailure(Throwable shardFailedError) {
public void onFailure(Exception shardFailedError) {
if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) {
onFailure.accept(shardFailedError);
} else {

View File

@ -95,7 +95,7 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
*/
protected abstract void resolveRequest(ClusterState state, Request request);
protected boolean retryOnFailure(Throwable e) {
protected boolean retryOnFailure(Exception e) {
return false;
}
@ -150,7 +150,7 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
}
}
shardIt = shards(observer.observedState(), request);
} catch (Throwable e) {
} catch (Exception e) {
listener.onFailure(e);
return;
}
@ -193,11 +193,11 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
@Override
public void handleException(TransportException exp) {
Throwable cause = exp.unwrapCause();
final Throwable cause = exp.unwrapCause();
// if we got disconnected from the node, or the node / shard is not in the right state (being closed)
if (cause instanceof ConnectTransportException || cause instanceof NodeClosedException ||
retryOnFailure(exp)) {
retry(cause);
retry((Exception) cause);
} else {
listener.onFailure(exp);
}
@ -205,10 +205,10 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
});
}
void retry(@Nullable final Throwable failure) {
void retry(@Nullable final Exception failure) {
if (observer.isTimedOut()) {
// we running as a last attempt after a timeout has happened. don't retry
Throwable listenFailure = failure;
Exception listenFailure = failure;
if (listenFailure == null) {
if (shardIt == null) {
listenFailure = new UnavailableShardsException(request.concreteIndex(), -1, "Timeout waiting for [{}], request: {}", request.timeout(), actionName);
@ -249,17 +249,18 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
public void onResponse(Response response) {
try {
channel.sendResponse(response);
} catch (Throwable e) {
} catch (Exception e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
logger.warn("failed to send response for get", e1);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.warn("failed to send response for get", inner);
}
}
});

View File

@ -124,7 +124,7 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
private final ShardsIterator shardIt;
private final InternalRequest internalRequest;
private final DiscoveryNodes nodes;
private volatile Throwable lastFailure;
private volatile Exception lastFailure;
private AsyncSingleAction(Request request, ActionListener<Response> listener) {
this.listener = listener;
@ -185,22 +185,22 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
}
}
private void onFailure(ShardRouting shardRouting, Throwable e) {
private void onFailure(ShardRouting shardRouting, Exception e) {
if (logger.isTraceEnabled() && e != null) {
logger.trace("{}: failed to execute [{}]", e, shardRouting, internalRequest.request());
}
perform(e);
}
private void perform(@Nullable final Throwable currentFailure) {
Throwable lastFailure = this.lastFailure;
private void perform(@Nullable final Exception currentFailure) {
Exception lastFailure = this.lastFailure;
if (lastFailure == null || TransportActions.isReadOverrideException(currentFailure)) {
lastFailure = currentFailure;
this.lastFailure = currentFailure;
}
final ShardRouting shardRouting = shardIt.nextOrNull();
if (shardRouting == null) {
Throwable failure = lastFailure;
Exception failure = lastFailure;
if (failure == null || isShardNotAvailableException(failure)) {
failure = new NoShardAvailableActionException(null, LoggerMessageFormat.format("No shard available for [{}]", internalRequest.request()), failure);
} else {
@ -261,13 +261,13 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
public void onResponse(Response result) {
try {
channel.sendResponse(result);
} catch (Throwable e) {
} catch (Exception e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {

View File

@ -215,9 +215,9 @@ public abstract class TransportTasksAction<
// nothing to do
try {
listener.onResponse(newResponse(request, responses));
} catch (Throwable t) {
logger.debug("failed to generate empty response", t);
listener.onFailure(t);
} catch (Exception e) {
logger.debug("failed to generate empty response", e);
listener.onFailure(e);
}
} else {
TransportRequestOptions.Builder builder = TransportRequestOptions.builder();
@ -259,8 +259,8 @@ public abstract class TransportTasksAction<
}
});
}
} catch (Throwable t) {
onFailure(idx, nodeId, t);
} catch (Exception e) {
onFailure(idx, nodeId, e);
}
}
}
@ -289,9 +289,9 @@ public abstract class TransportTasksAction<
TasksResponse finalResponse;
try {
finalResponse = newResponse(request, responses);
} catch (Throwable t) {
logger.debug("failed to combine responses from nodes", t);
listener.onFailure(t);
} catch (Exception e) {
logger.debug("failed to combine responses from nodes", e);
listener.onFailure(e);
return;
}
listener.onResponse(finalResponse);

View File

@ -40,13 +40,13 @@ public class MultiTermVectorsResponse extends ActionResponse implements Iterable
private String index;
private String type;
private String id;
private Throwable cause;
private Exception cause;
Failure() {
}
public Failure(String index, String type, String id, Throwable cause) {
public Failure(String index, String type, String id, Exception cause) {
this.index = index;
this.type = type;
this.id = id;
@ -77,7 +77,7 @@ public class MultiTermVectorsResponse extends ActionResponse implements Iterable
/**
* The failure cause.
*/
public Throwable getCause() {
public Exception getCause() {
return this.cause;
}
@ -92,7 +92,7 @@ public class MultiTermVectorsResponse extends ActionResponse implements Iterable
index = in.readString();
type = in.readOptionalString();
id = in.readString();
cause = in.readThrowable();
cause = in.readException();
}
@Override
@ -132,7 +132,7 @@ public class MultiTermVectorsResponse extends ActionResponse implements Iterable
builder.field(Fields._INDEX, failure.getIndex());
builder.field(Fields._TYPE, failure.getType());
builder.field(Fields._ID, failure.getId());
ElasticsearchException.renderThrowable(builder, params, failure.getCause());
ElasticsearchException.renderException(builder, params, failure.getCause());
builder.endObject();
} else {
TermVectorsResponse getResponse = response.getResponse();

View File

@ -107,7 +107,7 @@ public class TransportMultiTermVectorsAction extends HandledTransportAction<Mult
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
// create failures for all relevant requests
for (int i = 0; i < shardRequest.locations.size(); i++) {
TermVectorsRequest termVectorsRequest = shardRequest.requests.get(i);

View File

@ -83,7 +83,7 @@ public class TransportShardMultiTermsVectorAction extends TransportSingleShardAc
try {
TermVectorsResponse termVectorsResponse = TermVectorsService.getTermVectors(indexShard, termVectorsRequest);
response.add(request.locations.get(i), termVectorsResponse);
} catch (Throwable t) {
} catch (Exception t) {
if (TransportActions.isShardNotAvailableException(t)) {
throw (ElasticsearchException) t;
} else {

View File

@ -46,6 +46,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
@ -61,6 +62,8 @@ import org.elasticsearch.transport.TransportService;
import java.util.Collections;
import java.util.Map;
import static org.elasticsearch.ExceptionsHelper.unwrapCause;
/**
*/
public class TransportUpdateAction extends TransportInstanceSingleOperationAction<UpdateRequest, UpdateResponse> {
@ -97,7 +100,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
}
@Override
protected boolean retryOnFailure(Throwable e) {
protected boolean retryOnFailure(Exception e) {
return TransportActions.isShardNotAvailableException(e);
}
@ -125,13 +128,14 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
}
@Override
public void onFailure(Throwable e) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
public void onFailure(Exception e) {
if (unwrapCause(e) instanceof IndexAlreadyExistsException) {
// we have the index, do it
try {
innerExecute(request, listener);
} catch (Throwable e1) {
listener.onFailure(e1);
} catch (Exception inner) {
inner.addSuppressed(e);
listener.onFailure(inner);
}
} else {
listener.onFailure(e);
@ -193,9 +197,9 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
}
@Override
public void onFailure(Throwable e) {
e = ExceptionsHelper.unwrapCause(e);
if (e instanceof VersionConflictEngineException) {
public void onFailure(Exception e) {
final Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof VersionConflictEngineException) {
if (retryCount < request.retryOnConflict()) {
logger.trace("Retry attempt [{}] of [{}] on version conflict on [{}][{}][{}]",
retryCount + 1, request.retryOnConflict(), request.index(), request.getShardId(), request.id());
@ -208,7 +212,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
return;
}
}
listener.onFailure(e);
listener.onFailure(cause instanceof Exception ? (Exception) cause : new NotSerializableExceptionWrapper(cause));
}
});
break;
@ -226,9 +230,9 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
}
@Override
public void onFailure(Throwable e) {
e = ExceptionsHelper.unwrapCause(e);
if (e instanceof VersionConflictEngineException) {
public void onFailure(Exception e) {
final Throwable cause = unwrapCause(e);
if (cause instanceof VersionConflictEngineException) {
if (retryCount < request.retryOnConflict()) {
threadPool.executor(executor()).execute(new ActionRunnable<UpdateResponse>(listener) {
@Override
@ -239,7 +243,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
return;
}
}
listener.onFailure(e);
listener.onFailure(cause instanceof Exception ? (Exception) cause : new NotSerializableExceptionWrapper(cause));
}
});
break;
@ -255,9 +259,9 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
}
@Override
public void onFailure(Throwable e) {
e = ExceptionsHelper.unwrapCause(e);
if (e instanceof VersionConflictEngineException) {
public void onFailure(Exception e) {
final Throwable cause = unwrapCause(e);
if (cause instanceof VersionConflictEngineException) {
if (retryCount < request.retryOnConflict()) {
threadPool.executor(executor()).execute(new ActionRunnable<UpdateResponse>(listener) {
@Override
@ -268,7 +272,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
return;
}
}
listener.onFailure(e);
listener.onFailure(cause instanceof Exception ? (Exception) cause : new NotSerializableExceptionWrapper(cause));
}
});
break;

View File

@ -125,7 +125,7 @@ final class Bootstrap {
// force remainder of JNA to be loaded (if available).
try {
JNAKernel32Library.getInstance();
} catch (Throwable ignored) {
} catch (Exception ignored) {
// we've already logged this.
}
@ -216,7 +216,7 @@ final class Bootstrap {
static void init(
final boolean foreground,
final String pidFile,
final Map<String, String> esSettings) throws Throwable {
final Map<String, String> esSettings) throws Exception {
// Set the system property before anything has a chance to trigger its use
initLoggerPrefix();
@ -254,7 +254,7 @@ final class Bootstrap {
if (!foreground) {
closeSysError();
}
} catch (Throwable e) {
} catch (Exception e) {
// disable console logging, so user does not see the exception twice (jvm will show it already)
if (foreground) {
Loggers.disableConsoleLogging();

View File

@ -217,13 +217,13 @@ class JNANatives {
if (ret == 1) {
LOCAL_SECCOMP_ALL = true;
}
} catch (Throwable t) {
} catch (Exception e) {
// this is likely to happen unless the kernel is newish, its a best effort at the moment
// so we log stacktrace at debug for now...
if (logger.isDebugEnabled()) {
logger.debug("unable to install syscall filter", t);
logger.debug("unable to install syscall filter", e);
}
logger.warn("unable to install syscall filter: ", t);
logger.warn("unable to install syscall filter: ", e);
}
}
}

View File

@ -611,7 +611,7 @@ final class Seccomp {
* This is best effort and OS and architecture dependent. It may throw any Throwable.
* @return 0 if we can do this for application threads, 1 for the entire process
*/
static int init(Path tmpFile) throws Throwable {
static int init(Path tmpFile) throws Exception {
if (Constants.LINUX) {
return linuxImpl();
} else if (Constants.MAC_OS_X) {

View File

@ -230,9 +230,9 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
DiscoveryNode node = nodes.get((index) % nodes.size());
try {
callback.doWithNode(node, retryListener);
} catch (Throwable t) {
} catch (Exception e) {
//this exception can't come from the TransportService as it doesn't throw exception at all
listener.onFailure(t);
listener.onFailure(e);
}
}
@ -258,7 +258,7 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
if (ExceptionsHelper.unwrapCause(e) instanceof ConnectTransportException) {
int i = ++this.i;
if (i >= nodes.size()) {
@ -266,9 +266,10 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
} else {
try {
callback.doWithNode(nodes.get((index + i) % nodes.size()), this);
} catch(final Throwable t) {
} catch(final Exception inner) {
inner.addSuppressed(e);
// this exception can't come from the TransportService as it doesn't throw exceptions at all
listener.onFailure(t);
listener.onFailure(inner);
}
}
} else {
@ -335,7 +336,7 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
try {
logger.trace("connecting to node [{}]", node);
transportService.connectToNode(node);
} catch (Throwable e) {
} catch (Exception e) {
it.remove();
logger.debug("failed to connect to discovered node [{}]", e, node);
}
@ -373,7 +374,7 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
// its a listed node, light connect to it...
logger.trace("connecting to listed node (light) [{}]", listedNode);
transportService.connectToNodeLight(listedNode);
} catch (Throwable e) {
} catch (Exception e) {
logger.debug("failed to connect to node [{}], removed from nodes list", e, listedNode);
newFilteredNodes.add(listedNode);
continue;
@ -405,7 +406,7 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
logger.debug("node {} didn't return any discovery info, temporarily using transport discovery node", listedNode);
newNodes.add(listedNode);
}
} catch (Throwable e) {
} catch (Exception e) {
logger.info("failed to get node info for {}, disconnecting...", e, listedNode);
transportService.disconnectFromNode(listedNode);
}
@ -484,7 +485,7 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
latch.countDown();
}
});
} catch (Throwable e) {
} catch (Exception e) {
logger.info("failed to get local cluster state info for {}, disconnecting...", e, listedNode);
transportService.disconnectFromNode(listedNode);
latch.countDown();

View File

@ -36,9 +36,9 @@ public interface AckedClusterStateTaskListener extends ClusterStateTaskListener
* Called once all the nodes have acknowledged the cluster state update request. Must be
* very lightweight execution, since it gets executed on the cluster service thread.
*
* @param t optional error that might have been thrown
* @param e optional error that might have been thrown
*/
void onAllNodesAcked(@Nullable Throwable t);
void onAllNodesAcked(@Nullable Exception e);
/**
* Called once the acknowledgement timeout defined by

View File

@ -58,9 +58,9 @@ public abstract class AckedClusterStateUpdateTask<Response> extends ClusterState
* Called once all the nodes have acknowledged the cluster state update request. Must be
* very lightweight execution, since it gets executed on the cluster service thread.
*
* @param t optional error that might have been thrown
* @param e optional error that might have been thrown
*/
public void onAllNodesAcked(@Nullable Throwable t) {
public void onAllNodesAcked(@Nullable Exception e) {
listener.onResponse(newResponse(true));
}
@ -75,8 +75,8 @@ public abstract class AckedClusterStateUpdateTask<Response> extends ClusterState
}
@Override
public void onFailure(String source, Throwable t) {
listener.onFailure(t);
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}
/**

View File

@ -29,7 +29,7 @@ public interface ClusterStateTaskConfig {
/**
* The timeout for this cluster state update task configuration. If
* the cluster state update task isn't processed within this
* timeout, the associated {@link ClusterStateTaskListener#onFailure(String, Throwable)}
* timeout, the associated {@link ClusterStateTaskListener#onFailure(String, Exception)}
* is invoked.
*
* @return the timeout, or null if one is not set

View File

@ -82,13 +82,13 @@ public interface ClusterStateTaskExecutor<T> {
return this;
}
public Builder<T> failure(T task, Throwable t) {
return result(task, TaskResult.failure(t));
public Builder<T> failure(T task, Exception e) {
return result(task, TaskResult.failure(e));
}
public Builder<T> failures(Iterable<T> tasks, Throwable t) {
public Builder<T> failures(Iterable<T> tasks, Exception e) {
for (T task : tasks) {
failure(task, t);
failure(task, e);
}
return this;
}
@ -106,7 +106,7 @@ public interface ClusterStateTaskExecutor<T> {
}
final class TaskResult {
private final Throwable failure;
private final Exception failure;
private static final TaskResult SUCCESS = new TaskResult(null);
@ -114,11 +114,11 @@ public interface ClusterStateTaskExecutor<T> {
return SUCCESS;
}
public static TaskResult failure(Throwable failure) {
public static TaskResult failure(Exception failure) {
return new TaskResult(failure);
}
private TaskResult(Throwable failure) {
private TaskResult(Exception failure) {
this.failure = failure;
}
@ -126,7 +126,7 @@ public interface ClusterStateTaskExecutor<T> {
return this == SUCCESS;
}
public Throwable getFailure() {
public Exception getFailure() {
assert !isSuccess();
return failure;
}
@ -136,7 +136,7 @@ public interface ClusterStateTaskExecutor<T> {
* @param onSuccess handler to invoke on success
* @param onFailure handler to invoke on failure; the throwable passed through will not be null
*/
public void handle(Runnable onSuccess, Consumer<Throwable> onFailure) {
public void handle(Runnable onSuccess, Consumer<Exception> onFailure) {
if (failure == null) {
onSuccess.run();
} else {

View File

@ -25,7 +25,7 @@ public interface ClusterStateTaskListener {
/**
* A callback called when execute fails.
*/
void onFailure(String source, Throwable t);
void onFailure(String source, Exception e);
/**
* called when the task was rejected because the local node is no longer master

View File

@ -55,11 +55,11 @@ public abstract class ClusterStateUpdateTask implements ClusterStateTaskConfig,
/**
* A callback called when execute fails.
*/
public abstract void onFailure(String source, Throwable t);
public abstract void onFailure(String source, Exception e);
/**
* If the cluster state update task wasn't processed by the provided timeout, call
* {@link #onFailure(String, Throwable)}. May return null to indicate no timeout is needed (default).
* {@link ClusterStateTaskListener#onFailure(String, Exception)}. May return null to indicate no timeout is needed (default).
*/
@Nullable
public TimeValue timeout() {

View File

@ -19,7 +19,6 @@
package org.elasticsearch.cluster;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
@ -30,7 +29,6 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
@ -309,7 +307,7 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
if (e instanceof ReceiveTimeoutTransportException) {
logger.error("NodeStatsAction timed out for ClusterInfoUpdateJob", e);
} else {
@ -339,7 +337,7 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
if (e instanceof ReceiveTimeoutTransportException) {
logger.error("IndicesStatsAction timed out for ClusterInfoUpdateJob", e);
} else {

View File

@ -90,7 +90,7 @@ public class NodeConnectionsService extends AbstractLifecycleComponent {
assert current != null : "node " + node + " was removed in event but not in internal nodes";
try {
transportService.disconnectFromNode(node);
} catch (Throwable e) {
} catch (Exception e) {
logger.warn("failed to disconnect to node [{}]", e, node);
}
}
@ -123,8 +123,8 @@ public class NodeConnectionsService extends AbstractLifecycleComponent {
class ConnectionChecker extends AbstractRunnable {
@Override
public void onFailure(Throwable t) {
logger.warn("unexpected error while checking for node reconnects", t);
public void onFailure(Exception e) {
logger.warn("unexpected error while checking for node reconnects", e);
}
protected void doRun() {

View File

@ -111,7 +111,7 @@ public class ShardStateAction extends AbstractComponent {
waitForNewMasterAndRetry(actionName, observer, shardRoutingEntry, listener);
} else {
logger.warn("{} unexpected failure while sending request [{}] to [{}] for shard [{}]", exp, shardRoutingEntry.getShardRouting().shardId(), actionName, masterNode, shardRoutingEntry);
listener.onFailure(exp instanceof RemoteTransportException ? exp.getCause() : exp);
listener.onFailure(exp instanceof RemoteTransportException ? (Exception) (exp.getCause() instanceof Exception ? exp.getCause() : new ElasticsearchException(exp.getCause())) : exp);
}
}
});
@ -131,14 +131,13 @@ public class ShardStateAction extends AbstractComponent {
/**
* Send a shard failed request to the master node to update the
* cluster state.
*
* @param shardRouting the shard to fail
* @param sourceShardRouting the source shard requesting the failure (must be the shard itself, or the primary shard)
* @param message the reason for the failure
* @param failure the underlying cause of the failure
* @param listener callback upon completion of the request
*/
public void shardFailed(final ShardRouting shardRouting, ShardRouting sourceShardRouting, final String message, @Nullable final Throwable failure, Listener listener) {
public void shardFailed(final ShardRouting shardRouting, ShardRouting sourceShardRouting, final String message, @Nullable final Exception failure, Listener listener) {
ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, sourceShardRouting, message, failure);
sendShardAction(SHARD_FAILED_ACTION_NAME, observer, shardRoutingEntry, listener);
@ -190,12 +189,13 @@ public class ShardStateAction extends AbstractComponent {
shardFailedClusterStateTaskExecutor,
new ClusterStateTaskListener() {
@Override
public void onFailure(String source, Throwable t) {
logger.error("{} unexpected failure while failing shard [{}]", t, request.shardRouting.shardId(), request.shardRouting);
public void onFailure(String source, Exception e) {
logger.error("{} unexpected failure while failing shard [{}]", e, request.shardRouting.shardId(), request.shardRouting);
try {
channel.sendResponse(t);
} catch (Throwable channelThrowable) {
logger.warn("{} failed to send failure [{}] while failing shard [{}]", channelThrowable, request.shardRouting.shardId(), t, request.shardRouting);
channel.sendResponse(e);
} catch (Exception channelException) {
channelException.addSuppressed(e);
logger.warn("{} failed to send failure [{}] while failing shard [{}]", channelException, request.shardRouting.shardId(), e, request.shardRouting);
}
}
@ -204,8 +204,8 @@ public class ShardStateAction extends AbstractComponent {
logger.error("{} no longer master while failing shard [{}]", request.shardRouting.shardId(), request.shardRouting);
try {
channel.sendResponse(new NotMasterException(source));
} catch (Throwable channelThrowable) {
logger.warn("{} failed to send no longer master while failing shard [{}]", channelThrowable, request.shardRouting.shardId(), request.shardRouting);
} catch (Exception channelException) {
logger.warn("{} failed to send no longer master while failing shard [{}]", channelException, request.shardRouting.shardId(), request.shardRouting);
}
}
@ -213,8 +213,8 @@ public class ShardStateAction extends AbstractComponent {
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
try {
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (Throwable channelThrowable) {
logger.warn("{} failed to send response while failing shard [{}]", channelThrowable, request.shardRouting.shardId(), request.shardRouting);
} catch (Exception channelException) {
logger.warn("{} failed to send response while failing shard [{}]", channelException, request.shardRouting.shardId(), request.shardRouting);
}
}
}
@ -259,10 +259,10 @@ public class ShardStateAction extends AbstractComponent {
maybeUpdatedState = ClusterState.builder(currentState).routingResult(result).build();
}
batchResultBuilder.successes(tasksToFail);
} catch (Throwable t) {
} catch (Exception e) {
// failures are communicated back to the requester
// cluster state will not be updated in this case
batchResultBuilder.failures(tasksToFail, t);
batchResultBuilder.failures(tasksToFail, e);
}
partition
@ -379,16 +379,16 @@ public class ShardStateAction extends AbstractComponent {
maybeUpdatedState = ClusterState.builder(currentState).routingResult(result).build();
}
builder.successes(tasks);
} catch (Throwable t) {
builder.failures(tasks, t);
} catch (Exception e) {
builder.failures(tasks, e);
}
return builder.build(maybeUpdatedState);
}
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
public void onFailure(String source, Exception e) {
logger.error("unexpected failure during [{}]", e, source);
}
}
@ -396,12 +396,12 @@ public class ShardStateAction extends AbstractComponent {
ShardRouting shardRouting;
ShardRouting sourceShardRouting;
String message;
Throwable failure;
Exception failure;
public ShardRoutingEntry() {
}
ShardRoutingEntry(ShardRouting shardRouting, ShardRouting sourceShardRouting, String message, @Nullable Throwable failure) {
ShardRoutingEntry(ShardRouting shardRouting, ShardRouting sourceShardRouting, String message, @Nullable Exception failure) {
this.shardRouting = shardRouting;
this.sourceShardRouting = sourceShardRouting;
this.message = message;
@ -418,7 +418,7 @@ public class ShardStateAction extends AbstractComponent {
shardRouting = new ShardRouting(in);
sourceShardRouting = new ShardRouting(in);
message = in.readString();
failure = in.readThrowable();
failure = in.readException();
}
@Override
@ -461,9 +461,9 @@ public class ShardStateAction extends AbstractComponent {
* Any other exception is communicated to the requester via
* this notification.
*
* @param t the unexpected cause of the failure on the master
* @param e the unexpected cause of the failure on the master
*/
default void onFailure(final Throwable t) {
default void onFailure(final Exception e) {
}
}

View File

@ -84,7 +84,7 @@ public class AliasValidator extends AbstractComponent {
if (Strings.hasLength(alias.filter())) {
try (XContentParser parser = XContentFactory.xContent(alias.filter()).createParser(alias.filter())) {
parser.map();
} catch (Throwable e) {
} catch (Exception e) {
throw new IllegalArgumentException("failed to parse filter for alias [" + alias.name() + "]", e);
}
}
@ -121,7 +121,7 @@ public class AliasValidator extends AbstractComponent {
assert queryShardContext != null;
try (XContentParser parser = XContentFactory.xContent(filter).createParser(filter)) {
validateAliasFilter(parser, queryShardContext);
} catch (Throwable e) {
} catch (Exception e) {
throw new IllegalArgumentException("failed to parse filter for alias [" + alias + "]", e);
}
}
@ -135,7 +135,7 @@ public class AliasValidator extends AbstractComponent {
assert queryShardContext != null;
try (XContentParser parser = XContentFactory.xContent(filter).createParser(filter)) {
validateAliasFilter(parser, queryShardContext);
} catch (Throwable e) {
} catch (Exception e) {
throw new IllegalArgumentException("failed to parse filter for alias [" + alias + "]", e);
}
}

Some files were not shown because too many files have changed in this diff Show More