Merge branch 'master' into feature/rank-eval

This commit is contained in:
Christoph Büscher 2016-07-05 11:23:21 +02:00
commit 304e59cb04
1066 changed files with 14379 additions and 7892 deletions

View File

@ -31,7 +31,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.gateway.GatewayAllocator;
@ -102,7 +102,7 @@ public final class Allocators {
}
public static DiscoveryNode newNode(String nodeId, Map<String, String> attributes) {
return new DiscoveryNode("", nodeId, DummyTransportAddress.INSTANCE, attributes, Sets.newHashSet(DiscoveryNode.Role.MASTER,
return new DiscoveryNode("", nodeId, LocalTransportAddress.buildUnique(), attributes, Sets.newHashSet(DiscoveryNode.Role.MASTER,
DiscoveryNode.Role.DATA), Version.CURRENT);
}
}

View File

@ -39,6 +39,27 @@
<module name="EqualsHashCode" />
<!-- Checks that the order of modifiers conforms to the suggestions in the
Java Language specification, sections 8.1.1, 8.3.1 and 8.4.3. It is not that
the standard is perfect, but having a consistent order makes the code more
readable and no other order is compellingly better than the standard.
The correct order is:
public
protected
private
abstract
static
final
transient
volatile
synchronized
native
strictfp
-->
<module name="ModifierOrder" />
<module name="RedundantModifier" />
<!-- We don't use Java's builtin serialization and we suppress all warning
about it. The flip side of that coin is that we shouldn't _try_ to use
it. We can't outright ban it with ForbiddenApis because it complain about

View File

@ -266,7 +266,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]metadata[/\\]MetaDataMappingService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]metadata[/\\]MetaDataUpdateSettingsService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]metadata[/\\]RepositoriesMetaData.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]node[/\\]DiscoveryNodes.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]IndexRoutingTable.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]IndexShardRoutingTable.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]OperationRouting.java" checks="LineLength" />
@ -341,12 +340,9 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]DiscoveryService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]DiscoverySettings.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]local[/\\]LocalDiscovery.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]NodeJoinController.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]ZenDiscovery.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]elect[/\\]ElectMasterService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]fd[/\\]FaultDetection.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]fd[/\\]MasterFaultDetection.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]fd[/\\]NodesFaultDetection.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]membership[/\\]MembershipAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]ping[/\\]ZenPing.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]publish[/\\]PendingClusterStatesQueue.java" checks="LineLength" />
@ -357,7 +353,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]GatewayMetaState.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]GatewayService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]LocalAllocateDangledIndices.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]MetaDataStateFormat.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]PrimaryShardAllocator.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]ReplicaShardAllocator.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]TransportNodesListGatewayMetaState.java" checks="LineLength" />
@ -626,7 +621,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]percentiles[/\\]tdigest[/\\]AbstractTDigestPercentilesAggregator.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]percentiles[/\\]tdigest[/\\]TDigestPercentileRanksAggregator.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]percentiles[/\\]tdigest[/\\]TDigestPercentilesAggregator.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]scripted[/\\]InternalScriptedMetric.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]scripted[/\\]ScriptedMetricAggregator.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]stats[/\\]extended[/\\]ExtendedStatsAggregator.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]tophits[/\\]TopHitsAggregator.java" checks="LineLength" />
@ -848,7 +842,6 @@
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]BlockingClusterStatePublishResponseHandlerTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]DiscoveryWithServiceDisruptionsIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]ZenUnicastDiscoveryIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]NodeJoinControllerTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]ZenDiscoveryUnitTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]publish[/\\]PublishClusterStateActionTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]document[/\\]DocumentActionsIT.java" checks="LineLength" />

View File

