diff --git a/core/src/main/java/org/elasticsearch/action/ActionWriteResponse.java b/core/src/main/java/org/elasticsearch/action/ActionWriteResponse.java index a63f6dcd9fa..f4152ac85e4 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionWriteResponse.java +++ b/core/src/main/java/org/elasticsearch/action/ActionWriteResponse.java @@ -39,7 +39,7 @@ import java.util.Collections; /** * Base class for write action responses. */ -public abstract class ActionWriteResponse extends ActionResponse { +public class ActionWriteResponse extends ActionResponse { public final static ActionWriteResponse.ShardInfo.Failure[] EMPTY = new ActionWriteResponse.ShardInfo.Failure[0]; diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushResponse.java index a158b02611b..c2ac7002645 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushResponse.java @@ -21,10 +21,7 @@ package org.elasticsearch.action.admin.indices.flush; import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.support.broadcast.BroadcastResponse; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import java.io.IOException; import java.util.List; /** @@ -42,13 +39,4 @@ public class FlushResponse extends BroadcastResponse { super(totalShards, successfulShards, failedShards, shardFailures); } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java index 0e38181fa61..10db46c1da0 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java @@ -19,27 +19,27 @@ package org.elasticsearch.action.admin.indices.flush; -import org.elasticsearch.action.support.broadcast.BroadcastShardRequest; +import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.index.shard.ShardId; import java.io.IOException; -/** - * - */ -class ShardFlushRequest extends BroadcastShardRequest { +public class ShardFlushRequest extends ReplicationRequest { + private FlushRequest request = new FlushRequest(); - ShardFlushRequest() { - } - - ShardFlushRequest(ShardId shardId, FlushRequest request) { - super(shardId, request); + public ShardFlushRequest(FlushRequest request) { + super(request); this.request = request; } + public ShardFlushRequest() { + } + + FlushRequest getRequest() { + return request; + } @Override public void readFrom(StreamInput in) throws IOException { @@ -53,7 +53,5 @@ class ShardFlushRequest extends BroadcastShardRequest { request.writeTo(out); } - FlushRequest getRequest() { - return request; - } + } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushResponse.java deleted file mode 100644 index 6f2cc6a5522..00000000000 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushResponse.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.admin.indices.flush; - -import org.elasticsearch.action.support.broadcast.BroadcastShardResponse; -import org.elasticsearch.index.shard.ShardId; - -/** - * - */ -class ShardFlushResponse extends BroadcastShardResponse { - - ShardFlushResponse() { - - } - - ShardFlushResponse(ShardId shardId) { - super(shardId); - } -} \ No newline at end of file diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java index 323a6cc2382..d43cbb25ab5 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java @@ -19,99 +19,45 @@ package org.elasticsearch.action.admin.indices.flush; +import org.elasticsearch.action.ActionWriteResponse; import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.DefaultShardOperationFailedException; -import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException; -import org.elasticsearch.action.support.broadcast.TransportBroadcastAction; +import org.elasticsearch.action.support.replication.TransportBroadcastReplicationAction; import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicReferenceArray; /** * Flush Action. */ -public class TransportFlushAction extends TransportBroadcastAction { - - private final IndicesService indicesService; +public class TransportFlushAction extends TransportBroadcastReplicationAction { @Inject public TransportFlushAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, - TransportService transportService, IndicesService indicesService, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { - super(settings, FlushAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, - FlushRequest.class, ShardFlushRequest.class, ThreadPool.Names.FLUSH); - this.indicesService = indicesService; + TransportService transportService, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + TransportShardFlushAction replicatedFlushAction) { + super(FlushAction.NAME, FlushRequest.class, settings, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, replicatedFlushAction); } @Override - protected FlushResponse newResponse(FlushRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) { - int successfulShards = 0; - int failedShards = 0; - List shardFailures = null; - for (int i = 0; i < shardsResponses.length(); i++) { - Object shardResponse = shardsResponses.get(i); - if (shardResponse == null) { - // a non active shard, ignore - } else if (shardResponse instanceof BroadcastShardOperationFailedException) { - failedShards++; - if (shardFailures == null) { - shardFailures = new ArrayList<>(); - } - shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse)); - } else { - successfulShards++; - } - } - return new FlushResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures); + protected ActionWriteResponse newShardResponse() { + return new ActionWriteResponse(); } @Override - protected ShardFlushRequest newShardRequest(int numShards, ShardRouting shard, FlushRequest request) { - return new ShardFlushRequest(shard.shardId(), request); + protected ShardFlushRequest newShardRequest(FlushRequest request, ShardId shardId) { + return new ShardFlushRequest(request).setShardId(shardId).timeout("0ms"); } @Override - protected ShardFlushResponse newShardResponse() { - return new ShardFlushResponse(); - } - - @Override - protected ShardFlushResponse shardOperation(ShardFlushRequest request) { - IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).shardSafe(request.shardId().id()); - indexShard.flush(request.getRequest()); - return new ShardFlushResponse(request.shardId()); - } - - /** - * The refresh request works against *all* shards. - */ - @Override - protected GroupShardsIterator shards(ClusterState clusterState, FlushRequest request, String[] concreteIndices) { - return clusterState.routingTable().allActiveShardsGrouped(concreteIndices, true, true); - } - - @Override - protected ClusterBlockException checkGlobalBlock(ClusterState state, FlushRequest request) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); - } - - @Override - protected ClusterBlockException checkRequestBlock(ClusterState state, FlushRequest countRequest, String[] concreteIndices) { - return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, concreteIndices); + protected FlushResponse newResponse(int successfulShards, int failedShards, int totalNumCopies, List shardFailures) { + return new FlushResponse(totalNumCopies, successfulShards, failedShards, shardFailures); } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java new file mode 100644 index 00000000000..9b7c0643be2 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java @@ -0,0 +1,102 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.flush; + +import org.elasticsearch.action.ActionWriteResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.replication.TransportReplicationAction; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.action.index.MappingUpdatedAction; +import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +/** + * + */ +public class TransportShardFlushAction extends TransportReplicationAction { + + public static final String NAME = "indices:data/write/flush"; + + @Inject + public TransportShardFlushAction(Settings settings, TransportService transportService, ClusterService clusterService, + IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, + MappingUpdatedAction mappingUpdatedAction, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver) { + super(settings, NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction, + actionFilters, indexNameExpressionResolver, ShardFlushRequest.class, ShardFlushRequest.class, ThreadPool.Names.FLUSH); + } + + @Override + protected ActionWriteResponse newResponseInstance() { + return new ActionWriteResponse(); + } + + @Override + protected Tuple shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable { + IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id()); + indexShard.flush(shardRequest.request.getRequest()); + logger.trace("{} flush request executed on primary", indexShard.shardId()); + return new Tuple<>(new ActionWriteResponse(), shardRequest.request); + } + + @Override + protected void shardOperationOnReplica(ShardId shardId, ShardFlushRequest request) { + IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).shardSafe(request.shardId().id()); + indexShard.flush(request.getRequest()); + logger.trace("{} flush request executed on replica", indexShard.shardId()); + } + + @Override + protected boolean checkWriteConsistency() { + return false; + } + + @Override + protected ShardIterator shards(ClusterState clusterState, InternalRequest request) { + return clusterState.getRoutingTable().indicesRouting().get(request.concreteIndex()).getShards().get(request.request().shardId().getId()).shardsIt(); + } + + @Override + protected ClusterBlockException checkGlobalBlock(ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + + @Override + protected ClusterBlockException checkRequestBlock(ClusterState state, InternalRequest request) { + return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, new String[]{request.concreteIndex()}); + } + + @Override + protected boolean shouldExecuteReplication(Settings settings) { + return true; + } +} diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshAction.java index 79db06ec3f0..cfc2fb3a09f 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshAction.java @@ -22,8 +22,6 @@ package org.elasticsearch.action.admin.indices.refresh; import org.elasticsearch.action.Action; import org.elasticsearch.client.ElasticsearchClient; -/** - */ public class RefreshAction extends Action { public static final RefreshAction INSTANCE = new RefreshAction(); diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshRequest.java index 8f871307135..b0cb49c8874 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshRequest.java @@ -33,7 +33,6 @@ import org.elasticsearch.action.support.broadcast.BroadcastRequest; */ public class RefreshRequest extends BroadcastRequest { - RefreshRequest() { } @@ -48,5 +47,4 @@ public class RefreshRequest extends BroadcastRequest { public RefreshRequest(String... indices) { super(indices); } - } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshResponse.java index 28295fdd0a0..ba3ec31c6a5 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshResponse.java @@ -21,34 +21,18 @@ package org.elasticsearch.action.admin.indices.refresh; import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.support.broadcast.BroadcastResponse; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import java.io.IOException; import java.util.List; /** * The response of a refresh action. - * - * */ public class RefreshResponse extends BroadcastResponse { RefreshResponse() { - } RefreshResponse(int totalShards, int successfulShards, int failedShards, List shardFailures) { super(totalShards, successfulShards, failedShards, shardFailures); } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/ShardRefreshRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/ShardRefreshRequest.java deleted file mode 100644 index 37ea2cc46de..00000000000 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/ShardRefreshRequest.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.admin.indices.refresh; - -import org.elasticsearch.action.support.broadcast.BroadcastShardRequest; -import org.elasticsearch.index.shard.ShardId; - -/** - * - */ -class ShardRefreshRequest extends BroadcastShardRequest { - - ShardRefreshRequest() { - } - - ShardRefreshRequest(ShardId shardId, RefreshRequest request) { - super(shardId, request); - } - -} diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/ShardRefreshResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/ShardRefreshResponse.java deleted file mode 100644 index 4de0f5877dd..00000000000 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/ShardRefreshResponse.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.admin.indices.refresh; - -import org.elasticsearch.action.support.broadcast.BroadcastShardResponse; -import org.elasticsearch.index.shard.ShardId; - -/** - * - */ -class ShardRefreshResponse extends BroadcastShardResponse { - - ShardRefreshResponse() { - } - - ShardRefreshResponse(ShardId shardId) { - super(shardId); - } -} \ No newline at end of file diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java index 2eead86e202..6e7e3459b11 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java @@ -19,100 +19,46 @@ package org.elasticsearch.action.admin.indices.refresh; +import org.elasticsearch.action.ActionWriteResponse; import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.DefaultShardOperationFailedException; -import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException; -import org.elasticsearch.action.support.broadcast.TransportBroadcastAction; +import org.elasticsearch.action.support.replication.ReplicationRequest; +import org.elasticsearch.action.support.replication.TransportBroadcastReplicationAction; import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicReferenceArray; /** * Refresh action. */ -public class TransportRefreshAction extends TransportBroadcastAction { - - private final IndicesService indicesService; +public class TransportRefreshAction extends TransportBroadcastReplicationAction { @Inject public TransportRefreshAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, - TransportService transportService, IndicesService indicesService, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { - super(settings, RefreshAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, - RefreshRequest.class, ShardRefreshRequest.class, ThreadPool.Names.REFRESH); - this.indicesService = indicesService; + TransportService transportService, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + TransportShardRefreshAction shardRefreshAction) { + super(RefreshAction.NAME, RefreshRequest.class, settings, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, shardRefreshAction); } @Override - protected RefreshResponse newResponse(RefreshRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) { - int successfulShards = 0; - int failedShards = 0; - List shardFailures = null; - for (int i = 0; i < shardsResponses.length(); i++) { - Object shardResponse = shardsResponses.get(i); - if (shardResponse == null) { - // non active shard, ignore - } else if (shardResponse instanceof BroadcastShardOperationFailedException) { - failedShards++; - if (shardFailures == null) { - shardFailures = new ArrayList<>(); - } - shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse)); - } else { - successfulShards++; - } - } - return new RefreshResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures); + protected ActionWriteResponse newShardResponse() { + return new ActionWriteResponse(); } @Override - protected ShardRefreshRequest newShardRequest(int numShards, ShardRouting shard, RefreshRequest request) { - return new ShardRefreshRequest(shard.shardId(), request); + protected ReplicationRequest newShardRequest(RefreshRequest request, ShardId shardId) { + return new ReplicationRequest(request).setShardId(shardId).timeout("0ms"); } @Override - protected ShardRefreshResponse newShardResponse() { - return new ShardRefreshResponse(); - } - - @Override - protected ShardRefreshResponse shardOperation(ShardRefreshRequest request) { - IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).shardSafe(request.shardId().id()); - indexShard.refresh("api"); - logger.trace("{} refresh request executed", indexShard.shardId()); - return new ShardRefreshResponse(request.shardId()); - } - - /** - * The refresh request works against *all* shards. - */ - @Override - protected GroupShardsIterator shards(ClusterState clusterState, RefreshRequest request, String[] concreteIndices) { - return clusterState.routingTable().allAssignedShardsGrouped(concreteIndices, true, true); - } - - @Override - protected ClusterBlockException checkGlobalBlock(ClusterState state, RefreshRequest request) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); - } - - @Override - protected ClusterBlockException checkRequestBlock(ClusterState state, RefreshRequest countRequest, String[] concreteIndices) { - return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, concreteIndices); + protected RefreshResponse newResponse(int successfulShards, int failedShards, int totalNumCopies, List shardFailures) { + return new RefreshResponse(totalNumCopies, successfulShards, failedShards, shardFailures); } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java new file mode 100644 index 00000000000..2604d67b1b0 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java @@ -0,0 +1,103 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.refresh; + +import org.elasticsearch.action.ActionWriteResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.replication.ReplicationRequest; +import org.elasticsearch.action.support.replication.TransportReplicationAction; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.action.index.MappingUpdatedAction; +import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +/** + * + */ +public class TransportShardRefreshAction extends TransportReplicationAction { + + public static final String NAME = "indices:data/write/refresh"; + + @Inject + public TransportShardRefreshAction(Settings settings, TransportService transportService, ClusterService clusterService, + IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, + MappingUpdatedAction mappingUpdatedAction, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver) { + super(settings, NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction, + actionFilters, indexNameExpressionResolver, ReplicationRequest.class, ReplicationRequest.class, ThreadPool.Names.REFRESH); + } + + @Override + protected ActionWriteResponse newResponseInstance() { + return new ActionWriteResponse(); + } + + @Override + protected Tuple shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable { + IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id()); + indexShard.refresh("api"); + logger.trace("{} refresh request executed on primary", indexShard.shardId()); + return new Tuple<>(new ActionWriteResponse(), shardRequest.request); + } + + @Override + protected void shardOperationOnReplica(ShardId shardId, ReplicationRequest request) { + IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).shardSafe(shardId.id()); + indexShard.refresh("api"); + logger.trace("{} refresh request executed on replica", indexShard.shardId()); + } + + @Override + protected boolean checkWriteConsistency() { + return false; + } + + @Override + protected ShardIterator shards(ClusterState clusterState, InternalRequest request) { + return clusterState.getRoutingTable().indicesRouting().get(request.concreteIndex()).getShards().get(request.request().shardId().getId()).shardsIt(); + } + + @Override + protected ClusterBlockException checkGlobalBlock(ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + + @Override + protected ClusterBlockException checkRequestBlock(ClusterState state, InternalRequest request) { + return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, new String[]{request.concreteIndex()}); + } + + @Override + protected boolean shouldExecuteReplication(Settings settings) { + return true; + } +} diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java index a1eb616b1ee..6bda7b259ee 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java @@ -22,6 +22,7 @@ package org.elasticsearch.action.bulk; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.shard.ShardId; import java.io.IOException; import java.util.ArrayList; @@ -32,8 +33,6 @@ import java.util.List; */ public class BulkShardRequest extends ReplicationRequest { - private int shardId; - private BulkItemRequest[] items; private boolean refresh; @@ -44,7 +43,7 @@ public class BulkShardRequest extends ReplicationRequest { BulkShardRequest(BulkRequest bulkRequest, String index, int shardId, boolean refresh, BulkItemRequest[] items) { super(bulkRequest); this.index = index; - this.shardId = shardId; + this.setShardId(new ShardId(index, shardId)); this.items = items; this.refresh = refresh; } @@ -53,10 +52,6 @@ public class BulkShardRequest extends ReplicationRequest { return this.refresh; } - int shardId() { - return shardId; - } - BulkItemRequest[] items() { return items; } @@ -75,7 +70,6 @@ public class BulkShardRequest extends ReplicationRequest { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeVInt(shardId); out.writeVInt(items.length); for (BulkItemRequest item : items) { if (item != null) { @@ -91,7 +85,6 @@ public class BulkShardRequest extends ReplicationRequest { @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - shardId = in.readVInt(); items = new BulkItemRequest[in.readVInt()]; for (int i = 0; i < items.length; i++) { if (in.readBoolean()) { diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 2ca2dfe142a..a9aa3dcb31d 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -109,7 +109,7 @@ public class TransportShardBulkAction extends TransportReplicationAction extends ActionRequest implements IndicesRequest.Replaceable { +public class BroadcastRequest extends ActionRequest implements IndicesRequest.Replaceable { protected String[] indices; private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpenAndForbidClosed(); - protected BroadcastRequest() { + public BroadcastRequest() { } diff --git a/core/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastResponse.java b/core/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastResponse.java index 560c7ec9869..54d6220fd34 100644 --- a/core/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastResponse.java +++ b/core/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastResponse.java @@ -32,17 +32,17 @@ import static org.elasticsearch.action.support.DefaultShardOperationFailedExcept /** * Base class for all broadcast operation based responses. */ -public abstract class BroadcastResponse extends ActionResponse { +public class BroadcastResponse extends ActionResponse { private static final ShardOperationFailedException[] EMPTY = new ShardOperationFailedException[0]; private int totalShards; private int successfulShards; private int failedShards; private ShardOperationFailedException[] shardFailures = EMPTY; - protected BroadcastResponse() { + public BroadcastResponse() { } - protected BroadcastResponse(int totalShards, int successfulShards, int failedShards, List shardFailures) { + public BroadcastResponse(int totalShards, int successfulShards, int failedShards, List shardFailures) { this.totalShards = totalShards; this.successfulShards = successfulShards; this.failedShards = failedShards; diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java index 93907cf0aa6..37244c71efe 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.TimeValue; @@ -37,7 +38,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError; /** * */ -public abstract class ReplicationRequest extends ActionRequest implements IndicesRequest { +public class ReplicationRequest extends ActionRequest implements IndicesRequest { public static final TimeValue DEFAULT_TIMEOUT = new TimeValue(1, TimeUnit.MINUTES); @@ -49,14 +50,14 @@ public abstract class ReplicationRequest extends A private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT; private volatile boolean canHaveDuplicates = false; - protected ReplicationRequest() { + public ReplicationRequest() { } /** * Creates a new request that inherits headers and context from the request provided as argument. */ - protected ReplicationRequest(ActionRequest request) { + public ReplicationRequest(ActionRequest request) { super(request); } @@ -133,6 +134,16 @@ public abstract class ReplicationRequest extends A return this.consistencyLevel; } + /** + * @return the shardId of the shard where this operation should be executed on. + * can be null in case the shardId is determined by a single document (index, type, id) for example for index or delete request. + */ + public + @Nullable + ShardId shardId() { + return internalShardId; + } + /** * Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT} */ @@ -173,4 +184,10 @@ public abstract class ReplicationRequest extends A out.writeString(index); out.writeBoolean(canHaveDuplicates); } + + public T setShardId(ShardId shardId) { + this.internalShardId = shardId; + this.index = shardId.getIndex(); + return (T) this; + } } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java new file mode 100644 index 00000000000..42a83630a53 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java @@ -0,0 +1,162 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support.replication; + +import com.carrotsearch.hppc.cursors.IntObjectCursor; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionWriteResponse; +import org.elasticsearch.action.ShardOperationFailedException; +import org.elasticsearch.action.UnavailableShardsException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.DefaultShardOperationFailedException; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.broadcast.BroadcastRequest; +import org.elasticsearch.action.support.broadcast.BroadcastResponse; +import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.CountDown; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * Base class for requests that should be executed on all shards of an index or several indices. + * This action sends shard requests to all primary shards of the indices and they are then replicated like write requests + */ +public abstract class TransportBroadcastReplicationAction extends HandledTransportAction { + + private final TransportReplicationAction replicatedBroadcastShardAction; + private final ClusterService clusterService; + + public TransportBroadcastReplicationAction(String name, Class request, Settings settings, ThreadPool threadPool, ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, TransportReplicationAction replicatedBroadcastShardAction) { + super(settings, name, threadPool, transportService, actionFilters, indexNameExpressionResolver, request); + this.replicatedBroadcastShardAction = replicatedBroadcastShardAction; + this.clusterService = clusterService; + } + + @Override + protected void doExecute(final Request request, final ActionListener listener) { + final ClusterState clusterState = clusterService.state(); + List shards = shards(request, clusterState); + final CopyOnWriteArrayList shardsResponses = new CopyOnWriteArrayList(); + if (shards.size() == 0) { + finishAndNotifyListener(listener, shardsResponses); + } + final CountDown responsesCountDown = new CountDown(shards.size()); + for (final ShardId shardId : shards) { + ActionListener shardActionListener = new ActionListener() { + @Override + public void onResponse(ShardResponse shardResponse) { + shardsResponses.add(shardResponse); + logger.trace("{}: got response from {}", actionName, shardId); + if (responsesCountDown.countDown()) { + finishAndNotifyListener(listener, shardsResponses); + } + } + + @Override + public void onFailure(Throwable e) { + logger.trace("{}: got failure from {}", actionName, shardId); + int totalNumCopies = clusterState.getMetaData().index(shardId.index().getName()).getNumberOfReplicas() + 1; + ShardResponse shardResponse = newShardResponse(); + ActionWriteResponse.ShardInfo.Failure[] failures; + if (ExceptionsHelper.unwrap(e, UnavailableShardsException.class) != null) { + failures = new ActionWriteResponse.ShardInfo.Failure[0]; + } else { + ActionWriteResponse.ShardInfo.Failure failure = new ActionWriteResponse.ShardInfo.Failure(shardId.index().name(), shardId.id(), null, e, ExceptionsHelper.status(e), true); + failures = new ActionWriteResponse.ShardInfo.Failure[totalNumCopies]; + Arrays.fill(failures, failure); + } + shardResponse.setShardInfo(new ActionWriteResponse.ShardInfo(totalNumCopies, 0, failures)); + shardsResponses.add(shardResponse); + if (responsesCountDown.countDown()) { + finishAndNotifyListener(listener, shardsResponses); + } + } + }; + shardExecute(request, shardId, shardActionListener); + } + } + + protected void shardExecute(Request request, ShardId shardId, ActionListener shardActionListener) { + replicatedBroadcastShardAction.execute(newShardRequest(request, shardId), shardActionListener); + } + + /** + * @return all shard ids the request should run on + */ + protected List shards(Request request, ClusterState clusterState) { + List shardIds = new ArrayList<>(); + String[] concreteIndices = indexNameExpressionResolver.concreteIndices(clusterState, request); + for (String index : concreteIndices) { + IndexMetaData indexMetaData = clusterState.metaData().getIndices().get(index); + if (indexMetaData != null) { + for (IntObjectCursor shardRouting : clusterState.getRoutingTable().indicesRouting().get(index).getShards()) { + shardIds.add(shardRouting.value.shardId()); + } + } + } + return shardIds; + } + + protected abstract ShardResponse newShardResponse(); + + protected abstract ShardRequest newShardRequest(Request request, ShardId shardId); + + private void finishAndNotifyListener(ActionListener listener, CopyOnWriteArrayList shardsResponses) { + logger.trace("{}: got all shard responses", actionName); + int successfulShards = 0; + int failedShards = 0; + int totalNumCopies = 0; + List shardFailures = null; + for (int i = 0; i < shardsResponses.size(); i++) { + ActionWriteResponse shardResponse = shardsResponses.get(i); + if (shardResponse == null) { + // non active shard, ignore + } else { + failedShards += shardResponse.getShardInfo().getFailed(); + successfulShards += shardResponse.getShardInfo().getSuccessful(); + totalNumCopies += shardResponse.getShardInfo().getTotal(); + if (shardFailures == null) { + shardFailures = new ArrayList<>(); + } + for (ActionWriteResponse.ShardInfo.Failure failure : shardResponse.getShardInfo().getFailures()) { + shardFailures.add(new DefaultShardOperationFailedException(new BroadcastShardOperationFailedException(new ShardId(failure.index(), failure.shardId()), failure.getCause()))); + } + } + } + listener.onResponse(newResponse(successfulShards, failedShards, totalNumCopies, shardFailures)); + } + + protected abstract BroadcastResponse newResponse(int successfulShards, int failedShards, int totalNumCopies, List shardFailures); +} diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index f6925009189..608575007f4 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -362,6 +362,7 @@ public abstract class TransportReplicationAction()), new IndexNameExpressionResolver(Settings.EMPTY), null); + } + + @AfterClass + public static void afterClass() { + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + threadPool = null; + } + + @Test + public void testNotStartedPrimary() throws InterruptedException, ExecutionException, IOException { + final String index = "test"; + final ShardId shardId = new ShardId(index, 0); + clusterService.setState(state(index, randomBoolean(), + randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED, ShardRoutingState.UNASSIGNED)); + logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); + Future response = (broadcastReplicationAction.execute(new BroadcastRequest().indices(index))); + for (Tuple> shardRequests : broadcastReplicationAction.capturedShardRequests) { + shardRequests.v2().onFailure(new UnavailableShardsException(shardId, "test exception expected")); + } + response.get(); + logger.info("total shards: {}, ", response.get().getTotalShards()); + // we expect no failures here because UnavailableShardsException does not count as failed + assertBroadcastResponse(2, 0, 0, response.get(), null); + } + + @Test + public void testStartedPrimary() throws InterruptedException, ExecutionException, IOException { + final String index = "test"; + clusterService.setState(state(index, randomBoolean(), + ShardRoutingState.STARTED)); + logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); + Future response = (broadcastReplicationAction.execute(new BroadcastRequest().indices(index))); + for (Tuple> shardRequests : broadcastReplicationAction.capturedShardRequests) { + ActionWriteResponse actionWriteResponse = new ActionWriteResponse(); + actionWriteResponse.setShardInfo(new ActionWriteResponse.ShardInfo(1, 1, new ActionWriteResponse.ShardInfo.Failure[0])); + shardRequests.v2().onResponse(actionWriteResponse); + } + logger.info("total shards: {}, ", response.get().getTotalShards()); + assertBroadcastResponse(1, 1, 0, response.get(), null); + } + + @Test + public void testResultCombine() throws InterruptedException, ExecutionException, IOException { + final String index = "test"; + int numShards = randomInt(3); + clusterService.setState(stateWithAssignedPrimariesAndOneReplica(index, numShards)); + logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); + Future response = (broadcastReplicationAction.execute(new BroadcastRequest().indices(index))); + int succeeded = 0; + int failed = 0; + for (Tuple> shardRequests : broadcastReplicationAction.capturedShardRequests) { + if (randomBoolean()) { + ActionWriteResponse.ShardInfo.Failure[] failures = new ActionWriteResponse.ShardInfo.Failure[0]; + int shardsSucceeded = randomInt(1) + 1; + succeeded += shardsSucceeded; + ActionWriteResponse actionWriteResponse = new ActionWriteResponse(); + if (shardsSucceeded == 1 && randomBoolean()) { + //sometimes add failure (no failure means shard unavailable) + failures = new ActionWriteResponse.ShardInfo.Failure[1]; + failures[0] = new ActionWriteResponse.ShardInfo.Failure(index, shardRequests.v1().id(), null, new Exception("pretend shard failed"), RestStatus.GATEWAY_TIMEOUT, false); + failed++; + } + actionWriteResponse.setShardInfo(new ActionWriteResponse.ShardInfo(2, shardsSucceeded, failures)); + shardRequests.v2().onResponse(actionWriteResponse); + } else { + // sometimes fail + failed += 2; + // just add a general exception and see if failed shards will be incremented by 2 + shardRequests.v2().onFailure(new Exception("pretend shard failed")); + } + } + assertBroadcastResponse(2 * numShards, succeeded, failed, response.get(), Exception.class); + } + + @Test + public void testNoShards() throws InterruptedException, ExecutionException, IOException { + clusterService.setState(stateWithNoShard()); + logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); + BroadcastResponse response = executeAndAssertImmediateResponse(broadcastReplicationAction, new BroadcastRequest()); + assertBroadcastResponse(0, 0, 0, response, null); + } + + @Test + public void testShardsList() throws InterruptedException, ExecutionException { + final String index = "test"; + final ShardId shardId = new ShardId(index, 0); + ClusterState clusterState = state(index, randomBoolean(), + randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED, ShardRoutingState.UNASSIGNED); + logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); + List shards = broadcastReplicationAction.shards(new BroadcastRequest().indices(shardId.index().name()), clusterState); + assertThat(shards.size(), equalTo(1)); + assertThat(shards.get(0), equalTo(shardId)); + } + + private class TestBroadcastReplicationAction extends TransportBroadcastReplicationAction { + protected final Set>> capturedShardRequests = ConcurrentCollections.newConcurrentSet(); + + public TestBroadcastReplicationAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, TransportReplicationAction replicatedBroadcastShardAction) { + super("test-broadcast-replication-action", BroadcastRequest.class, settings, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, replicatedBroadcastShardAction); + } + + @Override + protected ActionWriteResponse newShardResponse() { + return new ActionWriteResponse(); + } + + @Override + protected ReplicationRequest newShardRequest(BroadcastRequest request, ShardId shardId) { + return new ReplicationRequest().setShardId(shardId); + } + + @Override + protected BroadcastResponse newResponse(int successfulShards, int failedShards, int totalNumCopies, List shardFailures) { + return new BroadcastResponse(totalNumCopies, successfulShards, failedShards, shardFailures); + } + + @Override + protected void shardExecute(BroadcastRequest request, ShardId shardId, ActionListener shardActionListener) { + capturedShardRequests.add(new Tuple<>(shardId, shardActionListener)); + } + + protected void clearCapturedRequests() { + capturedShardRequests.clear(); + } + } + + public FlushResponse assertImmediateResponse(String index, TransportFlushAction flushAction) throws InterruptedException, ExecutionException { + Date beginDate = new Date(); + FlushResponse flushResponse = flushAction.execute(new FlushRequest(index)).get(); + Date endDate = new Date(); + long maxTime = 500; + assertThat("this should not take longer than " + maxTime + " ms. The request hangs somewhere", endDate.getTime() - beginDate.getTime(), lessThanOrEqualTo(maxTime)); + return flushResponse; + } + + @Test + public void testTimeoutFlush() throws ExecutionException, InterruptedException { + + final String index = "test"; + clusterService.setState(state(index, randomBoolean(), + randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED, ShardRoutingState.UNASSIGNED)); + logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); + TransportShardFlushAction shardFlushAction = new TransportShardFlushAction(Settings.EMPTY, transportService, clusterService, + null, threadPool, null, + null, new ActionFilters(new HashSet()), new IndexNameExpressionResolver(Settings.EMPTY)); + TransportFlushAction flushAction = new TransportFlushAction(Settings.EMPTY, threadPool, clusterService, + transportService, new ActionFilters(new HashSet()), new IndexNameExpressionResolver(Settings.EMPTY), + shardFlushAction); + FlushResponse flushResponse = (FlushResponse) executeAndAssertImmediateResponse(flushAction, new FlushRequest(index)); + logger.info("total shards: {}, ", flushResponse.getTotalShards()); + assertBroadcastResponse(2, 0, 0, flushResponse, UnavailableShardsException.class); + + ClusterBlocks.Builder block = ClusterBlocks.builder() + .addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL)); + clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block)); + assertFailure("all shards should fail with cluster block", executeAndAssertImmediateResponse(flushAction, new FlushRequest(index)), ClusterBlockException.class); + + block = ClusterBlocks.builder() + .addGlobalBlock(new ClusterBlock(1, "retryable", true, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL)); + clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block)); + assertFailure("all shards should fail with cluster block", executeAndAssertImmediateResponse(flushAction, new FlushRequest(index)), ClusterBlockException.class); + + block = ClusterBlocks.builder() + .addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL)); + clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block)); + assertFailure("all shards should fail with cluster block", executeAndAssertImmediateResponse(flushAction, new FlushRequest(index)), ClusterBlockException.class); + } + + void assertFailure(String msg, BroadcastResponse broadcastResponse, Class klass) throws InterruptedException { + assertThat(broadcastResponse.getSuccessfulShards(), equalTo(0)); + assertThat(broadcastResponse.getTotalShards(), equalTo(broadcastResponse.getFailedShards())); + for (int i = 0; i < broadcastResponse.getFailedShards(); i++) { + assertThat(msg, broadcastResponse.getShardFailures()[i].getCause().getCause(), instanceOf(klass)); + } + } + + @Test + public void testTimeoutRefresh() throws ExecutionException, InterruptedException { + + final String index = "test"; + clusterService.setState(state(index, randomBoolean(), + randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED, ShardRoutingState.UNASSIGNED)); + logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); + TransportShardRefreshAction shardrefreshAction = new TransportShardRefreshAction(Settings.EMPTY, transportService, clusterService, + null, threadPool, null, + null, new ActionFilters(new HashSet()), new IndexNameExpressionResolver(Settings.EMPTY)); + TransportRefreshAction refreshAction = new TransportRefreshAction(Settings.EMPTY, threadPool, clusterService, + transportService, new ActionFilters(new HashSet()), new IndexNameExpressionResolver(Settings.EMPTY), + shardrefreshAction); + RefreshResponse refreshResponse = (RefreshResponse) executeAndAssertImmediateResponse(refreshAction, new RefreshRequest(index)); + assertBroadcastResponse(2, 0, 0, refreshResponse, UnavailableShardsException.class); + + ClusterBlocks.Builder block = ClusterBlocks.builder() + .addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL)); + clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block)); + assertFailure("all shards should fail with cluster block", executeAndAssertImmediateResponse(refreshAction, new RefreshRequest(index)), ClusterBlockException.class); + + block = ClusterBlocks.builder() + .addGlobalBlock(new ClusterBlock(1, "retryable", true, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL)); + clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block)); + assertFailure("all shards should fail with cluster block", executeAndAssertImmediateResponse(refreshAction, new RefreshRequest(index)), ClusterBlockException.class); + + block = ClusterBlocks.builder() + .addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL)); + clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block)); + assertFailure("all shards should fail with cluster block", executeAndAssertImmediateResponse(refreshAction, new RefreshRequest(index)), ClusterBlockException.class); + } + + public BroadcastResponse executeAndAssertImmediateResponse(TransportBroadcastReplicationAction broadcastAction, BroadcastRequest request) throws InterruptedException, ExecutionException { + return (BroadcastResponse) broadcastAction.execute(request).actionGet("5s"); + } + + private void assertBroadcastResponse(int total, int successful, int failed, BroadcastResponse response, Class exceptionClass) { + assertThat(response.getSuccessfulShards(), equalTo(successful)); + assertThat(response.getTotalShards(), equalTo(total)); + assertThat(response.getFailedShards(), equalTo(failed)); + for (int i = 0; i < failed; i++) { + assertThat(response.getShardFailures()[0].getCause().getCause(), instanceOf(exceptionClass)); + } + } +} diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java b/core/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java new file mode 100644 index 00000000000..e5143a3ef09 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java @@ -0,0 +1,230 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.elasticsearch.action.support.replication; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.DummyTransportAddress; +import org.elasticsearch.index.shard.ShardId; + +import java.util.HashSet; +import java.util.Set; + +import static org.elasticsearch.cluster.metadata.IndexMetaData.*; +import static org.elasticsearch.test.ESTestCase.randomBoolean; +import static org.elasticsearch.test.ESTestCase.randomFrom; +import static org.elasticsearch.test.ESTestCase.randomIntBetween; + +/** + * Helper methods for generating cluster states + */ +public class ClusterStateCreationUtils { + + + /** + * Creates cluster state with and index that has one shard and #(replicaStates) replicas + * + * @param index name of the index + * @param primaryLocal if primary should coincide with the local node in the cluster state + * @param primaryState state of primary + * @param replicaStates states of the replicas. length of this array determines also the number of replicas + */ + public static ClusterState state(String index, boolean primaryLocal, ShardRoutingState primaryState, ShardRoutingState... replicaStates) { + final int numberOfReplicas = replicaStates.length; + + int numberOfNodes = numberOfReplicas + 1; + if (primaryState == ShardRoutingState.RELOCATING) { + numberOfNodes++; + } + for (ShardRoutingState state : replicaStates) { + if (state == ShardRoutingState.RELOCATING) { + numberOfNodes++; + } + } + numberOfNodes = Math.max(2, numberOfNodes); // we need a non-local master to test shard failures + final ShardId shardId = new ShardId(index, 0); + DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); + Set unassignedNodes = new HashSet<>(); + for (int i = 0; i < numberOfNodes + 1; i++) { + final DiscoveryNode node = newNode(i); + discoBuilder = discoBuilder.put(node); + unassignedNodes.add(node.id()); + } + discoBuilder.localNodeId(newNode(0).id()); + discoBuilder.masterNodeId(newNode(1).id()); // we need a non-local master to test shard failures + IndexMetaData indexMetaData = IndexMetaData.builder(index).settings(Settings.builder() + .put(SETTING_VERSION_CREATED, Version.CURRENT) + .put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) + .put(SETTING_CREATION_DATE, System.currentTimeMillis())).build(); + + RoutingTable.Builder routing = new RoutingTable.Builder(); + routing.addAsNew(indexMetaData); + IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); + + String primaryNode = null; + String relocatingNode = null; + UnassignedInfo unassignedInfo = null; + if (primaryState != ShardRoutingState.UNASSIGNED) { + if (primaryLocal) { + primaryNode = newNode(0).id(); + unassignedNodes.remove(primaryNode); + } else { + primaryNode = selectAndRemove(unassignedNodes); + } + if (primaryState == ShardRoutingState.RELOCATING) { + relocatingNode = selectAndRemove(unassignedNodes); + } + } else { + unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null); + } + indexShardRoutingBuilder.addShard(TestShardRouting.newShardRouting(index, 0, primaryNode, relocatingNode, null, true, primaryState, 0, unassignedInfo)); + + for (ShardRoutingState replicaState : replicaStates) { + String replicaNode = null; + relocatingNode = null; + unassignedInfo = null; + if (replicaState != ShardRoutingState.UNASSIGNED) { + assert primaryNode != null : "a replica is assigned but the primary isn't"; + replicaNode = selectAndRemove(unassignedNodes); + if (replicaState == ShardRoutingState.RELOCATING) { + relocatingNode = selectAndRemove(unassignedNodes); + } + } else { + unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null); + } + indexShardRoutingBuilder.addShard( + TestShardRouting.newShardRouting(index, shardId.id(), replicaNode, relocatingNode, null, false, replicaState, 0, unassignedInfo)); + } + + ClusterState.Builder state = ClusterState.builder(new ClusterName("test")); + state.nodes(discoBuilder); + state.metaData(MetaData.builder().put(indexMetaData, false).generateClusterUuidIfNeeded()); + state.routingTable(RoutingTable.builder().add(IndexRoutingTable.builder(index).addIndexShard(indexShardRoutingBuilder.build()))); + return state.build(); + } + + /** + * Creates cluster state with several shards and one replica and all shards STARTED. + */ + public static ClusterState stateWithAssignedPrimariesAndOneReplica(String index, int numberOfShards) { + + int numberOfNodes = 2; // we need a non-local master to test shard failures + DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); + for (int i = 0; i < numberOfNodes + 1; i++) { + final DiscoveryNode node = newNode(i); + discoBuilder = discoBuilder.put(node); + } + discoBuilder.localNodeId(newNode(0).id()); + discoBuilder.masterNodeId(newNode(1).id()); // we need a non-local master to test shard failures + IndexMetaData indexMetaData = IndexMetaData.builder(index).settings(Settings.builder() + .put(SETTING_VERSION_CREATED, Version.CURRENT) + .put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 1) + .put(SETTING_CREATION_DATE, System.currentTimeMillis())).build(); + ClusterState.Builder state = ClusterState.builder(new ClusterName("test")); + state.nodes(discoBuilder); + state.metaData(MetaData.builder().put(indexMetaData, false).generateClusterUuidIfNeeded()); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); + for (int i = 0; i < numberOfShards; i++) { + RoutingTable.Builder routing = new RoutingTable.Builder(); + routing.addAsNew(indexMetaData); + final ShardId shardId = new ShardId(index, i); + IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); + indexShardRoutingBuilder.addShard(TestShardRouting.newShardRouting(index, i, newNode(0).id(), null, null, true, ShardRoutingState.STARTED, 0, null)); + indexShardRoutingBuilder.addShard(TestShardRouting.newShardRouting(index, i, newNode(1).id(), null, null, false, ShardRoutingState.STARTED, 0, null)); + indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder.build()); + } + state.routingTable(RoutingTable.builder().add(indexRoutingTableBuilder)); + return state.build(); + } + + /** + * Creates cluster state with and index that has one shard and as many replicas as numberOfReplicas. + * Primary will be STARTED in cluster state but replicas will be one of UNASSIGNED, INITIALIZING, STARTED or RELOCATING. + * + * @param index name of the index + * @param primaryLocal if primary should coincide with the local node in the cluster state + * @param numberOfReplicas number of replicas + */ + public static ClusterState stateWithStartedPrimary(String index, boolean primaryLocal, int numberOfReplicas) { + int assignedReplicas = randomIntBetween(0, numberOfReplicas); + return stateWithStartedPrimary(index, primaryLocal, assignedReplicas, numberOfReplicas - assignedReplicas); + } + + /** + * Creates cluster state with and index that has one shard and as many replicas as numberOfReplicas. + * Primary will be STARTED in cluster state. Some (unassignedReplicas) will be UNASSIGNED and + * some (assignedReplicas) will be one of INITIALIZING, STARTED or RELOCATING. + * + * @param index name of the index + * @param primaryLocal if primary should coincide with the local node in the cluster state + * @param assignedReplicas number of replicas that should have INITIALIZING, STARTED or RELOCATING state + * @param unassignedReplicas number of replicas that should be unassigned + */ + public static ClusterState stateWithStartedPrimary(String index, boolean primaryLocal, int assignedReplicas, int unassignedReplicas) { + ShardRoutingState[] replicaStates = new ShardRoutingState[assignedReplicas + unassignedReplicas]; + // no point in randomizing - node assignment later on does it too. + for (int i = 0; i < assignedReplicas; i++) { + replicaStates[i] = randomFrom(ShardRoutingState.INITIALIZING, ShardRoutingState.STARTED, ShardRoutingState.RELOCATING); + } + for (int i = assignedReplicas; i < replicaStates.length; i++) { + replicaStates[i] = ShardRoutingState.UNASSIGNED; + } + return state(index, primaryLocal, randomFrom(ShardRoutingState.STARTED, ShardRoutingState.RELOCATING), replicaStates); + } + + /** + * Creates a cluster state with no index + */ + public static ClusterState stateWithNoShard() { + int numberOfNodes = 2; + DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); + Set unassignedNodes = new HashSet<>(); + for (int i = 0; i < numberOfNodes + 1; i++) { + final DiscoveryNode node = newNode(i); + discoBuilder = discoBuilder.put(node); + unassignedNodes.add(node.id()); + } + discoBuilder.localNodeId(newNode(0).id()); + discoBuilder.masterNodeId(newNode(1).id()); + ClusterState.Builder state = ClusterState.builder(new ClusterName("test")); + state.nodes(discoBuilder); + state.metaData(MetaData.builder().generateClusterUuidIfNeeded()); + state.routingTable(RoutingTable.builder()); + return state.build(); + } + + private static DiscoveryNode newNode(int nodeId) { + return new DiscoveryNode("node_" + nodeId, DummyTransportAddress.INSTANCE, Version.CURRENT); + } + + static private String selectAndRemove(Set strings) { + String selection = randomFrom(strings.toArray(new String[strings.size()])); + strings.remove(selection); + return selection; + } +} diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java index d7fb2ddd54d..0b9e254b0ad 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java @@ -77,6 +77,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state; +import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithStartedPrimary; import static org.elasticsearch.cluster.metadata.IndexMetaData.*; import static org.hamcrest.Matchers.*; @@ -98,6 +100,7 @@ public class ShardReplicationTests extends ESTestCase { threadPool = new ThreadPool("ShardReplicationTests"); } + @Override @Before public void setUp() throws Exception { super.setUp(); @@ -161,103 +164,6 @@ public class ShardReplicationTests extends ESTestCase { assertEquals(1, count.get()); } - ClusterState stateWithStartedPrimary(String index, boolean primaryLocal, int numberOfReplicas) { - int assignedReplicas = randomIntBetween(0, numberOfReplicas); - return stateWithStartedPrimary(index, primaryLocal, assignedReplicas, numberOfReplicas - assignedReplicas); - } - - ClusterState stateWithStartedPrimary(String index, boolean primaryLocal, int assignedReplicas, int unassignedReplicas) { - ShardRoutingState[] replicaStates = new ShardRoutingState[assignedReplicas + unassignedReplicas]; - // no point in randomizing - node assignment later on does it too. - for (int i = 0; i < assignedReplicas; i++) { - replicaStates[i] = randomFrom(ShardRoutingState.INITIALIZING, ShardRoutingState.STARTED, ShardRoutingState.RELOCATING); - } - for (int i = assignedReplicas; i < replicaStates.length; i++) { - replicaStates[i] = ShardRoutingState.UNASSIGNED; - } - return state(index, primaryLocal, randomFrom(ShardRoutingState.STARTED, ShardRoutingState.RELOCATING), replicaStates); - } - - ClusterState state(String index, boolean primaryLocal, ShardRoutingState primaryState, ShardRoutingState... replicaStates) { - final int numberOfReplicas = replicaStates.length; - - int numberOfNodes = numberOfReplicas + 1; - if (primaryState == ShardRoutingState.RELOCATING) { - numberOfNodes++; - } - for (ShardRoutingState state : replicaStates) { - if (state == ShardRoutingState.RELOCATING) { - numberOfNodes++; - } - } - numberOfNodes = Math.max(2, numberOfNodes); // we need a non-local master to test shard failures - final ShardId shardId = new ShardId(index, 0); - DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); - Set unassignedNodes = new HashSet<>(); - for (int i = 0; i < numberOfNodes + 1; i++) { - final DiscoveryNode node = newNode(i); - discoBuilder = discoBuilder.put(node); - unassignedNodes.add(node.id()); - } - discoBuilder.localNodeId(newNode(0).id()); - discoBuilder.masterNodeId(newNode(1).id()); // we need a non-local master to test shard failures - IndexMetaData indexMetaData = IndexMetaData.builder(index).settings(Settings.builder() - .put(SETTING_VERSION_CREATED, Version.CURRENT) - .put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) - .put(SETTING_CREATION_DATE, System.currentTimeMillis())).build(); - - RoutingTable.Builder routing = new RoutingTable.Builder(); - routing.addAsNew(indexMetaData); - IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); - - String primaryNode = null; - String relocatingNode = null; - UnassignedInfo unassignedInfo = null; - if (primaryState != ShardRoutingState.UNASSIGNED) { - if (primaryLocal) { - primaryNode = newNode(0).id(); - unassignedNodes.remove(primaryNode); - } else { - primaryNode = selectAndRemove(unassignedNodes); - } - if (primaryState == ShardRoutingState.RELOCATING) { - relocatingNode = selectAndRemove(unassignedNodes); - } - } else { - unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null); - } - indexShardRoutingBuilder.addShard(TestShardRouting.newShardRouting(index, 0, primaryNode, relocatingNode, null, true, primaryState, 0, unassignedInfo)); - - for (ShardRoutingState replicaState : replicaStates) { - String replicaNode = null; - relocatingNode = null; - unassignedInfo = null; - if (replicaState != ShardRoutingState.UNASSIGNED) { - assert primaryNode != null : "a replica is assigned but the primary isn't"; - replicaNode = selectAndRemove(unassignedNodes); - if (replicaState == ShardRoutingState.RELOCATING) { - relocatingNode = selectAndRemove(unassignedNodes); - } - } else { - unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null); - } - indexShardRoutingBuilder.addShard( - TestShardRouting.newShardRouting(index, shardId.id(), replicaNode, relocatingNode, null, false, replicaState, 0, unassignedInfo)); - } - - ClusterState.Builder state = ClusterState.builder(new ClusterName("test")); - state.nodes(discoBuilder); - state.metaData(MetaData.builder().put(indexMetaData, false).generateClusterUuidIfNeeded()); - state.routingTable(RoutingTable.builder().add(IndexRoutingTable.builder(index).addIndexShard(indexShardRoutingBuilder.build()))); - return state.build(); - } - - private String selectAndRemove(Set strings) { - String selection = randomFrom(strings.toArray(new String[strings.size()])); - strings.remove(selection); - return selection; - } - @Test public void testNotStartedPrimary() throws InterruptedException, ExecutionException { final String index = "test"; @@ -527,6 +433,7 @@ public class ShardReplicationTests extends ESTestCase { action = new ActionWithDelay(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool); final TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener); Thread t = new Thread() { + @Override public void run() { primaryPhase.run(); } @@ -587,6 +494,7 @@ public class ShardReplicationTests extends ESTestCase { action = new ActionWithDelay(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool); final Action.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler(); Thread t = new Thread() { + @Override public void run() { try { replicaOperationTransportHandler.messageReceived(new Request(), createTransportChannel()); @@ -746,10 +654,6 @@ public class ShardReplicationTests extends ESTestCase { } } - static DiscoveryNode newNode(int nodeId) { - return new DiscoveryNode("node_" + nodeId, DummyTransportAddress.INSTANCE, Version.CURRENT); - } - /* * Throws exceptions when executed. Used for testing if the counter is correctly decremented in case an operation fails. * */ diff --git a/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java b/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java index a287bcb4f54..62f9a84d5f9 100644 --- a/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java +++ b/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java @@ -432,12 +432,12 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase { * state processing when a recover starts and only unblocking it shortly after the node receives * the ShardActiveRequest. */ - static class ReclocationStartEndTracer extends MockTransportService.Tracer { + public static class ReclocationStartEndTracer extends MockTransportService.Tracer { private final ESLogger logger; private final CountDownLatch beginRelocationLatch; private final CountDownLatch receivedShardExistsRequestLatch; - ReclocationStartEndTracer(ESLogger logger, CountDownLatch beginRelocationLatch, CountDownLatch receivedShardExistsRequestLatch) { + public ReclocationStartEndTracer(ESLogger logger, CountDownLatch beginRelocationLatch, CountDownLatch receivedShardExistsRequestLatch) { this.logger = logger; this.beginRelocationLatch = beginRelocationLatch; this.receivedShardExistsRequestLatch = receivedShardExistsRequestLatch; diff --git a/core/src/test/java/org/elasticsearch/test/cluster/TestClusterService.java b/core/src/test/java/org/elasticsearch/test/cluster/TestClusterService.java index 6a55fbd2577..162dff76bf8 100644 --- a/core/src/test/java/org/elasticsearch/test/cluster/TestClusterService.java +++ b/core/src/test/java/org/elasticsearch/test/cluster/TestClusterService.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.OperationRouting; +import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; import org.elasticsearch.cluster.service.PendingClusterTask; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; @@ -55,6 +56,7 @@ public class TestClusterService implements ClusterService { private final Queue onGoingTimeouts = ConcurrentCollections.newQueue(); private final ThreadPool threadPool; private final ESLogger logger = Loggers.getLogger(getClass(), Settings.EMPTY); + private final OperationRouting operationRouting = new OperationRouting(Settings.Builder.EMPTY_SETTINGS, new AwarenessAllocationDecider()); public TestClusterService() { this(ClusterState.builder(new ClusterName("test")).build()); @@ -129,7 +131,7 @@ public class TestClusterService implements ClusterService { @Override public OperationRouting operationRouting() { - return null; + return operationRouting; } @Override diff --git a/core/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java b/core/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java index 8678f20ac12..497322ac7ea 100644 --- a/core/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java +++ b/core/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java @@ -67,6 +67,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.rest.client.http.HttpResponse; +import org.hamcrest.CoreMatchers; import org.hamcrest.Matcher; import org.hamcrest.Matchers; import org.junit.Assert; @@ -126,6 +127,22 @@ public class ElasticsearchAssertions { assertBlocked(builder, null); } + /** + * Checks that all shard requests of a replicated brodcast request failed due to a cluster block + * + * @param replicatedBroadcastResponse the response that should only contain failed shard responses + * + * */ + public static void assertBlocked(BroadcastResponse replicatedBroadcastResponse) { + assertThat("all shard requests should have failed", replicatedBroadcastResponse.getFailedShards(), Matchers.equalTo(replicatedBroadcastResponse.getTotalShards())); + for (ShardOperationFailedException exception : replicatedBroadcastResponse.getShardFailures()) { + ClusterBlockException clusterBlockException = (ClusterBlockException) ExceptionsHelper.unwrap(exception.getCause(), ClusterBlockException.class); + assertNotNull("expected the cause of failure to be a ClusterBlockException but got " + exception.getCause().getMessage(), clusterBlockException); + assertThat(clusterBlockException.blocks().size(), greaterThan(0)); + assertThat(clusterBlockException.status(), CoreMatchers.equalTo(RestStatus.FORBIDDEN)); + } + } + /** * Executes the request and fails if the request has not been blocked by a specific {@link ClusterBlock}. *