reafactor how actions handle failures, better response when non active shards exists, also, default logging to have action set to DEBUG so exceptions in actions are logged in the server

This commit is contained in:
kimchy 2010-04-07 01:54:33 +03:00
parent 1a9c5d6b15
commit 2bb31fe740
42 changed files with 628 additions and 239 deletions

View File

@ -1,6 +1,8 @@
rootLogger: INFO, console, file
logger:
jgroups: WARN
# log action execution errors for easier debugging
action : DEBUG
appender:
console:

View File

@ -27,6 +27,10 @@ import org.elasticsearch.index.shard.ShardId;
*/
public class NoShardAvailableActionException extends IndexShardException {
public NoShardAvailableActionException(ShardId shardId, String msg) {
super(shardId, msg);
}
public NoShardAvailableActionException(ShardId shardId, String msg, Throwable cause) {
super(shardId, msg, cause);
}

View File

@ -36,8 +36,8 @@ public class BroadcastPingResponse extends BroadcastOperationResponse {
}
public BroadcastPingResponse(int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(successfulShards, failedShards, shardFailures);
public BroadcastPingResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(totalShards, successfulShards, failedShards, shardFailures);
}
@Override public void readFrom(StreamInput in) throws IOException {

View File

@ -83,7 +83,7 @@ public class TransportBroadcastPingAction extends TransportBroadcastOperationAct
successfulShards++;
}
}
return new BroadcastPingResponse(successfulShards, failedShards, shardFailures);
return new BroadcastPingResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures);
}
@Override protected BroadcastShardPingRequest newShardRequest() {

View File

@ -58,4 +58,8 @@ public class ShardReplicationPingRequest extends ShardReplicationOperationReques
super.writeTo(out);
out.writeVInt(shardId);
}
@Override public String toString() {
return "[" + index + "][" + shardId + "]";
}
}

View File

@ -38,8 +38,8 @@ public class ClearIndicesCacheResponse extends BroadcastOperationResponse {
}
ClearIndicesCacheResponse(int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(successfulShards, failedShards, shardFailures);
ClearIndicesCacheResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(totalShards, successfulShards, failedShards, shardFailures);
}
@Override public void readFrom(StreamInput in) throws IOException {

View File

@ -66,6 +66,10 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio
return new ClearIndicesCacheRequest();
}
@Override protected boolean ignoreNonActiveExceptions() {
return true;
}
@Override protected ClearIndicesCacheResponse newResponse(ClearIndicesCacheRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
int failedShards = 0;
@ -73,7 +77,7 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio
for (int i = 0; i < shardsResponses.length(); i++) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
failedShards++;
// simply ignore non active shards
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {
@ -84,7 +88,7 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio
successfulShards++;
}
}
return new ClearIndicesCacheResponse(successfulShards, failedShards, shardFailures);
return new ClearIndicesCacheResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures);
}
@Override protected ShardClearIndicesCacheRequest newShardRequest() {

View File

@ -38,8 +38,8 @@ public class FlushResponse extends BroadcastOperationResponse {
}
FlushResponse(int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(successfulShards, failedShards, shardFailures);
FlushResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(totalShards, successfulShards, failedShards, shardFailures);
}
@Override public void readFrom(StreamInput in) throws IOException {

View File

@ -65,6 +65,10 @@ public class TransportFlushAction extends TransportBroadcastOperationAction<Flus
return new FlushRequest();
}
@Override protected boolean ignoreNonActiveExceptions() {
return true;
}
@Override protected FlushResponse newResponse(FlushRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
int failedShards = 0;
@ -72,7 +76,7 @@ public class TransportFlushAction extends TransportBroadcastOperationAction<Flus
for (int i = 0; i < shardsResponses.length(); i++) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
failedShards++;
// a non active shard, ignore
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {
@ -83,7 +87,7 @@ public class TransportFlushAction extends TransportBroadcastOperationAction<Flus
successfulShards++;
}
}
return new FlushResponse(successfulShards, failedShards, shardFailures);
return new FlushResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures);
}
@Override protected ShardFlushRequest newShardRequest() {

View File

@ -58,4 +58,8 @@ class ShardGatewaySnapshotRequest extends ShardReplicationOperationRequest {
super.writeTo(out);
out.writeVInt(shardId);
}
@Override public String toString() {
return "[" + index + "][" + shardId + "]";
}
}

View File

