add failure reason for broadcast operations (refresh, flush, optimize, ....)

This commit is contained in:
kimchy 2010-02-20 18:58:44 +02:00
parent a828106553
commit 2e81730272
33 changed files with 419 additions and 167 deletions

View File

@ -19,22 +19,27 @@
package org.elasticsearch.action;
import org.elasticsearch.ElasticSearchWrapperException;
import org.elasticsearch.index.shard.IndexShardException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.util.io.Streamable;
/**
* An exception indicating that a failure occurred performing an operation on the shard.
*
* @author kimchy (Shay Banon)
*/
public class ShardOperationFailedException extends IndexShardException implements ElasticSearchWrapperException {
public interface ShardOperationFailedException extends Streamable {
public ShardOperationFailedException(ShardId shardId, Throwable cause) {
super(shardId, "", cause);
}
/**
* The index the operation failed on. Might return <tt>null</tt> if it can't be derived.
*/
String index();
public ShardOperationFailedException(ShardId shardId, String msg, Throwable cause) {
super(shardId, msg, cause);
}
/**
* The index the operation failed on. Might return <tt>-1</tt> if it can't be derived.
*/
int shardId();
/**
* The reason of the failure.
*/
String reason();
}

View File

@ -19,11 +19,13 @@
package org.elasticsearch.action.admin.cluster.ping.broadcast;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
/**
* @author kimchy (Shay Banon)
@ -34,8 +36,8 @@ public class BroadcastPingResponse extends BroadcastOperationResponse {
}
public BroadcastPingResponse(int successfulShards, int failedShards) {
super(successfulShards, failedShards);
public BroadcastPingResponse(int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(successfulShards, failedShards, shardFailures);
}
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {

View File

@ -21,7 +21,10 @@ package org.elasticsearch.action.admin.cluster.ping.broadcast;
import com.google.inject.Inject;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
@ -32,8 +35,10 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.settings.Settings;
import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceArray;
import static com.google.common.collect.Lists.*;
import static org.elasticsearch.action.Actions.*;
/**
@ -64,18 +69,22 @@ public class TransportBroadcastPingAction extends TransportBroadcastOperationAct
@Override protected BroadcastPingResponse newResponse(BroadcastPingRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
int failedShards = 0;
List<ShardOperationFailedException> shardFailures = null;
for (int i = 0; i < shardsResponses.length(); i++) {
if (shardsResponses.get(i) == null) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
failedShards++;
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {
shardFailures = newArrayList();
}
shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));
} else {
successfulShards++;
}
}
return new BroadcastPingResponse(successfulShards, failedShards);
}
@Override protected boolean accumulateExceptions() {
return false;
return new BroadcastPingResponse(successfulShards, failedShards, shardFailures);
}
@Override protected BroadcastShardPingRequest newShardRequest() {

View File

@ -19,11 +19,13 @@
package org.elasticsearch.action.admin.indices.flush;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
/**
* @author kimchy (Shay Banon)
@ -34,8 +36,8 @@ public class FlushResponse extends BroadcastOperationResponse {
}
FlushResponse(int successfulShards, int failedShards) {
super(successfulShards, failedShards);
FlushResponse(int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(successfulShards, failedShards, shardFailures);
}
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {

View File

@ -21,7 +21,10 @@ package org.elasticsearch.action.admin.indices.flush;
import com.google.inject.Inject;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
@ -34,8 +37,11 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.settings.Settings;
import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceArray;
import static com.google.common.collect.Lists.*;
/**
* @author kimchy (Shay Banon)
*/
@ -60,15 +66,22 @@ public class TransportFlushAction extends TransportBroadcastOperationAction<Flus
@Override protected FlushResponse newResponse(FlushRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
int failedShards = 0;
List<ShardOperationFailedException> shardFailures = null;
for (int i = 0; i < shardsResponses.length(); i++) {
ShardFlushResponse shardCountResponse = (ShardFlushResponse) shardsResponses.get(i);
if (shardCountResponse == null) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
failedShards++;
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {
shardFailures = newArrayList();
}
shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));
} else {
successfulShards++;
}
}
return new FlushResponse(successfulShards, failedShards);
return new FlushResponse(successfulShards, failedShards, shardFailures);
}
@Override protected ShardFlushRequest newShardRequest() {
@ -89,41 +102,10 @@ public class TransportFlushAction extends TransportBroadcastOperationAction<Flus
return new ShardFlushResponse(request.index(), request.shardId());
}
@Override protected boolean accumulateExceptions() {
return false;
}
/**
* The refresh request works against *all* shards.
*/
@Override protected GroupShardsIterator shards(FlushRequest request, ClusterState clusterState) {
return clusterState.routingTable().allShardsGrouped(request.indices());
}
// @Override protected FlushRequest newRequestInstance() {
// return new FlushRequest();
// }
//
// @Override protected FlushResponse newResponseInstance(FlushRequest request, AtomicReferenceArray indexResponses) {
// FlushResponse response = new FlushResponse();
// for (int i = 0; i < indexResponses.length(); i++) {
// IndexFlushResponse indexFlushResponse = (IndexFlushResponse) indexResponses.get(i);
// if (indexFlushResponse != null) {
// response.indices().put(indexFlushResponse.index(), indexFlushResponse);
// }
// }
// return response;
// }
//
// @Override protected boolean accumulateExceptions() {
// return false;
// }
//
// @Override protected String transportAction() {
// return TransportActions.Admin.Indices.FLUSH;
// }
//
// @Override protected IndexFlushRequest newIndexRequestInstance(FlushRequest request, String index) {
// return new IndexFlushRequest(request, index);
// }
}

