reafactor how actions handle failures, better response when non active shards exists, also, default logging to have action set to DEBUG so exceptions in actions are logged in the server
This commit is contained in:
parent
1a9c5d6b15
commit
2bb31fe740
|
@ -1,6 +1,8 @@
|
|||
rootLogger: INFO, console, file
|
||||
logger:
|
||||
jgroups: WARN
|
||||
# log action execution errors for easier debugging
|
||||
action : DEBUG
|
||||
|
||||
appender:
|
||||
console:
|
||||
|
|
|
@ -27,6 +27,10 @@ import org.elasticsearch.index.shard.ShardId;
|
|||
*/
|
||||
public class NoShardAvailableActionException extends IndexShardException {
|
||||
|
||||
public NoShardAvailableActionException(ShardId shardId, String msg) {
|
||||
super(shardId, msg);
|
||||
}
|
||||
|
||||
public NoShardAvailableActionException(ShardId shardId, String msg, Throwable cause) {
|
||||
super(shardId, msg, cause);
|
||||
}
|
||||
|
|
|
@ -36,8 +36,8 @@ public class BroadcastPingResponse extends BroadcastOperationResponse {
|
|||
|
||||
}
|
||||
|
||||
public BroadcastPingResponse(int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
|
||||
super(successfulShards, failedShards, shardFailures);
|
||||
public BroadcastPingResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
|
||||
super(totalShards, successfulShards, failedShards, shardFailures);
|
||||
}
|
||||
|
||||
@Override public void readFrom(StreamInput in) throws IOException {
|
||||
|
|
|
@ -83,7 +83,7 @@ public class TransportBroadcastPingAction extends TransportBroadcastOperationAct
|
|||
successfulShards++;
|
||||
}
|
||||
}
|
||||
return new BroadcastPingResponse(successfulShards, failedShards, shardFailures);
|
||||
return new BroadcastPingResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures);
|
||||
}
|
||||
|
||||
@Override protected BroadcastShardPingRequest newShardRequest() {
|
||||
|
|
|
@ -58,4 +58,8 @@ public class ShardReplicationPingRequest extends ShardReplicationOperationReques
|
|||
super.writeTo(out);
|
||||
out.writeVInt(shardId);
|
||||
}
|
||||
|
||||
@Override public String toString() {
|
||||
return "[" + index + "][" + shardId + "]";
|
||||
}
|
||||
}
|
|
@ -38,8 +38,8 @@ public class ClearIndicesCacheResponse extends BroadcastOperationResponse {
|
|||
|
||||
}
|
||||
|
||||
ClearIndicesCacheResponse(int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
|
||||
super(successfulShards, failedShards, shardFailures);
|
||||
ClearIndicesCacheResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
|
||||
super(totalShards, successfulShards, failedShards, shardFailures);
|
||||
}
|
||||
|
||||
@Override public void readFrom(StreamInput in) throws IOException {
|
||||
|
|
|
@ -66,6 +66,10 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio
|
|||
return new ClearIndicesCacheRequest();
|
||||
}
|
||||
|
||||
@Override protected boolean ignoreNonActiveExceptions() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override protected ClearIndicesCacheResponse newResponse(ClearIndicesCacheRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
|
||||
int successfulShards = 0;
|
||||
int failedShards = 0;
|
||||
|
@ -73,7 +77,7 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio
|
|||
for (int i = 0; i < shardsResponses.length(); i++) {
|
||||
Object shardResponse = shardsResponses.get(i);
|
||||
if (shardResponse == null) {
|
||||
failedShards++;
|
||||
// simply ignore non active shards
|
||||
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
|
||||
failedShards++;
|
||||
if (shardFailures == null) {
|
||||
|
@ -84,7 +88,7 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio
|
|||
successfulShards++;
|
||||
}
|
||||
}
|
||||
return new ClearIndicesCacheResponse(successfulShards, failedShards, shardFailures);
|
||||
return new ClearIndicesCacheResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures);
|
||||
}
|
||||
|
||||
@Override protected ShardClearIndicesCacheRequest newShardRequest() {
|
||||
|
|
|
@ -38,8 +38,8 @@ public class FlushResponse extends BroadcastOperationResponse {
|
|||
|
||||
}
|
||||
|
||||
FlushResponse(int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
|
||||
super(successfulShards, failedShards, shardFailures);
|
||||
FlushResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
|
||||
super(totalShards, successfulShards, failedShards, shardFailures);
|
||||
}
|
||||
|
||||
@Override public void readFrom(StreamInput in) throws IOException {
|
||||
|
|
|
@ -65,6 +65,10 @@ public class TransportFlushAction extends TransportBroadcastOperationAction<Flus
|
|||
return new FlushRequest();
|
||||
}
|
||||
|
||||
@Override protected boolean ignoreNonActiveExceptions() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override protected FlushResponse newResponse(FlushRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
|
||||
int successfulShards = 0;
|
||||
int failedShards = 0;
|
||||
|
@ -72,7 +76,7 @@ public class TransportFlushAction extends TransportBroadcastOperationAction<Flus
|
|||
for (int i = 0; i < shardsResponses.length(); i++) {
|
||||
Object shardResponse = shardsResponses.get(i);
|
||||
if (shardResponse == null) {
|
||||
failedShards++;
|
||||
// a non active shard, ignore
|
||||
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
|
||||
failedShards++;
|
||||
if (shardFailures == null) {
|
||||
|
@ -83,7 +87,7 @@ public class TransportFlushAction extends TransportBroadcastOperationAction<Flus
|
|||
successfulShards++;
|
||||
}
|
||||
}
|
||||
return new FlushResponse(successfulShards, failedShards, shardFailures);
|
||||
return new FlushResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures);
|
||||
}
|
||||
|
||||
@Override protected ShardFlushRequest newShardRequest() {
|
||||
|
|
|
@ -58,4 +58,8 @@ class ShardGatewaySnapshotRequest extends ShardReplicationOperationRequest {
|
|||
super.writeTo(out);
|
||||
out.writeVInt(shardId);
|
||||
}
|
||||
|
||||
@Override public String toString() {
|
||||
return "[" + index + "][" + shardId + "]";
|
||||
}
|
||||
}
|
|
@ -38,8 +38,8 @@ public class OptimizeResponse extends BroadcastOperationResponse {
|
|||
|
||||
}
|
||||
|
||||
OptimizeResponse(int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
|
||||
super(successfulShards, failedShards, shardFailures);
|
||||
OptimizeResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
|
||||
super(totalShards, successfulShards, failedShards, shardFailures);
|
||||
}
|
||||
|
||||
@Override public void readFrom(StreamInput in) throws IOException {
|
||||
|
|
|
@ -66,6 +66,10 @@ public class TransportOptimizeAction extends TransportBroadcastOperationAction<O
|
|||
return new OptimizeRequest();
|
||||
}
|
||||
|
||||
@Override protected boolean ignoreNonActiveExceptions() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override protected OptimizeResponse newResponse(OptimizeRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
|
||||
int successfulShards = 0;
|
||||
int failedShards = 0;
|
||||
|
@ -73,7 +77,7 @@ public class TransportOptimizeAction extends TransportBroadcastOperationAction<O
|
|||
for (int i = 0; i < shardsResponses.length(); i++) {
|
||||
Object shardResponse = shardsResponses.get(i);
|
||||
if (shardResponse == null) {
|
||||
failedShards++;
|
||||
// a non active shard, ignore...
|
||||
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
|
||||
failedShards++;
|
||||
if (shardFailures == null) {
|
||||
|
@ -84,7 +88,7 @@ public class TransportOptimizeAction extends TransportBroadcastOperationAction<O
|
|||
successfulShards++;
|
||||
}
|
||||
}
|
||||
return new OptimizeResponse(successfulShards, failedShards, shardFailures);
|
||||
return new OptimizeResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures);
|
||||
}
|
||||
|
||||
@Override protected ShardOptimizeRequest newShardRequest() {
|
||||
|
|
|
@ -38,8 +38,8 @@ public class RefreshResponse extends BroadcastOperationResponse {
|
|||
|
||||
}
|
||||
|
||||
RefreshResponse(int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
|
||||
super(successfulShards, failedShards, shardFailures);
|
||||
RefreshResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
|
||||
super(totalShards, successfulShards, failedShards, shardFailures);
|
||||
}
|
||||
|
||||
@Override public void readFrom(StreamInput in) throws IOException {
|
||||
|
|
|
@ -66,6 +66,10 @@ public class TransportRefreshAction extends TransportBroadcastOperationAction<Re
|
|||
return new RefreshRequest();
|
||||
}
|
||||
|
||||
@Override protected boolean ignoreNonActiveExceptions() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override protected RefreshResponse newResponse(RefreshRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
|
||||
int successfulShards = 0;
|
||||
int failedShards = 0;
|
||||
|
@ -73,7 +77,7 @@ public class TransportRefreshAction extends TransportBroadcastOperationAction<Re
|
|||
for (int i = 0; i < shardsResponses.length(); i++) {
|
||||
Object shardResponse = shardsResponses.get(i);
|
||||
if (shardResponse == null) {
|
||||
failedShards++;
|
||||
// non active shard, ignore
|
||||
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
|
||||
failedShards++;
|
||||
if (shardFailures == null) {
|
||||
|
@ -84,7 +88,7 @@ public class TransportRefreshAction extends TransportBroadcastOperationAction<Re
|
|||
successfulShards++;
|
||||
}
|
||||
}
|
||||
return new RefreshResponse(successfulShards, failedShards, shardFailures);
|
||||
return new RefreshResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures);
|
||||
}
|
||||
|
||||
@Override protected ShardRefreshRequest newShardRequest() {
|
||||
|
|
|
@ -37,7 +37,7 @@ import static org.elasticsearch.action.admin.indices.status.ShardStatus.*;
|
|||
import static org.elasticsearch.util.settings.ImmutableSettings.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class IndicesStatusResponse extends BroadcastOperationResponse {
|
||||
|
||||
|
@ -50,8 +50,8 @@ public class IndicesStatusResponse extends BroadcastOperationResponse {
|
|||
IndicesStatusResponse() {
|
||||
}
|
||||
|
||||
IndicesStatusResponse(ShardStatus[] shards, ClusterState clusterState, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
|
||||
super(successfulShards, failedShards, shardFailures);
|
||||
IndicesStatusResponse(ShardStatus[] shards, ClusterState clusterState, int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
|
||||
super(totalShards, successfulShards, failedShards, shardFailures);
|
||||
this.shards = shards;
|
||||
indicesSettings = newHashMap();
|
||||
for (ShardStatus shard : shards) {
|
||||
|
|
|
@ -45,7 +45,7 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
|
|||
import static com.google.common.collect.Lists.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class TransportIndicesStatusAction extends TransportBroadcastOperationAction<IndicesStatusRequest, IndicesStatusResponse, TransportIndicesStatusAction.IndexShardStatusRequest, ShardStatus> {
|
||||
|
||||
|
@ -65,6 +65,10 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
|
|||
return new IndicesStatusRequest();
|
||||
}
|
||||
|
||||
@Override protected boolean ignoreNonActiveExceptions() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override protected IndicesStatusResponse newResponse(IndicesStatusRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
|
||||
int successfulShards = 0;
|
||||
int failedShards = 0;
|
||||
|
@ -73,7 +77,7 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
|
|||
for (int i = 0; i < shardsResponses.length(); i++) {
|
||||
Object shardResponse = shardsResponses.get(i);
|
||||
if (shardResponse == null) {
|
||||
failedShards++;
|
||||
// simply ignore non active shards
|
||||
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
|
||||
failedShards++;
|
||||
if (shardFailures == null) {
|
||||
|
@ -85,7 +89,7 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
|
|||
successfulShards++;
|
||||
}
|
||||
}
|
||||
return new IndicesStatusResponse(shards.toArray(new ShardStatus[shards.size()]), clusterState, successfulShards, failedShards, shardFailures);
|
||||
return new IndicesStatusResponse(shards.toArray(new ShardStatus[shards.size()]), clusterState, shardsResponses.length(), successfulShards, failedShards, shardFailures);
|
||||
}
|
||||
|
||||
@Override protected IndexShardStatusRequest newShardRequest() {
|
||||
|
|
|
@ -201,6 +201,6 @@ public class CountRequest extends BroadcastOperationRequest {
|
|||
}
|
||||
|
||||
@Override public String toString() {
|
||||
return "[" + Arrays.toString(indices) + "][" + Arrays.toString(types) + "], querySource[" + Unicode.fromBytes(querySource) + "]";
|
||||
return "[" + Arrays.toString(indices) + "]" + Arrays.toString(types) + ", querySource[" + Unicode.fromBytes(querySource) + "]";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,8 +40,8 @@ public class CountResponse extends BroadcastOperationResponse {
|
|||
|
||||
}
|
||||
|
||||
CountResponse(long count, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
|
||||
super(successfulShards, failedShards, shardFailures);
|
||||
CountResponse(long count, int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
|
||||
super(totalShards, successfulShards, failedShards, shardFailures);
|
||||
this.count = count;
|
||||
}
|
||||
|
||||
|
|
|
@ -98,7 +98,7 @@ public class TransportCountAction extends TransportBroadcastOperationAction<Coun
|
|||
successfulShards++;
|
||||
}
|
||||
}
|
||||
return new CountResponse(count, successfulShards, failedShards, shardFailures);
|
||||
return new CountResponse(count, shardsResponses.length(), successfulShards, failedShards, shardFailures);
|
||||
}
|
||||
|
||||
@Override protected ShardCountResponse shardOperation(ShardCountRequest request) throws ElasticSearchException {
|
||||
|
|
|
@ -23,10 +23,12 @@ import org.elasticsearch.action.ActionRequestValidationException;
|
|||
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
|
||||
import org.elasticsearch.util.Nullable;
|
||||
import org.elasticsearch.util.Strings;
|
||||
import org.elasticsearch.util.Unicode;
|
||||
import org.elasticsearch.util.io.stream.StreamInput;
|
||||
import org.elasticsearch.util.io.stream.StreamOutput;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
||||
import static org.elasticsearch.action.Actions.*;
|
||||
|
||||
|
@ -115,4 +117,8 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest
|
|||
out.writeUTF(type);
|
||||
}
|
||||
}
|
||||
|
||||
@Override public String toString() {
|
||||
return "[" + index + "]" + Arrays.toString(types) + ", query [" + Unicode.fromBytes(querySource) + "]";
|
||||
}
|
||||
}
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
|||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardsIterator;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.search.SearchShardTarget;
|
||||
import org.elasticsearch.search.action.SearchServiceListener;
|
||||
import org.elasticsearch.search.action.SearchServiceTransportAction;
|
||||
import org.elasticsearch.search.controller.SearchPhaseController;
|
||||
|
@ -39,7 +40,6 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
import org.elasticsearch.util.settings.Settings;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.elasticsearch.action.search.type.TransportSearchHelper.*;
|
||||
|
@ -106,15 +106,15 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
|
|||
|
||||
shardsIts = indicesService.searchShards(clusterState, request.indices(), request.queryHint());
|
||||
expectedSuccessfulOps = shardsIts.size();
|
||||
expectedTotalOps = shardsIts.totalSize();
|
||||
expectedTotalOps = shardsIts.totalSizeActive();
|
||||
}
|
||||
|
||||
public void start() {
|
||||
// count the local operations, and perform the non local ones
|
||||
int localOperations = 0;
|
||||
for (final ShardsIterator shardIt : shardsIts) {
|
||||
final ShardRouting shard = shardIt.next();
|
||||
if (shard.active()) {
|
||||
final ShardRouting shard = shardIt.nextActiveOrNull();
|
||||
if (shard != null) {
|
||||
if (shard.currentNodeId().equals(nodes.localNodeId())) {
|
||||
localOperations++;
|
||||
} else {
|
||||
|
@ -122,7 +122,7 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
|
|||
performFirstPhase(shardIt.reset());
|
||||
}
|
||||
} else {
|
||||
// as if we have a "problem", so we iterate to the next one and maintain counts
|
||||
// really, no shards active in this group
|
||||
onFirstPhaseResult(shard, shardIt, null);
|
||||
}
|
||||
}
|
||||
|
@ -132,8 +132,8 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
|
|||
threadPool.execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
for (final ShardsIterator shardIt : shardsIts) {
|
||||
final ShardRouting shard = shardIt.reset().next();
|
||||
if (shard.active()) {
|
||||
final ShardRouting shard = shardIt.reset().nextActiveOrNull();
|
||||
if (shard != null) {
|
||||
if (shard.currentNodeId().equals(nodes.localNodeId())) {
|
||||
performFirstPhase(shardIt.reset());
|
||||
}
|
||||
|
@ -144,8 +144,8 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
|
|||
} else {
|
||||
boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD;
|
||||
for (final ShardsIterator shardIt : shardsIts) {
|
||||
final ShardRouting shard = shardIt.reset().next();
|
||||
if (shard.active()) {
|
||||
final ShardRouting shard = shardIt.reset().nextActiveOrNull();
|
||||
if (shard != null) {
|
||||
if (shard.currentNodeId().equals(nodes.localNodeId())) {
|
||||
if (localAsync) {
|
||||
threadPool.execute(new Runnable() {
|
||||
|
@ -163,10 +163,10 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
|
|||
}
|
||||
}
|
||||
|
||||
private void performFirstPhase(final Iterator<ShardRouting> shardIt) {
|
||||
final ShardRouting shard = shardIt.next();
|
||||
if (!shard.active()) {
|
||||
// as if we have a "problem", so we iterate to the next one and maintain counts
|
||||
private void performFirstPhase(final ShardsIterator shardIt) {
|
||||
final ShardRouting shard = shardIt.nextActiveOrNull();
|
||||
if (shard == null) {
|
||||
// no more active shards... (we should not really get here, but just for safety)
|
||||
onFirstPhaseResult(shard, shardIt, null);
|
||||
} else {
|
||||
Node node = nodes.get(shard.currentNodeId());
|
||||
|
@ -182,13 +182,13 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
|
|||
}
|
||||
}
|
||||
|
||||
private void onFirstPhaseResult(ShardRouting shard, FirstResult result, Iterator<ShardRouting> shardIt) {
|
||||
private void onFirstPhaseResult(ShardRouting shard, FirstResult result, ShardsIterator shardIt) {
|
||||
processFirstPhaseResult(shard, result);
|
||||
// increment all the "future" shards to update the total ops since we some may work and some may not...
|
||||
// and when that happens, we break on total ops, so we must maintain them
|
||||
while (shardIt.hasNext()) {
|
||||
while (shardIt.hasNextActive()) {
|
||||
totalOps.incrementAndGet();
|
||||
shardIt.next();
|
||||
shardIt.nextActive();
|
||||
}
|
||||
if (successulOps.incrementAndGet() == expectedSuccessfulOps ||
|
||||
totalOps.incrementAndGet() == expectedTotalOps) {
|
||||
|
@ -200,15 +200,25 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
|
|||
}
|
||||
}
|
||||
|
||||
private void onFirstPhaseResult(ShardRouting shard, final Iterator<ShardRouting> shardIt, Throwable t) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
if (t != null) {
|
||||
logger.debug(shard.shortSummary() + ": Failed to search [" + request + "]", t);
|
||||
}
|
||||
}
|
||||
private void onFirstPhaseResult(ShardRouting shard, final ShardsIterator shardIt, Throwable t) {
|
||||
if (totalOps.incrementAndGet() == expectedTotalOps) {
|
||||
// e is null when there is no next active....
|
||||
if (logger.isDebugEnabled()) {
|
||||
if (t != null) {
|
||||
if (shard != null) {
|
||||
logger.debug(shard.shortSummary() + ": Failed to execute [" + request + "]", t);
|
||||
} else {
|
||||
logger.debug(shardIt.shardId() + ": Failed to execute [" + request + "]", t);
|
||||
}
|
||||
}
|
||||
}
|
||||
// no more shards, add a failure
|
||||
shardFailures.add(new ShardSearchFailure(t));
|
||||
if (t == null) {
|
||||
// no active shards
|
||||
shardFailures.add(new ShardSearchFailure("No active shards", new SearchShardTarget(null, shardIt.shardId().index().name(), shardIt.shardId().id())));
|
||||
} else {
|
||||
shardFailures.add(new ShardSearchFailure(t));
|
||||
}
|
||||
if (successulOps.get() == 0) {
|
||||
// no successful ops, raise an exception
|
||||
invokeListener(new SearchPhaseExecutionException(firstPhaseName(), "total failure", buildShardFailures()));
|
||||
|
@ -220,11 +230,36 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
|
|||
}
|
||||
}
|
||||
} else {
|
||||
if (shardIt.hasNext()) {
|
||||
if (shardIt.hasNextActive()) {
|
||||
// trace log this exception
|
||||
if (logger.isTraceEnabled()) {
|
||||
if (t != null) {
|
||||
if (shard != null) {
|
||||
logger.trace(shard.shortSummary() + ": Failed to execute [" + request + "]", t);
|
||||
} else {
|
||||
logger.trace(shardIt.shardId() + ": Failed to execute [" + request + "]", t);
|
||||
}
|
||||
}
|
||||
}
|
||||
performFirstPhase(shardIt);
|
||||
} else {
|
||||
// no more shards, add a failure
|
||||
shardFailures.add(new ShardSearchFailure(t));
|
||||
// no more shards active, add a failure
|
||||
// e is null when there is no next active....
|
||||
if (logger.isDebugEnabled()) {
|
||||
if (t != null) {
|
||||
if (shard != null) {
|
||||
logger.debug(shard.shortSummary() + ": Failed to execute [" + request + "]", t);
|
||||
} else {
|
||||
logger.debug(shardIt.shardId() + ": Failed to execute [" + request + "]", t);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (t == null) {
|
||||
// no active shards
|
||||
shardFailures.add(new ShardSearchFailure("No active shards", new SearchShardTarget(null, shardIt.shardId().index().name(), shardIt.shardId().id())));
|
||||
} else {
|
||||
shardFailures.add(new ShardSearchFailure(t));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,6 +38,8 @@ import static org.elasticsearch.action.support.DefaultShardOperationFailedExcept
|
|||
*/
|
||||
public abstract class BroadcastOperationResponse implements ActionResponse {
|
||||
|
||||
private int totalShards;
|
||||
|
||||
private int successfulShards;
|
||||
|
||||
private int failedShards;
|
||||
|
@ -47,7 +49,8 @@ public abstract class BroadcastOperationResponse implements ActionResponse {
|
|||
protected BroadcastOperationResponse() {
|
||||
}
|
||||
|
||||
protected BroadcastOperationResponse(int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
|
||||
protected BroadcastOperationResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
|
||||
this.totalShards = totalShards;
|
||||
this.successfulShards = successfulShards;
|
||||
this.failedShards = failedShards;
|
||||
this.shardFailures = shardFailures;
|
||||
|
@ -60,7 +63,7 @@ public abstract class BroadcastOperationResponse implements ActionResponse {
|
|||
* The total shards this request ran against.
|
||||
*/
|
||||
public int totalShards() {
|
||||
return successfulShards + failedShards;
|
||||
return totalShards;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -88,6 +91,7 @@ public abstract class BroadcastOperationResponse implements ActionResponse {
|
|||
}
|
||||
|
||||
@Override public void readFrom(StreamInput in) throws IOException {
|
||||
totalShards = in.readVInt();
|
||||
successfulShards = in.readVInt();
|
||||
failedShards = in.readVInt();
|
||||
int size = in.readVInt();
|
||||
|
@ -100,6 +104,7 @@ public abstract class BroadcastOperationResponse implements ActionResponse {
|
|||
}
|
||||
|
||||
@Override public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVInt(totalShards);
|
||||
out.writeVInt(successfulShards);
|
||||
out.writeVInt(failedShards);
|
||||
out.writeVInt(shardFailures.size());
|
||||
|
|
|
@ -26,7 +26,7 @@ import org.elasticsearch.index.shard.ShardId;
|
|||
/**
|
||||
* An exception indicating that a failure occurred performing an operation on the shard.
|
||||
*
|
||||
* @author kimchy (Shay Banon)
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class BroadcastShardOperationFailedException extends IndexShardException implements ElasticSearchWrapperException {
|
||||
|
||||
|
|
|
@ -42,7 +42,6 @@ import org.elasticsearch.util.io.stream.Streamable;
|
|||
import org.elasticsearch.util.settings.Settings;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReferenceArray;
|
||||
|
@ -96,10 +95,14 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
|
|||
|
||||
protected abstract GroupShardsIterator shards(Request request, ClusterState clusterState);
|
||||
|
||||
private boolean accumulateExceptions() {
|
||||
protected boolean accumulateExceptions() {
|
||||
return true;
|
||||
}
|
||||
|
||||
protected boolean ignoreNonActiveExceptions() {
|
||||
return false;
|
||||
}
|
||||
|
||||
class AsyncBroadcastAction {
|
||||
|
||||
private final Request request;
|
||||
|
@ -145,8 +148,8 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
|
|||
// count the local operations, and perform the non local ones
|
||||
int localOperations = 0;
|
||||
for (final ShardsIterator shardIt : shardsIts) {
|
||||
final ShardRouting shard = shardIt.next();
|
||||
if (shard.active()) {
|
||||
final ShardRouting shard = shardIt.nextActiveOrNull();
|
||||
if (shard != null) {
|
||||
if (shard.currentNodeId().equals(nodes.localNodeId())) {
|
||||
localOperations++;
|
||||
} else {
|
||||
|
@ -154,8 +157,8 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
|
|||
performOperation(shardIt.reset(), true);
|
||||
}
|
||||
} else {
|
||||
// as if we have a "problem", so we iterate to the next one and maintain counts
|
||||
onOperation(shard, shardIt, new BroadcastShardOperationFailedException(shard.shardId(), "Not active"), false);
|
||||
// really, no shards active in this group
|
||||
onOperation(shard, shardIt, null, false);
|
||||
}
|
||||
}
|
||||
// we have local operations, perform them now
|
||||
|
@ -164,8 +167,8 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
|
|||
threadPool.execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
for (final ShardsIterator shardIt : shardsIts) {
|
||||
final ShardRouting shard = shardIt.reset().next();
|
||||
if (shard.active()) {
|
||||
final ShardRouting shard = shardIt.reset().nextActiveOrNull();
|
||||
if (shard != null) {
|
||||
if (shard.currentNodeId().equals(nodes.localNodeId())) {
|
||||
performOperation(shardIt.reset(), false);
|
||||
}
|
||||
|
@ -176,8 +179,8 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
|
|||
} else {
|
||||
boolean localAsync = request.operationThreading() == BroadcastOperationThreading.THREAD_PER_SHARD;
|
||||
for (final ShardsIterator shardIt : shardsIts) {
|
||||
final ShardRouting shard = shardIt.reset().next();
|
||||
if (shard.active()) {
|
||||
final ShardRouting shard = shardIt.reset().nextActiveOrNull();
|
||||
if (shard != null) {
|
||||
if (shard.currentNodeId().equals(nodes.localNodeId())) {
|
||||
performOperation(shardIt.reset(), localAsync);
|
||||
}
|
||||
|
@ -187,11 +190,11 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
|
|||
}
|
||||
}
|
||||
|
||||
private void performOperation(final Iterator<ShardRouting> shardIt, boolean localAsync) {
|
||||
final ShardRouting shard = shardIt.next();
|
||||
if (!shard.active()) {
|
||||
// as if we have a "problem", so we iterate to the next one and maintain counts
|
||||
onOperation(shard, shardIt, new BroadcastShardOperationFailedException(shard.shardId(), "Not Active"), false);
|
||||
private void performOperation(final ShardsIterator shardIt, boolean localAsync) {
|
||||
final ShardRouting shard = shardIt.nextActiveOrNull();
|
||||
if (shard == null) {
|
||||
// no more active shards... (we should not really get here, just safety)
|
||||
onOperation(shard, shardIt, null, false);
|
||||
} else {
|
||||
final ShardRequest shardRequest = newShardRequest(shard, request);
|
||||
if (shard.currentNodeId().equals(nodes.localNodeId())) {
|
||||
|
@ -223,8 +226,8 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
|
|||
onOperation(shard, response, false);
|
||||
}
|
||||
|
||||
@Override public void handleException(RemoteTransportException exp) {
|
||||
onOperation(shard, shardIt, exp, false);
|
||||
@Override public void handleException(RemoteTransportException e) {
|
||||
onOperation(shard, shardIt, e, false);
|
||||
}
|
||||
|
||||
@Override public boolean spawn() {
|
||||
|
@ -236,32 +239,54 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"}) private void onOperation(ShardRouting shard, ShardResponse response, boolean alreadyThreaded) {
|
||||
@SuppressWarnings({"unchecked"})
|
||||
private void onOperation(ShardRouting shard, ShardResponse response, boolean alreadyThreaded) {
|
||||
shardsResponses.set(indexCounter.getAndIncrement(), response);
|
||||
if (expectedOps == counterOps.incrementAndGet()) {
|
||||
finishHim(alreadyThreaded);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"}) private void onOperation(ShardRouting shard, final Iterator<ShardRouting> shardIt, Exception e, boolean alreadyThreaded) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
if (e != null) {
|
||||
logger.debug(shard.shortSummary() + ": Failed to execute [" + request + "]", e);
|
||||
@SuppressWarnings({"unchecked"})
|
||||
private void onOperation(ShardRouting shard, final ShardsIterator shardIt, Throwable t, boolean alreadyThreaded) {
|
||||
if (!shardIt.hasNextActive()) {
|
||||
// e is null when there is no next active....
|
||||
if (logger.isDebugEnabled()) {
|
||||
if (t != null) {
|
||||
if (shard != null) {
|
||||
logger.debug(shard.shortSummary() + ": Failed to execute [" + request + "]", t);
|
||||
} else {
|
||||
logger.debug(shardIt.shardId() + ": Failed to execute [" + request + "]", t);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!shardIt.hasNext()) {
|
||||
// no more shards in this partition
|
||||
// no more shards in this group
|
||||
int index = indexCounter.getAndIncrement();
|
||||
if (accumulateExceptions()) {
|
||||
if (!(e instanceof BroadcastShardOperationFailedException)) {
|
||||
e = new BroadcastShardOperationFailedException(shard.shardId(), e);
|
||||
if (t == null) {
|
||||
if (!ignoreNonActiveExceptions()) {
|
||||
t = new BroadcastShardOperationFailedException(shardIt.shardId(), "No active shard(s)");
|
||||
}
|
||||
} else if (!(t instanceof BroadcastShardOperationFailedException)) {
|
||||
t = new BroadcastShardOperationFailedException(shardIt.shardId(), t);
|
||||
}
|
||||
shardsResponses.set(index, e);
|
||||
shardsResponses.set(index, t);
|
||||
}
|
||||
if (expectedOps == counterOps.incrementAndGet()) {
|
||||
finishHim(alreadyThreaded);
|
||||
}
|
||||
return;
|
||||
} else {
|
||||
// trace log this exception
|
||||
if (logger.isTraceEnabled()) {
|
||||
if (t != null) {
|
||||
if (shard != null) {
|
||||
logger.trace(shard.shortSummary() + ": Failed to execute [" + request + "]", t);
|
||||
} else {
|
||||
logger.trace(shardIt.shardId() + ": Failed to execute [" + request + "]", t);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// we are not threaded here if we got here from the transport
|
||||
// or we possibly threaded if we got from a local threaded one,
|
||||
|
|
|
@ -31,7 +31,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import static org.elasticsearch.action.Actions.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public abstract class ShardReplicationOperationRequest implements ActionRequest {
|
||||
|
||||
|
|
|
@ -330,6 +330,9 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
|||
// still in recovery, retry (we know that its not UNASSIGNED OR INITIALIZING since we are checking it in the calling method)
|
||||
retryPrimary(fromDiscoveryListener, shard);
|
||||
} catch (Exception e) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(shard.shortSummary() + ": Failed to execute [" + request + "]", e);
|
||||
}
|
||||
listener.onFailure(new ReplicationShardOperationFailedException(shards.shardId(), e));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,10 +39,9 @@ import org.elasticsearch.util.io.stream.Streamable;
|
|||
import org.elasticsearch.util.settings.Settings;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public abstract class TransportSingleOperationAction<Request extends SingleOperationRequest, Response extends ActionResponse> extends BaseAction<Request, Response> {
|
||||
|
||||
|
@ -83,9 +82,7 @@ public abstract class TransportSingleOperationAction<Request extends SingleOpera
|
|||
|
||||
private final ActionListener<Response> listener;
|
||||
|
||||
private final ShardsIterator shards;
|
||||
|
||||
private Iterator<ShardRouting> shardsIt;
|
||||
private final ShardsIterator shardsIt;
|
||||
|
||||
private final Request request;
|
||||
|
||||
|
@ -102,18 +99,17 @@ public abstract class TransportSingleOperationAction<Request extends SingleOpera
|
|||
// update to the concrete shard to use
|
||||
request.index(clusterState.metaData().concreteIndex(request.index()));
|
||||
|
||||
this.shards = indicesService.indexServiceSafe(request.index()).operationRouting()
|
||||
this.shardsIt = indicesService.indexServiceSafe(request.index()).operationRouting()
|
||||
.getShards(clusterState, request.type(), request.id());
|
||||
this.shardsIt = shards.iterator();
|
||||
}
|
||||
|
||||
public void start() {
|
||||
performFirst();
|
||||
}
|
||||
|
||||
public void onFailure(ShardRouting shardRouting, Exception e) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(shardRouting.shortSummary() + ": Failed to get [" + request.type() + "#" + request.id() + "]", e);
|
||||
private void onFailure(ShardRouting shardRouting, Exception e) {
|
||||
if (logger.isTraceEnabled() && e != null) {
|
||||
logger.trace(shardRouting.shortSummary() + ": Failed to get [" + request.type() + "#" + request.id() + "]", e);
|
||||
}
|
||||
perform(e);
|
||||
}
|
||||
|
@ -122,11 +118,8 @@ public abstract class TransportSingleOperationAction<Request extends SingleOpera
|
|||
* First get should try and use a shard that exists on a local node for better performance
|
||||
*/
|
||||
private void performFirst() {
|
||||
while (shardsIt.hasNext()) {
|
||||
final ShardRouting shard = shardsIt.next();
|
||||
if (!shard.active()) {
|
||||
continue;
|
||||
}
|
||||
while (shardsIt.hasNextActive()) {
|
||||
final ShardRouting shard = shardsIt.nextActive();
|
||||
if (shard.currentNodeId().equals(nodes.localNodeId())) {
|
||||
if (request.operationThreaded()) {
|
||||
threadPool.execute(new Runnable() {
|
||||
|
@ -159,19 +152,16 @@ public abstract class TransportSingleOperationAction<Request extends SingleOpera
|
|||
}
|
||||
}
|
||||
}
|
||||
if (!shardsIt.hasNext()) {
|
||||
if (!shardsIt.hasNextActive()) {
|
||||
// no local node get, go remote
|
||||
shardsIt = shards.reset().iterator();
|
||||
shardsIt.reset();
|
||||
perform(null);
|
||||
}
|
||||
}
|
||||
|
||||
private void perform(final Exception lastException) {
|
||||
while (shardsIt.hasNext()) {
|
||||
final ShardRouting shard = shardsIt.next();
|
||||
if (!shard.active()) {
|
||||
continue;
|
||||
}
|
||||
while (shardsIt.hasNextActive()) {
|
||||
final ShardRouting shard = shardsIt.nextActive();
|
||||
// no need to check for local nodes, we tried them already in performFirstGet
|
||||
if (!shard.currentNodeId().equals(nodes.localNodeId())) {
|
||||
Node node = nodes.get(shard.currentNodeId());
|
||||
|
@ -204,12 +194,20 @@ public abstract class TransportSingleOperationAction<Request extends SingleOpera
|
|||
return;
|
||||
}
|
||||
}
|
||||
if (!shardsIt.hasNext()) {
|
||||
final NoShardAvailableActionException failure = new NoShardAvailableActionException(shards.shardId(), "No shard available for [" + request.type() + "#" + request.id() + "]", lastException);
|
||||
if (!shardsIt.hasNextActive()) {
|
||||
Exception failure = lastException;
|
||||
if (failure == null) {
|
||||
failure = new NoShardAvailableActionException(shardsIt.shardId(), "No shard available for [" + request.type() + "#" + request.id() + "]");
|
||||
} else {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(shardsIt.shardId() + ": Failed to get [" + request.type() + "#" + request.id() + "]", failure);
|
||||
}
|
||||
}
|
||||
if (request.listenerThreaded()) {
|
||||
final Exception fFailure = failure;
|
||||
threadPool.execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
listener.onFailure(failure);
|
||||
listener.onFailure(fFailure);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
|
|
|
@ -54,9 +54,9 @@ public class TermsResponse extends BroadcastOperationResponse implements Iterabl
|
|||
TermsResponse() {
|
||||
}
|
||||
|
||||
TermsResponse(int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures, FieldTermsFreq[] fieldsTermsFreq,
|
||||
TermsResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures, FieldTermsFreq[] fieldsTermsFreq,
|
||||
long numDocs, long maxDoc, long numDeletedDocs) {
|
||||
super(successfulShards, failedShards, shardFailures);
|
||||
super(totalShards, successfulShards, failedShards, shardFailures);
|
||||
this.fieldsTermsFreq = fieldsTermsFreq;
|
||||
this.numDocs = numDocs;
|
||||
this.maxDoc = maxDoc;
|
||||
|
|
|
@ -140,7 +140,7 @@ public class TransportTermsAction extends TransportBroadcastOperationAction<Term
|
|||
TermFreq[] freqs = entry.getValue().toArray(new TermFreq[entry.getValue().size()]);
|
||||
resultFreqs[index++] = new FieldTermsFreq(entry.getKey(), freqs);
|
||||
}
|
||||
return new TermsResponse(successfulShards, failedShards, shardFailures, resultFreqs, numDocs, maxDoc, numDeletedDocs);
|
||||
return new TermsResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures, resultFreqs, numDocs, maxDoc, numDeletedDocs);
|
||||
}
|
||||
|
||||
@Override protected ShardTermsResponse shardOperation(ShardTermsRequest request) throws ElasticSearchException {
|
||||
|
|
|
@ -1,105 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.routing;
|
||||
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.NoSuchElementException;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
public class CompoundShardsIterator implements ShardsIterator, Iterator<ShardRouting> {
|
||||
|
||||
private int index = 0;
|
||||
|
||||
private final List<ShardsIterator> iterators;
|
||||
|
||||
private Iterator<ShardRouting> current;
|
||||
|
||||
public CompoundShardsIterator(List<ShardsIterator> iterators) {
|
||||
this.iterators = iterators;
|
||||
}
|
||||
|
||||
@Override public ShardsIterator reset() {
|
||||
for (ShardsIterator it : iterators) {
|
||||
it.reset();
|
||||
}
|
||||
index = 0;
|
||||
current = null;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override public int size() {
|
||||
int size = 0;
|
||||
for (ShardsIterator it : iterators) {
|
||||
size += it.size();
|
||||
}
|
||||
return size;
|
||||
}
|
||||
|
||||
@Override public boolean hasNext() {
|
||||
if (index == iterators.size()) {
|
||||
return false;
|
||||
}
|
||||
if (current == null) {
|
||||
current = iterators.get(index).iterator();
|
||||
}
|
||||
while (!current.hasNext()) {
|
||||
if (++index == iterators.size()) {
|
||||
return false;
|
||||
}
|
||||
current = iterators.get(index).iterator();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override public ShardRouting next() {
|
||||
if (!hasNext()) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
return current.next();
|
||||
}
|
||||
|
||||
@Override public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override public ShardId shardId() {
|
||||
return currentShardsIterator().shardId();
|
||||
}
|
||||
|
||||
@Override public Iterator<ShardRouting> iterator() {
|
||||
return this;
|
||||
}
|
||||
|
||||
private ShardsIterator currentShardsIterator() throws NoSuchElementException {
|
||||
if (iterators.size() == 0) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
if (index == iterators.size()) {
|
||||
return iterators.get(index - 1);
|
||||
}
|
||||
return iterators.get(index);
|
||||
|
||||
}
|
||||
}
|
|
@ -57,6 +57,14 @@ public class GroupShardsIterator implements Iterable<ShardsIterator> {
|
|||
return size;
|
||||
}
|
||||
|
||||
public int totalSizeActive() {
|
||||
int size = 0;
|
||||
for (ShardsIterator shard : iterators) {
|
||||
size += shard.sizeActive();
|
||||
}
|
||||
return size;
|
||||
}
|
||||
|
||||
public int size() {
|
||||
return iterators.size();
|
||||
}
|
||||
|
|
|
@ -110,7 +110,7 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
|
|||
* A groups shards iterator where each groups is a single {@link ShardRouting} and a group
|
||||
* is created for each shard routing.
|
||||
*
|
||||
* <p>This basically means that components that use the {@link GroupShardsIterator} will itearte
|
||||
* <p>This basically means that components that use the {@link GroupShardsIterator} will iterate
|
||||
* over *all* the shards (all the replicas) within the index.
|
||||
*/
|
||||
public GroupShardsIterator groupByAllIt() {
|
||||
|
|
|
@ -106,7 +106,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
|
|||
* <p>The class can be used from different threads, though not designed to be used concurrently
|
||||
* from different threads.
|
||||
*/
|
||||
private class IndexShardsIterator implements ShardsIterator, Iterator<ShardRouting> {
|
||||
class IndexShardsIterator implements ShardsIterator, Iterator<ShardRouting> {
|
||||
|
||||
private final int origIndex;
|
||||
|
||||
|
@ -130,17 +130,47 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
|
|||
}
|
||||
|
||||
@Override public boolean hasNext() {
|
||||
return counter != size();
|
||||
return counter < size();
|
||||
}
|
||||
|
||||
@Override public ShardRouting next() {
|
||||
@Override public ShardRouting next() throws NoSuchElementException {
|
||||
if (!hasNext()) {
|
||||
throw new NoSuchElementException();
|
||||
throw new NoSuchElementException("No shard found");
|
||||
}
|
||||
counter++;
|
||||
return shardModulo(index++);
|
||||
}
|
||||
|
||||
@Override public boolean hasNextActive() {
|
||||
int counter = this.counter;
|
||||
int index = this.index;
|
||||
while (counter++ < size()) {
|
||||
ShardRouting shardRouting = shardModulo(index++);
|
||||
if (shardRouting.active()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override public ShardRouting nextActive() throws NoSuchElementException {
|
||||
ShardRouting shardRouting = nextActiveOrNull();
|
||||
if (shardRouting == null) {
|
||||
throw new NoSuchElementException("No active shard found");
|
||||
}
|
||||
return shardRouting;
|
||||
}
|
||||
|
||||
@Override public ShardRouting nextActiveOrNull() throws NoSuchElementException {
|
||||
while (counter++ < size()) {
|
||||
ShardRouting shardRouting = shardModulo(index++);
|
||||
if (shardRouting.active()) {
|
||||
return shardRouting;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
@ -149,6 +179,16 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
|
|||
return IndexShardRoutingTable.this.size();
|
||||
}
|
||||
|
||||
@Override public int sizeActive() {
|
||||
int shardsActive = 0;
|
||||
for (ShardRouting shardRouting : IndexShardRoutingTable.this.shards()) {
|
||||
if (shardRouting.active()) {
|
||||
shardsActive++;
|
||||
}
|
||||
}
|
||||
return shardsActive;
|
||||
}
|
||||
|
||||
@Override public ShardId shardId() {
|
||||
return IndexShardRoutingTable.this.shardId();
|
||||
}
|
||||
|
|
|
@ -23,9 +23,10 @@ import org.elasticsearch.index.shard.ShardId;
|
|||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.NoSuchElementException;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class PlainShardsIterator implements ShardsIterator {
|
||||
|
||||
|
@ -33,16 +34,15 @@ public class PlainShardsIterator implements ShardsIterator {
|
|||
|
||||
private final List<ShardRouting> shards;
|
||||
|
||||
private Iterator<ShardRouting> iterator;
|
||||
private volatile int counter = 0;
|
||||
|
||||
public PlainShardsIterator(ShardId shardId, List<ShardRouting> shards) {
|
||||
this.shardId = shardId;
|
||||
this.shards = shards;
|
||||
this.iterator = shards.iterator();
|
||||
}
|
||||
|
||||
@Override public ShardsIterator reset() {
|
||||
this.iterator = shards.iterator();
|
||||
this.counter = 0;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -50,6 +50,16 @@ public class PlainShardsIterator implements ShardsIterator {
|
|||
return shards.size();
|
||||
}
|
||||
|
||||
@Override public int sizeActive() {
|
||||
int sizeActive = 0;
|
||||
for (ShardRouting shardRouting : shards) {
|
||||
if (shardRouting.active()) {
|
||||
sizeActive++;
|
||||
}
|
||||
}
|
||||
return sizeActive;
|
||||
}
|
||||
|
||||
@Override public ShardId shardId() {
|
||||
return this.shardId;
|
||||
}
|
||||
|
@ -59,11 +69,42 @@ public class PlainShardsIterator implements ShardsIterator {
|
|||
}
|
||||
|
||||
@Override public boolean hasNext() {
|
||||
return iterator.hasNext();
|
||||
return counter < shards.size();
|
||||
}
|
||||
|
||||
@Override public ShardRouting next() {
|
||||
return iterator.next();
|
||||
if (!hasNext()) {
|
||||
throw new NoSuchElementException("No shard found");
|
||||
}
|
||||
return shards.get(counter++);
|
||||
}
|
||||
|
||||
@Override public boolean hasNextActive() {
|
||||
int counter = this.counter;
|
||||
while (counter < shards.size()) {
|
||||
if (shards.get(counter++).active()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override public ShardRouting nextActive() throws NoSuchElementException {
|
||||
ShardRouting shardRouting = nextActiveOrNull();
|
||||
if (shardRouting == null) {
|
||||
throw new NoSuchElementException("No active shard found");
|
||||
}
|
||||
return shardRouting;
|
||||
}
|
||||
|
||||
@Override public ShardRouting nextActiveOrNull() throws NoSuchElementException {
|
||||
while (counter < shards.size()) {
|
||||
ShardRouting shardRouting = shards.get(counter++);
|
||||
if (shardRouting.active()) {
|
||||
return shardRouting;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override public void remove() {
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.cluster.routing;
|
|||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.NoSuchElementException;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
|
@ -35,5 +36,13 @@ public interface ShardsIterator extends Iterable<ShardRouting>, Iterator<ShardRo
|
|||
|
||||
int size();
|
||||
|
||||
int sizeActive();
|
||||
|
||||
ShardId shardId();
|
||||
|
||||
boolean hasNextActive();
|
||||
|
||||
ShardRouting nextActive() throws NoSuchElementException;
|
||||
|
||||
ShardRouting nextActiveOrNull();
|
||||
}
|
||||
|
|
|
@ -29,7 +29,7 @@ import java.io.Serializable;
|
|||
/**
|
||||
* The target that the search request was executed on.
|
||||
*
|
||||
* @author kimchy (Shay Banon)
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class SearchShardTarget implements Streamable, Serializable {
|
||||
|
||||
|
|
|
@ -0,0 +1,119 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.test.integration.broadcast;
|
||||
|
||||
import org.elasticsearch.action.ShardOperationFailedException;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
|
||||
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
|
||||
import org.elasticsearch.action.count.CountResponse;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading;
|
||||
import org.elasticsearch.test.integration.AbstractServersTests;
|
||||
import org.elasticsearch.util.Unicode;
|
||||
import org.elasticsearch.util.json.JsonBuilder;
|
||||
import org.testng.annotations.AfterMethod;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.client.Requests.*;
|
||||
import static org.elasticsearch.index.query.json.JsonQueryBuilders.*;
|
||||
import static org.elasticsearch.util.json.JsonBuilder.*;
|
||||
import static org.hamcrest.MatcherAssert.*;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class BroadcastActionsTests extends AbstractServersTests {
|
||||
|
||||
@AfterMethod public void closeServers() {
|
||||
closeAllServers();
|
||||
}
|
||||
|
||||
@Test public void testBroadcastOperations() throws IOException {
|
||||
startServer("server1");
|
||||
|
||||
client("server1").admin().indices().create(createIndexRequest("test")).actionGet(5000);
|
||||
|
||||
logger.info("Running Cluster Health");
|
||||
ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForYellowStatus()).actionGet();
|
||||
logger.info("Done Cluster Health, status " + clusterHealth.status());
|
||||
assertThat(clusterHealth.timedOut(), equalTo(false));
|
||||
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW));
|
||||
|
||||
client("server1").index(indexRequest("test").type("type1").id("1").source(source("1", "test"))).actionGet();
|
||||
FlushResponse flushResponse = client("server1").admin().indices().flush(flushRequest("test")).actionGet();
|
||||
assertThat(flushResponse.totalShards(), equalTo(10));
|
||||
assertThat(flushResponse.successfulShards(), equalTo(5));
|
||||
assertThat(flushResponse.failedShards(), equalTo(0));
|
||||
client("server1").index(indexRequest("test").type("type1").id("2").source(source("2", "test"))).actionGet();
|
||||
RefreshResponse refreshResponse = client("server1").admin().indices().refresh(refreshRequest("test")).actionGet();
|
||||
assertThat(refreshResponse.totalShards(), equalTo(10));
|
||||
assertThat(refreshResponse.successfulShards(), equalTo(5));
|
||||
assertThat(refreshResponse.failedShards(), equalTo(0));
|
||||
|
||||
logger.info("Count");
|
||||
// check count
|
||||
for (int i = 0; i < 5; i++) {
|
||||
// test successful
|
||||
CountResponse countResponse = client("server1").count(countRequest("test").querySource(termQuery("_type", "type1")).operationThreading(BroadcastOperationThreading.NO_THREADS)).actionGet();
|
||||
assertThat(countResponse.count(), equalTo(2l));
|
||||
assertThat(countResponse.totalShards(), equalTo(5));
|
||||
assertThat(countResponse.successfulShards(), equalTo(5));
|
||||
assertThat(countResponse.failedShards(), equalTo(0));
|
||||
}
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
CountResponse countResponse = client("server1").count(countRequest("test").querySource(termQuery("_type", "type1")).operationThreading(BroadcastOperationThreading.SINGLE_THREAD)).actionGet();
|
||||
assertThat(countResponse.count(), equalTo(2l));
|
||||
assertThat(countResponse.totalShards(), equalTo(5));
|
||||
assertThat(countResponse.successfulShards(), equalTo(5));
|
||||
assertThat(countResponse.failedShards(), equalTo(0));
|
||||
}
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
CountResponse countResponse = client("server1").count(countRequest("test").querySource(termQuery("_type", "type1")).operationThreading(BroadcastOperationThreading.THREAD_PER_SHARD)).actionGet();
|
||||
assertThat(countResponse.count(), equalTo(2l));
|
||||
assertThat(countResponse.totalShards(), equalTo(5));
|
||||
assertThat(countResponse.successfulShards(), equalTo(5));
|
||||
assertThat(countResponse.failedShards(), equalTo(0));
|
||||
}
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
// test failed (simply query that can't be parsed)
|
||||
CountResponse countResponse = client("server1").count(countRequest("test").querySource(Unicode.fromStringAsBytes("{ term : { _type : \"type1 } }"))).actionGet();
|
||||
|
||||
assertThat(countResponse.count(), equalTo(0l));
|
||||
assertThat(countResponse.totalShards(), equalTo(5));
|
||||
assertThat(countResponse.successfulShards(), equalTo(0));
|
||||
assertThat(countResponse.failedShards(), equalTo(5));
|
||||
for (ShardOperationFailedException exp : countResponse.shardFailures()) {
|
||||
assertThat(exp.reason(), containsString("QueryParsingException"));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private JsonBuilder source(String id, String nameValue) throws IOException {
|
||||
return jsonBuilder().startObject().field("id", id).field("name", nameValue).endObject();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,7 @@
|
|||
cluster:
|
||||
routing:
|
||||
schedule: 100ms
|
||||
index:
|
||||
number_of_shards: 5
|
||||
number_of_replicas: 1
|
||||
|
|
@ -19,6 +19,10 @@
|
|||
|
||||
package org.elasticsearch.test.integration.recovery;
|
||||
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
|
||||
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.test.integration.AbstractServersTests;
|
||||
import org.testng.annotations.AfterMethod;
|
||||
|
@ -29,7 +33,7 @@ import static org.hamcrest.MatcherAssert.*;
|
|||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class SimpleRecoveryTests extends AbstractServersTests {
|
||||
|
||||
|
@ -42,14 +46,30 @@ public class SimpleRecoveryTests extends AbstractServersTests {
|
|||
|
||||
client("server1").admin().indices().create(createIndexRequest("test")).actionGet(5000);
|
||||
|
||||
logger.info("Running Cluster Health");
|
||||
ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForYellowStatus()).actionGet();
|
||||
logger.info("Done Cluster Health, status " + clusterHealth.status());
|
||||
assertThat(clusterHealth.timedOut(), equalTo(false));
|
||||
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW));
|
||||
|
||||
client("server1").index(indexRequest("test").type("type1").id("1").source(source("1", "test"))).actionGet();
|
||||
client("server1").admin().indices().flush(flushRequest("test")).actionGet();
|
||||
FlushResponse flushResponse = client("server1").admin().indices().flush(flushRequest("test")).actionGet();
|
||||
assertThat(flushResponse.totalShards(), equalTo(10));
|
||||
assertThat(flushResponse.successfulShards(), equalTo(5));
|
||||
assertThat(flushResponse.failedShards(), equalTo(0));
|
||||
client("server1").index(indexRequest("test").type("type1").id("2").source(source("2", "test"))).actionGet();
|
||||
client("server1").admin().indices().refresh(refreshRequest("test")).actionGet();
|
||||
RefreshResponse refreshResponse = client("server1").admin().indices().refresh(refreshRequest("test")).actionGet();
|
||||
assertThat(refreshResponse.totalShards(), equalTo(10));
|
||||
assertThat(refreshResponse.successfulShards(), equalTo(5));
|
||||
assertThat(refreshResponse.failedShards(), equalTo(0));
|
||||
|
||||
startServer("server2");
|
||||
// sleep so we recover properly
|
||||
Thread.sleep(5000);
|
||||
|
||||
logger.info("Running Cluster Health");
|
||||
clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus()).actionGet();
|
||||
logger.info("Done Cluster Health, status " + clusterHealth.status());
|
||||
assertThat(clusterHealth.timedOut(), equalTo(false));
|
||||
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
|
||||
|
||||
GetResponse getResult;
|
||||
|
||||
|
@ -66,7 +86,12 @@ public class SimpleRecoveryTests extends AbstractServersTests {
|
|||
|
||||
// now start another one so we move some primaries
|
||||
startServer("server3");
|
||||
Thread.sleep(5000);
|
||||
Thread.sleep(1000);
|
||||
logger.info("Running Cluster Health");
|
||||
clusterHealth = client("server1").admin().cluster().health(clusterHealth().waitForGreenStatus()).actionGet();
|
||||
logger.info("Done Cluster Health, status " + clusterHealth.status());
|
||||
assertThat(clusterHealth.timedOut(), equalTo(false));
|
||||
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
getResult = client("server1").get(getRequest("test").type("type1").id("1")).actionGet(1000);
|
||||
|
|
|
@ -0,0 +1,124 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.test.integration.search;
|
||||
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
|
||||
import org.elasticsearch.action.search.SearchPhaseExecutionException;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.Requests;
|
||||
import org.elasticsearch.test.integration.AbstractServersTests;
|
||||
import org.elasticsearch.util.Unicode;
|
||||
import org.elasticsearch.util.json.JsonBuilder;
|
||||
import org.testng.annotations.AfterMethod;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.client.Requests.*;
|
||||
import static org.elasticsearch.util.json.JsonBuilder.*;
|
||||
import static org.hamcrest.MatcherAssert.*;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class TransportSearchFailuresTests extends AbstractServersTests {
|
||||
|
||||
@AfterMethod public void closeServers() {
|
||||
closeAllServers();
|
||||
}
|
||||
|
||||
@Test public void testFailedSearchWithWrongQuery() throws Exception {
|
||||
logger.info("Start Testing failed search with wrong query");
|
||||
startServer("server1");
|
||||
client("server1").admin().indices().create(createIndexRequest("test")).actionGet();
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
index(client("server1"), Integer.toString(i), "test", i);
|
||||
}
|
||||
RefreshResponse refreshResponse = client("server1").admin().indices().refresh(refreshRequest("test")).actionGet();
|
||||
assertThat(refreshResponse.totalShards(), equalTo(9));
|
||||
assertThat(refreshResponse.successfulShards(), equalTo(3));
|
||||
assertThat(refreshResponse.failedShards(), equalTo(0));
|
||||
for (int i = 0; i < 5; i++) {
|
||||
try {
|
||||
SearchResponse searchResponse = client("server1").search(searchRequest("test").source(Unicode.fromStringAsBytes("{ xxx }"))).actionGet();
|
||||
assertThat(searchResponse.totalShards(), equalTo(3));
|
||||
assertThat(searchResponse.successfulShards(), equalTo(0));
|
||||
assertThat(searchResponse.failedShards(), equalTo(3));
|
||||
assert false : "search should fail";
|
||||
} catch (ElasticSearchException e) {
|
||||
assertThat(e.unwrapCause(), instanceOf(SearchPhaseExecutionException.class));
|
||||
// all is well
|
||||
}
|
||||
}
|
||||
|
||||
startServer("server2");
|
||||
Thread.sleep(300);
|
||||
|
||||
logger.info("Running Cluster Health");
|
||||
ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealth("test").waitForYellowStatus().waitForRelocatingShards(0)).actionGet();
|
||||
logger.info("Done Cluster Health, status " + clusterHealth.status());
|
||||
assertThat(clusterHealth.timedOut(), equalTo(false));
|
||||
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW));
|
||||
|
||||
refreshResponse = client("server1").admin().indices().refresh(refreshRequest("test")).actionGet();
|
||||
assertThat(refreshResponse.totalShards(), equalTo(9));
|
||||
assertThat(refreshResponse.successfulShards(), equalTo(6));
|
||||
assertThat(refreshResponse.failedShards(), equalTo(0));
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
try {
|
||||
SearchResponse searchResponse = client("server1").search(searchRequest("test").source(Unicode.fromStringAsBytes("{ xxx }"))).actionGet();
|
||||
assertThat(searchResponse.totalShards(), equalTo(3));
|
||||
assertThat(searchResponse.successfulShards(), equalTo(0));
|
||||
assertThat(searchResponse.failedShards(), equalTo(3));
|
||||
assert false : "search should fail";
|
||||
} catch (ElasticSearchException e) {
|
||||
assertThat(e.unwrapCause(), instanceOf(SearchPhaseExecutionException.class));
|
||||
// all is well
|
||||
}
|
||||
}
|
||||
|
||||
logger.info("Done Testing failed search");
|
||||
}
|
||||
|
||||
private void index(Client client, String id, String nameValue, int age) throws IOException {
|
||||
client.index(Requests.indexRequest("test").type("type1").id(id).source(source(id, nameValue, age))).actionGet();
|
||||
}
|
||||
|
||||
private JsonBuilder source(String id, String nameValue, int age) throws IOException {
|
||||
StringBuilder multi = new StringBuilder().append(nameValue);
|
||||
for (int i = 0; i < age; i++) {
|
||||
multi.append(" ").append(nameValue);
|
||||
}
|
||||
return binaryJsonBuilder().startObject()
|
||||
.field("id", id)
|
||||
.field("name", nameValue + id)
|
||||
.field("age", age)
|
||||
.field("multi", multi.toString())
|
||||
.field("_boost", age * 10)
|
||||
.endObject();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,9 @@
|
|||
cluster:
|
||||
routing:
|
||||
schedule: 100ms
|
||||
index:
|
||||
number_of_shards: 3
|
||||
number_of_replicas: 2
|
||||
routing :
|
||||
# Use simple hashing since we want even distribution and our ids are simple incremented number based
|
||||
hash.type : simple
|
|
@ -271,7 +271,10 @@ public class TransportTwoServersSearchTests extends AbstractServersTests {
|
|||
@Test public void testFailedSearchWithWrongQuery() throws Exception {
|
||||
logger.info("Start Testing failed search with wrong query");
|
||||
try {
|
||||
client.search(searchRequest("test").source(Unicode.fromStringAsBytes("{ xxx }"))).actionGet();
|
||||
SearchResponse searchResponse = client.search(searchRequest("test").source(Unicode.fromStringAsBytes("{ xxx }"))).actionGet();
|
||||
assertThat(searchResponse.totalShards(), equalTo(3));
|
||||
assertThat(searchResponse.successfulShards(), equalTo(0));
|
||||
assertThat(searchResponse.failedShards(), equalTo(3));
|
||||
assert false : "search should fail";
|
||||
} catch (ElasticSearchException e) {
|
||||
assertThat(e.unwrapCause(), instanceOf(SearchPhaseExecutionException.class));
|
||||
|
@ -287,6 +290,9 @@ public class TransportTwoServersSearchTests extends AbstractServersTests {
|
|||
.from(1000).size(20).explain(true);
|
||||
SearchResponse response = client.search(searchRequest("test").searchType(DFS_QUERY_AND_FETCH).source(source)).actionGet();
|
||||
assertThat(response.hits().hits().length, equalTo(0));
|
||||
assertThat(response.totalShards(), equalTo(3));
|
||||
assertThat(response.successfulShards(), equalTo(3));
|
||||
assertThat(response.failedShards(), equalTo(0));
|
||||
|
||||
response = client.search(searchRequest("test").searchType(QUERY_THEN_FETCH).source(source)).actionGet();
|
||||
assertThat(response.shardFailures().length, equalTo(0));
|
||||
|
|
Loading…
Reference in New Issue