improve effort into figuring out the shard associated with a search failure
This commit is contained in:
parent
d26b165af3
commit
80fa91d873
|
@ -55,10 +55,16 @@ public class ShardSearchFailure implements ShardOperationFailedException {
|
||||||
}
|
}
|
||||||
|
|
||||||
public ShardSearchFailure(Throwable t) {
|
public ShardSearchFailure(Throwable t) {
|
||||||
|
this(t, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ShardSearchFailure(Throwable t, @Nullable SearchShardTarget shardTarget) {
|
||||||
this.failure = t;
|
this.failure = t;
|
||||||
Throwable actual = ExceptionsHelper.unwrapCause(t);
|
Throwable actual = ExceptionsHelper.unwrapCause(t);
|
||||||
if (actual != null && actual instanceof SearchException) {
|
if (actual != null && actual instanceof SearchException) {
|
||||||
this.shardTarget = ((SearchException) actual).shard();
|
this.shardTarget = ((SearchException) actual).shard();
|
||||||
|
} else if (shardTarget != null) {
|
||||||
|
this.shardTarget = shardTarget;
|
||||||
}
|
}
|
||||||
if (actual != null && actual instanceof ElasticSearchException) {
|
if (actual != null && actual instanceof ElasticSearchException) {
|
||||||
status = ((ElasticSearchException) actual).status();
|
status = ((ElasticSearchException) actual).status();
|
||||||
|
|
|
@ -147,7 +147,7 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
|
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
|
||||||
}
|
}
|
||||||
AsyncAction.this.addShardFailure(shardIndex, t);
|
AsyncAction.this.addShardFailure(shardIndex, dfsResult.shardTarget(), t);
|
||||||
successulOps.decrementAndGet();
|
successulOps.decrementAndGet();
|
||||||
if (counter.decrementAndGet() == 0) {
|
if (counter.decrementAndGet() == 0) {
|
||||||
finishHim();
|
finishHim();
|
||||||
|
|
|
@ -156,7 +156,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
|
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
|
||||||
}
|
}
|
||||||
AsyncAction.this.addShardFailure(shardIndex, t);
|
AsyncAction.this.addShardFailure(shardIndex, dfsResult.shardTarget(), t);
|
||||||
successulOps.decrementAndGet();
|
successulOps.decrementAndGet();
|
||||||
if (counter.decrementAndGet() == 0) {
|
if (counter.decrementAndGet() == 0) {
|
||||||
executeFetchPhase();
|
executeFetchPhase();
|
||||||
|
@ -249,7 +249,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
|
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
|
||||||
}
|
}
|
||||||
AsyncAction.this.addShardFailure(shardIndex, t);
|
AsyncAction.this.addShardFailure(shardIndex, shardTarget, t);
|
||||||
successulOps.decrementAndGet();
|
successulOps.decrementAndGet();
|
||||||
if (counter.decrementAndGet() == 0) {
|
if (counter.decrementAndGet() == 0) {
|
||||||
finishHim();
|
finishHim();
|
||||||
|
|
|
@ -158,7 +158,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
|
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
|
||||||
}
|
}
|
||||||
AsyncAction.this.addShardFailure(shardIndex, t);
|
AsyncAction.this.addShardFailure(shardIndex, shardTarget, t);
|
||||||
successulOps.decrementAndGet();
|
successulOps.decrementAndGet();
|
||||||
if (counter.decrementAndGet() == 0) {
|
if (counter.decrementAndGet() == 0) {
|
||||||
finishHim();
|
finishHim();
|
||||||
|
|
|
@ -144,7 +144,7 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// really, no shards active in this group
|
// really, no shards active in this group
|
||||||
onFirstPhaseResult(shardIndex, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
|
onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// we have local operations, perform them now
|
// we have local operations, perform them now
|
||||||
|
@ -202,11 +202,11 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
|
||||||
void performFirstPhase(final int shardIndex, final ShardIterator shardIt, final ShardRouting shard) {
|
void performFirstPhase(final int shardIndex, final ShardIterator shardIt, final ShardRouting shard) {
|
||||||
if (shard == null) {
|
if (shard == null) {
|
||||||
// no more active shards... (we should not really get here, but just for safety)
|
// no more active shards... (we should not really get here, but just for safety)
|
||||||
onFirstPhaseResult(shardIndex, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
|
onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
|
||||||
} else {
|
} else {
|
||||||
DiscoveryNode node = nodes.get(shard.currentNodeId());
|
final DiscoveryNode node = nodes.get(shard.currentNodeId());
|
||||||
if (node == null) {
|
if (node == null) {
|
||||||
onFirstPhaseResult(shardIndex, shard, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
|
onFirstPhaseResult(shardIndex, shard, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
|
||||||
} else {
|
} else {
|
||||||
String[] filteringAliases = clusterState.metaData().filteringAliases(shard.index(), request.indices());
|
String[] filteringAliases = clusterState.metaData().filteringAliases(shard.index(), request.indices());
|
||||||
sendExecuteFirstPhase(node, internalSearchRequest(shard, shardsIts.size(), request, filteringAliases, startTime), new SearchServiceListener<FirstResult>() {
|
sendExecuteFirstPhase(node, internalSearchRequest(shard, shardsIts.size(), request, filteringAliases, startTime), new SearchServiceListener<FirstResult>() {
|
||||||
|
@ -217,7 +217,7 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Throwable t) {
|
public void onFailure(Throwable t) {
|
||||||
onFirstPhaseResult(shardIndex, shard, shardIt, t);
|
onFirstPhaseResult(shardIndex, shard, node.id(), shardIt, t);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -243,10 +243,11 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void onFirstPhaseResult(final int shardIndex, @Nullable ShardRouting shard, final ShardIterator shardIt, Throwable t) {
|
void onFirstPhaseResult(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId, final ShardIterator shardIt, Throwable t) {
|
||||||
// we always add the shard failure for a specific shard instance
|
// we always add the shard failure for a specific shard instance
|
||||||
// we do make sure to clean it on a successful response from a shard
|
// we do make sure to clean it on a successful response from a shard
|
||||||
addShardFailure(shardIndex, t);
|
SearchShardTarget shardTarget = new SearchShardTarget(nodeId, shardIt.shardId().getIndex(), shardIt.shardId().getId());
|
||||||
|
addShardFailure(shardIndex, shardTarget, t);
|
||||||
|
|
||||||
if (totalOps.incrementAndGet() == expectedTotalOps) {
|
if (totalOps.incrementAndGet() == expectedTotalOps) {
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
|
@ -317,7 +318,7 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
|
||||||
return failures;
|
return failures;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected final void addShardFailure(final int shardIndex, Throwable t) {
|
protected final void addShardFailure(final int shardIndex, @Nullable SearchShardTarget shardTarget, Throwable t) {
|
||||||
// we don't aggregate shard failures on non active shards (but do keep the header counts right)
|
// we don't aggregate shard failures on non active shards (but do keep the header counts right)
|
||||||
if (TransportActions.isShardNotAvailableException(t)) {
|
if (TransportActions.isShardNotAvailableException(t)) {
|
||||||
return;
|
return;
|
||||||
|
@ -333,12 +334,12 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
|
||||||
}
|
}
|
||||||
ShardSearchFailure failure = shardFailures.get(shardIndex);
|
ShardSearchFailure failure = shardFailures.get(shardIndex);
|
||||||
if (failure == null) {
|
if (failure == null) {
|
||||||
shardFailures.set(shardIndex, new ShardSearchFailure(t));
|
shardFailures.set(shardIndex, new ShardSearchFailure(t, shardTarget));
|
||||||
} else {
|
} else {
|
||||||
// the failure is already present, try and not override it with an exception that is less meaningless
|
// the failure is already present, try and not override it with an exception that is less meaningless
|
||||||
// for example, getting illegal shard state
|
// for example, getting illegal shard state
|
||||||
if (TransportActions.isReadOverrideException(t)) {
|
if (TransportActions.isReadOverrideException(t)) {
|
||||||
shardFailures.set(shardIndex, new ShardSearchFailure(t));
|
shardFailures.set(shardIndex, new ShardSearchFailure(t, shardTarget));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue