Streamline Search / Broadcast (count, suggest, refresh, ...) APIs header

closes #3441
This commit is contained in:
Shay Banon 2013-08-05 12:55:38 +02:00
parent 539ffb9ef5
commit d7922b8554
18 changed files with 59 additions and 80 deletions

View File

@ -78,11 +78,6 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio
return new ClearIndicesCacheRequest();
}
@Override
protected boolean ignoreNonActiveExceptions() {
return true;
}
@Override
protected ClearIndicesCacheResponse newResponse(ClearIndicesCacheRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;

View File

@ -71,11 +71,6 @@ public class TransportFlushAction extends TransportBroadcastOperationAction<Flus
return new FlushRequest();
}
@Override
protected boolean ignoreNonActiveExceptions() {
return true;
}
@Override
protected FlushResponse newResponse(FlushRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;

View File

@ -70,11 +70,6 @@ public class TransportGatewaySnapshotAction extends TransportBroadcastOperationA
return new GatewaySnapshotRequest();
}
@Override
protected boolean ignoreNonActiveExceptions() {
return true;
}
@Override
protected GatewaySnapshotResponse newResponse(GatewaySnapshotRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;

View File

@ -72,11 +72,6 @@ public class TransportOptimizeAction extends TransportBroadcastOperationAction<O
return new OptimizeRequest();
}
@Override
protected boolean ignoreNonActiveExceptions() {
return true;
}
@Override
protected OptimizeResponse newResponse(OptimizeRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;

View File

@ -72,11 +72,6 @@ public class TransportRefreshAction extends TransportBroadcastOperationAction<Re
return new RefreshRequest();
}
@Override
protected boolean ignoreNonActiveExceptions() {
return true;
}
@Override
protected RefreshResponse newResponse(RefreshRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;

View File

@ -76,11 +76,6 @@ public class TransportIndicesSegmentsAction extends TransportBroadcastOperationA
return new IndicesSegmentsRequest();
}
@Override
protected boolean ignoreNonActiveExceptions() {
return true;
}
/**
* Segments goes across *all* active shards.
*/

View File

@ -76,11 +76,6 @@ public class TransportIndicesStatsAction extends TransportBroadcastOperationActi
return new IndicesStatsRequest();
}
@Override
protected boolean ignoreNonActiveExceptions() {
return true;
}
/**
* Status goes across *all* shards.
*/

View File

@ -85,11 +85,6 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
return new IndicesStatusRequest();
}
@Override
protected boolean ignoreNonActiveExceptions() {
return true;
}
/**
* Status goes across *all* shards.
*/

View File

@ -130,7 +130,7 @@ public class TransportValidateQueryAction extends TransportBroadcastOperationAct
for (int i = 0; i < shardsResponses.length(); i++) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
failedShards++;
// simply ignore non active shards
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {

View File

@ -130,7 +130,7 @@ public class TransportCountAction extends TransportBroadcastOperationAction<Coun
for (int i = 0; i < shardsResponses.length(); i++) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
failedShards++;
// simply ignore non active shards
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {

View File

@ -132,7 +132,7 @@ public class TransportPercolateAction extends TransportBroadcastOperationAction<
for (int i = 0; i < shardsResponses.length(); i++) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
failedShards++;
// simply ignore non active shards
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {

View File

@ -69,15 +69,18 @@ public class SearchResponse extends ActionResponse implements ToXContent {
public RestStatus status() {
if (shardFailures.length == 0) {
if (successfulShards == 0 && totalShards > 0) {
return RestStatus.SERVICE_UNAVAILABLE;
}
return RestStatus.OK;
}
// if total failure, bubble up the status code to the response level
if (successfulShards == 0 && totalShards > 0) {
RestStatus status = shardFailures[0].status();
if (shardFailures.length > 1) {
for (int i = 1; i < shardFailures.length; i++) {
if (shardFailures[i].status().getStatus() >= 500) {
status = shardFailures[i].status();
}
RestStatus status = RestStatus.OK;
for (int i = 0; i < shardFailures.length; i++) {
RestStatus shardStatus = shardFailures[i].status();
if (shardStatus.getStatus() >= status.getStatus()) {
status = shardFailures[i].status();
}
}
return status;
@ -142,7 +145,9 @@ public class SearchResponse extends ActionResponse implements ToXContent {
* The failed number of shards the search was executed on.
*/
public int getFailedShards() {
return totalShards - successfulShards;
// we don't return totalShards - successfulShards, we don't count "no shards available" as a failed shard, just don't
// count it in the successful counter
return shardFailures.length;
}
/**

View File

@ -249,9 +249,8 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
addShardFailure(shardIndex, t);
if (totalOps.incrementAndGet() == expectedTotalOps) {
// e is null when there is no next active....
if (logger.isDebugEnabled()) {
if (t != null) {
if (t != null && !TransportActions.isShardNotAvailableException(t)) {
if (shard != null) {
logger.debug(shard.shortSummary() + ": Failed to execute [" + request + "]", t);
} else {
@ -285,9 +284,8 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
performFirstPhase(shardIndex, shardIt, nextShard);
} else {
// no more shards active, add a failure
// e is null when there is no next active....
if (logger.isDebugEnabled()) {
if (t != null) {
if (t != null && !TransportActions.isShardNotAvailableException(t)) {
if (shard != null) {
logger.debug(shard.shortSummary() + ": Failed to execute [" + request + "]", t);
} else {
@ -320,6 +318,11 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
}
protected final void addShardFailure(final int shardIndex, Throwable t) {
// we don't aggregate shard failures on non active shards (but do keep the header counts right)
if (TransportActions.isShardNotAvailableException(t)) {
return;
}
// lazily create shard failures, so we can early build the empty shard failure list in most cases (no failures)
if (shardFailures == null) {
synchronized (shardFailuresMutex) {

View File

@ -128,7 +128,7 @@ public class TransportSuggestAction extends TransportBroadcastOperationAction<Su
for (int i = 0; i < shardsResponses.length(); i++) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
failedShards++;
// simply ignore non active shards
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {

View File

@ -93,17 +93,6 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
protected abstract GroupShardsIterator shards(ClusterState clusterState, Request request, String[] concreteIndices);
protected boolean accumulateExceptions() {
return true;
}
/**
* Should non active routing shard state be ignore or not.
*/
protected boolean ignoreNonActiveExceptions() {
return false;
}
protected abstract ClusterBlockException checkGlobalBlock(ClusterState state, Request request);
protected abstract ClusterBlockException checkRequestBlock(ClusterState state, Request request, String[] concreteIndices);
@ -328,11 +317,8 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
}
void setFailure(ShardIterator shardIt, int shardIndex, Throwable t) {
if (!accumulateExceptions()) {
return;
}
if (ignoreNonActiveExceptions() && TransportActions.isShardNotAvailableException(t)) {
// we don't aggregate shard failures on non active shards (but do keep the header counts right)
if (TransportActions.isShardNotAvailableException(t)) {
return;
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.index;
import org.elasticsearch.index.shard.IndexShardException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
/**
*
@ -30,4 +31,9 @@ public class IndexShardMissingException extends IndexShardException {
public IndexShardMissingException(ShardId shardId) {
super(shardId, "missing");
}
@Override
public RestStatus status() {
return RestStatus.NOT_FOUND;
}
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.index.shard;
import org.elasticsearch.rest.RestStatus;
/**
*
*/
@ -39,4 +41,9 @@ public class IllegalIndexShardStateException extends IndexShardException {
public IndexShardState currentState() {
return currentState;
}
@Override
public RestStatus status() {
return RestStatus.NOT_FOUND;
}
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.QueryStringQueryBuilder;
import org.elasticsearch.rest.RestRequest;
@ -47,22 +48,33 @@ public class RestActions {
return 0;
}
static final class Fields {
static final XContentBuilderString _SHARDS = new XContentBuilderString("_shards");
static final XContentBuilderString TOTAL = new XContentBuilderString("total");
static final XContentBuilderString SUCCESSFUL = new XContentBuilderString("successful");
static final XContentBuilderString FAILED = new XContentBuilderString("failed");
static final XContentBuilderString FAILURES = new XContentBuilderString("failures");
static final XContentBuilderString INDEX = new XContentBuilderString("index");
static final XContentBuilderString SHARD = new XContentBuilderString("shard");
static final XContentBuilderString REASON = new XContentBuilderString("reason");
}
public static void buildBroadcastShardsHeader(XContentBuilder builder, BroadcastOperationResponse response) throws IOException {
builder.startObject("_shards");
builder.field("total", response.getTotalShards());
builder.field("successful", response.getSuccessfulShards());
builder.field("failed", response.getFailedShards());
if (response.getShardFailures()!=null && response.getShardFailures().length>0) {
builder.startArray("failures");
builder.startObject(Fields._SHARDS);
builder.field(Fields.TOTAL, response.getTotalShards());
builder.field(Fields.SUCCESSFUL, response.getSuccessfulShards());
builder.field(Fields.FAILED, response.getFailedShards());
if (response.getShardFailures() != null && response.getShardFailures().length > 0) {
builder.startArray(Fields.FAILURES);
for (ShardOperationFailedException shardFailure : response.getShardFailures()) {
builder.startObject();
if (shardFailure.index() != null) {
builder.field("index", shardFailure.index(), XContentBuilder.FieldCaseConversion.NONE);
builder.field(Fields.INDEX, shardFailure.index(), XContentBuilder.FieldCaseConversion.NONE);
}
if (shardFailure.shardId() != -1) {
builder.field("shard", shardFailure.shardId());
builder.field(Fields.SHARD, shardFailure.shardId());
}
builder.field("reason", shardFailure.reason());
builder.field(Fields.REASON, shardFailure.reason());
builder.endObject();
}
builder.endArray();