remove replica response from TransportShardReplicationOperation. not needed anymore

This commit is contained in:
Britta Weber 2015-05-04 10:52:20 +02:00
parent afdab84f2d
commit 8b7b2f3cdf
10 changed files with 56 additions and 134 deletions

View File

@ -60,7 +60,6 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import java.util.Map;
@ -68,7 +67,7 @@ import java.util.Map;
/**
* Performs the index operation.
*/
public class TransportShardBulkAction extends TransportShardReplicationOperationAction<BulkShardRequest, BulkShardRequest, BulkShardResponse, TransportResponse.Empty> {
public class TransportShardBulkAction extends TransportShardReplicationOperationAction<BulkShardRequest, BulkShardRequest, BulkShardResponse> {
private final static String OP_TYPE_UPDATE = "update";
private final static String OP_TYPE_DELETE = "delete";
@ -119,11 +118,6 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
return new BulkShardResponse();
}
@Override
protected TransportResponse.Empty newReplicaResponseInstance() {
return TransportResponse.Empty.INSTANCE;
}
@Override
protected boolean resolveIndex() {
return false;
@ -534,7 +528,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
@Override
protected TransportResponse.Empty shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id());
final BulkShardRequest request = shardRequest.request;
for (int i = 0; i < request.items().length; i++) {
@ -586,7 +580,6 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
// ignore
}
}
return newReplicaResponseInstance();
}
private void applyVersion(BulkItemRequest item, long version, VersionType versionType) {

View File

@ -43,13 +43,12 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
/**
* Performs the delete operation.
*/
public class TransportDeleteAction extends TransportShardReplicationOperationAction<DeleteRequest, DeleteRequest, DeleteResponse, TransportResponse.Empty> {
public class TransportDeleteAction extends TransportShardReplicationOperationAction<DeleteRequest, DeleteRequest, DeleteResponse> {
private final AutoCreateIndex autoCreateIndex;
@ -141,11 +140,6 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
return new DeleteResponse();
}
@Override
protected TransportResponse.Empty newReplicaResponseInstance() {
return TransportResponse.Empty.INSTANCE;
}
@Override
protected Tuple<DeleteResponse, DeleteRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
DeleteRequest request = shardRequest.request;
@ -171,7 +165,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
}
@Override
protected TransportResponse.Empty shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
DeleteRequest request = shardRequest.request;
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id());
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), request.versionType(), Engine.Operation.Origin.REPLICA);
@ -185,7 +179,6 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
// ignore
}
}
return newReplicaResponseInstance();
}
@Override

View File