@ -36,7 +36,7 @@ public class NamingConventionsCheckBadClasses {
public void testDummy() {}
}
public static abstract class DummyAbstractTests extends UnitTestCase {
public abstract static class DummyAbstractTests extends UnitTestCase {
}
public interface DummyInterfaceTests {

View File

@ -26,6 +26,8 @@ apply plugin: 'ru.vyarus.animalsniffer'
targetCompatibility = JavaVersion.VERSION_1_7
sourceCompatibility = JavaVersion.VERSION_1_7
group = 'org.elasticsearch.client'
dependencies {
compile "org.apache.httpcomponents:httpclient:${versions.httpclient}"
compile "org.apache.httpcomponents:httpcore:${versions.httpcore}"

View File

@ -28,7 +28,7 @@ import java.net.URI;
*/
final class HttpDeleteWithEntity extends HttpEntityEnclosingRequestBase {
final static String METHOD_NAME = HttpDelete.METHOD_NAME;
static final String METHOD_NAME = HttpDelete.METHOD_NAME;
HttpDeleteWithEntity(final URI uri) {
setURI(uri);

View File

@ -28,7 +28,7 @@ import java.net.URI;
*/
final class HttpGetWithEntity extends HttpEntityEnclosingRequestBase {
final static String METHOD_NAME = HttpGet.METHOD_NAME;
static final String METHOD_NAME = HttpGet.METHOD_NAME;
HttpGetWithEntity(final URI uri) {
setURI(uri);

View File

@ -26,6 +26,8 @@ apply plugin: 'ru.vyarus.animalsniffer'
targetCompatibility = JavaVersion.VERSION_1_7
sourceCompatibility = JavaVersion.VERSION_1_7
group = 'org.elasticsearch.client'
dependencies {
compile "org.elasticsearch.client:rest:${version}"
compile "org.apache.httpcomponents:httpclient:${versions.httpclient}"

View File

@ -26,6 +26,9 @@ apply plugin: 'ru.vyarus.animalsniffer'
targetCompatibility = JavaVersion.VERSION_1_7
sourceCompatibility = JavaVersion.VERSION_1_7
install.enabled = false
uploadArchives.enabled = false
dependencies {
compile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}"
compile "junit:junit:${versions.junit}"

View File

@ -1110,7 +1110,7 @@ public long ramBytesUsed() {
this.analyzed.copyBytes(analyzed);
}
private final static class SurfaceFormAndPayload implements Comparable<SurfaceFormAndPayload> {
private static final class SurfaceFormAndPayload implements Comparable<SurfaceFormAndPayload> {
BytesRef payload;
long weight;

View File

@ -26,7 +26,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
*/
public class StoreRateLimiting {
public static interface Provider {
public interface Provider {
StoreRateLimiting rateLimiting();
}

View File

@ -100,7 +100,7 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
}
public ElasticsearchException(StreamInput in) throws IOException {
super(in.readOptionalString(), in.readThrowable());
super(in.readOptionalString(), in.readException());
readStackTrace(this, in);
int numKeys = in.readVInt();
for (int i = 0; i < numKeys; i++) {
@ -162,7 +162,7 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
* Unwraps the actual cause from the exception for cases when the exception is a
* {@link ElasticsearchWrapperException}.
*
* @see org.elasticsearch.ExceptionsHelper#unwrapCause(Throwable)
* @see ExceptionsHelper#unwrapCause(Throwable)
*/
public Throwable unwrapCause() {
return ExceptionsHelper.unwrapCause(this);
@ -415,7 +415,7 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
int numSuppressed = in.readVInt();
for (int i = 0; i < numSuppressed; i++) {
throwable.addSuppressed(in.readThrowable());
throwable.addSuppressed(in.readException());
}
return throwable;
}
@ -794,9 +794,9 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
return null;
}
public static void renderThrowable(XContentBuilder builder, Params params, Throwable t) throws IOException {
public static void renderException(XContentBuilder builder, Params params, Exception e) throws IOException {
builder.startObject("error");
final ElasticsearchException[] rootCauses = ElasticsearchException.guessRootCauses(t);
final ElasticsearchException[] rootCauses = ElasticsearchException.guessRootCauses(e);
builder.field("root_cause");
builder.startArray();
for (ElasticsearchException rootCause : rootCauses) {
@ -806,7 +806,7 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
builder.endObject();
}
builder.endArray();
ElasticsearchException.toXContent(builder, params, t);
ElasticsearchException.toXContent(builder, params, e);
builder.endObject();
}

View File

@ -36,7 +36,7 @@ public class ElasticsearchSecurityException extends ElasticsearchException {
this.status = status ;
}
public ElasticsearchSecurityException(String msg, Throwable cause, Object... args) {
public ElasticsearchSecurityException(String msg, Exception cause, Object... args) {
this(msg, ExceptionsHelper.status(cause), cause, args);
}

View File

@ -37,25 +37,22 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
*
*/
public final class ExceptionsHelper {
private static final ESLogger logger = Loggers.getLogger(ExceptionsHelper.class);
public static RuntimeException convertToRuntime(Throwable t) {
if (t instanceof RuntimeException) {
return (RuntimeException) t;
public static RuntimeException convertToRuntime(Exception e) {
if (e instanceof RuntimeException) {
return (RuntimeException) e;
}
return new ElasticsearchException(t);
return new ElasticsearchException(e);
}
public static ElasticsearchException convertToElastic(Throwable t) {
if (t instanceof ElasticsearchException) {
return (ElasticsearchException) t;
public static ElasticsearchException convertToElastic(Exception e) {
if (e instanceof ElasticsearchException) {
return (ElasticsearchException) e;
}
return new ElasticsearchException(t);
return new ElasticsearchException(e);
}
public static RestStatus status(Throwable t) {
@ -209,7 +206,6 @@ public final class ExceptionsHelper {
return true;
}
/**
* Deduplicate the failures by exception message and index.
*/

View File

@ -32,5 +32,5 @@ public interface ActionListener<Response> {
/**
* A failure caused by an exception at some phase of the task.
*/
void onFailure(Throwable e);
void onFailure(Exception e);
}

View File

@ -22,7 +22,7 @@ package org.elasticsearch.action;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
/**
* Base class for {@link Runnable}s that need to call {@link ActionListener#onFailure(Throwable)} in case an uncaught
* Base class for {@link Runnable}s that need to call {@link ActionListener#onFailure(Exception)} in case an uncaught
* exception or error is thrown while the actual action is run.
*/
public abstract class ActionRunnable<Response> extends AbstractRunnable {
@ -34,11 +34,11 @@ public abstract class ActionRunnable<Response> extends AbstractRunnable {
}
/**
* Calls the action listeners {@link ActionListener#onFailure(Throwable)} method with the given exception.
* Calls the action listeners {@link ActionListener#onFailure(Exception)} method with the given exception.
* This method is invoked for all exception thrown by {@link #doRun()}
*/
@Override
public void onFailure(Throwable t) {
listener.onFailure(t);
public void onFailure(Exception e) {
listener.onFailure(e);
}
}

View File

@ -45,7 +45,7 @@ public class LatchedActionListener<T> implements ActionListener<T> {
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
try {
delegate.onFailure(e);
} finally {

View File

@ -43,15 +43,15 @@ public final class TaskOperationFailure implements Writeable, ToXContent {
private final long taskId;
private final Throwable reason;
private final Exception reason;
private final RestStatus status;
public TaskOperationFailure(String nodeId, long taskId, Throwable t) {
public TaskOperationFailure(String nodeId, long taskId, Exception e) {
this.nodeId = nodeId;
this.taskId = taskId;
this.reason = t;
status = ExceptionsHelper.status(t);
this.reason = e;
status = ExceptionsHelper.status(e);
}
/**
@ -60,7 +60,7 @@ public final class TaskOperationFailure implements Writeable, ToXContent {
public TaskOperationFailure(StreamInput in) throws IOException {
nodeId = in.readString();
taskId = in.readLong();
reason = in.readThrowable();
reason = in.readException();
status = RestStatus.readFrom(in);
}

View File

@ -43,7 +43,7 @@ public class NodeExplanation implements Writeable, ToXContent {
private final String finalExplanation;
public NodeExplanation(final DiscoveryNode node, final Decision nodeDecision, final Float nodeWeight,
final @Nullable IndicesShardStoresResponse.StoreStatus storeStatus,
@Nullable final IndicesShardStoresResponse.StoreStatus storeStatus,
final ClusterAllocationExplanation.FinalDecision finalDecision,
final String finalExplanation,
final ClusterAllocationExplanation.StoreCopy storeCopy) {

View File

@ -145,7 +145,7 @@ public class TransportClusterAllocationExplainAction
// No copies of the data
storeCopy = ClusterAllocationExplanation.StoreCopy.NONE;
} else {
final Throwable storeErr = storeStatus.getStoreException();
final Exception storeErr = storeStatus.getStoreException();
if (storeErr != null) {
if (ExceptionsHelper.unwrapCause(storeErr) instanceof CorruptIndexException) {
storeCopy = ClusterAllocationExplanation.StoreCopy.CORRUPT;
@ -323,7 +323,7 @@ public class TransportClusterAllocationExplainAction
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
listener.onFailure(e);
}
});

View File

@ -104,9 +104,9 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
}
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
listener.onFailure(t);
public void onFailure(String source, Exception e) {
logger.error("unexpected failure during [{}]", e, source);
listener.onFailure(e);
}
@Override

View File

@ -46,8 +46,6 @@ import static java.util.Collections.unmodifiableMap;
* Node information (static, does not change over time).
*/
public class NodeInfo extends BaseNodeResponse {
@Nullable
private Map<String, String> serviceAttributes;
private Version version;
private Build build;
@ -85,14 +83,13 @@ public class NodeInfo extends BaseNodeResponse {
public NodeInfo() {
}
public NodeInfo(Version version, Build build, DiscoveryNode node, @Nullable Map<String, String> serviceAttributes, @Nullable Settings settings,
public NodeInfo(Version version, Build build, DiscoveryNode node, @Nullable Settings settings,
@Nullable OsInfo os, @Nullable ProcessInfo process, @Nullable JvmInfo jvm, @Nullable ThreadPoolInfo threadPool,
@Nullable TransportInfo transport, @Nullable HttpInfo http, @Nullable PluginsAndModules plugins, @Nullable IngestInfo ingest,
@Nullable ByteSizeValue totalIndexingBuffer) {
super(node);
this.version = version;
this.build = build;
this.serviceAttributes = serviceAttributes;
this.settings = settings;
this.os = os;
this.process = process;
@ -127,14 +124,6 @@ public class NodeInfo extends BaseNodeResponse {
return this.build;
}
/**
* The service attributes of the node.
*/
@Nullable
public Map<String, String> getServiceAttributes() {
return this.serviceAttributes;
}
/**
* The settings of the node.
*/
@ -213,14 +202,6 @@ public class NodeInfo extends BaseNodeResponse {
} else {
totalIndexingBuffer = null;
}
if (in.readBoolean()) {
Map<String, String> builder = new HashMap<>();
int size = in.readVInt();
for (int i = 0; i < size; i++) {
builder.put(in.readString(), in.readString());
}
serviceAttributes = unmodifiableMap(builder);
}
if (in.readBoolean()) {
settings = Settings.readSettingsFromStream(in);
}
@ -262,16 +243,6 @@ public class NodeInfo extends BaseNodeResponse {
out.writeBoolean(true);
out.writeLong(totalIndexingBuffer.bytes());
}
if (getServiceAttributes() == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeVInt(serviceAttributes.size());
for (Map.Entry<String, String> entry : serviceAttributes.entrySet()) {
out.writeString(entry.getKey());
out.writeString(entry.getValue());
}
}
if (settings == null) {
out.writeBoolean(false);
} else {

View File

@ -73,12 +73,6 @@ public class NodesInfoResponse extends BaseNodesResponse<NodeInfo> implements To
builder.byteSizeField("total_indexing_buffer", "total_indexing_buffer_in_bytes", nodeInfo.getTotalIndexingBuffer());
}
if (nodeInfo.getServiceAttributes() != null) {
for (Map.Entry<String, String> nodeAttribute : nodeInfo.getServiceAttributes().entrySet()) {
builder.field(nodeAttribute.getKey(), nodeAttribute.getValue());
}
}
builder.startArray("roles");
for (DiscoveryNode.Role role : nodeInfo.getNode().getRoles()) {
builder.value(role.getRoleName());

View File

@ -154,8 +154,8 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
}
@Override
public void onFailure(Throwable t) {
listener.onFailure(t);
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
} else {
@ -179,7 +179,7 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
/*
* We couldn't load the task from the task index. Instead of 404 we should use the snapshot we took after it finished. If
* the error isn't a 404 then we'll just throw it back to the user.
@ -207,13 +207,13 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
public void onResponse(GetResponse getResponse) {
try {
onGetFinishedTaskFromIndex(getResponse, listener);
} catch (Throwable e) {
} catch (Exception e) {
listener.onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
if (ExceptionsHelper.unwrap(e, IndexNotFoundException.class) != null) {
// We haven't yet created the index for the task results so it can't be found.
listener.onFailure(new ResourceNotFoundException("task [{}] isn't running or persisted", e, request.getTaskId()));

View File

@ -76,7 +76,7 @@ public class TransportDeleteRepositoryAction extends TransportMasterNodeAction<D
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
listener.onFailure(e);
}
});

View File

@ -80,7 +80,7 @@ public class TransportPutRepositoryAction extends TransportMasterNodeAction<PutR
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
listener.onFailure(e);
}
});

View File

@ -78,7 +78,7 @@ public class TransportVerifyRepositoryAction extends TransportMasterNodeAction<V
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
listener.onFailure(e);
}
});

View File

@ -102,9 +102,9 @@ public class TransportClusterRerouteAction extends TransportMasterNodeAction<Clu
}
@Override
public void onFailure(String source, Throwable t) {
logger.debug("failed to perform [{}]", t, source);
super.onFailure(source, t);
public void onFailure(String source, Exception e) {
logger.debug("failed to perform [{}]", e, source);
super.onFailure(source, e);
}
@Override

View File

@ -93,11 +93,11 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeAct
}
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
public void onAllNodesAcked(@Nullable Exception e) {
if (changed) {
reroute(true);
} else {
super.onAllNodesAcked(t);
super.onAllNodesAcked(e);
}
}
@ -146,10 +146,10 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeAct
}
@Override
public void onFailure(String source, Throwable t) {
public void onFailure(String source, Exception e) {
//if the reroute fails we only log
logger.debug("failed to perform [{}]", t, source);
listener.onFailure(new ElasticsearchException("reroute after update settings failed", t));
logger.debug("failed to perform [{}]", e, source);
listener.onFailure(new ElasticsearchException("reroute after update settings failed", e));
}
@Override
@ -165,9 +165,9 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeAct
}
@Override
public void onFailure(String source, Throwable t) {
logger.debug("failed to perform [{}]", t, source);
super.onFailure(source, t);
public void onFailure(String source, Exception e) {
logger.debug("failed to perform [{}]", e, source);
super.onFailure(source, e);
}
@Override

View File

@ -94,10 +94,10 @@ public class TransportCreateSnapshotAction extends TransportMasterNodeAction<Cre
}
@Override
public void onSnapshotFailure(Snapshot snapshot, Throwable t) {
public void onSnapshotFailure(Snapshot snapshot, Exception e) {
if (snapshot.getRepository().equals(request.repository()) &&
snapshot.getSnapshotId().getName().equals(request.snapshot())) {
listener.onFailure(t);
listener.onFailure(e);
snapshotsService.removeListener(this);
}
}
@ -108,8 +108,8 @@ public class TransportCreateSnapshotAction extends TransportMasterNodeAction<Cre
}
@Override
public void onFailure(Throwable t) {
listener.onFailure(t);
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}

View File

@ -72,8 +72,8 @@ public class TransportDeleteSnapshotAction extends TransportMasterNodeAction<Del
}
@Override
public void onFailure(Throwable t) {
listener.onFailure(t);
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}

View File

@ -120,8 +120,8 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
snapshotInfoBuilder.addAll(snapshotsService.snapshots(repository, new ArrayList<>(toResolve), request.ignoreUnavailable()));
}
listener.onResponse(new GetSnapshotsResponse(snapshotInfoBuilder));
} catch (Throwable t) {
listener.onFailure(t);
} catch (Exception e) {
listener.onFailure(e);
}
}

View File

@ -94,7 +94,7 @@ public class TransportRestoreSnapshotAction extends TransportMasterNodeAction<Re
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
@ -104,7 +104,7 @@ public class TransportRestoreSnapshotAction extends TransportMasterNodeAction<Re
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
listener.onFailure(t);
}
});

View File

@ -125,13 +125,13 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
List<SnapshotsInProgress.Entry> currentSnapshots =
snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots()));
listener.onResponse(buildResponse(request, currentSnapshots, nodeSnapshotStatuses));
} catch (Throwable e) {
} catch (Exception e) {
listener.onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
@ -207,16 +207,15 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
.filter(s -> requestedSnapshotNames.contains(s.getName()))
.collect(Collectors.toMap(SnapshotId::getName, Function.identity()));
for (final String snapshotName : request.snapshots()) {
SnapshotId snapshotId = matchedSnapshotIds.get(snapshotName);
if (snapshotId == null) {
if (currentSnapshotNames.contains(snapshotName)) {
// we've already found this snapshot in the current snapshot entries, so skip over
continue;
} else {
}
SnapshotId snapshotId = matchedSnapshotIds.get(snapshotName);
if (snapshotId == null) {
// neither in the current snapshot entries nor found in the repository
throw new SnapshotMissingException(repositoryName, snapshotName);
}
}
SnapshotInfo snapshotInfo = snapshotsService.snapshot(repositoryName, snapshotId);
List<SnapshotIndexShardStatus> shardStatusBuilder = new ArrayList<>();
if (snapshotInfo.state().completed()) {

View File

@ -119,7 +119,7 @@ public class TransportIndicesAliasesAction extends TransportMasterNodeAction<Ind
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
logger.debug("failed to perform aliases", t);
listener.onFailure(t);
}

View File

@ -107,7 +107,7 @@ public class TransportCloseIndexAction extends TransportMasterNodeAction<CloseIn
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
logger.debug("failed to close indices [{}]", t, (Object)concreteIndices);
listener.onFailure(t);
}

View File

@ -364,7 +364,7 @@ public class CreateIndexRequest extends AcknowledgedRequest<CreateIndexRequest>
throw new ElasticsearchParseException("failed to parse source for create index", e);
}
} else {
settings(new String(source.toBytes(), StandardCharsets.UTF_8));
settings(source.utf8ToString());
}
return this;
}

View File

@ -87,7 +87,7 @@ public class TransportCreateIndexAction extends TransportMasterNodeAction<Create
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
if (t instanceof IndexAlreadyExistsException) {
logger.trace("[{}] failed to create", t, request.index());
} else {

View File

@ -99,7 +99,7 @@ public class TransportDeleteIndexAction extends TransportMasterNodeAction<Delete
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
logger.debug("failed to delete indices [{}]", t, concreteIndices);
listener.onFailure(t);
}

View File

@ -77,7 +77,7 @@ public class TransportGetFieldMappingsAction extends HandledTransportAction<GetF
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
int index = indexCounter.getAndIncrement();
indexResponses.set(index, e);
if (completionCounter.decrementAndGet() == 0) {

View File

@ -130,7 +130,7 @@ public class TransportGetFieldMappingsIndexAction extends TransportSingleShardAc
private static final ToXContent.Params includeDefaultsParams = new ToXContent.Params() {
final static String INCLUDE_DEFAULTS = "include_defaults";
static final String INCLUDE_DEFAULTS = "include_defaults";
@Override
public String param(String key) {

View File

@ -91,7 +91,7 @@ public class TransportPutMappingAction extends TransportMasterNodeAction<PutMapp
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
logger.debug("failed to put mappings on indices [{}], type [{}]", t, concreteIndices, request.type());
listener.onFailure(t);
}

View File

@ -92,7 +92,7 @@ public class TransportOpenIndexAction extends TransportMasterNodeAction<OpenInde
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
logger.debug("failed to open indices [{}]", t, (Object)concreteIndices);
listener.onFailure(t);
}

View File

@ -30,7 +30,7 @@ import java.io.IOException;
* when the index is at least {@link #value} old
*/
public class MaxAgeCondition extends Condition<TimeValue> {
public final static String NAME = "max_age";
public static final String NAME = "max_age";
public MaxAgeCondition(TimeValue value) {
super(NAME);

View File

@ -29,7 +29,7 @@ import java.io.IOException;
* when the index has at least {@link #value} docs
*/
public class MaxDocsCondition extends Condition<Long> {
public final static String NAME = "max_docs";
public static final String NAME = "max_docs";
public MaxDocsCondition(Long value) {
super(NAME);

View File

@ -43,7 +43,6 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -132,14 +131,14 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
listener.onFailure(t);
}
});
@ -152,7 +151,7 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
listener.onFailure(e);
}
}

View File

@ -91,7 +91,7 @@ public class TransportUpdateSettingsAction extends TransportMasterNodeAction<Upd
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
logger.debug("failed to update settings on indices [{}]", t, (Object)concreteIndices);
listener.onFailure(t);
}

View File

@ -57,7 +57,7 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
private DiscoveryNode node;
private long legacyVersion;
private String allocationId;
private Throwable storeException;
private Exception storeException;
private AllocationStatus allocationStatus;
/**
@ -116,7 +116,7 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
private StoreStatus() {
}
public StoreStatus(DiscoveryNode node, long legacyVersion, String allocationId, AllocationStatus allocationStatus, Throwable storeException) {
public StoreStatus(DiscoveryNode node, long legacyVersion, String allocationId, AllocationStatus allocationStatus, Exception storeException) {
this.node = node;
this.legacyVersion = legacyVersion;
this.allocationId = allocationId;
@ -150,7 +150,7 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
* Exception while trying to open the
* shard index or from when the shard failed
*/
public Throwable getStoreException() {
public Exception getStoreException() {
return storeException;
}
@ -177,7 +177,7 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
allocationId = in.readOptionalString();
allocationStatus = AllocationStatus.readFrom(in);
if (in.readBoolean()) {
storeException = in.readThrowable();
storeException = in.readException();
}
}

View File

@ -100,7 +100,7 @@ public class TransportShrinkAction extends TransportMasterNodeAction<ShrinkReque
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
if (t instanceof IndexAlreadyExistsException) {
logger.trace("[{}] failed to create shrink index", t, updateRequest.index());
} else {
@ -112,7 +112,7 @@ public class TransportShrinkAction extends TransportMasterNodeAction<ShrinkReque
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
listener.onFailure(e);
}
});

View File

@ -31,8 +31,8 @@ import java.util.EnumSet;
*/
public class CommonStatsFlags implements Streamable, Cloneable {
public final static CommonStatsFlags ALL = new CommonStatsFlags().all();
public final static CommonStatsFlags NONE = new CommonStatsFlags().clear();
public static final CommonStatsFlags ALL = new CommonStatsFlags().all();
public static final CommonStatsFlags NONE = new CommonStatsFlags().clear();
private EnumSet<Flag> flags = EnumSet.allOf(Flag.class);
private String[] types = null;

View File

@ -72,9 +72,9 @@ public class TransportDeleteIndexTemplateAction extends TransportMasterNodeActio
}
@Override
public void onFailure(Throwable t) {
logger.debug("failed to delete templates [{}]", t, request.name());
listener.onFailure(t);
public void onFailure(Exception e) {
logger.debug("failed to delete templates [{}]", e, request.name());
listener.onFailure(e);
}
});
}

View File

@ -93,9 +93,9 @@ public class TransportPutIndexTemplateAction extends TransportMasterNodeAction<P
}
@Override
public void onFailure(Throwable t) {
logger.debug("failed to put template [{}]", t, request.name());
listener.onFailure(t);
public void onFailure(Exception e) {
logger.debug("failed to put template [{}]", e, request.name());
listener.onFailure(e);
}
});
}

View File

@ -190,13 +190,13 @@ public class TransportUpgradeAction extends TransportBroadcastByNodeAction<Upgra
} else {
updateSettings(upgradeResponse, listener);
}
} catch (Throwable t) {
listener.onFailure(t);
} catch (Exception e) {
listener.onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
listener.onFailure(e);
}
};
@ -212,7 +212,7 @@ public class TransportUpgradeAction extends TransportBroadcastByNodeAction<Upgra
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
listener.onFailure(e);
}
});

View File

@ -78,7 +78,7 @@ public class TransportUpgradeSettingsAction extends TransportMasterNodeAction<Up
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
logger.debug("failed to upgrade minimum compatibility version settings on indices [{}]", t, request.versions().keySet());
listener.onFailure(t);
}

View File

@ -91,12 +91,12 @@ public class BulkItemResponse implements Streamable, StatusToXContent {
private final Throwable cause;
private final RestStatus status;
public Failure(String index, String type, String id, Throwable t) {
public Failure(String index, String type, String id, Throwable cause) {
this.index = index;
this.type = type;
this.id = id;
this.cause = t;
this.status = ExceptionsHelper.status(t);
this.cause = cause;
this.status = ExceptionsHelper.status(cause);
}
/**
@ -106,7 +106,7 @@ public class BulkItemResponse implements Streamable, StatusToXContent {
index = in.readString();
type = in.readString();
id = in.readOptionalString();
cause = in.readThrowable();
cause = in.readException();
status = ExceptionsHelper.status(cause);
}

View File

@ -80,10 +80,10 @@ abstract class BulkRequestHandler {
if (!afterCalled) {
listener.afterBulk(executionId, bulkRequest, e);
}
} catch (Throwable t) {
logger.warn("Failed to execute bulk request {}.", t, executionId);
} catch (Exception e) {
logger.warn("Failed to execute bulk request {}.", e, executionId);
if (!afterCalled) {
listener.afterBulk(executionId, bulkRequest, t);
listener.afterBulk(executionId, bulkRequest, e);
}
}
}
@ -131,7 +131,7 @@ abstract class BulkRequestHandler {
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
try {
listener.afterBulk(executionId, bulkRequest, e);
} finally {
@ -144,9 +144,9 @@ abstract class BulkRequestHandler {
Thread.currentThread().interrupt();
logger.info("Bulk request {} has been cancelled.", e, executionId);
listener.afterBulk(executionId, bulkRequest, e);
} catch (Throwable t) {
logger.warn("Failed to execute bulk request {}.", t, executionId);
listener.afterBulk(executionId, bulkRequest, t);
} catch (Exception e) {
logger.warn("Failed to execute bulk request {}.", e, executionId);
listener.afterBulk(executionId, bulkRequest, e);
} finally {
if (!bulkRequestSetupSuccessful && acquired) { // if we fail on client.bulk() release the semaphore
semaphore.release();

View File

@ -35,7 +35,7 @@ import java.util.Iterator;
*/
public class BulkResponse extends ActionResponse implements Iterable<BulkItemResponse> {
public final static long NO_INGEST_TOOK = -1L;
public static final long NO_INGEST_TOOK = -1L;
private BulkItemResponse[] responses;
private long tookInMillis;

View File

@ -38,7 +38,7 @@ public class BulkShardRequest extends ReplicatedWriteRequest<BulkShardRequest> {
public BulkShardRequest() {
}
BulkShardRequest(BulkRequest bulkRequest, ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items) {
BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items) {
super(shardId);
this.items = items;
setRefreshPolicy(refreshPolicy);

View File

@ -130,7 +130,7 @@ public class Retry {
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
try {
listener.onFailure(e);
} finally {
@ -163,8 +163,8 @@ public class Retry {
}
for (BulkItemResponse bulkItemResponse : bulkItemResponses) {
if (bulkItemResponse.isFailed()) {
Throwable cause = bulkItemResponse.getFailure().getCause();
Throwable rootCause = ExceptionsHelper.unwrapCause(cause);
final Throwable cause = bulkItemResponse.getFailure().getCause();
final Throwable rootCause = ExceptionsHelper.unwrapCause(cause);
if (!rootCause.getClass().equals(retryOnThrowable)) {
return false;
}

View File

@ -150,14 +150,14 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
if (counter.decrementAndGet() == 0) {
try {
executeBulk(task, bulkRequest, startTime, listener, responses);
} catch (Throwable t) {
listener.onFailure(t);
} catch (Exception e) {
listener.onFailure(e);
}
}
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
if (!(ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException)) {
// fail all requests involving this index, if create didnt work
for (int i = 0; i < bulkRequest.requests.size(); i++) {
@ -170,8 +170,9 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
if (counter.decrementAndGet() == 0) {
try {
executeBulk(task, bulkRequest, startTime, listener, responses);
} catch (Throwable t) {
listener.onFailure(t);
} catch (Exception inner) {
inner.addSuppressed(e);
listener.onFailure(inner);
}
}
}
@ -195,7 +196,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
return autoCreateIndex.shouldAutoCreate(index, state);
}
private boolean setResponseFailureIfIndexMatches(AtomicArray<BulkItemResponse> responses, int idx, ActionRequest request, String index, Throwable e) {
private boolean setResponseFailureIfIndexMatches(AtomicArray<BulkItemResponse> responses, int idx, ActionRequest request, String index, Exception e) {
if (request instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) request;
if (index.equals(indexRequest.index())) {
@ -344,7 +345,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
final ShardId shardId = entry.getKey();
final List<BulkItemRequest> requests = entry.getValue();
BulkShardRequest bulkShardRequest = new BulkShardRequest(bulkRequest, shardId, bulkRequest.getRefreshPolicy(),
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, bulkRequest.getRefreshPolicy(),
requests.toArray(new BulkItemRequest[requests.size()]));
bulkShardRequest.consistencyLevel(bulkRequest.consistencyLevel());
bulkShardRequest.timeout(bulkRequest.timeout());
@ -367,7 +368,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
// create failures for all relevant requests
for (BulkItemRequest request : requests) {
final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();

View File

@ -71,8 +71,8 @@ import static org.elasticsearch.action.support.replication.ReplicationOperation.
*/
public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequest, BulkShardResponse> {
private final static String OP_TYPE_UPDATE = "update";
private final static String OP_TYPE_DELETE = "delete";
private static final String OP_TYPE_UPDATE = "update";
private static final String OP_TYPE_DELETE = "delete";
public static final String ACTION_NAME = BulkAction.NAME + "[s]";
@ -158,7 +158,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
// add the response
IndexResponse indexResponse = result.getResponse();
setResponse(item, new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), indexResponse));
} catch (Throwable e) {
} catch (Exception e) {
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
if (retryPrimaryException(e)) {
// restore updated versions...
@ -181,11 +181,11 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
return location;
}
private <ReplicationRequestT extends ReplicationRequest<ReplicationRequestT>> void logFailure(Throwable e, String operation, ShardId shardId, ReplicationRequest<ReplicationRequestT> request) {
if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) {
logger.trace("{} failed to execute bulk item ({}) {}", e, shardId, operation, request);
private <ReplicationRequestT extends ReplicationRequest<ReplicationRequestT>> void logFailure(Throwable t, String operation, ShardId shardId, ReplicationRequest<ReplicationRequestT> request) {
if (ExceptionsHelper.status(t) == RestStatus.CONFLICT) {
logger.trace("{} failed to execute bulk item ({}) {}", t, shardId, operation, request);
} else {
logger.debug("{} failed to execute bulk item ({}) {}", e, shardId, operation, request);
logger.debug("{} failed to execute bulk item ({}) {}", t, shardId, operation, request);
}
}
@ -200,7 +200,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
DeleteResponse deleteResponse = writeResult.getResponse();
location = locationToSync(location, writeResult.getLocation());
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_DELETE, deleteResponse));
} catch (Throwable e) {
} catch (Exception e) {
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
if (retryPrimaryException(e)) {
// restore updated versions...
@ -232,7 +232,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
UpdateResult updateResult;
try {
updateResult = shardUpdateOperation(metaData, request, updateRequest, indexShard);
} catch (Throwable t) {
} catch (Exception t) {
updateResult = new UpdateResult(null, null, false, t, null);
}
if (updateResult.success()) {
@ -275,43 +275,43 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
// NOTE: Breaking out of the retry_on_conflict loop!
break;
} else if (updateResult.failure()) {
Throwable t = updateResult.error;
Throwable e = updateResult.error;
if (updateResult.retry) {
// updateAttemptCount is 0 based and marks current attempt, if it's equal to retryOnConflict we are going out of the iteration
if (updateAttemptsCount >= updateRequest.retryOnConflict()) {
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE,
new BulkItemResponse.Failure(request.index(), updateRequest.type(), updateRequest.id(), t)));
new BulkItemResponse.Failure(request.index(), updateRequest.type(), updateRequest.id(), e)));
}
} else {
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
if (retryPrimaryException(t)) {
if (retryPrimaryException(e)) {
// restore updated versions...
for (int j = 0; j < requestIndex; j++) {
applyVersion(request.items()[j], preVersions[j], preVersionTypes[j]);
}
throw (ElasticsearchException) t;
throw (ElasticsearchException) e;
}
// if its a conflict failure, and we already executed the request on a primary (and we execute it
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
// then just use the response we got from the successful execution
if (item.getPrimaryResponse() != null && isConflictException(t)) {
if (item.getPrimaryResponse() != null && isConflictException(e)) {
setResponse(item, item.getPrimaryResponse());
} else if (updateResult.result == null) {
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, new BulkItemResponse.Failure(request.index(), updateRequest.type(), updateRequest.id(), t)));
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, new BulkItemResponse.Failure(request.index(), updateRequest.type(), updateRequest.id(), e)));
} else {
switch (updateResult.result.operation()) {
case UPSERT:
case INDEX:
IndexRequest indexRequest = updateResult.request();
logFailure(t, "index", request.shardId(), indexRequest);
logFailure(e, "index", request.shardId(), indexRequest);
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE,
new BulkItemResponse.Failure(request.index(), indexRequest.type(), indexRequest.id(), t)));
new BulkItemResponse.Failure(request.index(), indexRequest.type(), indexRequest.id(), e)));
break;
case DELETE:
DeleteRequest deleteRequest = updateResult.request();
logFailure(t, "delete", request.shardId(), deleteRequest);
logFailure(e, "delete", request.shardId(), deleteRequest);
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_DELETE,
new BulkItemResponse.Failure(request.index(), deleteRequest.type(), deleteRequest.id(), t)));
new BulkItemResponse.Failure(request.index(), deleteRequest.type(), deleteRequest.id(), e)));
break;
}
}
@ -335,7 +335,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
}
private WriteResult<IndexResponse> shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, IndexMetaData metaData,
IndexShard indexShard, boolean processed) throws Throwable {
IndexShard indexShard, boolean processed) throws Exception {
MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type());
if (!processed) {
@ -406,26 +406,26 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
try {
WriteResult result = shardIndexOperation(bulkShardRequest, indexRequest, metaData, indexShard, false);
return new UpdateResult(translate, indexRequest, result);
} catch (Throwable t) {
t = ExceptionsHelper.unwrapCause(t);
} catch (Exception e) {
final Throwable cause = ExceptionsHelper.unwrapCause(e);
boolean retry = false;
if (t instanceof VersionConflictEngineException) {
if (cause instanceof VersionConflictEngineException) {
retry = true;
}
return new UpdateResult(translate, indexRequest, retry, t, null);
return new UpdateResult(translate, indexRequest, retry, cause, null);
}
case DELETE:
DeleteRequest deleteRequest = translate.action();
try {
WriteResult<DeleteResponse> result = TransportDeleteAction.executeDeleteRequestOnPrimary(deleteRequest, indexShard);
return new UpdateResult(translate, deleteRequest, result);
} catch (Throwable t) {
t = ExceptionsHelper.unwrapCause(t);
} catch (Exception e) {
final Throwable cause = ExceptionsHelper.unwrapCause(e);
boolean retry = false;
if (t instanceof VersionConflictEngineException) {
if (cause instanceof VersionConflictEngineException) {
retry = true;
}
return new UpdateResult(translate, deleteRequest, retry, t, null);
return new UpdateResult(translate, deleteRequest, retry, cause, null);
}
case NONE:
UpdateResponse updateResponse = translate.action();
@ -449,7 +449,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
try {
Engine.Index operation = TransportIndexAction.executeIndexRequestOnReplica(indexRequest, indexShard);
location = locationToSync(location, operation.getTranslogLocation());
} catch (Throwable e) {
} catch (Exception e) {
// if its not an ignore replica failure, we need to make sure to bubble up the failure
// so we will fail the shard
if (!ignoreReplicaException(e)) {
@ -462,7 +462,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
Engine.Delete delete = TransportDeleteAction.executeDeleteRequestOnReplica(deleteRequest, indexShard);
indexShard.delete(delete);
location = locationToSync(location, delete.getTranslogLocation());
} catch (Throwable e) {
} catch (Exception e) {
// if its not an ignore replica failure, we need to make sure to bubble up the failure
// so we will fail the shard
if (!ignoreReplicaException(e)) {

View File

@ -77,7 +77,7 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
// we have the index, do it
innerExecute(task, request, listener);

View File

@ -131,7 +131,7 @@ public class TransportExplainAction extends TransportSingleShardAction<ExplainRe
// Advantage is that we're not opening a second searcher to retrieve the _source. Also
// because we are working in the same searcher in engineGetResult we can be sure that a
// doc isn't deleted between the initial get and this call.
GetResult getResult = indexShard.getService().get(result, request.id(), request.type(), request.fields(), request.fetchSourceContext(), false);
GetResult getResult = indexShard.getService().get(result, request.id(), request.type(), request.fields(), request.fetchSourceContext());
return new ExplainResponse(shardId.getIndexName(), request.type(), request.id(), true, explanation, getResult);
} else {
return new ExplainResponse(shardId.getIndexName(), request.type(), request.id(), true, explanation);

View File

@ -615,17 +615,17 @@ public abstract class FieldStats<T> implements Writeable, ToXContent {
}
}
private final static class Fields {
final static String MAX_DOC = new String("max_doc");
final static String DOC_COUNT = new String("doc_count");
final static String DENSITY = new String("density");
final static String SUM_DOC_FREQ = new String("sum_doc_freq");
final static String SUM_TOTAL_TERM_FREQ = new String("sum_total_term_freq");
final static String SEARCHABLE = new String("searchable");
final static String AGGREGATABLE = new String("aggregatable");
final static String MIN_VALUE = new String("min_value");
final static String MIN_VALUE_AS_STRING = new String("min_value_as_string");
final static String MAX_VALUE = new String("max_value");
final static String MAX_VALUE_AS_STRING = new String("max_value_as_string");
private static final class Fields {
static final String MAX_DOC = new String("max_doc");
static final String DOC_COUNT = new String("doc_count");
static final String DENSITY = new String("density");
static final String SUM_DOC_FREQ = new String("sum_doc_freq");
static final String SUM_TOTAL_TERM_FREQ = new String("sum_total_term_freq");
static final String SEARCHABLE = new String("searchable");
static final String AGGREGATABLE = new String("aggregatable");
static final String MIN_VALUE = new String("min_value");
static final String MIN_VALUE_AS_STRING = new String("min_value_as_string");
static final String MAX_VALUE = new String("max_value");
static final String MAX_VALUE_AS_STRING = new String("max_value_as_string");
}
}

View File

@ -39,7 +39,7 @@ import java.util.List;
*/
public class FieldStatsRequest extends BroadcastRequest<FieldStatsRequest> {
public final static String DEFAULT_LEVEL = "cluster";
public static final String DEFAULT_LEVEL = "cluster";
private String[] fields = Strings.EMPTY_ARRAY;
private String level = DEFAULT_LEVEL;

View File

@ -40,17 +40,17 @@ public class MultiGetResponse extends ActionResponse implements Iterable<MultiGe
private String index;
private String type;
private String id;
private Throwable throwable;
private Exception exception;
Failure() {
}
public Failure(String index, String type, String id, Throwable throwable) {
public Failure(String index, String type, String id, Exception exception) {
this.index = index;
this.type = type;
this.id = id;
this.throwable = throwable;
this.exception = exception;
}
/**
@ -78,7 +78,7 @@ public class MultiGetResponse extends ActionResponse implements Iterable<MultiGe
* The failure message.
*/
public String getMessage() {
return throwable != null ? throwable.getMessage() : null;
return exception != null ? exception.getMessage() : null;
}
public static Failure readFailure(StreamInput in) throws IOException {
@ -92,7 +92,7 @@ public class MultiGetResponse extends ActionResponse implements Iterable<MultiGe
index = in.readString();
type = in.readOptionalString();
id = in.readString();
throwable = in.readThrowable();
exception = in.readException();
}
@Override
@ -100,11 +100,11 @@ public class MultiGetResponse extends ActionResponse implements Iterable<MultiGe
out.writeString(index);
out.writeOptionalString(type);
out.writeString(id);
out.writeThrowable(throwable);
out.writeThrowable(exception);
}
public Throwable getFailure() {
return throwable;
public Exception getFailure() {
return exception;
}
}
@ -136,7 +136,7 @@ public class MultiGetResponse extends ActionResponse implements Iterable<MultiGe
builder.field(Fields._INDEX, failure.getIndex());
builder.field(Fields._TYPE, failure.getType());
builder.field(Fields._ID, failure.getId());
ElasticsearchException.renderThrowable(builder, params, failure.getFailure());
ElasticsearchException.renderException(builder, params, failure.getFailure());
builder.endObject();
} else {
GetResponse getResponse = response.getResponse();

View File

@ -105,7 +105,7 @@ public class TransportMultiGetAction extends HandledTransportAction<MultiGetRequ
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
// create failures for all relevant requests
for (int i = 0; i < shardRequest.locations.size(); i++) {
MultiGetRequest.Item item = shardRequest.items.get(i);

View File

@ -88,12 +88,12 @@ public class TransportShardMultiGetAction extends TransportSingleShardAction<Mul
try {
GetResult getResult = indexShard.getService().get(item.type(), item.id(), item.fields(), request.realtime(), item.version(), item.versionType(), item.fetchSourceContext(), request.ignoreErrorsOnGeneratedFields());
response.add(request.locations.get(i), new GetResponse(getResult));
} catch (Throwable t) {
if (TransportActions.isShardNotAvailableException(t)) {
throw (ElasticsearchException) t;
} catch (Exception e) {
if (TransportActions.isShardNotAvailableException(e)) {
throw (ElasticsearchException) e;
} else {
logger.debug("{} failed to execute multi_get for [{}]/[{}]", t, shardId, item.type(), item.id());
response.add(request.locations.get(i), new MultiGetResponse.Failure(request.index(), item.type(), item.id(), t));
logger.debug("{} failed to execute multi_get for [{}]/[{}]", e, shardId, item.type(), item.id());
response.add(request.locations.get(i), new MultiGetResponse.Failure(request.index(), item.type(), item.id(), e));
}
}
}

View File

@ -101,13 +101,14 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
// we have the index, do it
try {
innerExecute(task, request, listener);
} catch (Throwable e1) {
listener.onFailure(e1);
} catch (Exception inner) {
inner.addSuppressed(e);
listener.onFailure(inner);
}
} else {
listener.onFailure(e);

View File

@ -104,13 +104,13 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
void processBulkIndexRequest(Task task, BulkRequest original, String action, ActionFilterChain chain, ActionListener<BulkResponse> listener) {
long ingestStartTimeInNanos = System.nanoTime();
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
executionService.executeBulkRequest(() -> bulkRequestModifier, (indexRequest, throwable) -> {
logger.debug("failed to execute pipeline [{}] for document [{}/{}/{}]", throwable, indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id());
bulkRequestModifier.markCurrentItemAsFailed(throwable);
}, (throwable) -> {
if (throwable != null) {
logger.error("failed to execute pipeline for a bulk request", throwable);
listener.onFailure(throwable);
executionService.executeBulkRequest(() -> bulkRequestModifier, (indexRequest, exception) -> {
logger.debug("failed to execute pipeline [{}] for document [{}/{}/{}]", exception, indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id());
bulkRequestModifier.markCurrentItemAsFailed(exception);
}, (exception) -> {
if (exception != null) {
logger.error("failed to execute pipeline for a bulk request", exception);
listener.onFailure(exception);
} else {
long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ingestStartTimeInNanos);
BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest();
@ -132,7 +132,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
return Integer.MAX_VALUE;
}
final static class BulkRequestModifier implements Iterator<ActionRequest<?>> {
static final class BulkRequestModifier implements Iterator<ActionRequest<?>> {
final BulkRequest bulkRequest;
final Set<Integer> failedSlots;
@ -188,7 +188,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
actionListener.onFailure(e);
}
};
@ -197,7 +197,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
}
}
void markCurrentItemAsFailed(Throwable e) {
void markCurrentItemAsFailed(Exception e) {
IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(currentSlot);
// We hit a error during preprocessing a request, so we:
// 1) Remember the request item slot from the bulk, so that we're done processing all requests we know what failed
@ -210,7 +210,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
}
final static class IngestBulkResponseListener implements ActionListener<BulkResponse> {
static final class IngestBulkResponseListener implements ActionListener<BulkResponse> {
private final long ingestTookInMillis;
private final int[] originalSlots;
@ -233,7 +233,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
actionListener.onFailure(e);
}
}

View File

@ -90,7 +90,7 @@ public class PutPipelineTransportAction extends TransportMasterNodeAction<PutPip
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
listener.onFailure(e);
}
});

View File

@ -49,7 +49,7 @@ public final class SimulateDocumentBaseResult implements SimulateDocumentResult
public SimulateDocumentBaseResult(StreamInput in) throws IOException {
if (in.readBoolean()) {
ingestDocument = null;
failure = in.readThrowable();
failure = in.readException();
} else {
ingestDocument = new WriteableIngestDocument(in);
failure = null;
@ -84,7 +84,7 @@ public final class SimulateDocumentBaseResult implements SimulateDocumentResult
if (failure == null) {
ingestDocument.toXContent(builder, params);
} else {
ElasticsearchException.renderThrowable(builder, params, failure);
ElasticsearchException.renderException(builder, params, failure);
}
builder.endObject();
return builder;

View File

@ -52,7 +52,7 @@ public class SimulateProcessorResult implements Writeable, ToXContent {
public SimulateProcessorResult(StreamInput in) throws IOException {
this.processorTag = in.readString();
if (in.readBoolean()) {
this.failure = in.readThrowable();
this.failure = in.readException();
this.ingestDocument = null;
} else {
this.ingestDocument = new WriteableIngestDocument(in);
@ -96,7 +96,7 @@ public class SimulateProcessorResult implements Writeable, ToXContent {
if (failure == null) {
ingestDocument.toXContent(builder, params);
} else {
ElasticsearchException.renderThrowable(builder, params, failure);
ElasticsearchException.renderException(builder, params, failure);
}
builder.endObject();
return builder;

View File

@ -167,7 +167,7 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
onFirstPhaseResult(shardIndex, shard, node.getId(), shardIt, t);
}
});
@ -188,7 +188,7 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
if (xTotalOps == expectedTotalOps) {
try {
innerMoveToSecondPhase();
} catch (Throwable e) {
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("{}: Failed to execute [{}] while moving to second phase", e, shardIt.shardId(), request);
}
@ -201,33 +201,34 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
}
void onFirstPhaseResult(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId,
final ShardIterator shardIt, Throwable t) {
final ShardIterator shardIt, Exception e) {
// we always add the shard failure for a specific shard instance
// we do make sure to clean it on a successful response from a shard
SearchShardTarget shardTarget = new SearchShardTarget(nodeId, shardIt.shardId().getIndex(), shardIt.shardId().getId());
addShardFailure(shardIndex, shardTarget, t);
addShardFailure(shardIndex, shardTarget, e);
if (totalOps.incrementAndGet() == expectedTotalOps) {
if (logger.isDebugEnabled()) {
if (t != null && !TransportActions.isShardNotAvailableException(t)) {
logger.debug("{}: Failed to execute [{}]", t, shard != null ? shard.shortSummary() : shardIt.shardId(), request);
if (e != null && !TransportActions.isShardNotAvailableException(e)) {
logger.debug("{}: Failed to execute [{}]", e, shard != null ? shard.shortSummary() : shardIt.shardId(), request);
} else if (logger.isTraceEnabled()) {
logger.trace("{}: Failed to execute [{}]", t, shard, request);
logger.trace("{}: Failed to execute [{}]", e, shard, request);
}
}
final ShardSearchFailure[] shardSearchFailures = buildShardFailures();
if (successfulOps.get() == 0) {
if (logger.isDebugEnabled()) {
logger.debug("All shards failed for phase: [{}]", t, firstPhaseName());
logger.debug("All shards failed for phase: [{}]", e, firstPhaseName());
}
// no successful ops, raise an exception
raiseEarlyFailure(new SearchPhaseExecutionException(firstPhaseName(), "all shards failed", t, shardSearchFailures));
raiseEarlyFailure(new SearchPhaseExecutionException(firstPhaseName(), "all shards failed", e, shardSearchFailures));
} else {
try {
innerMoveToSecondPhase();
} catch (Throwable e) {
raiseEarlyFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e, shardSearchFailures));
} catch (Exception inner) {
inner.addSuppressed(e);
raiseEarlyFailure(new ReduceSearchPhaseException(firstPhaseName(), "", inner, shardSearchFailures));
}
}
} else {
@ -235,20 +236,21 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
final boolean lastShard = nextShard == null;
// trace log this exception
if (logger.isTraceEnabled()) {
logger.trace("{}: Failed to execute [{}] lastShard [{}]", t, shard != null ? shard.shortSummary() : shardIt.shardId(),
logger.trace("{}: Failed to execute [{}] lastShard [{}]", e, shard != null ? shard.shortSummary() : shardIt.shardId(),
request, lastShard);
}
if (!lastShard) {
try {
performFirstPhase(shardIndex, shardIt, nextShard);
} catch (Throwable t1) {
onFirstPhaseResult(shardIndex, shard, shard.currentNodeId(), shardIt, t1);
} catch (Exception inner) {
inner.addSuppressed(e);
onFirstPhaseResult(shardIndex, shard, shard.currentNodeId(), shardIt, inner);
}
} else {
// no more shards active, add a failure
if (logger.isDebugEnabled() && !logger.isTraceEnabled()) { // do not double log this exception
if (t != null && !TransportActions.isShardNotAvailableException(t)) {
logger.debug("{}: Failed to execute [{}] lastShard [{}]", t,
if (e != null && !TransportActions.isShardNotAvailableException(e)) {
logger.debug("{}: Failed to execute [{}] lastShard [{}]", e,
shard != null ? shard.shortSummary() : shardIt.shardId(), request, lastShard);
}
}
@ -269,9 +271,9 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
return failures;
}
protected final void addShardFailure(final int shardIndex, @Nullable SearchShardTarget shardTarget, Throwable t) {
protected final void addShardFailure(final int shardIndex, @Nullable SearchShardTarget shardTarget, Exception e) {
// we don't aggregate shard failures on non active shards (but do keep the header counts right)
if (TransportActions.isShardNotAvailableException(t)) {
if (TransportActions.isShardNotAvailableException(e)) {
return;
}
@ -285,26 +287,27 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
}
ShardSearchFailure failure = shardFailures.get(shardIndex);
if (failure == null) {
shardFailures.set(shardIndex, new ShardSearchFailure(t, shardTarget));
shardFailures.set(shardIndex, new ShardSearchFailure(e, shardTarget));
} else {
// the failure is already present, try and not override it with an exception that is less meaningless
// for example, getting illegal shard state
if (TransportActions.isReadOverrideException(t)) {
shardFailures.set(shardIndex, new ShardSearchFailure(t, shardTarget));
if (TransportActions.isReadOverrideException(e)) {
shardFailures.set(shardIndex, new ShardSearchFailure(e, shardTarget));
}
}
}
private void raiseEarlyFailure(Throwable t) {
private void raiseEarlyFailure(Exception e) {
for (AtomicArray.Entry<FirstResult> entry : firstResults.asList()) {
try {
DiscoveryNode node = nodes.get(entry.value.shardTarget().nodeId());
sendReleaseSearchContext(entry.value.id(), node);
} catch (Throwable t1) {
logger.trace("failed to release context", t1);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.trace("failed to release context", inner);
}
}
listener.onFailure(t);
listener.onFailure(e);
}
/**
@ -324,8 +327,8 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
try {
DiscoveryNode node = nodes.get(entry.value.queryResult().shardTarget().nodeId());
sendReleaseSearchContext(entry.value.queryResult().id(), node);
} catch (Throwable t1) {
logger.trace("failed to release context", t1);
} catch (Exception e) {
logger.trace("failed to release context", e);
}
}
}

View File

@ -29,7 +29,6 @@ import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.Arrays;
@ -45,22 +44,22 @@ public class MultiSearchResponse extends ActionResponse implements Iterable<Mult
*/
public static class Item implements Streamable {
private SearchResponse response;
private Throwable throwable;
private Exception exception;
Item() {
}
public Item(SearchResponse response, Throwable throwable) {
public Item(SearchResponse response, Exception exception) {
this.response = response;
this.throwable = throwable;
this.exception = exception;
}
/**
* Is it a failed search?
*/
public boolean isFailure() {
return throwable != null;
return exception != null;
}
/**
@ -68,7 +67,7 @@ public class MultiSearchResponse extends ActionResponse implements Iterable<Mult
*/
@Nullable
public String getFailureMessage() {
return throwable == null ? null : throwable.getMessage();
return exception == null ? null : exception.getMessage();
}
/**
@ -91,7 +90,7 @@ public class MultiSearchResponse extends ActionResponse implements Iterable<Mult
this.response = new SearchResponse();
response.readFrom(in);
} else {
throwable = in.readThrowable();
exception = in.readException();
}
}
@ -102,12 +101,12 @@ public class MultiSearchResponse extends ActionResponse implements Iterable<Mult
response.writeTo(out);
} else {
out.writeBoolean(false);
out.writeThrowable(throwable);
out.writeThrowable(exception);
}
}
public Throwable getFailure() {
return throwable;
public Exception getFailure() {
return exception;
}
}
@ -156,7 +155,7 @@ public class MultiSearchResponse extends ActionResponse implements Iterable<Mult
for (Item item : items) {
builder.startObject();
if (item.isFailure()) {
ElasticsearchException.renderThrowable(builder, params, item.getFailure());
ElasticsearchException.renderException(builder, params, item.getFailure());
builder.field(Fields.STATUS, ExceptionsHelper.status(item.getFailure()).getStatus());
} else {
item.getResponse().toXContent(builder, params);

View File

@ -89,7 +89,7 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<DfsSea
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
try {
onSecondPhaseFailure(t, querySearchRequest, shardIndex, dfsResult, counter);
} finally {
@ -102,12 +102,12 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<DfsSea
});
}
void onSecondPhaseFailure(Throwable t, QuerySearchRequest querySearchRequest, int shardIndex, DfsSearchResult dfsResult,
void onSecondPhaseFailure(Exception e, QuerySearchRequest querySearchRequest, int shardIndex, DfsSearchResult dfsResult,
AtomicInteger counter) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
logger.debug("[{}] Failed to execute query phase", e, querySearchRequest.id());
}
this.addShardFailure(shardIndex, dfsResult.shardTarget(), t);
this.addShardFailure(shardIndex, dfsResult.shardTarget(), e);
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
@ -130,12 +130,12 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<DfsSea
}
@Override
public void onFailure(Throwable t) {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("query_fetch", "", t, buildShardFailures());
public void onFailure(Exception e) {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("query_fetch", "", e, buildShardFailures());
if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure);
}
super.onFailure(t);
super.onFailure(e);
}
});

View File

@ -97,7 +97,7 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
try {
onQueryFailure(t, querySearchRequest, shardIndex, dfsResult, counter);
} finally {
@ -110,12 +110,12 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
});
}
void onQueryFailure(Throwable t, QuerySearchRequest querySearchRequest, int shardIndex, DfsSearchResult dfsResult,
void onQueryFailure(Exception e, QuerySearchRequest querySearchRequest, int shardIndex, DfsSearchResult dfsResult,
AtomicInteger counter) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
logger.debug("[{}] Failed to execute query phase", e, querySearchRequest.id());
}
this.addShardFailure(shardIndex, dfsResult.shardTarget(), t);
this.addShardFailure(shardIndex, dfsResult.shardTarget(), e);
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
if (successfulOps.get() == 0) {
@ -129,7 +129,7 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
void executeFetchPhase() {
try {
innerExecuteFetchPhase();
} catch (Throwable e) {
} catch (Exception e) {
listener.onFailure(new ReduceSearchPhaseException("query", "", e, buildShardFailures()));
}
}
@ -169,7 +169,7 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
// the search context might not be cleared on the node where the fetch was executed for example
// because the action was rejected by the thread pool. in this case we need to send a dedicated
// request to clear the search context. by setting docIdsToLoad to null, the context will be cleared
@ -180,12 +180,12 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
});
}
void onFetchFailure(Throwable t, ShardFetchSearchRequest fetchSearchRequest, int shardIndex,
void onFetchFailure(Exception e, ShardFetchSearchRequest fetchSearchRequest, int shardIndex,
SearchShardTarget shardTarget, AtomicInteger counter) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
logger.debug("[{}] Failed to execute fetch phase", e, fetchSearchRequest.id());
}
this.addShardFailure(shardIndex, shardTarget, t);
this.addShardFailure(shardIndex, shardTarget, e);
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
@ -208,9 +208,9 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception e) {
try {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", t, buildShardFailures());
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", e, buildShardFailures());
if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure);
}

View File

@ -32,9 +32,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
*
*/
public class SearchPhaseExecutionException extends ElasticsearchException {
private final String phaseName;
private final ShardSearchFailure[] shardFailures;
@ -69,7 +66,7 @@ public class SearchPhaseExecutionException extends ElasticsearchException {
}
}
private static final Throwable deduplicateCause(Throwable cause, ShardSearchFailure[] shardFailures) {
private static Throwable deduplicateCause(Throwable cause, ShardSearchFailure[] shardFailures) {
if (shardFailures == null) {
throw new IllegalArgumentException("shardSearchFailures must not be null");
}

View File

@ -73,8 +73,8 @@ class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<QueryFetc
}
@Override
public void onFailure(Throwable t) {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", t, buildShardFailures());
public void onFailure(Exception e) {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", e, buildShardFailures());
if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure);
}

View File

@ -102,7 +102,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<QuerySea
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
// the search context might not be cleared on the node where the fetch was executed for example
// because the action was rejected by the thread pool. in this case we need to send a dedicated
// request to clear the search context. by setting docIdsToLoad to null, the context will be cleared
@ -113,12 +113,12 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<QuerySea
});
}
void onFetchFailure(Throwable t, ShardFetchSearchRequest fetchSearchRequest, int shardIndex, SearchShardTarget shardTarget,
void onFetchFailure(Exception e, ShardFetchSearchRequest fetchSearchRequest, int shardIndex, SearchShardTarget shardTarget,
AtomicInteger counter) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
logger.debug("[{}] Failed to execute fetch phase", e, fetchSearchRequest.id());
}
this.addShardFailure(shardIndex, shardTarget, t);
this.addShardFailure(shardIndex, shardTarget, e);
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
@ -141,9 +141,9 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<QuerySea
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception e) {
try {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", t, buildShardFailures());
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", e, buildShardFailures());
if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure);
}

View File

@ -252,8 +252,8 @@ public class SearchRequestBuilder extends ActionRequestBuilder<SearchRequest, Se
/**
* Sets no fields to be loaded, resulting in only id and type to be returned per field.
*/
public SearchRequestBuilder setNoFields() {
sourceBuilder().noFields();
public SearchRequestBuilder setNoStoredFields() {
sourceBuilder().noStoredFields();
return this;
}
@ -289,13 +289,23 @@ public class SearchRequestBuilder extends ActionRequestBuilder<SearchRequest, Se
return this;
}
/**
* Adds a docvalue based field to load and return. The field does not have to be stored,
* but its recommended to use non analyzed or numeric fields.
*
* @param name The field to get from the docvalue
*/
public SearchRequestBuilder addDocValueField(String name) {
sourceBuilder().docValueField(name);
return this;
}
/**
* Adds a field to load and return (note, it must be stored) as part of the search request.
* Adds a stored field to load and return (note, it must be stored) as part of the search request.
* If none are specified, the source of the document will be return.
*/
public SearchRequestBuilder addField(String field) {
sourceBuilder().field(field);
public SearchRequestBuilder addStoredField(String field) {
sourceBuilder().storedField(field);
return this;
}
@ -304,12 +314,15 @@ public class SearchRequestBuilder extends ActionRequestBuilder<SearchRequest, Se
* but its recommended to use non analyzed or numeric fields.
*
* @param name The field to get from the field data cache
* @deprecated Use {@link SearchRequestBuilder#addDocValueField(String)} instead.
*/
@Deprecated
public SearchRequestBuilder addFieldDataField(String name) {
sourceBuilder().fieldDataField(name);
sourceBuilder().docValueField(name);
return this;
}
/**
* Adds a script based field to load and return. The field does not have to be stored,
* but its recommended to use non analyzed or numeric fields.
@ -366,12 +379,24 @@ public class SearchRequestBuilder extends ActionRequestBuilder<SearchRequest, Se
return this;
}
/**
* Sets the stored fields to load and return as part of the search request. If none
* are specified, the source of the document will be returned.
*
* @deprecated Use {@link SearchRequestBuilder#storedFields(String...)} instead.
*/
@Deprecated
public SearchRequestBuilder fields(String... fields) {
sourceBuilder().storedFields(Arrays.asList(fields));
return this;
}
/**
* Sets the fields to load and return as part of the search request. If none
* are specified, the source of the document will be returned.
*/
public SearchRequestBuilder fields(String... fields) {
sourceBuilder().fields(Arrays.asList(fields));
public SearchRequestBuilder storedFields(String... fields) {
sourceBuilder().storedFields(Arrays.asList(fields));
return this;
}

View File

@ -168,7 +168,7 @@ public class SearchResponse extends ActionResponse implements StatusToXContent {
*
* @return The profile results or an empty map
*/
public @Nullable Map<String, ProfileShardResult> getProfileResults() {
@Nullable public Map<String, ProfileShardResult> getProfileResults() {
return internalResponse.profile();
}

View File

@ -138,21 +138,21 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction {
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
onPhaseFailure(t, searchId, shardIndex);
}
});
}
private void onPhaseFailure(Throwable t, long searchId, int shardIndex) {
private void onPhaseFailure(Exception e, long searchId, int shardIndex) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, searchId);
logger.debug("[{}] Failed to execute query phase", e, searchId);
}
addShardFailure(shardIndex, new ShardSearchFailure(t));
addShardFailure(shardIndex, new ShardSearchFailure(e));
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
if (successfulOps.get() == 0) {
listener.onFailure(new SearchPhaseExecutionException("query_fetch", "all shards failed", t, buildShardFailures()));
listener.onFailure(new SearchPhaseExecutionException("query_fetch", "all shards failed", e, buildShardFailures()));
} else {
finishHim();
}
@ -162,7 +162,7 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction {
private void finishHim() {
try {
innerFinishHim();
} catch (Throwable e) {
} catch (Exception e) {
listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()));
}
}

View File

@ -113,7 +113,7 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
if (counter.decrementAndGet() == 0) {
try {
executeFetchPhase();
} catch (Throwable e) {
} catch (Exception e) {
listener.onFailure(new SearchPhaseExecutionException("query", "Fetch failed", e, ShardSearchFailure.EMPTY_ARRAY));
return;
}
@ -131,32 +131,33 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
if (counter.decrementAndGet() == 0) {
try {
executeFetchPhase();
} catch (Throwable e) {
} catch (Exception e) {
onFailure(e);
}
}
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
onQueryPhaseFailure(shardIndex, counter, searchId, t);
}
});
}
void onQueryPhaseFailure(final int shardIndex, final AtomicInteger counter, final long searchId, Throwable t) {
void onQueryPhaseFailure(final int shardIndex, final AtomicInteger counter, final long searchId, Exception failure) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, searchId);
logger.debug("[{}] Failed to execute query phase", failure, searchId);
}
addShardFailure(shardIndex, new ShardSearchFailure(t));
addShardFailure(shardIndex, new ShardSearchFailure(failure));
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
if (successfulOps.get() == 0) {
listener.onFailure(new SearchPhaseExecutionException("query", "all shards failed", t, buildShardFailures()));
listener.onFailure(new SearchPhaseExecutionException("query", "all shards failed", failure, buildShardFailures()));
} else {
try {
executeFetchPhase();
} catch (Throwable e) {
} catch (Exception e) {
e.addSuppressed(failure);
listener.onFailure(new SearchPhaseExecutionException("query", "Fetch failed", e, ShardSearchFailure.EMPTY_ARRAY));
}
}
@ -193,7 +194,7 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
}
@Override
public void onFailure(Throwable t) {
public void onFailure(Exception t) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to execute fetch phase", t);
}
@ -209,8 +210,8 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
private void finishHim() {
try {
innerFinishHim();
} catch (Throwable e) {
listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()));
} catch (Exception e) {
listener.onFailure(new ReduceSearchPhaseException("fetch", "inner finish failed", e, buildShardFailures()));
}
}

View File

@ -48,19 +48,19 @@ public class ShardSearchFailure implements ShardOperationFailedException {
}
public ShardSearchFailure(Throwable t) {
this(t, null);
public ShardSearchFailure(Exception e) {
this(e, null);
}
public ShardSearchFailure(Throwable t, @Nullable SearchShardTarget shardTarget) {
Throwable actual = ExceptionsHelper.unwrapCause(t);
public ShardSearchFailure(Exception e, @Nullable SearchShardTarget shardTarget) {
final Throwable actual = ExceptionsHelper.unwrapCause(e);
if (actual != null && actual instanceof SearchException) {
this.shardTarget = ((SearchException) actual).shard();
} else if (shardTarget != null) {
this.shardTarget = shardTarget;
}
status = ExceptionsHelper.status(actual);
this.reason = ExceptionsHelper.detailedMessage(t);
this.reason = ExceptionsHelper.detailedMessage(e);
this.cause = actual;
}
@ -135,7 +135,7 @@ public class ShardSearchFailure implements ShardOperationFailedException {
}
reason = in.readString();
status = RestStatus.readFrom(in);
cause = in.readThrowable();
cause = in.readException();
}
@Override

View File

@ -103,7 +103,7 @@ public class TransportClearScrollAction extends HandledTransportAction<ClearScro
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
onFailedFreedContext(e, node);
}
});
@ -124,7 +124,7 @@ public class TransportClearScrollAction extends HandledTransportAction<ClearScro
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
onFailedFreedContext(e, node);
}
});

View File

@ -30,7 +30,6 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -119,7 +118,7 @@ public class TransportMultiSearchAction extends HandledTransportAction<MultiSear
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
responses.set(request.responseSlot, new MultiSearchResponse.Item(null, e));
handleResponse();
}
@ -134,7 +133,7 @@ public class TransportMultiSearchAction extends HandledTransportAction<MultiSear
});
}
final static class SearchRequestSlot {
static final class SearchRequestSlot {
final SearchRequest request;
final int responseSlot;

View File

@ -74,7 +74,7 @@ public class TransportSearchScrollAction extends HandledTransportAction<SearchSc
throw new IllegalArgumentException("Scroll id type [" + scrollId.getType() + "] unrecognized");
}
action.start();
} catch (Throwable e) {
} catch (Exception e) {
listener.onFailure(e);
}
}

View File

@ -33,7 +33,7 @@ import java.util.List;
*/
public abstract class AbstractListenableActionFuture<T, L> extends AdapterActionFuture<T, L> implements ListenableActionFuture<T> {
private final static ESLogger logger = Loggers.getLogger(AbstractListenableActionFuture.class);
private static final ESLogger logger = Loggers.getLogger(AbstractListenableActionFuture.class);
final ThreadPool threadPool;
volatile Object listeners;
@ -53,7 +53,7 @@ public abstract class AbstractListenableActionFuture<T, L> extends AdapterAction
}
public void internalAddListener(ActionListener<T> listener) {
listener = new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, listener);
listener = new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, listener, false);
boolean executeImmediate = false;
synchronized (this) {
if (executedListeners) {
@ -102,7 +102,7 @@ public abstract class AbstractListenableActionFuture<T, L> extends AdapterAction
// we use a timeout of 0 to by pass assertion forbidding to call actionGet() (blocking) on a network thread.
// here we know we will never block
listener.onResponse(actionGet(0));
} catch (Throwable e) {
} catch (Exception e) {
listener.onFailure(e);
}
}

View File

@ -55,7 +55,7 @@ public interface ActionFilter {
* filter chain. This base class should serve any action filter implementations that doesn't require
* to apply async filtering logic.
*/
public static abstract class Simple extends AbstractComponent implements ActionFilter {
public abstract static class Simple extends AbstractComponent implements ActionFilter {
protected Simple(Settings settings) {
super(settings);

View File

@ -98,7 +98,7 @@ public abstract class AdapterActionFuture<T, L> extends BaseFuture<T> implements
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
setException(e);
}

View File

@ -54,11 +54,11 @@ public class DefaultShardOperationFailedException implements ShardOperationFaile
this.status = e.status();
}
public DefaultShardOperationFailedException(String index, int shardId, Throwable t) {
public DefaultShardOperationFailedException(String index, int shardId, Throwable reason) {
this.index = index;
this.shardId = shardId;
this.reason = t;
status = ExceptionsHelper.status(t);
this.reason = reason;
this.status = ExceptionsHelper.status(reason);
}
@Override
@ -98,7 +98,7 @@ public class DefaultShardOperationFailedException implements ShardOperationFaile
index = in.readString();
}
shardId = in.readVInt();
reason = in.readThrowable();
reason = in.readException();
status = RestStatus.readFrom(in);
}

View File

@ -41,7 +41,7 @@ public abstract class DelegatingActionListener<Instigator extends ActionResponse
}
@Override
public final void onFailure(Throwable e) {
public final void onFailure(Exception e) {
delegatedActionListener.onFailure(e);
}
}

View File

@ -65,13 +65,13 @@ public abstract class HandledTransportAction<Request extends ActionRequest<Reque
public void onResponse(Response response) {
try {
channel.sendResponse(response);
} catch (Throwable e) {
} catch (Exception e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {

View File

@ -21,9 +21,6 @@ package org.elasticsearch.action.support;
import org.elasticsearch.threadpool.ThreadPool;
/**
*
*/
public class PlainListenableActionFuture<T> extends AbstractListenableActionFuture<T, T> {
public PlainListenableActionFuture(ThreadPool threadPool) {
@ -34,4 +31,5 @@ public class PlainListenableActionFuture<T> extends AbstractListenableActionFutu
protected T convert(T response) {
return response;
}
}

View File

@ -64,7 +64,7 @@ public final class ThreadedActionListener<Response> implements ActionListener<Re
if (listener instanceof ThreadedActionListener) {
return listener;
}
return new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, listener);
return new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, listener, false);
}
}
@ -72,40 +72,53 @@ public final class ThreadedActionListener<Response> implements ActionListener<Re
private final ThreadPool threadPool;
private final String executor;
private final ActionListener<Response> listener;
private final boolean forceExecution;
public ThreadedActionListener(ESLogger logger, ThreadPool threadPool, String executor, ActionListener<Response> listener) {
public ThreadedActionListener(ESLogger logger, ThreadPool threadPool, String executor, ActionListener<Response> listener,
boolean forceExecution) {
this.logger = logger;
this.threadPool = threadPool;
this.executor = executor;
this.listener = listener;
this.forceExecution = forceExecution;
}
@Override
public void onResponse(final Response response) {
threadPool.executor(executor).execute(new AbstractRunnable() {
@Override
public boolean isForceExecution() {
return forceExecution;
}
@Override
protected void doRun() throws Exception {
listener.onResponse(response);
}
@Override
public void onFailure(Throwable t) {
listener.onFailure(t);
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
@Override
public void onFailure(final Throwable e) {
public void onFailure(final Exception e) {
threadPool.executor(executor).execute(new AbstractRunnable() {
@Override
public boolean isForceExecution() {
return forceExecution;
}
@Override
protected void doRun() throws Exception {
listener.onFailure(e);
}
@Override
public void onFailure(Throwable t) {
logger.warn("failed to execute failure callback on [{}], failure [{}]", t, listener, e);
public void onFailure(Exception e) {
logger.warn("failed to execute failure callback on [{}], failure [{}]", e, listener, e);
}
});
}

View File

@ -92,7 +92,7 @@ public abstract class TransportAction<Request extends ActionRequest<Request>, Re
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
taskManager.unregister(task);
listener.onFailure(e);
}
@ -113,7 +113,7 @@ public abstract class TransportAction<Request extends ActionRequest<Request>, Re
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
if (task != null) {
taskManager.unregister(task);
}
@ -140,9 +140,9 @@ public abstract class TransportAction<Request extends ActionRequest<Request>, Re
if (filters.length == 0) {
try {
doExecute(task, request, listener);
} catch(Throwable t) {
logger.trace("Error during transport action execution.", t);
listener.onFailure(t);
} catch(Exception e) {
logger.trace("Error during transport action execution.", e);
listener.onFailure(e);
}
} else {
RequestFilterChain<Request, Response> requestFilterChain = new RequestFilterChain<>(this, logger);
@ -180,9 +180,9 @@ public abstract class TransportAction<Request extends ActionRequest<Request>, Re
} else {
listener.onFailure(new IllegalStateException("proceed was called too many times"));
}
} catch(Throwable t) {
logger.trace("Error during transport action execution.", t);
listener.onFailure(t);
} catch(Exception e) {
logger.trace("Error during transport action execution.", e);
listener.onFailure(e);
}
}
@ -221,9 +221,9 @@ public abstract class TransportAction<Request extends ActionRequest<Request>, Re
} else {
listener.onFailure(new IllegalStateException("proceed was called too many times"));
}
} catch (Throwable t) {
logger.trace("Error during transport action execution.", t);
listener.onFailure(t);
} catch (Exception e) {
logger.trace("Error during transport action execution.", e);
listener.onFailure(e);
}
}
}
@ -246,7 +246,7 @@ public abstract class TransportAction<Request extends ActionRequest<Request>, Re
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
listener.onFailure(e);
}
}
@ -269,17 +269,18 @@ public abstract class TransportAction<Request extends ActionRequest<Request>, Re
public void onResponse(Response response) {
try {
taskManager.persistResult(task, response, delegate);
} catch (Throwable e) {
} catch (Exception e) {
delegate.onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
try {
taskManager.persistResult(task, e, delegate);
} catch (Throwable e1) {
delegate.onFailure(e1);
} catch (Exception inner) {
inner.addSuppressed(e);
delegate.onFailure(inner);
}
}
}

View File

@ -27,30 +27,23 @@ import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.ShardNotFoundException;
/**
*/
public class TransportActions {
public static boolean isShardNotAvailableException(Throwable t) {
Throwable actual = ExceptionsHelper.unwrapCause(t);
if (actual instanceof ShardNotFoundException ||
public static boolean isShardNotAvailableException(final Throwable e) {
final Throwable actual = ExceptionsHelper.unwrapCause(e);
return (actual instanceof ShardNotFoundException ||
actual instanceof IndexNotFoundException ||
actual instanceof IllegalIndexShardStateException ||
actual instanceof NoShardAvailableActionException ||
actual instanceof UnavailableShardsException ||
actual instanceof AlreadyClosedException) {
return true;
}
return false;
actual instanceof AlreadyClosedException);
}
/**
* If a failure is already present, should this failure override it or not for read operations.
*/
public static boolean isReadOverrideException(Throwable t) {
if (isShardNotAvailableException(t)) {
return false;
}
return true;
public static boolean isReadOverrideException(Exception e) {
return !isShardNotAvailableException(e);
}
}

View File

@ -36,5 +36,5 @@ public interface WriteResponse {
* {@link RefreshPolicy#IMMEDIATE} should always mark this as true. Requests that set it to {@link RefreshPolicy#WAIT_UNTIL} will only
* set this to true if they run out of refresh listener slots (see {@link IndexSettings#MAX_REFRESH_LISTENERS_PER_SHARD}).
*/
public abstract void setForcedRefresh(boolean forcedRefresh);
void setForcedRefresh(boolean forcedRefresh);
}

View File

@ -144,7 +144,7 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
// no shards
try {
listener.onResponse(newResponse(request, new AtomicReferenceArray(0), clusterState));
} catch (Throwable e) {
} catch (Exception e) {
listener.onFailure(e);
}
return;
@ -199,7 +199,7 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
}
});
}
} catch (Throwable e) {
} catch (Exception e) {
onOperation(shard, shardIt, shardIndex, e);
}
}
@ -215,25 +215,25 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
}
@SuppressWarnings({"unchecked"})
void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, int shardIndex, Throwable t) {
void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, int shardIndex, Exception e) {
// we set the shard failure always, even if its the first in the replication group, and the next one
// will work (it will just override it...)
setFailure(shardIt, shardIndex, t);
setFailure(shardIt, shardIndex, e);
ShardRouting nextShard = shardIt.nextOrNull();
if (nextShard != null) {
if (t != null) {
if (e != null) {
if (logger.isTraceEnabled()) {
if (!TransportActions.isShardNotAvailableException(t)) {
logger.trace("{}: failed to execute [{}]", t, shard != null ? shard.shortSummary() : shardIt.shardId(), request);
if (!TransportActions.isShardNotAvailableException(e)) {
logger.trace("{}: failed to execute [{}]", e, shard != null ? shard.shortSummary() : shardIt.shardId(), request);
}
}
}
performOperation(shardIt, nextShard, shardIndex);
} else {
if (logger.isDebugEnabled()) {
if (t != null) {
if (!TransportActions.isShardNotAvailableException(t)) {
logger.debug("{}: failed to execute [{}]", t, shard != null ? shard.shortSummary() : shardIt.shardId(), request);
if (e != null) {
if (!TransportActions.isShardNotAvailableException(e)) {
logger.debug("{}: failed to execute [{}]", e, shard != null ? shard.shortSummary() : shardIt.shardId(), request);
}
}
}
@ -246,25 +246,25 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
protected void finishHim() {
try {
listener.onResponse(newResponse(request, shardsResponses, clusterState));
} catch (Throwable e) {
} catch (Exception e) {
listener.onFailure(e);
}
}
void setFailure(ShardIterator shardIt, int shardIndex, Throwable t) {
void setFailure(ShardIterator shardIt, int shardIndex, Exception e) {
// we don't aggregate shard failures on non active shards (but do keep the header counts right)
if (TransportActions.isShardNotAvailableException(t)) {
if (TransportActions.isShardNotAvailableException(e)) {
return;
}
if (!(t instanceof BroadcastShardOperationFailedException)) {
t = new BroadcastShardOperationFailedException(shardIt.shardId(), t);
if (!(e instanceof BroadcastShardOperationFailedException)) {
e = new BroadcastShardOperationFailedException(shardIt.shardId(), e);
}
Object response = shardsResponses.get(shardIndex);
if (response == null) {
// just override it and return
shardsResponses.set(shardIndex, t);
shardsResponses.set(shardIndex, e);
}
if (!(response instanceof Throwable)) {
@ -274,8 +274,8 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
// the failure is already present, try and not override it with an exception that is less meaningless
// for example, getting illegal shard state
if (TransportActions.isReadOverrideException(t)) {
shardsResponses.set(shardIndex, t);
if (TransportActions.isReadOverrideException(e)) {
shardsResponses.set(shardIndex, e);
}
}
}

Some files were not shown because too many files have changed in this diff Show More