sync commit actions

This commit is contained in:
Britta Weber 2015-04-22 14:16:08 +02:00
parent 7bcf7ba218
commit 3633b83b1f
32 changed files with 1217 additions and 156 deletions

View File

@ -60,6 +60,7 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import java.util.Map;
@ -67,7 +68,7 @@ import java.util.Map;
/**
* Performs the index operation.
*/
public class TransportShardBulkAction extends TransportShardReplicationOperationAction<BulkShardRequest, BulkShardRequest, BulkShardResponse> {
public class TransportShardBulkAction extends TransportShardReplicationOperationAction<BulkShardRequest, BulkShardRequest, BulkShardResponse, TransportResponse.Empty> {
private final static String OP_TYPE_UPDATE = "update";
private final static String OP_TYPE_DELETE = "delete";
@ -118,6 +119,11 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
return new BulkShardResponse();
}
@Override
protected TransportResponse.Empty newReplicaResponseInstance() {
return TransportResponse.Empty.INSTANCE;
}
@Override
protected boolean resolveIndex() {
return false;
@ -528,7 +534,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
@Override
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
protected TransportResponse.Empty shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id());
final BulkShardRequest request = shardRequest.request;
for (int i = 0; i < request.items().length; i++) {
@ -580,6 +586,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
// ignore
}
}
return newReplicaResponseInstance();
}
private void applyVersion(BulkItemRequest item, long version, VersionType versionType) {

View File

@ -43,12 +43,13 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
/**
* Performs the delete operation.
*/
public class TransportDeleteAction extends TransportShardReplicationOperationAction<DeleteRequest, DeleteRequest, DeleteResponse> {
public class TransportDeleteAction extends TransportShardReplicationOperationAction<DeleteRequest, DeleteRequest, DeleteResponse, TransportResponse.Empty> {
private final AutoCreateIndex autoCreateIndex;
@ -140,6 +141,11 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
return new DeleteResponse();
}
@Override
protected TransportResponse.Empty newReplicaResponseInstance() {
return TransportResponse.Empty.INSTANCE;
}
@Override
protected Tuple<DeleteResponse, DeleteRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
DeleteRequest request = shardRequest.request;
@ -165,7 +171,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
}
@Override
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
protected TransportResponse.Empty shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
DeleteRequest request = shardRequest.request;
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id());
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), request.versionType(), Engine.Operation.Origin.REPLICA);
@ -179,6 +185,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
// ignore
}
}
return newReplicaResponseInstance();
}
@Override

View File

@ -32,6 +32,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import java.util.Map;
@ -40,7 +41,7 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
/**
*/
public class TransportDeleteByQueryAction extends TransportIndicesReplicationOperationAction<DeleteByQueryRequest, DeleteByQueryResponse, IndexDeleteByQueryRequest, IndexDeleteByQueryResponse, ShardDeleteByQueryRequest, ShardDeleteByQueryResponse> {
public class TransportDeleteByQueryAction extends TransportIndicesReplicationOperationAction<DeleteByQueryRequest, DeleteByQueryResponse, IndexDeleteByQueryRequest, IndexDeleteByQueryResponse, ShardDeleteByQueryRequest, ShardDeleteByQueryResponse, TransportResponse.Empty> {
private final DestructiveOperations destructiveOperations;

View File

@ -30,13 +30,14 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse;
import java.util.List;
/**
* Internal transport action that broadcasts a delete by query request to all of the shards that belong to an index.
*/
public class TransportIndexDeleteByQueryAction extends TransportIndexReplicationOperationAction<IndexDeleteByQueryRequest, IndexDeleteByQueryResponse, ShardDeleteByQueryRequest, ShardDeleteByQueryResponse> {
public class TransportIndexDeleteByQueryAction extends TransportIndexReplicationOperationAction<IndexDeleteByQueryRequest, IndexDeleteByQueryResponse, ShardDeleteByQueryRequest, ShardDeleteByQueryResponse, TransportResponse.Empty> {
private static final String ACTION_NAME = DeleteByQueryAction.NAME + "[index]";

View File

@ -42,12 +42,13 @@ import org.elasticsearch.search.internal.DefaultSearchContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchLocalRequest;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
/**
*
*/
public class TransportShardDeleteByQueryAction extends TransportShardReplicationOperationAction<ShardDeleteByQueryRequest, ShardDeleteByQueryRequest, ShardDeleteByQueryResponse> {
public class TransportShardDeleteByQueryAction extends TransportShardReplicationOperationAction<ShardDeleteByQueryRequest, ShardDeleteByQueryRequest, ShardDeleteByQueryResponse, TransportResponse.Empty> {
public final static String DELETE_BY_QUERY_API = "delete_by_query";
@ -93,6 +94,11 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
return new ShardDeleteByQueryResponse();
}
@Override
protected TransportResponse.Empty newReplicaResponseInstance() {
return TransportResponse.Empty.INSTANCE;
}
@Override
protected boolean resolveIndex() {
return false;
@ -121,7 +127,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
@Override
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
protected TransportResponse.Empty shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
ShardDeleteByQueryRequest request = shardRequest.request;
IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
@ -138,6 +144,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
SearchContext.removeCurrent();
}
}
return newReplicaResponseInstance();
}
@Override

View File

@ -49,6 +49,7 @@ import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
/**
@ -61,7 +62,7 @@ import org.elasticsearch.transport.TransportService;
* <li><b>allowIdGeneration</b>: If the id is set not, should it be generated. Defaults to <tt>true</tt>.
* </ul>
*/
public class TransportIndexAction extends TransportShardReplicationOperationAction<IndexRequest, IndexRequest, IndexResponse> {
public class TransportIndexAction extends TransportShardReplicationOperationAction<IndexRequest, IndexRequest, IndexResponse, TransportResponse.Empty> {
private final AutoCreateIndex autoCreateIndex;
@ -156,6 +157,11 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
return new IndexResponse();
}
@Override
protected TransportResponse.Empty newReplicaResponseInstance() {
return TransportResponse.Empty.INSTANCE;
}
@Override
protected String executor() {
return ThreadPool.Names.INDEX;
@ -239,7 +245,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
}
@Override
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
protected TransportResponse.Empty shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id());
IndexRequest request = shardRequest.request;
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).type(request.type()).id(request.id())
@ -259,5 +265,10 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
// ignore
}
}
return newReplicaResponseInstance();
}
public String getReplicaActionName() {
return IndexAction.NAME + "[r]";
}
}

View File