@ -32,7 +32,6 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import java.util.Map;
@ -41,7 +40,7 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
/**
*/
public class TransportDeleteByQueryAction extends TransportIndicesReplicationOperationAction<DeleteByQueryRequest, DeleteByQueryResponse, IndexDeleteByQueryRequest, IndexDeleteByQueryResponse, ShardDeleteByQueryRequest, ShardDeleteByQueryResponse, TransportResponse.Empty> {
public class TransportDeleteByQueryAction extends TransportIndicesReplicationOperationAction<DeleteByQueryRequest, DeleteByQueryResponse, IndexDeleteByQueryRequest, IndexDeleteByQueryResponse, ShardDeleteByQueryRequest, ShardDeleteByQueryResponse> {
private final DestructiveOperations destructiveOperations;

View File

@ -30,14 +30,13 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse;
import java.util.List;
/**
* Internal transport action that broadcasts a delete by query request to all of the shards that belong to an index.
*/
public class TransportIndexDeleteByQueryAction extends TransportIndexReplicationOperationAction<IndexDeleteByQueryRequest, IndexDeleteByQueryResponse, ShardDeleteByQueryRequest, ShardDeleteByQueryResponse, TransportResponse.Empty> {
public class TransportIndexDeleteByQueryAction extends TransportIndexReplicationOperationAction<IndexDeleteByQueryRequest, IndexDeleteByQueryResponse, ShardDeleteByQueryRequest, ShardDeleteByQueryResponse> {
private static final String ACTION_NAME = DeleteByQueryAction.NAME + "[index]";

View File

@ -42,13 +42,12 @@ import org.elasticsearch.search.internal.DefaultSearchContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchLocalRequest;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
/**
*
*/
public class TransportShardDeleteByQueryAction extends TransportShardReplicationOperationAction<ShardDeleteByQueryRequest, ShardDeleteByQueryRequest, ShardDeleteByQueryResponse, TransportResponse.Empty> {
public class TransportShardDeleteByQueryAction extends TransportShardReplicationOperationAction<ShardDeleteByQueryRequest, ShardDeleteByQueryRequest, ShardDeleteByQueryResponse> {
public final static String DELETE_BY_QUERY_API = "delete_by_query";
@ -94,11 +93,6 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
return new ShardDeleteByQueryResponse();
}
@Override
protected TransportResponse.Empty newReplicaResponseInstance() {
return TransportResponse.Empty.INSTANCE;
}
@Override
protected boolean resolveIndex() {
return false;
@ -127,7 +121,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
@Override
protected TransportResponse.Empty shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
ShardDeleteByQueryRequest request = shardRequest.request;
IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
@ -144,7 +138,6 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
SearchContext.removeCurrent();
}
}
return newReplicaResponseInstance();
}
@Override

View File

@ -49,7 +49,6 @@ import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
/**
@ -62,7 +61,7 @@ import org.elasticsearch.transport.TransportService;
* <li><b>allowIdGeneration</b>: If the id is set not, should it be generated. Defaults to <tt>true</tt>.
* </ul>
*/
public class TransportIndexAction extends TransportShardReplicationOperationAction<IndexRequest, IndexRequest, IndexResponse, TransportResponse.Empty> {
public class TransportIndexAction extends TransportShardReplicationOperationAction<IndexRequest, IndexRequest, IndexResponse> {
private final AutoCreateIndex autoCreateIndex;
@ -157,11 +156,6 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
return new IndexResponse();
}
@Override
protected TransportResponse.Empty newReplicaResponseInstance() {
return TransportResponse.Empty.INSTANCE;
}
@Override
protected String executor() {
return ThreadPool.Names.INDEX;
@ -245,7 +239,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
}
@Override
protected TransportResponse.Empty shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id());
IndexRequest request = shardRequest.request;
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).type(request.type()).id(request.id())
@ -265,10 +259,5 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
// ignore
}
}
return newReplicaResponseInstance();
}
public String getReplicaActionName() {
return IndexAction.NAME + "[r]";
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
@ -59,16 +60,12 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
this.clusterService = clusterService;
this.transportService = transportService;
this.threadPool = threadPool;
this.transportShardAction = actionName + getShardOperationNameSuffix();
this.transportShardAction = actionName + "[s]";
this.executor = executor();
transportService.registerHandler(transportShardAction, new ShardTransportHandler());
}
public static String getShardOperationNameSuffix() {
return "[s]";
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
new AsyncBroadcastAction(request, listener).start();

View File

@ -36,7 +36,6 @@ import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse;
import java.util.ArrayList;
import java.util.Arrays;
@ -49,15 +48,15 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
* It relies on a shard sub-action that gets sent over the transport and executed on each of the shard.
* The index provided with the request is expected to be a concrete index, properly resolved by the callers (parent actions).
*/
public abstract class TransportIndexReplicationOperationAction<Request extends IndexReplicationOperationRequest, Response extends ActionResponse, ShardRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionWriteResponse, ReplicaResponse extends TransportResponse>
public abstract class TransportIndexReplicationOperationAction<Request extends IndexReplicationOperationRequest, Response extends ActionResponse, ShardRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionWriteResponse>
extends TransportAction<Request, Response> {
protected final ClusterService clusterService;
protected final TransportShardReplicationOperationAction<ShardRequest, ShardRequest, ShardResponse, ReplicaResponse> shardAction;
protected final TransportShardReplicationOperationAction<ShardRequest, ShardRequest, ShardResponse> shardAction;
protected TransportIndexReplicationOperationAction(Settings settings, String actionName, ClusterService clusterService,
ThreadPool threadPool, TransportShardReplicationOperationAction<ShardRequest, ShardRequest, ShardResponse, ReplicaResponse> shardAction, ActionFilters actionFilters) {
ThreadPool threadPool, TransportShardReplicationOperationAction<ShardRequest, ShardRequest, ShardResponse> shardAction, ActionFilters actionFilters) {
super(settings, actionName, threadPool, actionFilters);
this.clusterService = clusterService;
this.shardAction = shardAction;

View File

@ -32,7 +32,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import java.util.Map;
@ -43,15 +42,15 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
/**
*/
public abstract class TransportIndicesReplicationOperationAction<Request extends IndicesReplicationOperationRequest, Response extends ActionResponse, IndexRequest extends IndexReplicationOperationRequest, IndexResponse extends ActionResponse,
ShardRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionWriteResponse, ReplicaResponse extends TransportResponse>
ShardRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionWriteResponse>
extends TransportAction<Request, Response> {
protected final ClusterService clusterService;
protected final TransportIndexReplicationOperationAction<IndexRequest, IndexResponse, ShardRequest, ShardResponse, ReplicaResponse> indexAction;
protected final TransportIndexReplicationOperationAction<IndexRequest, IndexResponse, ShardRequest, ShardResponse> indexAction;
protected TransportIndicesReplicationOperationAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
TransportIndexReplicationOperationAction<IndexRequest, IndexResponse, ShardRequest, ShardResponse, ReplicaResponse> indexAction, ActionFilters actionFilters) {
TransportIndexReplicationOperationAction<IndexRequest, IndexResponse, ShardRequest, ShardResponse> indexAction, ActionFilters actionFilters) {
super(settings, actionName, threadPool, actionFilters);
this.clusterService = clusterService;
this.indexAction = indexAction;

View File

@ -59,14 +59,12 @@ import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
*/
public abstract class TransportShardReplicationOperationAction<Request extends ShardReplicationOperationRequest, ReplicaRequest extends ShardReplicationOperationRequest, Response extends ActionWriteResponse, ReplicaResponse extends TransportResponse> extends TransportAction<Request, Response> {
public abstract class TransportShardReplicationOperationAction<Request extends ShardReplicationOperationRequest, ReplicaRequest extends ShardReplicationOperationRequest, Response extends ActionWriteResponse> extends TransportAction<Request, Response> {
protected final TransportService transportService;
protected final ClusterService clusterService;
@ -88,22 +86,18 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
this.indicesService = indicesService;
this.shardStateAction = shardStateAction;
this.transportReplicaAction = actionName + getReplicaOperationNameSuffix();
this.transportReplicaAction = actionName + "[r]";
this.executor = executor();
this.checkWriteConsistency = checkWriteConsistency();
transportService.registerHandler(actionName, new OperationTransportHandler());
transportService.registerHandler(transportReplicaAction, new ReplicaOperationTransportHandler());
transportService.registerHandler(transportReplicaAction, new ReplicaOperationTransportHandler());
this.transportOptions = transportOptions();
this.defaultWriteConsistencyLevel = WriteConsistencyLevel.fromString(settings.get("action.write_consistency", "quorum"));
}
public static String getReplicaOperationNameSuffix() {
return "[r]";
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
new AsyncShardOperationAction(request, listener).start();
@ -115,17 +109,15 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
protected abstract Response newResponseInstance();
protected abstract ReplicaResponse newReplicaResponseInstance();
protected abstract String executor();
/**
* @return A tuple containing not null values, as first value the result of the primary operation and as second value
* the request to be executed on the replica shards.
* @return A tuple containing not null values, as first value the result of the primary operation and as second value
* the request to be executed on the replica shards.
*/
protected abstract Tuple<Response, ReplicaRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable;
protected abstract Tuple<Response, ReplicaRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable;
protected abstract ReplicaResponse shardOperationOnReplica(ReplicaOperationRequest shardRequest);
protected abstract void shardOperationOnReplica(ReplicaOperationRequest shardRequest);
protected abstract ShardIterator shards(ClusterState clusterState, InternalRequest request) throws ElasticsearchException;
@ -224,7 +216,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
}
private class ReplicaOperationTransportHandler extends BaseTransportRequestHandler<ReplicaOperationRequest> {
class ReplicaOperationTransportHandler extends BaseTransportRequestHandler<ReplicaOperationRequest> {
@Override
public ReplicaOperationRequest newInstance() {
@ -244,14 +236,13 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
@Override
public void messageReceived(final ReplicaOperationRequest request, final TransportChannel channel) throws Exception {
ReplicaResponse response;
try {
response = shardOperationOnReplica(request);
shardOperationOnReplica(request);
} catch (Throwable t) {
failReplicaIfNeeded(request.shardId.getIndex(), request.shardId.id(), t);
throw t;
}
channel.sendResponse(response);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
@ -269,20 +260,14 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
public ShardId shardId;
public ReplicaRequest request;
private String nodeId;
ReplicaOperationRequest() {
}
public String getNodeId() {
return nodeId;
}
ReplicaOperationRequest(ShardId shardId, ReplicaRequest request, String nodeId) {
ReplicaOperationRequest(ShardId shardId, ReplicaRequest request) {
super(request);
this.shardId = shardId;
this.request = request;
this.nodeId = nodeId;
}
@Override
@ -307,7 +292,6 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
//older nodes will send the concrete index as part of the request
shardId = new ShardId(request.index(), shard);
}
nodeId = in.readString();
}
@Override
@ -315,12 +299,6 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
super.writeTo(out);
shardId.writeTo(out);
request.writeTo(out);
out.writeString(nodeId);
}
public ReplicaOperationRequest setNodeId(String nodeId) {
this.nodeId = nodeId;
return this;
}
}
@ -635,7 +613,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
return;
}
final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId(), state.replicaRequest(), shard.currentNodeId());
final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId(), state.replicaRequest());
// If the replicas use shadow replicas, there is no reason to
// perform the action on the replica, so skip it and
@ -645,41 +623,31 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
// to wait until they get the new mapping through the cluster
// state, which is why we recommend pre-defined mappings for
// indices using shadow replicas
state.onReplicaSuccess(newReplicaResponseInstance());
state.onReplicaSuccess();
return;
}
if (!nodeId.equals(observer.observedState().nodes().localNodeId())) {
final DiscoveryNode node = observer.observedState().nodes().get(nodeId);
transportService.sendRequest(node, transportReplicaAction, shardRequest,
transportOptions, new TransportResponseHandler<ReplicaResponse>() {
@Override
public ReplicaResponse newInstance() {
return newReplicaResponseInstance();
}
transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty vResponse) {
state.onReplicaSuccess();
}
@Override
public void handleResponse(ReplicaResponse vResponse) {
state.onReplicaSuccess(vResponse);
}
@Override
public void handleException(TransportException exp) {
state.onReplicaFailure(nodeId, exp);
logger.trace("[{}] Transport failure during replica request [{}] ", exp, node, internalRequest.request());
if (!ignoreReplicaException(exp)) {
logger.warn("Failed to perform " + actionName + " on remote replica " + node + shardIt.shardId(), exp);
shardStateAction.shardFailed(shard, indexMetaData.getUUID(),
"Failed to perform [" + actionName + "] on replica, message [" + ExceptionsHelper.detailedMessage(exp) + "]");
}
}
@Override
public void handleException(TransportException exp) {
state.onReplicaFailure(nodeId, exp);
logger.trace("[{}] Transport failure during replica request [{}] ", exp, node, internalRequest.request());
if (!ignoreReplicaException(exp)) {
logger.warn("Failed to perform " + actionName + " on remote replica " + node + shardIt.shardId(), exp);
shardStateAction.shardFailed(shard, indexMetaData.getUUID(),
"Failed to perform [" + actionName + "] on replica, message [" + ExceptionsHelper.detailedMessage(exp) + "]");
}
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
});
} else {
if (internalRequest.request().operationThreaded()) {
try {
@ -687,8 +655,8 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
@Override
protected void doRun() {
try {
ReplicaResponse response = shardOperationOnReplica(shardRequest);
state.onReplicaSuccess(response);
shardOperationOnReplica(shardRequest);
state.onReplicaSuccess();
} catch (Throwable e) {
state.onReplicaFailure(nodeId, e);
failReplicaIfNeeded(shard.index(), shard.id(), e);
@ -712,8 +680,8 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
} else {
try {
ReplicaResponse response = shardOperationOnReplica(shardRequest);
state.onReplicaSuccess(response);
shardOperationOnReplica(shardRequest);
state.onReplicaSuccess();
} catch (Throwable e) {
failReplicaIfNeeded(shard.index(), shard.id(), e);
state.onReplicaFailure(nodeId, e);
@ -735,11 +703,11 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
final int sizeActive;
final int requiredNumber;
IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(shard.index());
IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(shard.index());
if (indexRoutingTable != null) {
IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shard.getId());
if (shardRoutingTable != null) {
sizeActive = shardRoutingTable.activeShards().size();
sizeActive = shardRoutingTable.activeShards().size();
if (consistencyLevel == WriteConsistencyLevel.QUORUM && shardRoutingTable.getSize() > 2) {
// only for more than 2 in the number of shardIt it makes sense, otherwise its 1 shard with 1 replica, quorum is 1 (which is what it is initialized to)
requiredNumber = (shardRoutingTable.getSize() / 2) + 1;
@ -770,7 +738,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
void retryBecauseUnavailable(ShardId shardId, String message) {
retry(new UnavailableShardsException(shardId, message + " Timeout: [" + internalRequest.request().timeout() + "], request: " + internalRequest.request().toString()));
retry(new UnavailableShardsException(shardId, message + " Timeout: [" + internalRequest.request().timeout() +"], request: " + internalRequest.request().toString()));
}
}
@ -802,8 +770,6 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
private final AtomicBoolean finished = new AtomicBoolean(false);
private final AtomicInteger success = new AtomicInteger(1); // We already wrote into the primary shard
private final ConcurrentMap<String, Throwable> shardReplicaFailures = ConcurrentCollections.newConcurrentMap();
// nocommit the Broadcast operations use AtomicReferencArray, Boaz wants to figure out why, this here is just a hack
private final CopyOnWriteArrayList<ReplicaResponse> replicaResponses = new CopyOnWriteArrayList<>();
private final AtomicInteger pending;
private final int numberOfShardInstances;
@ -834,9 +800,8 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
finishIfNeeded();
}
public void onReplicaSuccess(ReplicaResponse replicaResponse) {
public void onReplicaSuccess() {
success.incrementAndGet();
replicaResponses.add(replicaResponse);
finishIfNeeded();
}
@ -874,16 +839,12 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
)
);
listener.onResponse(onAllReplicasResponded(finalResponse, replicaResponses));
listener.onResponse(finalResponse);
}
}
}
protected Response onAllReplicasResponded(Response finalResponse, CopyOnWriteArrayList<ReplicaResponse> replicaResponses) {
return finalResponse;
}
/**
* Internal request class that gets built on each node. Holds the original request plus additional info.
*/