View File

@ -19,11 +19,13 @@
package org.elasticsearch.action.admin.indices.optimize;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
/**
* @author kimchy (Shay Banon)
@ -34,8 +36,8 @@ public class OptimizeResponse extends BroadcastOperationResponse {
}
OptimizeResponse(int successfulShards, int failedShards) {
super(successfulShards, failedShards);
OptimizeResponse(int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(successfulShards, failedShards, shardFailures);
}
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {

View File

@ -21,7 +21,10 @@ package org.elasticsearch.action.admin.indices.optimize;
import com.google.inject.Inject;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
@ -34,8 +37,11 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.settings.Settings;
import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceArray;
import static com.google.common.collect.Lists.*;
/**
* @author kimchy (Shay Banon)
*/
@ -61,15 +67,22 @@ public class TransportOptimizeAction extends TransportBroadcastOperationAction<O
@Override protected OptimizeResponse newResponse(OptimizeRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
int failedShards = 0;
List<ShardOperationFailedException> shardFailures = null;
for (int i = 0; i < shardsResponses.length(); i++) {
ShardOptimizeResponse shardCountResponse = (ShardOptimizeResponse) shardsResponses.get(i);
if (shardCountResponse == null) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
failedShards++;
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {
shardFailures = newArrayList();
}
shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));
} else {
successfulShards++;
}
}
return new OptimizeResponse(successfulShards, failedShards);
return new OptimizeResponse(successfulShards, failedShards, shardFailures);
}
@Override protected ShardOptimizeRequest newShardRequest() {
@ -96,10 +109,6 @@ public class TransportOptimizeAction extends TransportBroadcastOperationAction<O
return new ShardOptimizeResponse(request.index(), request.shardId());
}
@Override protected boolean accumulateExceptions() {
return false;
}
/**
* The refresh request works against *all* shards.
*/

View File