@ -24,7 +24,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
@ -60,12 +59,16 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
this.clusterService = clusterService;
this.transportService = transportService;
this.threadPool = threadPool;
this.transportShardAction = actionName + "[s]";
this.transportShardAction = actionName + getShardOperationNameSuffix();
this.executor = executor();
transportService.registerHandler(transportShardAction, new ShardTransportHandler());
}
public static String getShardOperationNameSuffix() {
return "[s]";
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
new AsyncBroadcastAction(request, listener).start();

View File

@ -36,6 +36,7 @@ import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse;
import java.util.ArrayList;
import java.util.Arrays;
@ -48,15 +49,15 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
* It relies on a shard sub-action that gets sent over the transport and executed on each of the shard.
* The index provided with the request is expected to be a concrete index, properly resolved by the callers (parent actions).
*/
public abstract class TransportIndexReplicationOperationAction<Request extends IndexReplicationOperationRequest, Response extends ActionResponse, ShardRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionWriteResponse>
public abstract class TransportIndexReplicationOperationAction<Request extends IndexReplicationOperationRequest, Response extends ActionResponse, ShardRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionWriteResponse, ReplicaResponse extends TransportResponse>
extends TransportAction<Request, Response> {
protected final ClusterService clusterService;
protected final TransportShardReplicationOperationAction<ShardRequest, ShardRequest, ShardResponse> shardAction;
protected final TransportShardReplicationOperationAction<ShardRequest, ShardRequest, ShardResponse, ReplicaResponse> shardAction;
protected TransportIndexReplicationOperationAction(Settings settings, String actionName, ClusterService clusterService,
ThreadPool threadPool, TransportShardReplicationOperationAction<ShardRequest, ShardRequest, ShardResponse> shardAction, ActionFilters actionFilters) {
ThreadPool threadPool, TransportShardReplicationOperationAction<ShardRequest, ShardRequest, ShardResponse, ReplicaResponse> shardAction, ActionFilters actionFilters) {
super(settings, actionName, threadPool, actionFilters);
this.clusterService = clusterService;
this.shardAction = shardAction;

View File

@ -32,6 +32,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import java.util.Map;
@ -42,15 +43,15 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
/**
*/
public abstract class TransportIndicesReplicationOperationAction<Request extends IndicesReplicationOperationRequest, Response extends ActionResponse, IndexRequest extends IndexReplicationOperationRequest, IndexResponse extends ActionResponse,
ShardRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionWriteResponse>
ShardRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionWriteResponse, ReplicaResponse extends TransportResponse>
extends TransportAction<Request, Response> {
protected final ClusterService clusterService;
protected final TransportIndexReplicationOperationAction<IndexRequest, IndexResponse, ShardRequest, ShardResponse> indexAction;
protected final TransportIndexReplicationOperationAction<IndexRequest, IndexResponse, ShardRequest, ShardResponse, ReplicaResponse> indexAction;
protected TransportIndicesReplicationOperationAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
TransportIndexReplicationOperationAction<IndexRequest, IndexResponse, ShardRequest, ShardResponse> indexAction, ActionFilters actionFilters) {
TransportIndexReplicationOperationAction<IndexRequest, IndexResponse, ShardRequest, ShardResponse, ReplicaResponse> indexAction, ActionFilters actionFilters) {
super(settings, actionName, threadPool, actionFilters);
this.clusterService = clusterService;
this.indexAction = indexAction;

View File

@ -59,12 +59,14 @@ import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
*/
public abstract class TransportShardReplicationOperationAction<Request extends ShardReplicationOperationRequest, ReplicaRequest extends ShardReplicationOperationRequest, Response extends ActionWriteResponse> extends TransportAction<Request, Response> {
public abstract class TransportShardReplicationOperationAction<Request extends ShardReplicationOperationRequest, ReplicaRequest extends ShardReplicationOperationRequest, Response extends ActionWriteResponse, ReplicaResponse extends TransportResponse> extends TransportAction<Request, Response> {
protected final TransportService transportService;
protected final ClusterService clusterService;
@ -86,18 +88,22 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
this.indicesService = indicesService;
this.shardStateAction = shardStateAction;
this.transportReplicaAction = actionName + "[r]";
this.transportReplicaAction = actionName + getReplicaOperationNameSuffix();
this.executor = executor();
this.checkWriteConsistency = checkWriteConsistency();
transportService.registerHandler(actionName, new OperationTransportHandler());
transportService.registerHandler(transportReplicaAction, new ReplicaOperationTransportHandler());
transportService.registerHandler(transportReplicaAction, new ReplicaOperationTransportHandler());
this.transportOptions = transportOptions();
this.defaultWriteConsistencyLevel = WriteConsistencyLevel.fromString(settings.get("action.write_consistency", "quorum"));
}
public static String getReplicaOperationNameSuffix() {
return "[r]";
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
new AsyncShardOperationAction(request, listener).start();
@ -109,15 +115,17 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
protected abstract Response newResponseInstance();
protected abstract ReplicaResponse newReplicaResponseInstance();
protected abstract String executor();
/**
* @return A tuple containing not null values, as first value the result of the primary operation and as second value
* the request to be executed on the replica shards.
* @return A tuple containing not null values, as first value the result of the primary operation and as second value
* the request to be executed on the replica shards.
*/
protected abstract Tuple<Response, ReplicaRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable;
protected abstract Tuple<Response, ReplicaRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable;
protected abstract void shardOperationOnReplica(ReplicaOperationRequest shardRequest);
protected abstract ReplicaResponse shardOperationOnReplica(ReplicaOperationRequest shardRequest);
protected abstract ShardIterator shards(ClusterState clusterState, InternalRequest request) throws ElasticsearchException;
@ -216,7 +224,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
}
class ReplicaOperationTransportHandler extends BaseTransportRequestHandler<ReplicaOperationRequest> {
private class ReplicaOperationTransportHandler extends BaseTransportRequestHandler<ReplicaOperationRequest> {
@Override
public ReplicaOperationRequest newInstance() {
@ -236,13 +244,14 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
@Override
public void messageReceived(final ReplicaOperationRequest request, final TransportChannel channel) throws Exception {
ReplicaResponse response;
try {
shardOperationOnReplica(request);
response = shardOperationOnReplica(request);
} catch (Throwable t) {
failReplicaIfNeeded(request.shardId.getIndex(), request.shardId.id(), t);
throw t;
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
channel.sendResponse(response);
}
}
@ -260,14 +269,20 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
public ShardId shardId;
public ReplicaRequest request;
private String nodeId;
ReplicaOperationRequest() {
}
ReplicaOperationRequest(ShardId shardId, ReplicaRequest request) {
public String getNodeId() {
return nodeId;
}
ReplicaOperationRequest(ShardId shardId, ReplicaRequest request, String nodeId) {
super(request);
this.shardId = shardId;
this.request = request;
this.nodeId = nodeId;
}
@Override
@ -292,6 +307,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
//older nodes will send the concrete index as part of the request
shardId = new ShardId(request.index(), shard);
}
nodeId = in.readString();
}
@Override
@ -299,6 +315,12 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
super.writeTo(out);
shardId.writeTo(out);
request.writeTo(out);
out.writeString(nodeId);
}
public ReplicaOperationRequest setNodeId(String nodeId) {
this.nodeId = nodeId;
return this;
}
}
@ -613,7 +635,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
return;
}
final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId(), state.replicaRequest());
final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId(), state.replicaRequest(), shard.currentNodeId());
// If the replicas use shadow replicas, there is no reason to
// perform the action on the replica, so skip it and
@ -623,31 +645,41 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
// to wait until they get the new mapping through the cluster
// state, which is why we recommend pre-defined mappings for
// indices using shadow replicas
state.onReplicaSuccess();
state.onReplicaSuccess(newReplicaResponseInstance());
return;
}
if (!nodeId.equals(observer.observedState().nodes().localNodeId())) {
final DiscoveryNode node = observer.observedState().nodes().get(nodeId);
transportService.sendRequest(node, transportReplicaAction, shardRequest,
transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty vResponse) {
state.onReplicaSuccess();
}
transportOptions, new TransportResponseHandler<ReplicaResponse>() {
@Override
public ReplicaResponse newInstance() {
return newReplicaResponseInstance();
}
@Override
public void handleException(TransportException exp) {
state.onReplicaFailure(nodeId, exp);
logger.trace("[{}] Transport failure during replica request [{}] ", exp, node, internalRequest.request());
if (!ignoreReplicaException(exp)) {
logger.warn("Failed to perform " + actionName + " on remote replica " + node + shardIt.shardId(), exp);
shardStateAction.shardFailed(shard, indexMetaData.getUUID(),
"Failed to perform [" + actionName + "] on replica, message [" + ExceptionsHelper.detailedMessage(exp) + "]");
}
}
@Override
public void handleResponse(ReplicaResponse vResponse) {
state.onReplicaSuccess(vResponse);
}
});
@Override
public void handleException(TransportException exp) {
state.onReplicaFailure(nodeId, exp);
logger.trace("[{}] Transport failure during replica request [{}] ", exp, node, internalRequest.request());
if (!ignoreReplicaException(exp)) {
logger.warn("Failed to perform " + actionName + " on remote replica " + node + shardIt.shardId(), exp);
shardStateAction.shardFailed(shard, indexMetaData.getUUID(),
"Failed to perform [" + actionName + "] on replica, message [" + ExceptionsHelper.detailedMessage(exp) + "]");
}
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
} else {
if (internalRequest.request().operationThreaded()) {
try {
@ -655,8 +687,8 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
@Override
protected void doRun() {
try {
shardOperationOnReplica(shardRequest);
state.onReplicaSuccess();
ReplicaResponse response = shardOperationOnReplica(shardRequest);
state.onReplicaSuccess(response);
} catch (Throwable e) {
state.onReplicaFailure(nodeId, e);
failReplicaIfNeeded(shard.index(), shard.id(), e);
@ -680,8 +712,8 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
} else {
try {
shardOperationOnReplica(shardRequest);
state.onReplicaSuccess();
ReplicaResponse response = shardOperationOnReplica(shardRequest);
state.onReplicaSuccess(response);
} catch (Throwable e) {
failReplicaIfNeeded(shard.index(), shard.id(), e);
state.onReplicaFailure(nodeId, e);
@ -703,11 +735,11 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
final int sizeActive;
final int requiredNumber;
IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(shard.index());
IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(shard.index());
if (indexRoutingTable != null) {
IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shard.getId());
if (shardRoutingTable != null) {
sizeActive = shardRoutingTable.activeShards().size();
sizeActive = shardRoutingTable.activeShards().size();
if (consistencyLevel == WriteConsistencyLevel.QUORUM && shardRoutingTable.getSize() > 2) {
// only for more than 2 in the number of shardIt it makes sense, otherwise its 1 shard with 1 replica, quorum is 1 (which is what it is initialized to)
requiredNumber = (shardRoutingTable.getSize() / 2) + 1;
@ -738,7 +770,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
void retryBecauseUnavailable(ShardId shardId, String message) {
retry(new UnavailableShardsException(shardId, message + " Timeout: [" + internalRequest.request().timeout() +"], request: " + internalRequest.request().toString()));
retry(new UnavailableShardsException(shardId, message + " Timeout: [" + internalRequest.request().timeout() + "], request: " + internalRequest.request().toString()));
}
}
@ -770,6 +802,8 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
private final AtomicBoolean finished = new AtomicBoolean(false);
private final AtomicInteger success = new AtomicInteger(1); // We already wrote into the primary shard
private final ConcurrentMap<String, Throwable> shardReplicaFailures = ConcurrentCollections.newConcurrentMap();
// nocommit the Broadcast operations use AtomicReferencArray, Boaz wants to figure out why, this here is just a hack
private final CopyOnWriteArrayList<ReplicaResponse> replicaResponses = new CopyOnWriteArrayList<>();
private final AtomicInteger pending;
private final int numberOfShardInstances;
@ -800,8 +834,9 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
finishIfNeeded();
}
public void onReplicaSuccess() {
public void onReplicaSuccess(ReplicaResponse replicaResponse) {
success.incrementAndGet();
replicaResponses.add(replicaResponse);
finishIfNeeded();
}
@ -839,12 +874,16 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
)
);
listener.onResponse(finalResponse);
listener.onResponse(onAllReplicasResponded(finalResponse, replicaResponses));
}
}
}
protected Response onAllReplicasResponded(Response finalResponse, CopyOnWriteArrayList<ReplicaResponse> replicaResponses) {
return finalResponse;
}
/**
* Internal request class that gets built on each node. Holds the original request plus additional info.
*/

View File

@ -352,7 +352,7 @@ public interface IndicesAdminClient extends ElasticsearchClient<IndicesAdminClie
* @param listener A listener to be notified with a result
* @see org.elasticsearch.client.Requests#flushRequest(String...)
*/
void flush(FlushRequest request, ActionListener<FlushResponse> listener);
void flush(FlushRequest request, ActionListener <FlushResponse> listener);
/**
* Explicitly flush one or more indices (releasing memory from the node).

View File

@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing;
import com.carrotsearch.hppc.IntSet;
import com.google.common.collect.*;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
@ -195,7 +196,7 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
/**
* Return GroupShardsIterator where each assigned shard routing has it's own shard iterator.
*
* @param includeEmpty if true, a shard iterator will be added for non-assigned shards as well
* @param includeEmpty if true, a shard iterator will be added for non-assigned shards as well
* @param includeRelocationTargets if true, an <b>extra</b> shard iterator will be added for relocating shards. The extra
* iterator contains a single ShardRouting pointing at the relocating target
*/
@ -254,6 +255,35 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
return new GroupShardsIterator(set);
}
/**
* All the shard copies for the provided shard id grouped. Each group is a single element, consisting
* either of the primary shard of one replica.
*
* @param shardId the shard id for the copies we want
* @return All the shard copies (primary and replicas) for the shardId
* @throws IndexMissingException If an index passed does not exists
* @see IndexRoutingTable#groupByAllIt()
*/
public GroupShardsIterator allActiveShardCopiesGrouped(ShardId shardId) throws IndexMissingException {
// use list here since we need to maintain identity across shards
ArrayList<ShardIterator> set = new ArrayList<>();
IndexRoutingTable indexRoutingTable = index(shardId.index().name());
if (indexRoutingTable == null) {
throw new IndexMissingException(new Index(shardId.index().name()));
}
IndexShardRoutingTable copiesRoutingTable = indexRoutingTable.shard(shardId.id());
if (copiesRoutingTable != null) {
for (ShardRouting shardRouting : copiesRoutingTable) {
if (shardRouting.active()) {
set.add(shardRouting.shardsIt());
}
}
} else {
throw new ElasticsearchIllegalStateException(shardId + " does not exist");
}
return new GroupShardsIterator(set);
}
public static Builder builder() {
return new Builder();
}

View File

@ -419,39 +419,48 @@ public abstract class StreamInput extends InputStream {
public int[] readIntArray() throws IOException {
int length = readVInt();
int[] values = new int[length];
for(int i=0; i<length; i++) {
for (int i = 0; i < length; i++) {
values[i] = readInt();
}
return values;
}
public long[] readLongArray() throws IOException {
int length = readVInt();
long[] values = new long[length];
for(int i=0; i<length; i++) {
for (int i = 0; i < length; i++) {
values[i] = readLong();
}
return values;
}
public float[] readFloatArray() throws IOException {
int length = readVInt();
float[] values = new float[length];
for(int i=0; i<length; i++) {
for (int i = 0; i < length; i++) {
values[i] = readFloat();
}
return values;
}
public double[] readDoubleArray() throws IOException {
int length = readVInt();
double[] values = new double[length];
for(int i=0; i<length; i++) {
for (int i = 0; i < length; i++) {
values[i] = readDouble();
}
return values;
}
public byte[] readByteArray() throws IOException {
int length = readVInt();
byte[] values = new byte[length];
for (int i = 0; i < length; i++) {
values[i] = readByte();
}
return values;
}
/**
* Serializes a potential null value.
*/

View File

@ -95,6 +95,16 @@ public abstract class StreamOutput extends OutputStream {
*/
public abstract void writeBytes(byte[] b, int offset, int length) throws IOException;
/**
* Writes an array of bytes.
*
* @param b the bytes to write
*/
public void writeByteArray(byte[] b) throws IOException {
writeVInt(b.length);
writeBytes(b, 0, b.length);
}
/**
* Writes the bytes reference, including a length header.
*/

View File

@ -599,7 +599,7 @@ public class InternalEngine extends Engine {
@Override
public boolean syncFlushIfNoPendingChanges(String syncId, byte[] expectedCommitId) throws EngineException {
// best effort attempt before we aquire locks
// best effort attempt before we acquire locks
ensureOpen();
if (indexWriter.hasUncommittedChanges()) {
logger.trace("can't sync commit [{}]. have pending changes", syncId);
@ -627,6 +627,7 @@ public class InternalEngine extends Engine {
indexWriter.setCommitData(commitData);
commitIndexWriter(indexWriter);
logger.debug("successfully sync committed. sync id [{}].", syncId);
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
return true;
} catch (IOException ex) {
maybeFailEngine("sync commit", ex);

View File

@ -687,17 +687,22 @@ public class IndexShard extends AbstractIndexShardComponent {
return engine().syncFlushIfNoPendingChanges(syncId, expectedCommitId);
}
public void flush(FlushRequest request) throws ElasticsearchException {
public byte[] flush(FlushRequest request) throws ElasticsearchException {
boolean waitIfOngoing = request.waitIfOngoing();
boolean force = request.force();
if (logger.isTraceEnabled()) {
logger.trace("flush with {}", request);
}
// we allows flush while recovering, since we allow for operations to happen
// while recovering, and we want to keep the translog at bay (up to deletes, which
// we don't gc).
verifyStartedOrRecovering();
if (logger.isTraceEnabled()) {
logger.trace("flush with {}", request);
}
long time = System.nanoTime();
engine().flush(request.force(), request.waitIfOngoing());
byte[] commitId = engine().flush(force, waitIfOngoing);
flushMetric.inc(System.nanoTime() - time);
return commitId;
}
public void optimize(OptimizeRequest optimize) throws ElasticsearchException {

View File

@ -38,6 +38,7 @@ import org.elasticsearch.indices.recovery.RecoverySource;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import org.elasticsearch.indices.syncedflush.SyncedFlushService;
import org.elasticsearch.indices.ttl.IndicesTTLService;
/**

View File

@ -1,39 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
public class SyncedFlushService extends AbstractComponent {
private final IndicesService indicesService;
@Inject
public SyncedFlushService(Settings settings, IndicesService indicesService) {
super(settings);
this.indicesService = indicesService;
}
public boolean attemptSyncedFlush(ShardId shardId) {
throw new UnsupportedOperationException("not so fast");
}
}

View File

@ -0,0 +1,70 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.syncedflush;
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.Arrays;
/**
*/
public class PreSyncedFlushRequest extends BroadcastOperationRequest<PreSyncedFlushRequest> {
private ShardId shardId;
PreSyncedFlushRequest() {
}
public PreSyncedFlushRequest(ShardId shardId) {
super(Arrays.asList(shardId.getIndex()).toArray(new String[0]));
this.shardId = shardId;
}
@Override
public String toString() {
return "PreSyncedFlushRequest{" +
"shardId=" + shardId +
'}';
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardId.writeTo(out);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.shardId = ShardId.readShardId(in);
}
public ShardId shardId() {
return shardId;
}
public void shardId(ShardId shardId) {
this.shardId = shardId;
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.syncedflush;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
* A response to pre synced flush action.
*/
public class PreSyncedFlushResponse extends BroadcastOperationResponse {
Map<String, byte[]> commitIds = new HashMap<>();
PreSyncedFlushResponse() {
}
public PreSyncedFlushResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures, AtomicReferenceArray shardsResponses) {
super(totalShards, successfulShards, failedShards, shardFailures);
for (int i = 0; i < shardsResponses.length(); i++) {
PreSyncedShardFlushResponse preSyncedShardFlushResponse = (PreSyncedShardFlushResponse) shardsResponses.get(i);
commitIds.put(preSyncedShardFlushResponse.shardRouting().currentNodeId(), preSyncedShardFlushResponse.id());
}
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
int numCommitIds = in.readVInt();
for (int i = 0; i < numCommitIds; i++) {
commitIds.put(in.readString(), in.readByteArray());
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(commitIds.size());
for (Map.Entry<String, byte[]> entry : commitIds.entrySet()) {
out.writeString(entry.getKey());
out.writeByteArray(entry.getValue());
}
}
public Map<String, byte[]> commitIds() {
return commitIds;
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.syncedflush;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest;
import org.elasticsearch.cluster.routing.ImmutableShardRouting;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
*
*/
class PreSyncedShardFlushRequest extends BroadcastShardOperationRequest {
private ShardRouting shardRouting;
// we need our own request because it has to include the shard routing
private PreSyncedFlushRequest request = new PreSyncedFlushRequest();
PreSyncedShardFlushRequest() {
}
PreSyncedShardFlushRequest(ShardRouting shardRouting, PreSyncedFlushRequest request) {
super(shardRouting.shardId(), request);
this.request = request;
this.shardRouting = shardRouting;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
request.readFrom(in);
shardRouting = ImmutableShardRouting.readShardRoutingEntry(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
request.writeTo(out);
shardRouting.writeTo(out);
}
PreSyncedFlushRequest getRequest() {
return request;
}
public ShardRouting shardRouting() {
return shardRouting;
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.syncedflush;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationResponse;
import org.elasticsearch.cluster.routing.ImmutableShardRouting;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
*
*/
class PreSyncedShardFlushResponse extends BroadcastShardOperationResponse {
byte[] id;
private ShardRouting shardRouting;
PreSyncedShardFlushResponse() {
}
PreSyncedShardFlushResponse(byte[] id, ShardRouting shardRouting) {
super(shardRouting.shardId());
this.id = id;
this.shardRouting = shardRouting;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
id = in.readByteArray();
shardRouting = ImmutableShardRouting.readShardRoutingEntry(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByteArray(id);
shardRouting.writeTo(out);
}
byte[] id() {
return id;
}
public ShardRouting shardRouting() {
return shardRouting;
}
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.syncedflush;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException;
public class SyncedFlushReplicaResponse extends TransportResponse {
boolean succeeded = true;
private String index;
private int shardId;
private String nodeId;
private String reason;
void setResult(boolean succeeded, String index, int shardId, String nodeId, String reason) {
this.succeeded = succeeded;
this.index = index;
this.shardId = shardId;
this.nodeId = nodeId;
this.reason = reason;
}
public String getIndex() {
return index;
}
public int getShardId() {
return shardId;
}
public String getNodeId() {
return nodeId;
}
public String getReason() {
return reason;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
succeeded = in.readBoolean();
this.index = in.readString();
this.shardId = in.readInt();
this.nodeId = in.readString();
this.reason = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(succeeded);
out.writeString(index);
out.writeInt(shardId);
out.writeString(nodeId);
out.writeString(reason);
}
}

View File

@ -0,0 +1,89 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.syncedflush;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
*/
public class SyncedFlushRequest extends ShardReplicationOperationRequest<SyncedFlushRequest> {
private String syncId;
private Map<String, byte[]> commitIds;
private ShardId shardId;
public SyncedFlushRequest() {
}
public SyncedFlushRequest(ShardId shardId, String syncId, Map<String, byte[]> commitIds) {
this.commitIds = commitIds;
this.shardId = shardId;
this.syncId = syncId;
this.index(shardId.index().getName());
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = ShardId.readShardId(in);
commitIds = new HashMap<>();
int numCommitIds = in.readVInt();
for (int i = 0; i < numCommitIds; i++) {
commitIds.put(in.readString(), in.readByteArray());
}
syncId = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardId.writeTo(out);
out.writeVInt(commitIds.size());
for (Map.Entry<String, byte[]> entry : commitIds.entrySet()) {
out.writeString(entry.getKey());
out.writeByteArray(entry.getValue());
}
out.writeString(syncId);
}
@Override
public String toString() {
return "write sync commit {" + shardId + "}";
}
public ShardId shardId() {
return shardId;
}
public String syncId() {
return syncId;
}
public Map<String, byte[]> commitIds() {
return commitIds;
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.syncedflush;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
*/
public class SyncedFlushResponse extends ActionWriteResponse {
private boolean succes;
String syncCommitId;
public SyncedFlushResponse() {
}
public SyncedFlushResponse(boolean success, String syncCommitId) {
this.succes = success;
this.syncCommitId = syncCommitId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.succes = in.readBoolean();
syncCommitId = in.readOptionalString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(succes);
out.writeOptionalString(syncCommitId);
}
public boolean success() {
return succes;
}
public String getSyncId() {
return syncCommitId;
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.syncedflush;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import java.util.concurrent.ExecutionException;
public class SyncedFlushService extends AbstractComponent {
private final TransportPreSyncedFlushAction transportPreSyncedFlushAction;
private final TransportSyncedFlushAction transportSyncedFlushAction;
@Inject
public SyncedFlushService(Settings settings, TransportPreSyncedFlushAction transportPreSyncedFlushAction, TransportSyncedFlushAction transportSyncedFlushAction) {
super(settings);
this.transportPreSyncedFlushAction = transportPreSyncedFlushAction;
this.transportSyncedFlushAction = transportSyncedFlushAction;
}
public SyncedFlushResponse attemptSyncedFlush(ShardId shardId) throws ExecutionException, InterruptedException {
PreSyncedFlushResponse preSyncedFlushResponse = transportPreSyncedFlushAction.execute(new PreSyncedFlushRequest(shardId)).get();
// exit if this did not work
String syncId = Strings.base64UUID();
SyncedFlushResponse syncedFlushResponse = transportSyncedFlushAction.execute(new SyncedFlushRequest(shardId, syncId, preSyncedFlushResponse.commitIds())).get();
return syncedFlushResponse;
}
}

View File

@ -0,0 +1,135 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.syncedflush;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceArray;
import static com.google.common.collect.Lists.newArrayList;
/**
* Sync Commit Action.
*/
public class TransportPreSyncedFlushAction extends TransportBroadcastOperationAction<PreSyncedFlushRequest, PreSyncedFlushResponse, PreSyncedShardFlushRequest, PreSyncedShardFlushResponse> {
private final IndicesService indicesService;
public static final String NAME = "indices:admin/presyncedflush";
@Inject
public TransportPreSyncedFlushAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, IndicesService indicesService, ActionFilters actionFilters) {
super(settings, NAME, threadPool, clusterService, transportService, actionFilters);
this.indicesService = indicesService;
}
@Override
protected String executor() {
return ThreadPool.Names.FLUSH;
}
@Override
protected PreSyncedFlushRequest newRequestInstance() {
return new PreSyncedFlushRequest();
}
@Override
protected PreSyncedFlushResponse newResponse(PreSyncedFlushRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
int failedShards = 0;
List<ShardOperationFailedException> shardFailures = null;
for (int i = 0; i < shardsResponses.length(); i++) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
// a non active shard, ignore
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {
shardFailures = newArrayList();
}
shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));
} else {
successfulShards++;
}
}
return new PreSyncedFlushResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures, shardsResponses);
}
@Override
protected PreSyncedShardFlushRequest newShardRequest() {
return new PreSyncedShardFlushRequest();
}
@Override
protected PreSyncedShardFlushRequest newShardRequest(int numShards, ShardRouting shard, PreSyncedFlushRequest request) {
return new PreSyncedShardFlushRequest(shard, request);
}
@Override
protected PreSyncedShardFlushResponse newShardResponse() {
return new PreSyncedShardFlushResponse();
}
@Override
protected PreSyncedShardFlushResponse shardOperation(PreSyncedShardFlushRequest request) throws ElasticsearchException {
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).shardSafe(request.shardId().id());
FlushRequest flushRequest = new FlushRequest().force(false).waitIfOngoing(true);
byte[] id = indexShard.flush(flushRequest);
return new PreSyncedShardFlushResponse(id, request.shardRouting());
}
/**
* The sync commit request works against one primary and all of its copies.
*/
@Override
protected GroupShardsIterator shards(ClusterState clusterState, PreSyncedFlushRequest request, String[] concreteIndices) {
return clusterState.routingTable().allActiveShardCopiesGrouped(request.shardId());
}
@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, PreSyncedFlushRequest request) {
return null;
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, PreSyncedFlushRequest countRequest, String[] concreteIndices) {
return null;
}
}

View File

@ -0,0 +1,136 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.syncedflush;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
/**
*/
public class TransportSyncedFlushAction extends TransportShardReplicationOperationAction<SyncedFlushRequest, SyncedFlushRequest, SyncedFlushResponse, SyncedFlushReplicaResponse> {
public static final String NAME = "indices:admin/syncedflush";
@Inject
public TransportSyncedFlushAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters) {
super(settings, NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters);
}
@Override
protected boolean checkWriteConsistency() {
return true;
}
@Override
protected boolean resolveIndex() {
return false;
}
@Override
protected SyncedFlushRequest newRequestInstance() {
return new SyncedFlushRequest();
}
@Override
protected SyncedFlushRequest newReplicaRequestInstance() {
return new SyncedFlushRequest();
}
@Override
protected SyncedFlushResponse newResponseInstance() {
return new SyncedFlushResponse();
}
@Override
protected SyncedFlushReplicaResponse newReplicaResponseInstance() {
return new SyncedFlushReplicaResponse();
}
@Override
protected String executor() {
return ThreadPool.Names.FLUSH;
}
@Override
protected ShardIterator shards(ClusterState clusterState, InternalRequest request) {
// get all shards for id
return clusterService.state().routingTable().index(request.concreteIndex()).shard(request.request().shardId().id()).shardsIt();
}
@Override
protected Tuple<SyncedFlushResponse, SyncedFlushRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
byte[] commitId = shardRequest.request.commitIds().get(clusterService.localNode().getId());
IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
SyncedFlushResponse syncedFlushResponse = new SyncedFlushResponse(indexShard.syncFlushIfNoPendingChanges(shardRequest.request.syncId(), commitId), shardRequest.request.syncId());
if (syncedFlushResponse.success() == false) {
throw new ElasticsearchIllegalStateException("could not sync commit on primary");
}
return new Tuple<>(syncedFlushResponse, shardRequest.request);
}
@Override
protected SyncedFlushReplicaResponse shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
byte[] commitId = shardRequest.request.commitIds().get(clusterService.localNode().getId());
IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
SyncedFlushReplicaResponse syncedFlushReplicaResponse = new SyncedFlushReplicaResponse();
boolean success = indexShard.syncFlushIfNoPendingChanges(shardRequest.request.syncId(), commitId);
String message = success ? "synced flush succeeded" : "synced flush failed";
syncedFlushReplicaResponse.setResult(success, shardRequest.request.index(), shardRequest.shardId.id(), shardRequest.getNodeId(), message);
return syncedFlushReplicaResponse;
}
protected SyncedFlushResponse onAllReplicasResponded(SyncedFlushResponse finalResponse, CopyOnWriteArrayList<SyncedFlushReplicaResponse> replicaResponses) {
List<ActionWriteResponse.ShardInfo.Failure> additionalFailures = new ArrayList<>();
for (SyncedFlushReplicaResponse replicaResponse : replicaResponses) {
if (replicaResponse.succeeded == false) {
additionalFailures.add(new ActionWriteResponse.ShardInfo.Failure(replicaResponse.getIndex(), replicaResponse.getShardId(), replicaResponse.getNodeId(), replicaResponse.getReason(), RestStatus.CONFLICT, false));
}
}
additionalFailures.addAll(Arrays.asList(finalResponse.getShardInfo().getFailures()));
finalResponse.setShardInfo(new ActionWriteResponse.ShardInfo(finalResponse.getShardInfo().getTotal(), finalResponse.getShardInfo().getTotal() - additionalFailures.size(), additionalFailures.toArray(new ActionWriteResponse.ShardInfo.Failure[additionalFailures.size()])));
return finalResponse;
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.routing;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -29,18 +30,18 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.test.ElasticsearchAllocationTestCase;
import org.junit.Before;
import org.junit.Test;
import static org.hamcrest.Matchers.nullValue;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
public class RoutingTableTest extends ElasticsearchAllocationTestCase {
public class RoutingTableTest extends ElasticsearchAllocationTestCase {
private static final String TEST_INDEX_1 = "test1";
private static final String TEST_INDEX_2 = "test2";
@ -72,9 +73,9 @@ public class RoutingTableTest extends ElasticsearchAllocationTestCase {
.build();
this.testRoutingTable = new RoutingTable.Builder()
.add(new IndexRoutingTable.Builder(TEST_INDEX_1).initializeAsNew(metaData.index(TEST_INDEX_1)).build())
.add(new IndexRoutingTable.Builder(TEST_INDEX_2).initializeAsNew(metaData.index(TEST_INDEX_2)).build())
.build();
.add(new IndexRoutingTable.Builder(TEST_INDEX_1).initializeAsNew(metaData.index(TEST_INDEX_1)).build())
.add(new IndexRoutingTable.Builder(TEST_INDEX_2).initializeAsNew(metaData.index(TEST_INDEX_2)).build())
.build();
this.clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(testRoutingTable).build();
}
@ -82,10 +83,10 @@ public class RoutingTableTest extends ElasticsearchAllocationTestCase {
* puts primary shard routings into initializing state
*/
private void initPrimaries() {
logger.info("adding " + (this.numberOfReplicas + 1) + " nodes and performing rerouting");
logger.info("adding " + (this.numberOfReplicas + 1) + " nodes and performing rerouting");
Builder discoBuilder = DiscoveryNodes.builder();
for (int i=0; i<this.numberOfReplicas+1;i++) {
discoBuilder = discoBuilder.put(newNode("node"+i));
for (int i = 0; i < this.numberOfReplicas + 1; i++) {
discoBuilder = discoBuilder.put(newNode("node" + i));
}
this.clusterState = ClusterState.builder(clusterState).nodes(discoBuilder).build();
RoutingAllocation.Result rerouteResult = ALLOCATION_SERVICE.reroute(clusterState);
@ -104,9 +105,9 @@ public class RoutingTableTest extends ElasticsearchAllocationTestCase {
private IndexMetaData.Builder createIndexMetaData(String indexName) {
return new IndexMetaData.Builder(indexName)
.settings(DEFAULT_SETTINGS)
.numberOfReplicas(this.numberOfReplicas)
.numberOfShards(this.numberOfShards);
.settings(DEFAULT_SETTINGS)
.numberOfReplicas(this.numberOfReplicas)
.numberOfShards(this.numberOfShards);
}
@Test
@ -173,25 +174,25 @@ public class RoutingTableTest extends ElasticsearchAllocationTestCase {
assertThat(this.emptyRoutingTable.activePrimaryShardsGrouped(new String[0], true).size(), is(0));
assertThat(this.emptyRoutingTable.activePrimaryShardsGrouped(new String[0], false).size(), is(0));
assertThat(this.testRoutingTable.activePrimaryShardsGrouped(new String[] {TEST_INDEX_1}, false).size(), is(0));
assertThat(this.testRoutingTable.activePrimaryShardsGrouped(new String[] {TEST_INDEX_1}, true).size(), is(this.numberOfShards));
assertThat(this.testRoutingTable.activePrimaryShardsGrouped(new String[]{TEST_INDEX_1}, false).size(), is(0));
assertThat(this.testRoutingTable.activePrimaryShardsGrouped(new String[]{TEST_INDEX_1}, true).size(), is(this.numberOfShards));
initPrimaries();
assertThat(this.testRoutingTable.activePrimaryShardsGrouped(new String[] {TEST_INDEX_1}, false).size(), is(0));
assertThat(this.testRoutingTable.activePrimaryShardsGrouped(new String[] {TEST_INDEX_1}, true).size(), is(this.numberOfShards));
assertThat(this.testRoutingTable.activePrimaryShardsGrouped(new String[]{TEST_INDEX_1}, false).size(), is(0));
assertThat(this.testRoutingTable.activePrimaryShardsGrouped(new String[]{TEST_INDEX_1}, true).size(), is(this.numberOfShards));
startInitializingShards(TEST_INDEX_1);
assertThat(this.testRoutingTable.activePrimaryShardsGrouped(new String[] {TEST_INDEX_1}, false).size(), is(this.numberOfShards));
assertThat(this.testRoutingTable.activePrimaryShardsGrouped(new String[] {TEST_INDEX_1, TEST_INDEX_2}, false).size(), is(this.numberOfShards));
assertThat(this.testRoutingTable.activePrimaryShardsGrouped(new String[] {TEST_INDEX_1}, true).size(), is(this.numberOfShards));
assertThat(this.testRoutingTable.activePrimaryShardsGrouped(new String[]{TEST_INDEX_1}, false).size(), is(this.numberOfShards));
assertThat(this.testRoutingTable.activePrimaryShardsGrouped(new String[]{TEST_INDEX_1, TEST_INDEX_2}, false).size(), is(this.numberOfShards));
assertThat(this.testRoutingTable.activePrimaryShardsGrouped(new String[]{TEST_INDEX_1}, true).size(), is(this.numberOfShards));
startInitializingShards(TEST_INDEX_2);
assertThat(this.testRoutingTable.activePrimaryShardsGrouped(new String[] {TEST_INDEX_2}, false).size(), is(this.numberOfShards));
assertThat(this.testRoutingTable.activePrimaryShardsGrouped(new String[] {TEST_INDEX_1, TEST_INDEX_2}, false).size(), is(2 * this.numberOfShards));
assertThat(this.testRoutingTable.activePrimaryShardsGrouped(new String[] {TEST_INDEX_1, TEST_INDEX_2}, true).size(), is(2 * this.numberOfShards));
assertThat(this.testRoutingTable.activePrimaryShardsGrouped(new String[]{TEST_INDEX_2}, false).size(), is(this.numberOfShards));
assertThat(this.testRoutingTable.activePrimaryShardsGrouped(new String[]{TEST_INDEX_1, TEST_INDEX_2}, false).size(), is(2 * this.numberOfShards));
assertThat(this.testRoutingTable.activePrimaryShardsGrouped(new String[]{TEST_INDEX_1, TEST_INDEX_2}, true).size(), is(2 * this.numberOfShards));
try {
this.testRoutingTable.activePrimaryShardsGrouped(new String[] {TEST_INDEX_1, "not_exists"}, true);
this.testRoutingTable.activePrimaryShardsGrouped(new String[]{TEST_INDEX_1, "not_exists"}, true);
fail("Calling with non-existing index name should raise IndexMissingException");
} catch (IndexMissingException e) {
// expected
@ -203,25 +204,25 @@ public class RoutingTableTest extends ElasticsearchAllocationTestCase {
assertThat(this.emptyRoutingTable.allActiveShardsGrouped(new String[0], true).size(), is(0));
assertThat(this.emptyRoutingTable.allActiveShardsGrouped(new String[0], false).size(), is(0));
assertThat(this.testRoutingTable.allActiveShardsGrouped(new String[] {TEST_INDEX_1}, false).size(), is(0));
assertThat(this.testRoutingTable.allActiveShardsGrouped(new String[] {TEST_INDEX_1}, true).size(), is(this.shardsPerIndex));
assertThat(this.testRoutingTable.allActiveShardsGrouped(new String[]{TEST_INDEX_1}, false).size(), is(0));
assertThat(this.testRoutingTable.allActiveShardsGrouped(new String[]{TEST_INDEX_1}, true).size(), is(this.shardsPerIndex));
initPrimaries();
assertThat(this.testRoutingTable.allActiveShardsGrouped(new String[] {TEST_INDEX_1}, false).size(), is(0));
assertThat(this.testRoutingTable.allActiveShardsGrouped(new String[] {TEST_INDEX_1}, true).size(), is(this.shardsPerIndex));
assertThat(this.testRoutingTable.allActiveShardsGrouped(new String[]{TEST_INDEX_1}, false).size(), is(0));
assertThat(this.testRoutingTable.allActiveShardsGrouped(new String[]{TEST_INDEX_1}, true).size(), is(this.shardsPerIndex));
startInitializingShards(TEST_INDEX_1);
assertThat(this.testRoutingTable.allActiveShardsGrouped(new String[] {TEST_INDEX_1}, false).size(), is(this.numberOfShards));
assertThat(this.testRoutingTable.allActiveShardsGrouped(new String[] {TEST_INDEX_1, TEST_INDEX_2}, false).size(), is(this.numberOfShards));
assertThat(this.testRoutingTable.allActiveShardsGrouped(new String[] {TEST_INDEX_1}, true).size(), is(this.shardsPerIndex));
assertThat(this.testRoutingTable.allActiveShardsGrouped(new String[]{TEST_INDEX_1}, false).size(), is(this.numberOfShards));
assertThat(this.testRoutingTable.allActiveShardsGrouped(new String[]{TEST_INDEX_1, TEST_INDEX_2}, false).size(), is(this.numberOfShards));
assertThat(this.testRoutingTable.allActiveShardsGrouped(new String[]{TEST_INDEX_1}, true).size(), is(this.shardsPerIndex));
startInitializingShards(TEST_INDEX_2);
assertThat(this.testRoutingTable.allActiveShardsGrouped(new String[] {TEST_INDEX_2}, false).size(), is(this.numberOfShards));
assertThat(this.testRoutingTable.allActiveShardsGrouped(new String[] {TEST_INDEX_1, TEST_INDEX_2}, false).size(), is(2 * this.numberOfShards));
assertThat(this.testRoutingTable.allActiveShardsGrouped(new String[] {TEST_INDEX_1, TEST_INDEX_2}, true).size(), is(this.totalNumberOfShards));
assertThat(this.testRoutingTable.allActiveShardsGrouped(new String[]{TEST_INDEX_2}, false).size(), is(this.numberOfShards));
assertThat(this.testRoutingTable.allActiveShardsGrouped(new String[]{TEST_INDEX_1, TEST_INDEX_2}, false).size(), is(2 * this.numberOfShards));
assertThat(this.testRoutingTable.allActiveShardsGrouped(new String[]{TEST_INDEX_1, TEST_INDEX_2}, true).size(), is(this.totalNumberOfShards));
try {
this.testRoutingTable.allActiveShardsGrouped(new String[] {TEST_INDEX_1, "not_exists"}, true);
this.testRoutingTable.allActiveShardsGrouped(new String[]{TEST_INDEX_1, "not_exists"}, true);
} catch (IndexMissingException e) {
fail("Calling with non-existing index should be ignored at the moment");
}
@ -229,20 +230,36 @@ public class RoutingTableTest extends ElasticsearchAllocationTestCase {
@Test
public void testAllAssignedShardsGrouped() {
assertThat(this.testRoutingTable.allAssignedShardsGrouped(new String[] {TEST_INDEX_1}, false).size(), is(0));
assertThat(this.testRoutingTable.allAssignedShardsGrouped(new String[] {TEST_INDEX_1}, true).size(), is(this.shardsPerIndex));
assertThat(this.testRoutingTable.allAssignedShardsGrouped(new String[]{TEST_INDEX_1}, false).size(), is(0));
assertThat(this.testRoutingTable.allAssignedShardsGrouped(new String[]{TEST_INDEX_1}, true).size(), is(this.shardsPerIndex));
initPrimaries();
assertThat(this.testRoutingTable.allAssignedShardsGrouped(new String[] {TEST_INDEX_1}, false).size(), is(this.numberOfShards));
assertThat(this.testRoutingTable.allAssignedShardsGrouped(new String[] {TEST_INDEX_1}, true).size(), is(this.shardsPerIndex));
assertThat(this.testRoutingTable.allAssignedShardsGrouped(new String[]{TEST_INDEX_1}, false).size(), is(this.numberOfShards));
assertThat(this.testRoutingTable.allAssignedShardsGrouped(new String[]{TEST_INDEX_1}, true).size(), is(this.shardsPerIndex));
assertThat(this.testRoutingTable.allAssignedShardsGrouped(new String[] {TEST_INDEX_1, TEST_INDEX_2}, false).size(), is(2 * this.numberOfShards));
assertThat(this.testRoutingTable.allAssignedShardsGrouped(new String[] {TEST_INDEX_1, TEST_INDEX_2}, true).size(), is(this.totalNumberOfShards));
assertThat(this.testRoutingTable.allAssignedShardsGrouped(new String[]{TEST_INDEX_1, TEST_INDEX_2}, false).size(), is(2 * this.numberOfShards));
assertThat(this.testRoutingTable.allAssignedShardsGrouped(new String[]{TEST_INDEX_1, TEST_INDEX_2}, true).size(), is(this.totalNumberOfShards));
try {
this.testRoutingTable.allAssignedShardsGrouped(new String[] {TEST_INDEX_1, "not_exists"}, false);
this.testRoutingTable.allAssignedShardsGrouped(new String[]{TEST_INDEX_1, "not_exists"}, false);
} catch (IndexMissingException e) {
fail("Calling with non-existing index should be ignored at the moment");
}
}
@Test
public void testAllActiveShardCopiesGrouped() {
assertThat(this.testRoutingTable.allActiveShardCopiesGrouped(new ShardId(TEST_INDEX_1, randomInt(numberOfShards - 1))).size(), is(0));
try {
this.testRoutingTable.allActiveShardCopiesGrouped(new ShardId(TEST_INDEX_1, numberOfShards)).size();
fail();
} catch (ElasticsearchIllegalStateException e) {
}
initPrimaries();
assertThat(this.testRoutingTable.allActiveShardCopiesGrouped(new ShardId(TEST_INDEX_1, randomInt(numberOfShards - 1))).size(), is(0));
startInitializingShards(TEST_INDEX_1);
assertThat(this.testRoutingTable.allActiveShardCopiesGrouped(new ShardId(TEST_INDEX_1, randomInt(numberOfShards - 1))).size(), is(1));
startInitializingShards(TEST_INDEX_1);
assertThat(this.testRoutingTable.allActiveShardCopiesGrouped(new ShardId(TEST_INDEX_1, randomInt(numberOfShards - 1))).size(), is(numberOfReplicas + 1));
}
}

View File

@ -701,6 +701,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
commitID = engine.flush();
assertTrue("should succeed to flush commit with right id and no pending doc", engine.syncFlushIfNoPendingChanges(syncId, commitID));
assertThat(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID), equalTo(syncId));
assertThat(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), equalTo(syncId));
}
@Test

View File

@ -21,14 +21,21 @@ package org.elasticsearch.indices;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.syncedflush.SyncedFlushResponse;
import org.elasticsearch.indices.syncedflush.SyncedFlushService;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
@ -72,14 +79,26 @@ public class FlushTest extends ElasticsearchIntegrationTest {
}
}
public void testSyncedFlush() {
public void testSyncedFlush() throws ExecutionException, InterruptedException, IOException {
internalCluster().ensureAtLeastNumDataNodes(2);
prepareCreate("test").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1).get();
prepareCreate("test").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).get();
ensureGreen();
IndexStats indexStats = client().admin().indices().prepareStats("test").get().getIndex("test");
for (ShardStats shardStats : indexStats.getShards()) {
assertNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
}
ClusterStateResponse state = client().admin().cluster().prepareState().get();
String nodeId = state.getState().getRoutingTable().index("test").shard(0).getShards().get(0).currentNodeId();
String nodeName = state.getState().getNodes().get(nodeId).name();
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName);
indicesService.indexServiceSafe("test").shardInjectorSafe(0).getInstance(SyncedFlushService.class).attemptSyncedFlush(new ShardId("test", 0));
SyncedFlushResponse syncedFlushResponse = internalCluster().getInstance(SyncedFlushService.class).attemptSyncedFlush(new ShardId("test", 0));
assertTrue(syncedFlushResponse.success());
indexStats = client().admin().indices().prepareStats("test").get().getIndex("test");
assertThat(indexStats.getShards().length, equalTo(client().admin().indices().prepareGetIndex().get().getSettings().get("test").getAsInt(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, -1) + 1));
for (ShardStats shardStats : indexStats.getShards()) {
assertThat(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID), equalTo(syncedFlushResponse.getSyncId()));
}
}
}

View File

@ -0,0 +1,93 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.syncedflush;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.cluster.routing.ImmutableShardRouting;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReferenceArray;
public class SynceFlushStreamablesTests extends ElasticsearchTestCase {
@Test
public void streamWriteSyncResponse() throws InterruptedException, IOException {
ShardId shardId = new ShardId("test", 0);
Map<String, byte[]> commitIds = new HashMap<>();
final String nodeId = "node_id";
commitIds.put(nodeId, generateRandomId(randomInt(100)));
SyncedFlushRequest syncedFlushRequest = new SyncedFlushRequest(shardId, randomAsciiOfLength(5), commitIds);
BytesStreamOutput out = new BytesStreamOutput();
syncedFlushRequest.writeTo(out);
out.close();
StreamInput in = new BytesStreamInput(out.bytes());
SyncedFlushRequest request = new SyncedFlushRequest();
request.readFrom(in);
assertArrayEquals(request.commitIds().get(nodeId), syncedFlushRequest.commitIds().get(nodeId));
}
@Test
public void streamSyncResponse() throws InterruptedException, IOException {
ShardRouting shardRouting = new ImmutableShardRouting("test", 0, "test_node",
"other_test_node", randomBoolean(), ShardRoutingState.STARTED, randomInt());
AtomicReferenceArray atomicReferenceArray = new AtomicReferenceArray(1);
atomicReferenceArray.set(0, new PreSyncedShardFlushResponse(generateRandomId(randomInt(100)), shardRouting));
PreSyncedFlushResponse preSyncedFlushResponse = new PreSyncedFlushResponse(randomInt(), randomInt(), randomInt(), new ArrayList<ShardOperationFailedException>(), atomicReferenceArray);
BytesStreamOutput out = new BytesStreamOutput();
preSyncedFlushResponse.writeTo(out);
out.close();
StreamInput in = new BytesStreamInput(out.bytes());
PreSyncedFlushResponse request = new PreSyncedFlushResponse();
request.readFrom(in);
assertArrayEquals(request.commitIds().get(shardRouting), preSyncedFlushResponse.commitIds().get(shardRouting));
}
@Test
public void streamShardSyncResponse() throws InterruptedException, IOException {
ShardRouting shardRouting = new ImmutableShardRouting("test", 0, "test_node",
"other_test_node", randomBoolean(), ShardRoutingState.STARTED, randomInt());
PreSyncedShardFlushResponse preSyncedShardFlushResponse = new PreSyncedShardFlushResponse(generateRandomId(randomInt(100)), shardRouting);
BytesStreamOutput out = new BytesStreamOutput();
preSyncedShardFlushResponse.writeTo(out);
out.close();
StreamInput in = new BytesStreamInput(out.bytes());
PreSyncedShardFlushResponse request = new PreSyncedShardFlushResponse();
request.readFrom(in);
assertArrayEquals(request.id(), preSyncedShardFlushResponse.id());
}
byte[] generateRandomId(int length) {
byte[] id = new byte[length];
for (int i = 0; i < length; i++) {
id[i] = randomByte();
}
return id;
}
}