@ -38,8 +38,8 @@ public class OptimizeResponse extends BroadcastOperationResponse {
}
OptimizeResponse(int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(successfulShards, failedShards, shardFailures);
OptimizeResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(totalShards, successfulShards, failedShards, shardFailures);
}
@Override public void readFrom(StreamInput in) throws IOException {

View File

@ -66,6 +66,10 @@ public class TransportOptimizeAction extends TransportBroadcastOperationAction<O
return new OptimizeRequest();
}
@Override protected boolean ignoreNonActiveExceptions() {
return true;
}
@Override protected OptimizeResponse newResponse(OptimizeRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
int failedShards = 0;
@ -73,7 +77,7 @@ public class TransportOptimizeAction extends TransportBroadcastOperationAction<O
for (int i = 0; i < shardsResponses.length(); i++) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
failedShards++;
// a non active shard, ignore...
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {
@ -84,7 +88,7 @@ public class TransportOptimizeAction extends TransportBroadcastOperationAction<O
successfulShards++;
}
}
return new OptimizeResponse(successfulShards, failedShards, shardFailures);
return new OptimizeResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures);
}
@Override protected ShardOptimizeRequest newShardRequest() {

View File

@ -38,8 +38,8 @@ public class RefreshResponse extends BroadcastOperationResponse {
}
RefreshResponse(int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(successfulShards, failedShards, shardFailures);
RefreshResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(totalShards, successfulShards, failedShards, shardFailures);
}
@Override public void readFrom(StreamInput in) throws IOException {

View File

@ -66,6 +66,10 @@ public class TransportRefreshAction extends TransportBroadcastOperationAction<Re
return new RefreshRequest();
}
@Override protected boolean ignoreNonActiveExceptions() {
return true;
}
@Override protected RefreshResponse newResponse(RefreshRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
int failedShards = 0;
@ -73,7 +77,7 @@ public class TransportRefreshAction extends TransportBroadcastOperationAction<Re
for (int i = 0; i < shardsResponses.length(); i++) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
failedShards++;
// non active shard, ignore
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {
@ -84,7 +88,7 @@ public class TransportRefreshAction extends TransportBroadcastOperationAction<Re
successfulShards++;
}
}
return new RefreshResponse(successfulShards, failedShards, shardFailures);
return new RefreshResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures);
}
@Override protected ShardRefreshRequest newShardRequest() {

View File

@ -37,7 +37,7 @@ import static org.elasticsearch.action.admin.indices.status.ShardStatus.*;
import static org.elasticsearch.util.settings.ImmutableSettings.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class IndicesStatusResponse extends BroadcastOperationResponse {
@ -50,8 +50,8 @@ public class IndicesStatusResponse extends BroadcastOperationResponse {
IndicesStatusResponse() {
}
IndicesStatusResponse(ShardStatus[] shards, ClusterState clusterState, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(successfulShards, failedShards, shardFailures);
IndicesStatusResponse(ShardStatus[] shards, ClusterState clusterState, int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(totalShards, successfulShards, failedShards, shardFailures);
this.shards = shards;
indicesSettings = newHashMap();
for (ShardStatus shard : shards) {

View File

@ -45,7 +45,7 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
import static com.google.common.collect.Lists.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class TransportIndicesStatusAction extends TransportBroadcastOperationAction<IndicesStatusRequest, IndicesStatusResponse, TransportIndicesStatusAction.IndexShardStatusRequest, ShardStatus> {
@ -65,6 +65,10 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
return new IndicesStatusRequest();
}
@Override protected boolean ignoreNonActiveExceptions() {
return true;
}
@Override protected IndicesStatusResponse newResponse(IndicesStatusRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
int failedShards = 0;
@ -73,7 +77,7 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
for (int i = 0; i < shardsResponses.length(); i++) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
failedShards++;
// simply ignore non active shards
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {
@ -85,7 +89,7 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
successfulShards++;
}
}
return new IndicesStatusResponse(shards.toArray(new ShardStatus[shards.size()]), clusterState, successfulShards, failedShards, shardFailures);
return new IndicesStatusResponse(shards.toArray(new ShardStatus[shards.size()]), clusterState, shardsResponses.length(), successfulShards, failedShards, shardFailures);
}
@Override protected IndexShardStatusRequest newShardRequest() {

View File

@ -201,6 +201,6 @@ public class CountRequest extends BroadcastOperationRequest {
}
@Override public String toString() {
return "[" + Arrays.toString(indices) + "][" + Arrays.toString(types) + "], querySource[" + Unicode.fromBytes(querySource) + "]";
return "[" + Arrays.toString(indices) + "]" + Arrays.toString(types) + ", querySource[" + Unicode.fromBytes(querySource) + "]";
}
}

View File

@ -40,8 +40,8 @@ public class CountResponse extends BroadcastOperationResponse {
}
CountResponse(long count, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(successfulShards, failedShards, shardFailures);
CountResponse(long count, int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(totalShards, successfulShards, failedShards, shardFailures);
this.count = count;
}

View File

@ -98,7 +98,7 @@ public class TransportCountAction extends TransportBroadcastOperationAction<Coun
successfulShards++;
}
}
return new CountResponse(count, successfulShards, failedShards, shardFailures);
return new CountResponse(count, shardsResponses.length(), successfulShards, failedShards, shardFailures);
}
@Override protected ShardCountResponse shardOperation(ShardCountRequest request) throws ElasticSearchException {

View File

@ -23,10 +23,12 @@ import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
import org.elasticsearch.util.Nullable;
import org.elasticsearch.util.Strings;
import org.elasticsearch.util.Unicode;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import java.io.IOException;
import java.util.Arrays;
import static org.elasticsearch.action.Actions.*;
@ -115,4 +117,8 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest
out.writeUTF(type);
}
}
@Override public String toString() {
return "[" + index + "]" + Arrays.toString(types) + ", query [" + Unicode.fromBytes(querySource) + "]";
}
}

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
@ -39,7 +40,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.util.settings.Settings;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.action.search.type.TransportSearchHelper.*;
@ -106,15 +106,15 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
shardsIts = indicesService.searchShards(clusterState, request.indices(), request.queryHint());
expectedSuccessfulOps = shardsIts.size();
expectedTotalOps = shardsIts.totalSize();
expectedTotalOps = shardsIts.totalSizeActive();
}
public void start() {
// count the local operations, and perform the non local ones
int localOperations = 0;
for (final ShardsIterator shardIt : shardsIts) {
final ShardRouting shard = shardIt.next();
if (shard.active()) {
final ShardRouting shard = shardIt.nextActiveOrNull();
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
localOperations++;
} else {
@ -122,7 +122,7 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
performFirstPhase(shardIt.reset());
}
} else {
// as if we have a "problem", so we iterate to the next one and maintain counts
// really, no shards active in this group
onFirstPhaseResult(shard, shardIt, null);
}
}
@ -132,8 +132,8 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
threadPool.execute(new Runnable() {
@Override public void run() {
for (final ShardsIterator shardIt : shardsIts) {
final ShardRouting shard = shardIt.reset().next();
if (shard.active()) {
final ShardRouting shard = shardIt.reset().nextActiveOrNull();
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
performFirstPhase(shardIt.reset());
}
@ -144,8 +144,8 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
} else {
boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD;
for (final ShardsIterator shardIt : shardsIts) {
final ShardRouting shard = shardIt.reset().next();
if (shard.active()) {
final ShardRouting shard = shardIt.reset().nextActiveOrNull();
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
if (localAsync) {
threadPool.execute(new Runnable() {
@ -163,10 +163,10 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
}
}
private void performFirstPhase(final Iterator<ShardRouting> shardIt) {
final ShardRouting shard = shardIt.next();
if (!shard.active()) {
// as if we have a "problem", so we iterate to the next one and maintain counts
private void performFirstPhase(final ShardsIterator shardIt) {
final ShardRouting shard = shardIt.nextActiveOrNull();
if (shard == null) {
// no more active shards... (we should not really get here, but just for safety)
onFirstPhaseResult(shard, shardIt, null);
} else {
Node node = nodes.get(shard.currentNodeId());
@ -182,13 +182,13 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
}
}
private void onFirstPhaseResult(ShardRouting shard, FirstResult result, Iterator<ShardRouting> shardIt) {
private void onFirstPhaseResult(ShardRouting shard, FirstResult result, ShardsIterator shardIt) {
processFirstPhaseResult(shard, result);
// increment all the "future" shards to update the total ops since we some may work and some may not...
// and when that happens, we break on total ops, so we must maintain them
while (shardIt.hasNext()) {
while (shardIt.hasNextActive()) {
totalOps.incrementAndGet();
shardIt.next();
shardIt.nextActive();
}
if (successulOps.incrementAndGet() == expectedSuccessfulOps ||
totalOps.incrementAndGet() == expectedTotalOps) {
@ -200,15 +200,25 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
}
}
private void onFirstPhaseResult(ShardRouting shard, final Iterator<ShardRouting> shardIt, Throwable t) {
if (logger.isDebugEnabled()) {
if (t != null) {
logger.debug(shard.shortSummary() + ": Failed to search [" + request + "]", t);
}
}
private void onFirstPhaseResult(ShardRouting shard, final ShardsIterator shardIt, Throwable t) {
if (totalOps.incrementAndGet() == expectedTotalOps) {
// e is null when there is no next active....
if (logger.isDebugEnabled()) {
if (t != null) {
if (shard != null) {
logger.debug(shard.shortSummary() + ": Failed to execute [" + request + "]", t);
} else {
logger.debug(shardIt.shardId() + ": Failed to execute [" + request + "]", t);
}
}
}
// no more shards, add a failure
shardFailures.add(new ShardSearchFailure(t));
if (t == null) {
// no active shards
shardFailures.add(new ShardSearchFailure("No active shards", new SearchShardTarget(null, shardIt.shardId().index().name(), shardIt.shardId().id())));
} else {
shardFailures.add(new ShardSearchFailure(t));
}
if (successulOps.get() == 0) {
// no successful ops, raise an exception
invokeListener(new SearchPhaseExecutionException(firstPhaseName(), "total failure", buildShardFailures()));
@ -220,11 +230,36 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
}
}
} else {
if (shardIt.hasNext()) {
if (shardIt.hasNextActive()) {
// trace log this exception
if (logger.isTraceEnabled()) {
if (t != null) {
if (shard != null) {
logger.trace(shard.shortSummary() + ": Failed to execute [" + request + "]", t);
} else {
logger.trace(shardIt.shardId() + ": Failed to execute [" + request + "]", t);
}
}
}
performFirstPhase(shardIt);
} else {
// no more shards, add a failure
shardFailures.add(new ShardSearchFailure(t));
// no more shards active, add a failure
// e is null when there is no next active....
if (logger.isDebugEnabled()) {
if (t != null) {
if (shard != null) {
logger.debug(shard.shortSummary() + ": Failed to execute [" + request + "]", t);
} else {
logger.debug(shardIt.shardId() + ": Failed to execute [" + request + "]", t);
}
}
}
if (t == null) {
// no active shards
shardFailures.add(new ShardSearchFailure("No active shards", new SearchShardTarget(null, shardIt.shardId().index().name(), shardIt.shardId().id())));
} else {
shardFailures.add(new ShardSearchFailure(t));
}
}
}
}

View File

@ -38,6 +38,8 @@ import static org.elasticsearch.action.support.DefaultShardOperationFailedExcept
*/
public abstract class BroadcastOperationResponse implements ActionResponse {
private int totalShards;
private int successfulShards;
private int failedShards;
@ -47,7 +49,8 @@ public abstract class BroadcastOperationResponse implements ActionResponse {
protected BroadcastOperationResponse() {
}
protected BroadcastOperationResponse(int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
protected BroadcastOperationResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
this.totalShards = totalShards;
this.successfulShards = successfulShards;
this.failedShards = failedShards;
this.shardFailures = shardFailures;
@ -60,7 +63,7 @@ public abstract class BroadcastOperationResponse implements ActionResponse {
* The total shards this request ran against.
*/
public int totalShards() {
return successfulShards + failedShards;
return totalShards;
}
/**
@ -88,6 +91,7 @@ public abstract class BroadcastOperationResponse implements ActionResponse {
}
@Override public void readFrom(StreamInput in) throws IOException {
totalShards = in.readVInt();
successfulShards = in.readVInt();
failedShards = in.readVInt();
int size = in.readVInt();
@ -100,6 +104,7 @@ public abstract class BroadcastOperationResponse implements ActionResponse {
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(totalShards);
out.writeVInt(successfulShards);
out.writeVInt(failedShards);
out.writeVInt(shardFailures.size());

View File

@ -26,7 +26,7 @@ import org.elasticsearch.index.shard.ShardId;
/**
* An exception indicating that a failure occurred performing an operation on the shard.
*
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class BroadcastShardOperationFailedException extends IndexShardException implements ElasticSearchWrapperException {

View File

@ -42,7 +42,6 @@ import org.elasticsearch.util.io.stream.Streamable;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
@ -96,10 +95,14 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
protected abstract GroupShardsIterator shards(Request request, ClusterState clusterState);
private boolean accumulateExceptions() {
protected boolean accumulateExceptions() {
return true;
}
protected boolean ignoreNonActiveExceptions() {
return false;
}
class AsyncBroadcastAction {
private final Request request;
@ -145,8 +148,8 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
// count the local operations, and perform the non local ones
int localOperations = 0;
for (final ShardsIterator shardIt : shardsIts) {
final ShardRouting shard = shardIt.next();
if (shard.active()) {
final ShardRouting shard = shardIt.nextActiveOrNull();
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
localOperations++;
} else {
@ -154,8 +157,8 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
performOperation(shardIt.reset(), true);
}
} else {
// as if we have a "problem", so we iterate to the next one and maintain counts
onOperation(shard, shardIt, new BroadcastShardOperationFailedException(shard.shardId(), "Not active"), false);
// really, no shards active in this group
onOperation(shard, shardIt, null, false);
}
}
// we have local operations, perform them now
@ -164,8 +167,8 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
threadPool.execute(new Runnable() {
@Override public void run() {
for (final ShardsIterator shardIt : shardsIts) {
final ShardRouting shard = shardIt.reset().next();
if (shard.active()) {
final ShardRouting shard = shardIt.reset().nextActiveOrNull();
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
performOperation(shardIt.reset(), false);
}
@ -176,8 +179,8 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
} else {
boolean localAsync = request.operationThreading() == BroadcastOperationThreading.THREAD_PER_SHARD;
for (final ShardsIterator shardIt : shardsIts) {
final ShardRouting shard = shardIt.reset().next();
if (shard.active()) {
final ShardRouting shard = shardIt.reset().nextActiveOrNull();
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
performOperation(shardIt.reset(), localAsync);
}
@ -187,11 +190,11 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
}
}
private void performOperation(final Iterator<ShardRouting> shardIt, boolean localAsync) {
final ShardRouting shard = shardIt.next();
if (!shard.active()) {
// as if we have a "problem", so we iterate to the next one and maintain counts
onOperation(shard, shardIt, new BroadcastShardOperationFailedException(shard.shardId(), "Not Active"), false);
private void performOperation(final ShardsIterator shardIt, boolean localAsync) {
final ShardRouting shard = shardIt.nextActiveOrNull();
if (shard == null) {
// no more active shards... (we should not really get here, just safety)
onOperation(shard, shardIt, null, false);
} else {
final ShardRequest shardRequest = newShardRequest(shard, request);
if (shard.currentNodeId().equals(nodes.localNodeId())) {
@ -223,8 +226,8 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
onOperation(shard, response, false);
}
@Override public void handleException(RemoteTransportException exp) {
onOperation(shard, shardIt, exp, false);
@Override public void handleException(RemoteTransportException e) {
onOperation(shard, shardIt, e, false);
}
@Override public boolean spawn() {
@ -236,32 +239,54 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
}
}
@SuppressWarnings({"unchecked"}) private void onOperation(ShardRouting shard, ShardResponse response, boolean alreadyThreaded) {
@SuppressWarnings({"unchecked"})
private void onOperation(ShardRouting shard, ShardResponse response, boolean alreadyThreaded) {
shardsResponses.set(indexCounter.getAndIncrement(), response);
if (expectedOps == counterOps.incrementAndGet()) {
finishHim(alreadyThreaded);
}
}
@SuppressWarnings({"unchecked"}) private void onOperation(ShardRouting shard, final Iterator<ShardRouting> shardIt, Exception e, boolean alreadyThreaded) {
if (logger.isDebugEnabled()) {
if (e != null) {
logger.debug(shard.shortSummary() + ": Failed to execute [" + request + "]", e);
@SuppressWarnings({"unchecked"})
private void onOperation(ShardRouting shard, final ShardsIterator shardIt, Throwable t, boolean alreadyThreaded) {
if (!shardIt.hasNextActive()) {
// e is null when there is no next active....
if (logger.isDebugEnabled()) {
if (t != null) {
if (shard != null) {
logger.debug(shard.shortSummary() + ": Failed to execute [" + request + "]", t);
} else {
logger.debug(shardIt.shardId() + ": Failed to execute [" + request + "]", t);
}
}
}
}
if (!shardIt.hasNext()) {
// no more shards in this partition
// no more shards in this group
int index = indexCounter.getAndIncrement();
if (accumulateExceptions()) {
if (!(e instanceof BroadcastShardOperationFailedException)) {
e = new BroadcastShardOperationFailedException(shard.shardId(), e);
if (t == null) {
if (!ignoreNonActiveExceptions()) {
t = new BroadcastShardOperationFailedException(shardIt.shardId(), "No active shard(s)");
}
} else if (!(t instanceof BroadcastShardOperationFailedException)) {
t = new BroadcastShardOperationFailedException(shardIt.shardId(), t);
}
shardsResponses.set(index, e);
shardsResponses.set(index, t);
}
if (expectedOps == counterOps.incrementAndGet()) {
finishHim(alreadyThreaded);
}
return;
} else {
// trace log this exception
if (logger.isTraceEnabled()) {
if (t != null) {
if (shard != null) {
logger.trace(shard.shortSummary() + ": Failed to execute [" + request + "]", t);
} else {
logger.trace(shardIt.shardId() + ": Failed to execute [" + request + "]", t);
}
}
}
}
// we are not threaded here if we got here from the transport
// or we possibly threaded if we got from a local threaded one,

View File

@ -31,7 +31,7 @@ import java.util.concurrent.TimeUnit;
import static org.elasticsearch.action.Actions.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public abstract class ShardReplicationOperationRequest implements ActionRequest {

View File

@ -330,6 +330,9 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
// still in recovery, retry (we know that its not UNASSIGNED OR INITIALIZING since we are checking it in the calling method)
retryPrimary(fromDiscoveryListener, shard);
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug(shard.shortSummary() + ": Failed to execute [" + request + "]", e);
}
listener.onFailure(new ReplicationShardOperationFailedException(shards.shardId(), e));
}
}

View File

@ -39,10 +39,9 @@ import org.elasticsearch.util.io.stream.Streamable;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;
import java.util.Iterator;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public abstract class TransportSingleOperationAction<Request extends SingleOperationRequest, Response extends ActionResponse> extends BaseAction<Request, Response> {
@ -83,9 +82,7 @@ public abstract class TransportSingleOperationAction<Request extends SingleOpera
private final ActionListener<Response> listener;
private final ShardsIterator shards;
private Iterator<ShardRouting> shardsIt;
private final ShardsIterator shardsIt;
private final Request request;
@ -102,18 +99,17 @@ public abstract class TransportSingleOperationAction<Request extends SingleOpera
// update to the concrete shard to use
request.index(clusterState.metaData().concreteIndex(request.index()));
this.shards = indicesService.indexServiceSafe(request.index()).operationRouting()
this.shardsIt = indicesService.indexServiceSafe(request.index()).operationRouting()
.getShards(clusterState, request.type(), request.id());
this.shardsIt = shards.iterator();
}
public void start() {
performFirst();
}
public void onFailure(ShardRouting shardRouting, Exception e) {
if (logger.isDebugEnabled()) {
logger.debug(shardRouting.shortSummary() + ": Failed to get [" + request.type() + "#" + request.id() + "]", e);
private void onFailure(ShardRouting shardRouting, Exception e) {
if (logger.isTraceEnabled() && e != null) {
logger.trace(shardRouting.shortSummary() + ": Failed to get [" + request.type() + "#" + request.id() + "]", e);
}
perform(e);
}
@ -122,11 +118,8 @@ public abstract class TransportSingleOperationAction<Request extends SingleOpera
* First get should try and use a shard that exists on a local node for better performance
*/
private void performFirst() {
while (shardsIt.hasNext()) {
final ShardRouting shard = shardsIt.next();
if (!shard.active()) {
continue;
}
while (shardsIt.hasNextActive()) {
final ShardRouting shard = shardsIt.nextActive();
if (shard.currentNodeId().equals(nodes.localNodeId())) {
if (request.operationThreaded()) {
threadPool.execute(new Runnable() {
@ -159,19 +152,16 @@ public abstract class TransportSingleOperationAction<Request extends SingleOpera
}
}
}
if (!shardsIt.hasNext()) {
if (!shardsIt.hasNextActive()) {
// no local node get, go remote
shardsIt = shards.reset().iterator();
shardsIt.reset();
perform(null);
}
}
private void perform(final Exception lastException) {
while (shardsIt.hasNext()) {
final ShardRouting shard = shardsIt.next();
if (!shard.active()) {
continue;
}
while (shardsIt.hasNextActive()) {
final ShardRouting shard = shardsIt.nextActive();
// no need to check for local nodes, we tried them already in performFirstGet
if (!shard.currentNodeId().equals(nodes.localNodeId())) {
Node node = nodes.get(shard.currentNodeId());
@ -204,12 +194,20 @@ public abstract class TransportSingleOperationAction<Request extends SingleOpera
return;
}
}
if (!shardsIt.hasNext()) {
final NoShardAvailableActionException failure = new NoShardAvailableActionException(shards.shardId(), "No shard available for [" + request.type() + "#" + request.id() + "]", lastException);
if (!shardsIt.hasNextActive()) {
Exception failure = lastException;
if (failure == null) {
failure = new NoShardAvailableActionException(shardsIt.shardId(), "No shard available for [" + request.type() + "#" + request.id() + "]");
} else {
if (logger.isDebugEnabled()) {
logger.debug(shardsIt.shardId() + ": Failed to get [" + request.type() + "#" + request.id() + "]", failure);
}
}
if (request.listenerThreaded()) {
final Exception fFailure = failure;
threadPool.execute(new Runnable() {
@Override public void run() {
listener.onFailure(failure);
listener.onFailure(fFailure);
}
});
} else {

View File

@ -54,9 +54,9 @@ public class TermsResponse extends BroadcastOperationResponse implements Iterabl
TermsResponse() {
}
TermsResponse(int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures, FieldTermsFreq[] fieldsTermsFreq,
TermsResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures, FieldTermsFreq[] fieldsTermsFreq,
long numDocs, long maxDoc, long numDeletedDocs) {
super(successfulShards, failedShards, shardFailures);
super(totalShards, successfulShards, failedShards, shardFailures);
this.fieldsTermsFreq = fieldsTermsFreq;
this.numDocs = numDocs;
this.maxDoc = maxDoc;

View File

@ -140,7 +140,7 @@ public class TransportTermsAction extends TransportBroadcastOperationAction<Term
TermFreq[] freqs = entry.getValue().toArray(new TermFreq[entry.getValue().size()]);
resultFreqs[index++] = new FieldTermsFreq(entry.getKey(), freqs);
}
return new TermsResponse(successfulShards, failedShards, shardFailures, resultFreqs, numDocs, maxDoc, numDeletedDocs);
return new TermsResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures, resultFreqs, numDocs, maxDoc, numDeletedDocs);
}
@Override protected ShardTermsResponse shardOperation(ShardTermsRequest request) throws ElasticSearchException {

View File

@ -1,105 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing;
import org.elasticsearch.index.shard.ShardId;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
/**
* @author kimchy (Shay Banon)
*/
public class CompoundShardsIterator implements ShardsIterator, Iterator<ShardRouting> {
private int index = 0;
private final List<ShardsIterator> iterators;
private Iterator<ShardRouting> current;
public CompoundShardsIterator(List<ShardsIterator> iterators) {
this.iterators = iterators;
}
@Override public ShardsIterator reset() {
for (ShardsIterator it : iterators) {
it.reset();
}
index = 0;
current = null;
return this;
}
@Override public int size() {
int size = 0;
for (ShardsIterator it : iterators) {
size += it.size();
}
return size;
}
@Override public boolean hasNext() {
if (index == iterators.size()) {
return false;
}
if (current == null) {
current = iterators.get(index).iterator();
}
while (!current.hasNext()) {
if (++index == iterators.size()) {
return false;
}
current = iterators.get(index).iterator();
}
return true;
}
@Override public ShardRouting next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return current.next();
}
@Override public void remove() {
throw new UnsupportedOperationException();
}
@Override public ShardId shardId() {
return currentShardsIterator().shardId();
}
@Override public Iterator<ShardRouting> iterator() {
return this;
}
private ShardsIterator currentShardsIterator() throws NoSuchElementException {
if (iterators.size() == 0) {
throw new NoSuchElementException();
}
if (index == iterators.size()) {
return iterators.get(index - 1);
}
return iterators.get(index);
}
}

View File

@ -57,6 +57,14 @@ public class GroupShardsIterator implements Iterable<ShardsIterator> {
return size;
}
public int totalSizeActive() {
int size = 0;
for (ShardsIterator shard : iterators) {
size += shard.sizeActive();
}
return size;
}
public int size() {
return iterators.size();
}

View File

@ -110,7 +110,7 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
* A groups shards iterator where each groups is a single {@link ShardRouting} and a group
* is created for each shard routing.
*
* <p>This basically means that components that use the {@link GroupShardsIterator} will itearte
* <p>This basically means that components that use the {@link GroupShardsIterator} will iterate
* over *all* the shards (all the replicas) within the index.
*/
public GroupShardsIterator groupByAllIt() {

View File

@ -106,7 +106,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
* <p>The class can be used from different threads, though not designed to be used concurrently
* from different threads.
*/
private class IndexShardsIterator implements ShardsIterator, Iterator<ShardRouting> {
class IndexShardsIterator implements ShardsIterator, Iterator<ShardRouting> {
private final int origIndex;
@ -130,17 +130,47 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
}
@Override public boolean hasNext() {
return counter != size();
return counter < size();
}
@Override public ShardRouting next() {
@Override public ShardRouting next() throws NoSuchElementException {
if (!hasNext()) {
throw new NoSuchElementException();
throw new NoSuchElementException("No shard found");
}
counter++;
return shardModulo(index++);
}
@Override public boolean hasNextActive() {
int counter = this.counter;
int index = this.index;
while (counter++ < size()) {
ShardRouting shardRouting = shardModulo(index++);
if (shardRouting.active()) {
return true;
}
}
return false;
}
@Override public ShardRouting nextActive() throws NoSuchElementException {
ShardRouting shardRouting = nextActiveOrNull();
if (shardRouting == null) {
throw new NoSuchElementException("No active shard found");
}
return shardRouting;
}
@Override public ShardRouting nextActiveOrNull() throws NoSuchElementException {
while (counter++ < size()) {
ShardRouting shardRouting = shardModulo(index++);
if (shardRouting.active()) {
return shardRouting;
}
}
return null;
}
@Override public void remove() {
throw new UnsupportedOperationException();
}
@ -149,6 +179,16 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
return IndexShardRoutingTable.this.size();
}
@Override public int sizeActive() {
int shardsActive = 0;
for (ShardRouting shardRouting : IndexShardRoutingTable.this.shards()) {
if (shardRouting.active()) {
shardsActive++;
}
}
return shardsActive;
}
@Override public ShardId shardId() {
return IndexShardRoutingTable.this.shardId();
}

View File

@ -23,9 +23,10 @@ import org.elasticsearch.index.shard.ShardId;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class PlainShardsIterator implements ShardsIterator {
@ -33,16 +34,15 @@ public class PlainShardsIterator implements ShardsIterator {
private final List<ShardRouting> shards;
private Iterator<ShardRouting> iterator;
private volatile int counter = 0;
public PlainShardsIterator(ShardId shardId, List<ShardRouting> shards) {
this.shardId = shardId;
this.shards = shards;
this.iterator = shards.iterator();
}
@Override public ShardsIterator reset() {
this.iterator = shards.iterator();
this.counter = 0;
return this;
}
@ -50,6 +50,16 @@ public class PlainShardsIterator implements ShardsIterator {
return shards.size();
}
@Override public int sizeActive() {
int sizeActive = 0;
for (ShardRouting shardRouting : shards) {
if (shardRouting.active()) {
sizeActive++;
}
}
return sizeActive;
}
@Override public ShardId shardId() {
return this.shardId;
}
@ -59,11 +69,42 @@ public class PlainShardsIterator implements ShardsIterator {
}
@Override public boolean hasNext() {
return iterator.hasNext();
return counter < shards.size();
}
@Override public ShardRouting next() {
return iterator.next();
if (!hasNext()) {
throw new NoSuchElementException("No shard found");
}
return shards.get(counter++);
}
@Override public boolean hasNextActive() {
int counter = this.counter;
while (counter < shards.size()) {
if (shards.get(counter++).active()) {
return true;
}
}
return false;
}
@Override public ShardRouting nextActive() throws NoSuchElementException {
ShardRouting shardRouting = nextActiveOrNull();
if (shardRouting == null) {
throw new NoSuchElementException("No active shard found");
}
return shardRouting;
}
@Override public ShardRouting nextActiveOrNull() throws NoSuchElementException {
while (counter < shards.size()) {
ShardRouting shardRouting = shards.get(counter++);
if (shardRouting.active()) {
return shardRouting;
}
}
return null;
}
@Override public void remove() {

View File

@ -22,6 +22,7 @@ package org.elasticsearch.cluster.routing;
import org.elasticsearch.index.shard.ShardId;
import java.util.Iterator;
import java.util.NoSuchElementException;
/**
* @author kimchy (Shay Banon)
@ -35,5 +36,13 @@ public interface ShardsIterator extends Iterable<ShardRouting>, Iterator<ShardRo
int size();
int sizeActive();
ShardId shardId();
boolean hasNextActive();
ShardRouting nextActive() throws NoSuchElementException;
ShardRouting nextActiveOrNull();
}

View File

@ -29,7 +29,7 @@ import java.io.Serializable;
/**
* The target that the search request was executed on.
*
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class SearchShardTarget implements Streamable, Serializable {

View File

@ -0,0 +1,119 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.integration.broadcast;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading;
import org.elasticsearch.test.integration.AbstractServersTests;
import org.elasticsearch.util.Unicode;
import org.elasticsearch.util.json.JsonBuilder;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
import java.io.IOException;
import static org.elasticsearch.client.Requests.*;
import static org.elasticsearch.index.query.json.JsonQueryBuilders.*;
import static org.elasticsearch.util.json.JsonBuilder.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (shay.banon)
*/
public class BroadcastActionsTests extends AbstractServersTests {
@AfterMethod public void closeServers() {
closeAllServers();
}
@Test public void testBroadcastOperations() throws IOException {
startServer("server1");
client("server1").admin().indices().create(createIndexRequest("test")).actionGet(5000);
logger.info("Running Cluster Health");
ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForYellowStatus()).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW));
client("server1").index(indexRequest("test").type("type1").id("1").source(source("1", "test"))).actionGet();
FlushResponse flushResponse = client("server1").admin().indices().flush(flushRequest("test")).actionGet();
assertThat(flushResponse.totalShards(), equalTo(10));
assertThat(flushResponse.successfulShards(), equalTo(5));
assertThat(flushResponse.failedShards(), equalTo(0));
client("server1").index(indexRequest("test").type("type1").id("2").source(source("2", "test"))).actionGet();
RefreshResponse refreshResponse = client("server1").admin().indices().refresh(refreshRequest("test")).actionGet();
assertThat(refreshResponse.totalShards(), equalTo(10));
assertThat(refreshResponse.successfulShards(), equalTo(5));
assertThat(refreshResponse.failedShards(), equalTo(0));
logger.info("Count");
// check count
for (int i = 0; i < 5; i++) {
// test successful
CountResponse countResponse = client("server1").count(countRequest("test").querySource(termQuery("_type", "type1")).operationThreading(BroadcastOperationThreading.NO_THREADS)).actionGet();
assertThat(countResponse.count(), equalTo(2l));
assertThat(countResponse.totalShards(), equalTo(5));
assertThat(countResponse.successfulShards(), equalTo(5));
assertThat(countResponse.failedShards(), equalTo(0));
}
for (int i = 0; i < 5; i++) {
CountResponse countResponse = client("server1").count(countRequest("test").querySource(termQuery("_type", "type1")).operationThreading(BroadcastOperationThreading.SINGLE_THREAD)).actionGet();
assertThat(countResponse.count(), equalTo(2l));
assertThat(countResponse.totalShards(), equalTo(5));
assertThat(countResponse.successfulShards(), equalTo(5));
assertThat(countResponse.failedShards(), equalTo(0));
}
for (int i = 0; i < 5; i++) {
CountResponse countResponse = client("server1").count(countRequest("test").querySource(termQuery("_type", "type1")).operationThreading(BroadcastOperationThreading.THREAD_PER_SHARD)).actionGet();
assertThat(countResponse.count(), equalTo(2l));
assertThat(countResponse.totalShards(), equalTo(5));
assertThat(countResponse.successfulShards(), equalTo(5));
assertThat(countResponse.failedShards(), equalTo(0));
}
for (int i = 0; i < 5; i++) {
// test failed (simply query that can't be parsed)
CountResponse countResponse = client("server1").count(countRequest("test").querySource(Unicode.fromStringAsBytes("{ term : { _type : \"type1 } }"))).actionGet();
assertThat(countResponse.count(), equalTo(0l));
assertThat(countResponse.totalShards(), equalTo(5));
assertThat(countResponse.successfulShards(), equalTo(0));
assertThat(countResponse.failedShards(), equalTo(5));
for (ShardOperationFailedException exp : countResponse.shardFailures()) {
assertThat(exp.reason(), containsString("QueryParsingException"));
}
}
}
private JsonBuilder source(String id, String nameValue) throws IOException {
return jsonBuilder().startObject().field("id", id).field("name", nameValue).endObject();
}
}

View File

@ -0,0 +1,7 @@
cluster:
routing:
schedule: 100ms
index:
number_of_shards: 5
number_of_replicas: 1

View File

@ -19,6 +19,10 @@
package org.elasticsearch.test.integration.recovery;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.test.integration.AbstractServersTests;
import org.testng.annotations.AfterMethod;
@ -29,7 +33,7 @@ import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class SimpleRecoveryTests extends AbstractServersTests {
@ -42,14 +46,30 @@ public class SimpleRecoveryTests extends AbstractServersTests {
client("server1").admin().indices().create(createIndexRequest("test")).actionGet(5000);
logger.info("Running Cluster Health");
ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForYellowStatus()).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW));
client("server1").index(indexRequest("test").type("type1").id("1").source(source("1", "test"))).actionGet();
client("server1").admin().indices().flush(flushRequest("test")).actionGet();
FlushResponse flushResponse = client("server1").admin().indices().flush(flushRequest("test")).actionGet();
assertThat(flushResponse.totalShards(), equalTo(10));
assertThat(flushResponse.successfulShards(), equalTo(5));
assertThat(flushResponse.failedShards(), equalTo(0));
client("server1").index(indexRequest("test").type("type1").id("2").source(source("2", "test"))).actionGet();
client("server1").admin().indices().refresh(refreshRequest("test")).actionGet();
RefreshResponse refreshResponse = client("server1").admin().indices().refresh(refreshRequest("test")).actionGet();
assertThat(refreshResponse.totalShards(), equalTo(10));
assertThat(refreshResponse.successfulShards(), equalTo(5));
assertThat(refreshResponse.failedShards(), equalTo(0));
startServer("server2");
// sleep so we recover properly
Thread.sleep(5000);
logger.info("Running Cluster Health");
clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus()).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
GetResponse getResult;
@ -66,7 +86,12 @@ public class SimpleRecoveryTests extends AbstractServersTests {
// now start another one so we move some primaries
startServer("server3");
Thread.sleep(5000);
Thread.sleep(1000);
logger.info("Running Cluster Health");
clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus()).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
for (int i = 0; i < 5; i++) {
getResult = client("server1").get(getRequest("test").type("type1").id("1")).actionGet(1000);

View File

@ -0,0 +1,124 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.integration.search;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.test.integration.AbstractServersTests;
import org.elasticsearch.util.Unicode;
import org.elasticsearch.util.json.JsonBuilder;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
import java.io.IOException;
import static org.elasticsearch.client.Requests.*;
import static org.elasticsearch.util.json.JsonBuilder.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (shay.banon)
*/
public class TransportSearchFailuresTests extends AbstractServersTests {
@AfterMethod public void closeServers() {
closeAllServers();
}
@Test public void testFailedSearchWithWrongQuery() throws Exception {
logger.info("Start Testing failed search with wrong query");
startServer("server1");
client("server1").admin().indices().create(createIndexRequest("test")).actionGet();
for (int i = 0; i < 100; i++) {
index(client("server1"), Integer.toString(i), "test", i);
}
RefreshResponse refreshResponse = client("server1").admin().indices().refresh(refreshRequest("test")).actionGet();
assertThat(refreshResponse.totalShards(), equalTo(9));
assertThat(refreshResponse.successfulShards(), equalTo(3));
assertThat(refreshResponse.failedShards(), equalTo(0));
for (int i = 0; i < 5; i++) {
try {
SearchResponse searchResponse = client("server1").search(searchRequest("test").source(Unicode.fromStringAsBytes("{ xxx }"))).actionGet();
assertThat(searchResponse.totalShards(), equalTo(3));
assertThat(searchResponse.successfulShards(), equalTo(0));
assertThat(searchResponse.failedShards(), equalTo(3));
assert false : "search should fail";
} catch (ElasticSearchException e) {
assertThat(e.unwrapCause(), instanceOf(SearchPhaseExecutionException.class));
// all is well
}
}
startServer("server2");
Thread.sleep(300);
logger.info("Running Cluster Health");
ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealth("test").waitForYellowStatus().waitForRelocatingShards(0)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW));
refreshResponse = client("server1").admin().indices().refresh(refreshRequest("test")).actionGet();
assertThat(refreshResponse.totalShards(), equalTo(9));
assertThat(refreshResponse.successfulShards(), equalTo(6));
assertThat(refreshResponse.failedShards(), equalTo(0));
for (int i = 0; i < 5; i++) {
try {
SearchResponse searchResponse = client("server1").search(searchRequest("test").source(Unicode.fromStringAsBytes("{ xxx }"))).actionGet();
assertThat(searchResponse.totalShards(), equalTo(3));
assertThat(searchResponse.successfulShards(), equalTo(0));
assertThat(searchResponse.failedShards(), equalTo(3));
assert false : "search should fail";
} catch (ElasticSearchException e) {
assertThat(e.unwrapCause(), instanceOf(SearchPhaseExecutionException.class));
// all is well
}
}
logger.info("Done Testing failed search");
}
private void index(Client client, String id, String nameValue, int age) throws IOException {
client.index(Requests.indexRequest("test").type("type1").id(id).source(source(id, nameValue, age))).actionGet();
}
private JsonBuilder source(String id, String nameValue, int age) throws IOException {
StringBuilder multi = new StringBuilder().append(nameValue);
for (int i = 0; i < age; i++) {
multi.append(" ").append(nameValue);
}
return binaryJsonBuilder().startObject()
.field("id", id)
.field("name", nameValue + id)
.field("age", age)
.field("multi", multi.toString())
.field("_boost", age * 10)
.endObject();
}
}

View File

@ -0,0 +1,9 @@
cluster:
routing:
schedule: 100ms
index:
number_of_shards: 3
number_of_replicas: 2
routing :
# Use simple hashing since we want even distribution and our ids are simple incremented number based
hash.type : simple

View File

@ -271,7 +271,10 @@ public class TransportTwoServersSearchTests extends AbstractServersTests {
@Test public void testFailedSearchWithWrongQuery() throws Exception {
logger.info("Start Testing failed search with wrong query");
try {
client.search(searchRequest("test").source(Unicode.fromStringAsBytes("{ xxx }"))).actionGet();
SearchResponse searchResponse = client.search(searchRequest("test").source(Unicode.fromStringAsBytes("{ xxx }"))).actionGet();
assertThat(searchResponse.totalShards(), equalTo(3));
assertThat(searchResponse.successfulShards(), equalTo(0));
assertThat(searchResponse.failedShards(), equalTo(3));
assert false : "search should fail";
} catch (ElasticSearchException e) {
assertThat(e.unwrapCause(), instanceOf(SearchPhaseExecutionException.class));
@ -287,6 +290,9 @@ public class TransportTwoServersSearchTests extends AbstractServersTests {
.from(1000).size(20).explain(true);
SearchResponse response = client.search(searchRequest("test").searchType(DFS_QUERY_AND_FETCH).source(source)).actionGet();
assertThat(response.hits().hits().length, equalTo(0));
assertThat(response.totalShards(), equalTo(3));
assertThat(response.successfulShards(), equalTo(3));
assertThat(response.failedShards(), equalTo(0));
response = client.search(searchRequest("test").searchType(QUERY_THEN_FETCH).source(source)).actionGet();
assertThat(response.shardFailures().length, equalTo(0));