@ -19,11 +19,13 @@
package org.elasticsearch.action.admin.indices.refresh;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
/**
* @author kimchy (Shay Banon)
@ -34,8 +36,8 @@ public class RefreshResponse extends BroadcastOperationResponse {
}
RefreshResponse(int successfulShards, int failedShards) {
super(successfulShards, failedShards);
RefreshResponse(int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(successfulShards, failedShards, shardFailures);
}
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {

View File

@ -21,7 +21,10 @@ package org.elasticsearch.action.admin.indices.refresh;
import com.google.inject.Inject;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
@ -34,8 +37,11 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.settings.Settings;
import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceArray;
import static com.google.common.collect.Lists.*;
/**
* @author kimchy (Shay Banon)
*/
@ -61,15 +67,22 @@ public class TransportRefreshAction extends TransportBroadcastOperationAction<Re
@Override protected RefreshResponse newResponse(RefreshRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
int failedShards = 0;
List<ShardOperationFailedException> shardFailures = null;
for (int i = 0; i < shardsResponses.length(); i++) {
ShardRefreshResponse shardCountResponse = (ShardRefreshResponse) shardsResponses.get(i);
if (shardCountResponse == null) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
failedShards++;
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {
shardFailures = newArrayList();
}
shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));
} else {
successfulShards++;
}
}
return new RefreshResponse(successfulShards, failedShards);
return new RefreshResponse(successfulShards, failedShards, shardFailures);
}
@Override protected ShardRefreshRequest newShardRequest() {
@ -90,10 +103,6 @@ public class TransportRefreshAction extends TransportBroadcastOperationAction<Re
return new ShardRefreshResponse(request.index(), request.shardId());
}
@Override protected boolean accumulateExceptions() {
return false;
}
/**
* The refresh request works against *all* shards.
*/

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.admin.indices.status;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.util.settings.Settings;
@ -49,8 +50,8 @@ public class IndicesStatusResponse extends BroadcastOperationResponse {
IndicesStatusResponse() {
}
IndicesStatusResponse(ShardStatus[] shards, ClusterState clusterState, int successfulShards, int failedShards) {
super(successfulShards, failedShards);
IndicesStatusResponse(ShardStatus[] shards, ClusterState clusterState, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(successfulShards, failedShards, shardFailures);
this.shards = shards;
indicesSettings = newHashMap();
for (ShardStatus shard : shards) {

View File

@ -21,7 +21,10 @@ package org.elasticsearch.action.admin.indices.status;
import com.google.inject.Inject;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest;
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cluster.ClusterService;
@ -65,17 +68,24 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
@Override protected IndicesStatusResponse newResponse(IndicesStatusRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
int failedShards = 0;
List<ShardOperationFailedException> shardFailures = null;
final List<ShardStatus> shards = newArrayList();
for (int i = 0; i < shardsResponses.length(); i++) {
Object resp = shardsResponses.get(i);
if (resp instanceof ShardStatus) {
shards.add((ShardStatus) resp);
successfulShards++;
} else {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
failedShards++;
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {
shardFailures = newArrayList();
}
shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));
} else {
shards.add((ShardStatus) shardResponse);
successfulShards++;
}
}
return new IndicesStatusResponse(shards.toArray(new ShardStatus[shards.size()]), clusterState, successfulShards, failedShards);
return new IndicesStatusResponse(shards.toArray(new ShardStatus[shards.size()]), clusterState, successfulShards, failedShards, shardFailures);
}
@Override protected IndexShardStatusRequest newShardRequest() {
@ -114,10 +124,6 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
return shardStatus;
}
@Override protected boolean accumulateExceptions() {
return false;
}
/**
* Status goes across *all* shards.
*/

View File

@ -19,11 +19,13 @@
package org.elasticsearch.action.count;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
/**
* @author kimchy (Shay Banon)
@ -36,8 +38,8 @@ public class CountResponse extends BroadcastOperationResponse {
}
public CountResponse(long count, int successfulShards, int failedShards) {
super(successfulShards, failedShards);
public CountResponse(long count, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(successfulShards, failedShards, shardFailures);
this.count = count;
}

View File

@ -21,7 +21,10 @@ package org.elasticsearch.action.count;
import com.google.inject.Inject;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
@ -33,8 +36,10 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.settings.Settings;
import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceArray;
import static com.google.common.collect.Lists.*;
import static org.elasticsearch.action.Actions.*;
/**
@ -78,20 +83,23 @@ public class TransportCountAction extends TransportBroadcastOperationAction<Coun
int successfulShards = 0;
int failedShards = 0;
long count = 0;
List<ShardOperationFailedException> shardFailures = null;
for (int i = 0; i < shardsResponses.length(); i++) {
ShardCountResponse shardCountResponse = (ShardCountResponse) shardsResponses.get(i);
if (shardCountResponse == null) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
failedShards++;
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {
shardFailures = newArrayList();
}
shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));
} else {
count += shardCountResponse.count();
count += ((ShardCountResponse) shardResponse).count();
successfulShards++;
}
}
return new CountResponse(count, successfulShards, failedShards);
}
@Override protected boolean accumulateExceptions() {
return false;
return new CountResponse(count, successfulShards, failedShards, shardFailures);
}
@Override protected ShardCountResponse shardOperation(ShardCountRequest request) throws ElasticSearchException {

View File

@ -20,10 +20,10 @@
package org.elasticsearch.action.search;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.search.SearchException;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.util.Nullable;
import org.elasticsearch.util.io.Streamable;
import java.io.DataInput;
import java.io.DataOutput;
@ -36,7 +36,7 @@ import static org.elasticsearch.search.SearchShardTarget.*;
*
* @author kimchy (shay.banon)
*/
public class ShardSearchFailure implements Streamable {
public class ShardSearchFailure implements ShardOperationFailedException {
public static final ShardSearchFailure[] EMPTY_ARRAY = new ShardSearchFailure[0];
@ -65,6 +65,20 @@ public class ShardSearchFailure implements Streamable {
return this.shardTarget;
}
@Override public String index() {
if (shardTarget != null) {
return shardTarget.index();
}
return null;
}
@Override public int shardId() {
if (shardTarget != null) {
return shardTarget.shardId();
}
return -1;
}
public String reason() {
return this.reason;
}

View File

@ -0,0 +1,94 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.index.shard.IndexShardException;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import static org.elasticsearch.ExceptionsHelper.*;
/**
* @author kimchy (shay.banon)
*/
public class DefaultShardOperationFailedException implements ShardOperationFailedException {
private String index;
private int shardId;
private String reason;
private DefaultShardOperationFailedException() {
}
public DefaultShardOperationFailedException(IndexShardException e) {
this.index = e.shardId().index().name();
this.shardId = e.shardId().id();
this.reason = detailedMessage(e);
}
public DefaultShardOperationFailedException(String index, int shardId, Throwable t) {
this.index = index;
this.shardId = shardId;
this.reason = detailedMessage(t);
}
@Override public String index() {
return this.index;
}
@Override public int shardId() {
return this.shardId;
}
@Override public String reason() {
return this.reason;
}
public static DefaultShardOperationFailedException readShardOperationFailed(DataInput in) throws IOException, ClassNotFoundException {
DefaultShardOperationFailedException exp = new DefaultShardOperationFailedException();
exp.readFrom(in);
return exp;
}
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
if (in.readBoolean()) {
index = in.readUTF();
}
shardId = in.readInt();
reason = in.readUTF();
}
@Override public void writeTo(DataOutput out) throws IOException {
if (index == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(index);
}
out.writeInt(shardId);
out.writeUTF(reason);
}
}

View File

@ -19,11 +19,16 @@
package org.elasticsearch.action.support.broadcast;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* @author kimchy (Shay Banon)
@ -34,12 +39,18 @@ public abstract class BroadcastOperationResponse implements ActionResponse {
private int failedShards;
private List<ShardOperationFailedException> shardFailures = ImmutableList.of();
protected BroadcastOperationResponse() {
}
protected BroadcastOperationResponse(int successfulShards, int failedShards) {
protected BroadcastOperationResponse(int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
this.successfulShards = successfulShards;
this.failedShards = failedShards;
this.shardFailures = shardFailures;
if (shardFailures == null) {
this.shardFailures = ImmutableList.of();
}
}
public int totalShards() {
@ -54,13 +65,28 @@ public abstract class BroadcastOperationResponse implements ActionResponse {
return failedShards;
}
public List<? extends ShardOperationFailedException> shardFailures() {
return shardFailures;
}
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
successfulShards = in.readInt();
failedShards = in.readInt();
int size = in.readInt();
if (size > 0) {
shardFailures = new ArrayList<ShardOperationFailedException>(size);
for (int i = 0; i < size; i++) {
shardFailures.add(DefaultShardOperationFailedException.readShardOperationFailed(in));
}
}
}
@Override public void writeTo(DataOutput out) throws IOException {
out.writeInt(successfulShards);
out.writeInt(failedShards);
out.writeInt(shardFailures.size());
for (ShardOperationFailedException exp : shardFailures) {
exp.writeTo(out);
}
}
}

View File

@ -17,16 +17,28 @@
* under the License.
*/
package org.elasticsearch.action;
package org.elasticsearch.action.support.broadcast;
import org.elasticsearch.ElasticSearchWrapperException;
import org.elasticsearch.index.shard.IndexShardException;
import org.elasticsearch.index.shard.ShardId;
/**
* An exception indicating that a failure occurred performing an operation on the shard.
*
* @author kimchy (Shay Banon)
*/
public class ShardNotActiveException extends ShardOperationFailedException {
public class BroadcastShardOperationFailedException extends IndexShardException implements ElasticSearchWrapperException {
public ShardNotActiveException(ShardId shardId) {
super(shardId, "not active", null);
public BroadcastShardOperationFailedException(ShardId shardId, String msg) {
super(shardId, msg, null);
}
}
public BroadcastShardOperationFailedException(ShardId shardId, Throwable cause) {
super(shardId, "", cause);
}
public BroadcastShardOperationFailedException(ShardId shardId, String msg, Throwable cause) {
super(shardId, msg, cause);
}
}

View File

@ -21,8 +21,6 @@ package org.elasticsearch.action.support.broadcast;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ShardNotActiveException;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.BaseAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
@ -85,10 +83,12 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
protected abstract ShardResponse shardOperation(ShardRequest request) throws ElasticSearchException;
protected abstract boolean accumulateExceptions();
protected abstract GroupShardsIterator shards(Request request, ClusterState clusterState);
private boolean accumulateExceptions() {
return true;
}
private class AsyncBroadcastAction {
private final Request request;
@ -140,7 +140,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
}
} else {
// as if we have a "problem", so we iterate to the next one and maintain counts
onOperation(shard, shardIt, new ShardNotActiveException(shard.shardId()), false);
onOperation(shard, shardIt, new BroadcastShardOperationFailedException(shard.shardId(), "Not active"), false);
}
}
// we have local operations, perform them now
@ -176,7 +176,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
final ShardRouting shard = shardIt.next();
if (!shard.active()) {
// as if we have a "problem", so we iterate to the next one and maintain counts
onOperation(shard, shardIt, new ShardNotActiveException(shard.shardId()), false);
onOperation(shard, shardIt, new BroadcastShardOperationFailedException(shard.shardId(), "Not Active"), false);
} else {
final ShardRequest shardRequest = newShardRequest(shard, request);
if (shard.currentNodeId().equals(nodes.localNodeId())) {
@ -238,7 +238,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
// no more shards in this partition
int index = indexCounter.getAndIncrement();
if (accumulateExceptions()) {
shardsResponses.set(index, new ShardOperationFailedException(shard.shardId(), e));
shardsResponses.set(index, new BroadcastShardOperationFailedException(shard.shardId(), e));
}
if (expectedOps == counterOps.incrementAndGet()) {
finishHim(alreadyThreaded);

View File

@ -0,0 +1,44 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.replication;
import org.elasticsearch.ElasticSearchWrapperException;
import org.elasticsearch.index.shard.IndexShardException;
import org.elasticsearch.index.shard.ShardId;
/**
* An exception indicating that a failure occurred performing an operation on the shard.
*
* @author kimchy (Shay Banon)
*/
public class ReplicationShardOperationFailedException extends IndexShardException implements ElasticSearchWrapperException {
public ReplicationShardOperationFailedException(ShardId shardId, String msg) {
super(shardId, msg, null);
}
public ReplicationShardOperationFailedException(ShardId shardId, Throwable cause) {
super(shardId, "", cause);
}
public ReplicationShardOperationFailedException(ShardId shardId, String msg, Throwable cause) {
super(shardId, msg, cause);
}
}

View File

@ -23,7 +23,6 @@ import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.PrimaryNotStartedActionException;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.BaseAction;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
@ -217,7 +216,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
try {
shards = shards(request);
} catch (Exception e) {
listener.onFailure(new ShardOperationFailedException(shards.shardId(), e));
listener.onFailure(new ReplicationShardOperationFailedException(shards.shardId(), e));
return true;
}
@ -320,7 +319,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
// still in recovery, retry (we know that its not UNASSIGNED OR INITIALIZING since we are checking it in the calling method)
retryPrimary(fromDiscoveryListener, shard);
} catch (Exception e) {
listener.onFailure(new ShardOperationFailedException(shards.shardId(), e));
listener.onFailure(new ReplicationShardOperationFailedException(shards.shardId(), e));
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.terms;
import com.google.common.collect.Iterators;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
import java.io.DataInput;
@ -27,6 +28,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.action.terms.FieldTermsFreq.*;
@ -52,9 +54,9 @@ public class TermsResponse extends BroadcastOperationResponse implements Iterabl
TermsResponse() {
}
TermsResponse(int successfulShards, int failedShards, FieldTermsFreq[] fieldsTermsFreq,
TermsResponse(int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures, FieldTermsFreq[] fieldsTermsFreq,
long numDocs, long maxDoc, long numDeletedDocs) {
super(successfulShards, failedShards);
super(successfulShards, failedShards, shardFailures);
this.fieldsTermsFreq = fieldsTermsFreq;
this.numDocs = numDocs;
this.maxDoc = maxDoc;

View File

@ -25,7 +25,10 @@ import org.apache.lucene.index.TermDocs;
import org.apache.lucene.index.TermEnum;
import org.apache.lucene.util.StringHelper;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
@ -44,13 +47,11 @@ import org.elasticsearch.util.gnu.trove.TObjectIntIterator;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.NavigableSet;
import java.util.*;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.regex.Pattern;
import static com.google.common.collect.Lists.*;
import static org.elasticsearch.action.Actions.*;
/**
@ -68,16 +69,24 @@ public class TransportTermsAction extends TransportBroadcastOperationAction<Term
long numDocs = 0;
long maxDoc = 0;
long numDeletedDocs = 0;
List<ShardOperationFailedException> shardFailures = null;
ShardTermsResponse aggregator = null;
for (int i = 0; i < shardsResponses.length(); i++) {
ShardTermsResponse shardResponse = (ShardTermsResponse) shardsResponses.get(i);
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
failedShards++;
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {
shardFailures = newArrayList();
}
shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));
} else {
ShardTermsResponse shardTermsResponse = (ShardTermsResponse) shardResponse;
if (aggregator == null) {
aggregator = shardResponse;
aggregator = shardTermsResponse;
} else {
for (Map.Entry<String, TObjectIntHashMap<String>> entry : shardResponse.fieldsTermsFreqs().entrySet()) {
for (Map.Entry<String, TObjectIntHashMap<String>> entry : shardTermsResponse.fieldsTermsFreqs().entrySet()) {
String fieldName = entry.getKey();
TObjectIntHashMap<String> termsFreqs = aggregator.fieldsTermsFreqs().get(fieldName);
if (termsFreqs == null) {
@ -90,9 +99,9 @@ public class TransportTermsAction extends TransportBroadcastOperationAction<Term
}
}
}
numDocs += shardResponse.numDocs();
maxDoc += shardResponse.maxDoc();
numDeletedDocs += shardResponse.numDeletedDocs();
numDocs += shardTermsResponse.numDocs();
maxDoc += shardTermsResponse.maxDoc();
numDeletedDocs += shardTermsResponse.numDeletedDocs();
successfulShards++;
}
}
@ -122,7 +131,7 @@ public class TransportTermsAction extends TransportBroadcastOperationAction<Term
TermFreq[] freqs = entry.getValue().toArray(new TermFreq[entry.getValue().size()]);
resultFreqs[index++] = new FieldTermsFreq(entry.getKey(), freqs);
}
return new TermsResponse(successfulShards, failedShards, resultFreqs, numDocs, maxDoc, numDeletedDocs);
return new TermsResponse(successfulShards, failedShards, shardFailures, resultFreqs, numDocs, maxDoc, numDeletedDocs);
}
@Override protected ShardTermsResponse shardOperation(ShardTermsRequest request) throws ElasticSearchException {
@ -322,10 +331,6 @@ public class TransportTermsAction extends TransportBroadcastOperationAction<Term
return new ShardTermsResponse();
}
@Override protected boolean accumulateExceptions() {
return false;
}
@Override protected GroupShardsIterator shards(TermsRequest request, ClusterState clusterState) {
return indicesService.searchShards(clusterState, processIndices(clusterState, request.indices()), request.queryHint());
}

View File

@ -33,7 +33,11 @@ public class IndexException extends ElasticSearchException {
}
public IndexException(Index index, String msg, Throwable cause) {
super("Index[" + index.name() + "] " + msg, cause);
this(index, true, msg, cause);
}
protected IndexException(Index index, boolean withSpace, String msg, Throwable cause) {
super("[" + index.name() + "]" + (withSpace ? " " : "") + msg, cause);
this.index = index;
}

View File

@ -33,7 +33,7 @@ public class IndexShardException extends IndexException {
}
public IndexShardException(ShardId shardId, String msg, Throwable cause) {
super(shardId.index(), "Shard[" + shardId.id() + "] " + msg, cause);
super(shardId.index(), false, "[" + shardId.id() + "] " + msg, cause);
this.shardId = shardId;
}

View File

@ -30,6 +30,8 @@ import org.elasticsearch.util.settings.Settings;
import java.io.IOException;
import static org.elasticsearch.rest.RestResponse.Status.*;
/**
* @author kimchy (Shay Banon)
*/
@ -94,6 +96,10 @@ public class RestController extends AbstractComponent implements LifecycleCompon
public void dispatchRequest(final RestRequest request, final RestChannel channel) {
final RestHandler handler = getHandler(request);
if (handler == null) {
channel.sendResponse(new StringRestResponse(BAD_REQUEST, "No handler found for uri [" + request.uri() + "] and method [" + request.method() + "]"));
return;
}
try {
handler.handleRequest(request, channel);
} catch (Exception e) {

View File

@ -34,6 +34,7 @@ import org.elasticsearch.util.settings.Settings;
import java.io.IOException;
import static org.elasticsearch.rest.RestResponse.Status.*;
import static org.elasticsearch.rest.action.support.RestActions.*;
/**
* @author kimchy (Shay Banon)
@ -56,16 +57,14 @@ public class RestBroadcastPingAction extends BaseRestHandler {
}
broadcastPingRequest.operationThreading(operationThreading);
client.admin().cluster().execPing(broadcastPingRequest, new ActionListener<BroadcastPingResponse>() {
@Override public void onResponse(BroadcastPingResponse result) {
@Override public void onResponse(BroadcastPingResponse response) {
try {
JsonBuilder generator = RestJsonBuilder.cached(request);
generator.startObject()
.field("ok", true)
.field("totalShards", result.totalShards())
.field("successfulShards", result.successfulShards())
.field("failedShards", result.failedShards())
.endObject();
channel.sendResponse(new JsonRestResponse(request, OK, generator));
JsonBuilder builder = RestJsonBuilder.cached(request);
builder.startObject();
builder.field("ok", true);
buildBroadcastShardsHeader(builder, response);
builder.endObject();
channel.sendResponse(new JsonRestResponse(request, OK, builder));
} catch (Exception e) {
onFailure(e);
}

View File

@ -35,6 +35,7 @@ import java.io.IOException;
import static org.elasticsearch.rest.RestRequest.Method.*;
import static org.elasticsearch.rest.RestResponse.Status.*;
import static org.elasticsearch.rest.action.support.RestActions.*;
/**
* @author kimchy (Shay Banon)
@ -79,11 +80,7 @@ public class RestOptimizeAction extends BaseRestHandler {
builder.startObject();
builder.field("ok", true);
builder.startObject("_shards");
builder.field("total", response.totalShards());
builder.field("successful", response.successfulShards());
builder.field("failed", response.failedShards());
builder.endObject();
buildBroadcastShardsHeader(builder, response);
builder.endObject();
channel.sendResponse(new JsonRestResponse(request, OK, builder));

View File

@ -35,6 +35,7 @@ import java.io.IOException;
import static org.elasticsearch.rest.RestRequest.Method.*;
import static org.elasticsearch.rest.RestResponse.Status.*;
import static org.elasticsearch.rest.action.support.RestActions.*;
/**
* @author kimchy (Shay Banon)
@ -64,11 +65,7 @@ public class RestRefreshAction extends BaseRestHandler {
builder.startObject();
builder.field("ok", true);
builder.startObject("_shards");
builder.field("total", response.totalShards());
builder.field("successful", response.successfulShards());
builder.field("failed", response.failedShards());
builder.endObject();
buildBroadcastShardsHeader(builder, response);
builder.endObject();
channel.sendResponse(new JsonRestResponse(request, OK, builder));

View File

@ -64,11 +64,7 @@ public class RestIndicesStatusAction extends BaseRestHandler {
builder.startObject();
builder.field("ok", true);
builder.startObject("_shards");
builder.field("total", response.totalShards());
builder.field("successful", response.successfulShards());
builder.field("failed", response.failedShards());
builder.endObject();
buildBroadcastShardsHeader(builder, response);
builder.startObject("indices");
for (IndexStatus indexStatus : response.indices().values()) {

View File

@ -89,11 +89,7 @@ public class RestCountAction extends BaseRestHandler {
builder.startObject();
builder.field("count", response.count());
builder.startObject("_shards");
builder.field("total", response.totalShards());
builder.field("successful", response.successfulShards());
builder.field("failed", response.failedShards());
builder.endObject();
buildBroadcastShardsHeader(builder, response);
builder.endObject();
channel.sendResponse(new JsonRestResponse(request, OK, builder));

View File

@ -20,11 +20,15 @@
package org.elasticsearch.rest.action.support;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
import org.elasticsearch.index.query.json.JsonQueryBuilders;
import org.elasticsearch.index.query.json.QueryStringJsonQueryBuilder;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.util.Strings;
import org.elasticsearch.util.json.JsonBuilder;
import java.io.IOException;
import java.util.regex.Pattern;
/**
@ -43,6 +47,29 @@ public class RestActions {
nodesIdsPattern = Pattern.compile(",");
}
public static void buildBroadcastShardsHeader(JsonBuilder builder, BroadcastOperationResponse response) throws IOException {
builder.startObject("_shards");
builder.field("total", response.totalShards());
builder.field("successful", response.successfulShards());
builder.field("failed", response.failedShards());
if (!response.shardFailures().isEmpty()) {
builder.startArray("failures");
for (ShardOperationFailedException shardFailure : response.shardFailures()) {
builder.startObject();
if (shardFailure.index() != null) {
builder.field("index", shardFailure.index());
}
if (shardFailure.shardId() != -1) {
builder.field("shardId", shardFailure.shardId());
}
builder.field("reason", shardFailure.reason());
builder.endObject();
}
builder.endArray();
}
builder.endObject();
}
public static String parseQuerySource(RestRequest request) {
if (request.hasContent()) {
return request.contentAsString();

View File

@ -31,8 +31,7 @@ public class RestJsonBuilder {
public static JsonBuilder cached(RestRequest request) throws IOException {
JsonBuilder builder = JsonBuilder.jsonBuilder();
String prettyPrint = request.param("pretty");
if (prettyPrint != null && "true".equals(prettyPrint)) {
if (request.paramAsBoolean("pretty", false)) {
builder.prettyPrint();
}
return builder;

View File

@ -113,11 +113,7 @@ public class RestTermsAction extends BaseRestHandler {
JsonBuilder builder = RestJsonBuilder.cached(request);
builder.startObject();
builder.startObject("_shards");
builder.field("total", response.totalShards());
builder.field("successful", response.successfulShards());
builder.field("failed", response.failedShards());
builder.endObject();
buildBroadcastShardsHeader(builder, response);
builder.startObject("docs");
builder.field("numDocs", response.numDocs());