diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActionModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActionModule.java index bf26f536731..ddb326ce4af 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActionModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActionModule.java @@ -30,15 +30,11 @@ import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction; import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction; import org.elasticsearch.action.admin.indices.flush.TransportFlushAction; -import org.elasticsearch.action.admin.indices.flush.TransportIndexFlushAction; -import org.elasticsearch.action.admin.indices.flush.TransportShardFlushAction; import org.elasticsearch.action.admin.indices.gateway.snapshot.TransportGatewaySnapshotAction; import org.elasticsearch.action.admin.indices.gateway.snapshot.TransportIndexGatewaySnapshotAction; import org.elasticsearch.action.admin.indices.gateway.snapshot.TransportShardGatewaySnapshotAction; import org.elasticsearch.action.admin.indices.mapping.create.TransportCreateMappingAction; -import org.elasticsearch.action.admin.indices.refresh.TransportIndexRefreshAction; import org.elasticsearch.action.admin.indices.refresh.TransportRefreshAction; -import org.elasticsearch.action.admin.indices.refresh.TransportShardRefreshAction; import org.elasticsearch.action.admin.indices.status.TransportIndicesStatusAction; import org.elasticsearch.action.count.TransportCountAction; import org.elasticsearch.action.delete.TransportDeleteAction; @@ -76,18 +72,11 @@ public class TransportActionModule extends AbstractModule { bind(TransportIndexGatewaySnapshotAction.class).asEagerSingleton(); bind(TransportGatewaySnapshotAction.class).asEagerSingleton(); - bind(TransportShardRefreshAction.class).asEagerSingleton(); - bind(TransportIndexRefreshAction.class).asEagerSingleton(); bind(TransportRefreshAction.class).asEagerSingleton(); - - bind(TransportShardFlushAction.class).asEagerSingleton(); - bind(TransportIndexFlushAction.class).asEagerSingleton(); bind(TransportFlushAction.class).asEagerSingleton(); bind(TransportIndexAction.class).asEagerSingleton(); - bind(TransportGetAction.class).asEagerSingleton(); - bind(TransportDeleteAction.class).asEagerSingleton(); bind(TransportShardDeleteByQueryAction.class).asEagerSingleton(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/broadcast/TransportBroadcastPingAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/broadcast/TransportBroadcastPingAction.java index 5c8f7acf0ba..ed710ea0d84 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/broadcast/TransportBroadcastPingAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/broadcast/TransportBroadcastPingAction.java @@ -24,6 +24,8 @@ import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.TransportActions; import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction; import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; @@ -32,6 +34,8 @@ import org.elasticsearch.util.settings.Settings; import java.util.concurrent.atomic.AtomicReferenceArray; +import static org.elasticsearch.action.Actions.*; + /** * @author kimchy (Shay Banon) */ @@ -53,7 +57,11 @@ public class TransportBroadcastPingAction extends TransportBroadcastOperationAct return new BroadcastPingRequest(); } - @Override protected BroadcastPingResponse newResponse(BroadcastPingRequest broadcastPingRequest, AtomicReferenceArray shardsResponses) { + @Override protected GroupShardsIterator shards(BroadcastPingRequest request, ClusterState clusterState) { + return indicesService.searchShards(clusterState, processIndices(clusterState, request.indices()), request.queryHint()); + } + + @Override protected BroadcastPingResponse newResponse(BroadcastPingRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) { int successfulShards = 0; int failedShards = 0; for (int i = 0; i < shardsResponses.length(); i++) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushRequest.java index e9e07237630..a05bcd1c22e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushRequest.java @@ -19,20 +19,26 @@ package org.elasticsearch.action.admin.indices.flush; -import org.elasticsearch.action.support.replication.IndicesReplicationOperationRequest; -import org.elasticsearch.util.TimeValue; +import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest; +import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading; /** * @author kimchy (Shay Banon) */ -public class FlushRequest extends IndicesReplicationOperationRequest { +public class FlushRequest extends BroadcastOperationRequest { + + FlushRequest() { + + } public FlushRequest(String index) { this(new String[]{index}); } public FlushRequest(String... indices) { - this.indices = indices; + super(indices, null); + // we want to do the refresh in parallel on local shards... + operationThreading(BroadcastOperationThreading.THREAD_PER_SHARD); } @Override public FlushRequest listenerThreaded(boolean threadedListener) { @@ -40,12 +46,8 @@ public class FlushRequest extends IndicesReplicationOperationRequest { return this; } - public FlushRequest timeout(TimeValue timeout) { - this.timeout = timeout; + @Override public FlushRequest operationThreading(BroadcastOperationThreading operationThreading) { + super.operationThreading(operationThreading); return this; } - - FlushRequest() { - - } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushResponse.java index 505331d6541..11c4f9cdc4f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushResponse.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushResponse.java @@ -19,47 +19,30 @@ package org.elasticsearch.action.admin.indices.flush; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.util.io.Streamable; +import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; /** * @author kimchy (Shay Banon) */ -public class FlushResponse implements ActionResponse, Streamable { - - private Map indices = new HashMap(); +public class FlushResponse extends BroadcastOperationResponse { FlushResponse() { } - public Map indices() { - return indices; - } - - public IndexFlushResponse index(String index) { - return indices.get(index); + FlushResponse(int successfulShards, int failedShards) { + super(successfulShards, failedShards); } @Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException { - int size = in.readInt(); - for (int i = 0; i < size; i++) { - IndexFlushResponse indexFlushResponse = new IndexFlushResponse(); - indexFlushResponse.readFrom(in); - indices.put(indexFlushResponse.index(), indexFlushResponse); - } + super.readFrom(in); } @Override public void writeTo(DataOutput out) throws IOException { - out.writeInt(indices.size()); - for (IndexFlushResponse indexFlushResponse : indices.values()) { - indexFlushResponse.writeTo(out); - } + super.writeTo(out); } -} \ No newline at end of file +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/IndexFlushRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/IndexFlushRequest.java deleted file mode 100644 index 6447ae2aef0..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/IndexFlushRequest.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.admin.indices.flush; - -import org.elasticsearch.action.support.replication.IndexReplicationOperationRequest; -import org.elasticsearch.util.TimeValue; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -/** - * @author kimchy (Shay Banon) - */ -public class IndexFlushRequest extends IndexReplicationOperationRequest { - - public IndexFlushRequest(String index) { - this.index = index; - } - - IndexFlushRequest(FlushRequest request, String index) { - this.index = index; - this.timeout = request.timeout(); - } - - IndexFlushRequest() { - } - - public IndexFlushRequest timeout(TimeValue timeout) { - this.timeout = timeout; - return this; - } - - public void readFrom(DataInput in) throws IOException, ClassNotFoundException { - super.readFrom(in); - } - - public void writeTo(DataOutput out) throws IOException { - super.writeTo(out); - } -} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/IndexFlushResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/IndexFlushResponse.java deleted file mode 100644 index a5965ab7f6e..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/IndexFlushResponse.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.admin.indices.flush; - -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.util.io.Streamable; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -/** - * @author kimchy (Shay Banon) - */ -public class IndexFlushResponse implements ActionResponse, Streamable { - - private String index; - - private int successfulShards; - - private int failedShards; - - IndexFlushResponse(String index, int successfulShards, int failedShards) { - this.index = index; - this.successfulShards = successfulShards; - this.failedShards = failedShards; - } - - IndexFlushResponse() { - - } - - public String index() { - return index; - } - - public int successfulShards() { - return successfulShards; - } - - public int failedShards() { - return failedShards; - } - - public int totalShards() { - return successfulShards + failedShards; - } - - @Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException { - index = in.readUTF(); - successfulShards = in.readInt(); - failedShards = in.readInt(); - } - - @Override public void writeTo(DataOutput out) throws IOException { - out.writeUTF(index); - out.writeInt(successfulShards); - out.writeInt(failedShards); - } -} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java index 0cb8eede57c..b6b700d706b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.admin.indices.flush; -import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest; +import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest; import java.io.DataInput; import java.io.DataOutput; @@ -28,34 +28,20 @@ import java.io.IOException; /** * @author kimchy (Shay Banon) */ -public class ShardFlushRequest extends ShardReplicationOperationRequest { - - private int shardId; - - public ShardFlushRequest(IndexFlushRequest indexFlushRequest, int shardId) { - this(indexFlushRequest.index(), shardId); - timeout = indexFlushRequest.timeout(); - } - - public ShardFlushRequest(String index, int shardId) { - this.index = index; - this.shardId = shardId; - } +public class ShardFlushRequest extends BroadcastShardOperationRequest { ShardFlushRequest() { } - public int shardId() { - return this.shardId; + public ShardFlushRequest(String index, int shardId) { + super(index, shardId); } @Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException { super.readFrom(in); - shardId = in.readInt(); } @Override public void writeTo(DataOutput out) throws IOException { super.writeTo(out); - out.writeInt(shardId); } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushResponse.java index d09e0876733..42e8da294d2 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushResponse.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushResponse.java @@ -19,8 +19,7 @@ package org.elasticsearch.action.admin.indices.flush; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.util.io.Streamable; +import org.elasticsearch.action.support.broadcast.BroadcastShardOperationResponse; import java.io.DataInput; import java.io.DataOutput; @@ -29,15 +28,21 @@ import java.io.IOException; /** * @author kimchy (Shay Banon) */ -public class ShardFlushResponse implements ActionResponse, Streamable { +public class ShardFlushResponse extends BroadcastShardOperationResponse { ShardFlushResponse() { } + public ShardFlushResponse(String index, int shardId) { + super(index, shardId); + } + @Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException { + super.readFrom(in); } @Override public void writeTo(DataOutput out) throws IOException { + super.writeTo(out); } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java index 561654cc774..9772807b73d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java @@ -20,9 +20,15 @@ package org.elasticsearch.action.admin.indices.flush; import com.google.inject.Inject; +import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.TransportActions; -import org.elasticsearch.action.support.replication.TransportIndicesReplicationOperationAction; +import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction; import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.util.settings.Settings; @@ -32,36 +38,91 @@ import java.util.concurrent.atomic.AtomicReferenceArray; /** * @author kimchy (Shay Banon) */ -public class TransportFlushAction extends TransportIndicesReplicationOperationAction { +public class TransportFlushAction extends TransportBroadcastOperationAction { - @Inject public TransportFlushAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, TransportIndexFlushAction indexFlushAction) { - super(settings, transportService, clusterService, threadPool, indexFlushAction); - } - - @Override protected FlushRequest newRequestInstance() { - return new FlushRequest(); - } - - @Override protected FlushResponse newResponseInstance(FlushRequest request, AtomicReferenceArray indexResponses) { - FlushResponse response = new FlushResponse(); - for (int i = 0; i < indexResponses.length(); i++) { - IndexFlushResponse indexFlushResponse = (IndexFlushResponse) indexResponses.get(i); - if (indexFlushResponse != null) { - response.indices().put(indexFlushResponse.index(), indexFlushResponse); - } - } - return response; - } - - @Override protected boolean accumulateExceptions() { - return false; + @Inject public TransportFlushAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, IndicesService indicesService) { + super(settings, threadPool, clusterService, transportService, indicesService); } @Override protected String transportAction() { return TransportActions.Admin.Indices.FLUSH; } - @Override protected IndexFlushRequest newIndexRequestInstance(FlushRequest request, String index) { - return new IndexFlushRequest(request, index); + @Override protected String transportShardAction() { + return "indices/flush/shard"; } + + @Override protected FlushRequest newRequest() { + return new FlushRequest(); + } + + @Override protected FlushResponse newResponse(FlushRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) { + int successfulShards = 0; + int failedShards = 0; + for (int i = 0; i < shardsResponses.length(); i++) { + ShardFlushResponse shardCountResponse = (ShardFlushResponse) shardsResponses.get(i); + if (shardCountResponse == null) { + failedShards++; + } else { + successfulShards++; + } + } + return new FlushResponse(successfulShards, failedShards); + } + + @Override protected ShardFlushRequest newShardRequest() { + return new ShardFlushRequest(); + } + + @Override protected ShardFlushRequest newShardRequest(ShardRouting shard, FlushRequest request) { + return new ShardFlushRequest(shard.index(), shard.id()); + } + + @Override protected ShardFlushResponse newShardResponse() { + return new ShardFlushResponse(); + } + + @Override protected ShardFlushResponse shardOperation(ShardFlushRequest request) throws ElasticSearchException { + IndexShard indexShard = indicesService.indexServiceSafe(request.index()).shardSafe(request.shardId()); + indexShard.flush(); + return new ShardFlushResponse(request.index(), request.shardId()); + } + + @Override protected boolean accumulateExceptions() { + return false; + } + + /** + * The refresh request works against *all* shards. + */ + @Override protected GroupShardsIterator shards(FlushRequest request, ClusterState clusterState) { + return clusterState.routingTable().allShardsGrouped(request.indices()); + } + + // @Override protected FlushRequest newRequestInstance() { +// return new FlushRequest(); +// } +// +// @Override protected FlushResponse newResponseInstance(FlushRequest request, AtomicReferenceArray indexResponses) { +// FlushResponse response = new FlushResponse(); +// for (int i = 0; i < indexResponses.length(); i++) { +// IndexFlushResponse indexFlushResponse = (IndexFlushResponse) indexResponses.get(i); +// if (indexFlushResponse != null) { +// response.indices().put(indexFlushResponse.index(), indexFlushResponse); +// } +// } +// return response; +// } +// +// @Override protected boolean accumulateExceptions() { +// return false; +// } +// +// @Override protected String transportAction() { +// return TransportActions.Admin.Indices.FLUSH; +// } +// +// @Override protected IndexFlushRequest newIndexRequestInstance(FlushRequest request, String index) { +// return new IndexFlushRequest(request, index); +// } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportIndexFlushAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportIndexFlushAction.java deleted file mode 100644 index 5470b7c9ec4..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportIndexFlushAction.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.admin.indices.flush; - -import com.google.inject.Inject; -import org.elasticsearch.action.support.replication.TransportIndexReplicationOperationAction; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.index.Index; -import org.elasticsearch.indices.IndexMissingException; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; -import org.elasticsearch.util.settings.Settings; - -import java.util.concurrent.atomic.AtomicReferenceArray; - -/** - * @author kimchy (Shay Banon) - */ -public class TransportIndexFlushAction extends TransportIndexReplicationOperationAction { - - private final ClusterService clusterService; - - @Inject public TransportIndexFlushAction(Settings settings, ClusterService clusterService, - TransportService transportService, ThreadPool threadPool, - TransportShardFlushAction shardFlushAction) { - super(settings, transportService, threadPool, shardFlushAction); - this.clusterService = clusterService; - } - - @Override protected IndexFlushRequest newRequestInstance() { - return new IndexFlushRequest(); - } - - @Override protected IndexFlushResponse newResponseInstance(IndexFlushRequest indexFlushReqest, AtomicReferenceArray shardsResponses) { - int successfulShards = 0; - int failedShards = 0; - for (int i = 0; i < shardsResponses.length(); i++) { - if (shardsResponses.get(i) == null) { - failedShards++; - } else { - successfulShards++; - } - } - return new IndexFlushResponse(indexFlushReqest.index(), successfulShards, failedShards); - } - - @Override protected boolean accumulateExceptions() { - return false; - } - - @Override protected String transportAction() { - return "indices/index/flush"; - } - - @Override protected GroupShardsIterator shards(IndexFlushRequest indexFlushRequest) { - IndexRoutingTable indexRouting = clusterService.state().routingTable().index(indexFlushRequest.index()); - if (indexRouting == null) { - throw new IndexMissingException(new Index(indexFlushRequest.index())); - } - return indexRouting.groupByShardsIt(); - } - - @Override protected ShardFlushRequest newShardRequestInstance(IndexFlushRequest indexFlushRequest, int shardId) { - return new ShardFlushRequest(indexFlushRequest, shardId); - } -} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java deleted file mode 100644 index c71e8168e93..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.admin.indices.flush; - -import com.google.inject.Inject; -import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.action.shard.ShardStateAction; -import org.elasticsearch.cluster.routing.ShardsIterator; -import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; -import org.elasticsearch.util.settings.Settings; - -/** - * @author kimchy (Shay Banon) - */ -public class TransportShardFlushAction extends TransportShardReplicationOperationAction { - - @Inject public TransportShardFlushAction(Settings settings, TransportService transportService, - ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, - ShardStateAction shardStateAction) { - super(settings, transportService, clusterService, indicesService, threadPool, shardStateAction); - } - - @Override protected ShardFlushRequest newRequestInstance() { - return new ShardFlushRequest(); - } - - @Override protected ShardFlushResponse newResponseInstance() { - return new ShardFlushResponse(); - } - - @Override protected String transportAction() { - return "indices/index/shard/flush"; - } - - @Override protected ShardFlushResponse shardOperationOnPrimary(ShardOperationRequest shardRequest) { - ShardFlushRequest request = shardRequest.request; - indexShard(shardRequest).flush(); - return new ShardFlushResponse(); - } - - @Override protected void shardOperationOnBackup(ShardOperationRequest shardRequest) { - ShardFlushRequest request = shardRequest.request; - indexShard(shardRequest).flush(); - } - - @Override protected ShardsIterator shards(ShardFlushRequest request) { - return clusterService.state().routingTable().index(request.index()).shard(request.shardId()).shardsIt(); - } -} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/IndexRefreshRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/IndexRefreshRequest.java deleted file mode 100644 index 15d3e6c648f..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/IndexRefreshRequest.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.admin.indices.refresh; - -import org.elasticsearch.action.support.replication.IndexReplicationOperationRequest; -import org.elasticsearch.util.TimeValue; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -/** - * @author kimchy (Shay Banon) - */ -public class IndexRefreshRequest extends IndexReplicationOperationRequest { - - private boolean waitForOperations = true; - - public IndexRefreshRequest(String index) { - this.index = index; - } - - IndexRefreshRequest(RefreshRequest request, String index) { - this.index = index; - this.timeout = request.timeout(); - this.waitForOperations = request.waitForOperations(); - } - - IndexRefreshRequest() { - } - - public IndexRefreshRequest timeout(TimeValue timeout) { - this.timeout = timeout; - return this; - } - - public boolean waitForOperations() { - return waitForOperations; - } - - public IndexRefreshRequest waitForOperations(boolean waitForOperations) { - this.waitForOperations = waitForOperations; - return this; - } - - public void readFrom(DataInput in) throws IOException, ClassNotFoundException { - super.readFrom(in); - waitForOperations = in.readBoolean(); - } - - public void writeTo(DataOutput out) throws IOException { - super.writeTo(out); - out.writeBoolean(waitForOperations); - } -} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/IndexRefreshResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/IndexRefreshResponse.java deleted file mode 100644 index 0146094cfff..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/IndexRefreshResponse.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.admin.indices.refresh; - -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.util.io.Streamable; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -/** - * @author kimchy (Shay Banon) - */ -public class IndexRefreshResponse implements ActionResponse, Streamable { - - private String index; - - private int successfulShards; - - private int failedShards; - - IndexRefreshResponse(String index, int successfulShards, int failedShards) { - this.index = index; - this.successfulShards = successfulShards; - this.failedShards = failedShards; - } - - IndexRefreshResponse() { - - } - - public String index() { - return index; - } - - public int successfulShards() { - return successfulShards; - } - - public int failedShards() { - return failedShards; - } - - public int totalShards() { - return successfulShards + failedShards; - } - - @Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException { - index = in.readUTF(); - successfulShards = in.readInt(); - failedShards = in.readInt(); - } - - @Override public void writeTo(DataOutput out) throws IOException { - out.writeUTF(index); - out.writeInt(successfulShards); - out.writeInt(failedShards); - } -} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshRequest.java index 33689bb7b20..d7ddea871e6 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshRequest.java @@ -19,8 +19,8 @@ package org.elasticsearch.action.admin.indices.refresh; -import org.elasticsearch.action.support.replication.IndicesReplicationOperationRequest; -import org.elasticsearch.util.TimeValue; +import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest; +import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading; import java.io.DataInput; import java.io.DataOutput; @@ -29,7 +29,7 @@ import java.io.IOException; /** * @author kimchy (Shay Banon) */ -public class RefreshRequest extends IndicesReplicationOperationRequest { +public class RefreshRequest extends BroadcastOperationRequest { private boolean waitForOperations = true; @@ -38,12 +38,9 @@ public class RefreshRequest extends IndicesReplicationOperationRequest { } public RefreshRequest(String... indices) { - this.indices = indices; - } - - public RefreshRequest timeout(TimeValue timeout) { - this.timeout = timeout; - return this; + super(indices, null); + // we want to do the refresh in parallel on local shards... + operationThreading(BroadcastOperationThreading.THREAD_PER_SHARD); } RefreshRequest() { @@ -55,6 +52,11 @@ public class RefreshRequest extends IndicesReplicationOperationRequest { return this; } + @Override public RefreshRequest operationThreading(BroadcastOperationThreading operationThreading) { + super.operationThreading(operationThreading); + return this; + } + public boolean waitForOperations() { return waitForOperations; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshResponse.java index 080904dfdea..ed22a4b8a43 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshResponse.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/RefreshResponse.java @@ -19,47 +19,30 @@ package org.elasticsearch.action.admin.indices.refresh; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.util.io.Streamable; +import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; /** * @author kimchy (Shay Banon) */ -public class RefreshResponse implements ActionResponse, Streamable { - - private Map indices = new HashMap(); +public class RefreshResponse extends BroadcastOperationResponse { RefreshResponse() { } - public Map indices() { - return indices; - } - - public IndexRefreshResponse index(String index) { - return indices.get(index); + RefreshResponse(int successfulShards, int failedShards) { + super(successfulShards, failedShards); } @Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException { - int size = in.readInt(); - for (int i = 0; i < size; i++) { - IndexRefreshResponse response = new IndexRefreshResponse(); - response.readFrom(in); - indices.put(response.index(), response); - } + super.readFrom(in); } @Override public void writeTo(DataOutput out) throws IOException { - out.writeInt(indices.size()); - for (IndexRefreshResponse indexRefreshResponse : indices.values()) { - indexRefreshResponse.writeTo(out); - } + super.writeTo(out); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/ShardRefreshRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/ShardRefreshRequest.java index e0e0365a598..2539951f652 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/ShardRefreshRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/ShardRefreshRequest.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.admin.indices.refresh; -import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest; +import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest; import java.io.DataInput; import java.io.DataOutput; @@ -28,27 +28,16 @@ import java.io.IOException; /** * @author kimchy (Shay Banon) */ -public class ShardRefreshRequest extends ShardReplicationOperationRequest { +public class ShardRefreshRequest extends BroadcastShardOperationRequest { - private int shardId; private boolean waitForOperations = true; - public ShardRefreshRequest(IndexRefreshRequest request, int shardId) { - this(request.index(), shardId); - timeout = request.timeout(); - waitForOperations = request.waitForOperations(); - } - - public ShardRefreshRequest(String index, int shardId) { - this.index = index; - this.shardId = shardId; - } - ShardRefreshRequest() { } - public int shardId() { - return this.shardId; + public ShardRefreshRequest(String index, int shardId, RefreshRequest request) { + super(index, shardId); + waitForOperations = request.waitForOperations(); } public boolean waitForOperations() { @@ -62,13 +51,11 @@ public class ShardRefreshRequest extends ShardReplicationOperationRequest { @Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException { super.readFrom(in); - shardId = in.readInt(); waitForOperations = in.readBoolean(); } @Override public void writeTo(DataOutput out) throws IOException { super.writeTo(out); - out.writeInt(shardId); out.writeBoolean(waitForOperations); } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/ShardRefreshResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/ShardRefreshResponse.java index ebe348452f5..f0e1849bbdc 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/ShardRefreshResponse.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/ShardRefreshResponse.java @@ -19,8 +19,7 @@ package org.elasticsearch.action.admin.indices.refresh; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.util.io.Streamable; +import org.elasticsearch.action.support.broadcast.BroadcastShardOperationResponse; import java.io.DataInput; import java.io.DataOutput; @@ -29,14 +28,20 @@ import java.io.IOException; /** * @author kimchy (Shay Banon) */ -public class ShardRefreshResponse implements ActionResponse, Streamable { +public class ShardRefreshResponse extends BroadcastShardOperationResponse { ShardRefreshResponse() { } + public ShardRefreshResponse(String index, int shardId) { + super(index, shardId); + } + @Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException { + super.readFrom(in); } @Override public void writeTo(DataOutput out) throws IOException { + super.writeTo(out); } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportIndexRefreshAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportIndexRefreshAction.java deleted file mode 100644 index e3f078e788b..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportIndexRefreshAction.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.admin.indices.refresh; - -import com.google.inject.Inject; -import org.elasticsearch.action.support.replication.TransportIndexReplicationOperationAction; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; -import org.elasticsearch.util.settings.Settings; - -import java.util.concurrent.atomic.AtomicReferenceArray; - -/** - * @author kimchy (Shay Banon) - */ -public class TransportIndexRefreshAction extends TransportIndexReplicationOperationAction { - - private final ClusterService clusterService; - - @Inject public TransportIndexRefreshAction(Settings settings, ClusterService clusterService, - TransportService transportService, ThreadPool threadPool, - TransportShardRefreshAction shardRefreshAction) { - super(settings, transportService, threadPool, shardRefreshAction); - this.clusterService = clusterService; - } - - @Override protected IndexRefreshRequest newRequestInstance() { - return new IndexRefreshRequest(); - } - - @Override protected IndexRefreshResponse newResponseInstance(IndexRefreshRequest request, AtomicReferenceArray shardsResponses) { - int successfulShards = 0; - int failedShards = 0; - for (int i = 0; i < shardsResponses.length(); i++) { - if (shardsResponses.get(i) == null) { - failedShards++; - } else { - successfulShards++; - } - } - return new IndexRefreshResponse(request.index(), successfulShards, failedShards); - } - - @Override protected boolean accumulateExceptions() { - return false; - } - - @Override protected String transportAction() { - return "indices/index/refresh"; - } - - @Override protected GroupShardsIterator shards(IndexRefreshRequest request) { - return clusterService.state().routingTable().index(request.index()).groupByShardsIt(); - } - - @Override protected ShardRefreshRequest newShardRequestInstance(IndexRefreshRequest request, int shardId) { - return new ShardRefreshRequest(request, shardId); - } -} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java index 71499f84249..b88c5514a69 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java @@ -20,9 +20,15 @@ package org.elasticsearch.action.admin.indices.refresh; import com.google.inject.Inject; +import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.TransportActions; -import org.elasticsearch.action.support.replication.TransportIndicesReplicationOperationAction; +import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction; import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.util.settings.Settings; @@ -32,36 +38,65 @@ import java.util.concurrent.atomic.AtomicReferenceArray; /** * @author kimchy (Shay Banon) */ -public class TransportRefreshAction extends TransportIndicesReplicationOperationAction { +public class TransportRefreshAction extends TransportBroadcastOperationAction { - @Inject public TransportRefreshAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, TransportIndexRefreshAction indexAction) { - super(settings, transportService, clusterService, threadPool, indexAction); - } - - @Override protected RefreshRequest newRequestInstance() { - return new RefreshRequest(); - } - - @Override protected RefreshResponse newResponseInstance(RefreshRequest request, AtomicReferenceArray indexResponses) { - RefreshResponse response = new RefreshResponse(); - for (int i = 0; i < indexResponses.length(); i++) { - IndexRefreshResponse indexResponse = (IndexRefreshResponse) indexResponses.get(i); - if (indexResponse != null) { - response.indices().put(indexResponse.index(), indexResponse); - } - } - return response; - } - - @Override protected boolean accumulateExceptions() { - return false; + @Inject public TransportRefreshAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, + TransportService transportService, IndicesService indicesService) { + super(settings, threadPool, clusterService, transportService, indicesService); } @Override protected String transportAction() { return TransportActions.Admin.Indices.REFRESH; } - @Override protected IndexRefreshRequest newIndexRequestInstance(RefreshRequest request, String index) { - return new IndexRefreshRequest(request, index); + @Override protected String transportShardAction() { + return "indices/refresh/shard"; + } + + @Override protected RefreshRequest newRequest() { + return new RefreshRequest(); + } + + @Override protected RefreshResponse newResponse(RefreshRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) { + int successfulShards = 0; + int failedShards = 0; + for (int i = 0; i < shardsResponses.length(); i++) { + ShardRefreshResponse shardCountResponse = (ShardRefreshResponse) shardsResponses.get(i); + if (shardCountResponse == null) { + failedShards++; + } else { + successfulShards++; + } + } + return new RefreshResponse(successfulShards, failedShards); + } + + @Override protected ShardRefreshRequest newShardRequest() { + return new ShardRefreshRequest(); + } + + @Override protected ShardRefreshRequest newShardRequest(ShardRouting shard, RefreshRequest request) { + return new ShardRefreshRequest(shard.index(), shard.id(), request); + } + + @Override protected ShardRefreshResponse newShardResponse() { + return new ShardRefreshResponse(); + } + + @Override protected ShardRefreshResponse shardOperation(ShardRefreshRequest request) throws ElasticSearchException { + IndexShard indexShard = indicesService.indexServiceSafe(request.index()).shardSafe(request.shardId()); + indexShard.refresh(request.waitForOperations()); + return new ShardRefreshResponse(request.index(), request.shardId()); + } + + @Override protected boolean accumulateExceptions() { + return false; + } + + /** + * The refresh request works against *all* shards. + */ + @Override protected GroupShardsIterator shards(RefreshRequest request, ClusterState clusterState) { + return clusterState.routingTable().allShardsGrouped(request.indices()); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java deleted file mode 100644 index 20b15566ef0..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.admin.indices.refresh; - -import com.google.inject.Inject; -import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.action.shard.ShardStateAction; -import org.elasticsearch.cluster.routing.ShardsIterator; -import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; -import org.elasticsearch.util.settings.Settings; - -/** - * @author kimchy (Shay Banon) - */ -public class TransportShardRefreshAction extends TransportShardReplicationOperationAction { - - @Inject public TransportShardRefreshAction(Settings settings, TransportService transportService, - ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, - ShardStateAction shardStateAction) { - super(settings, transportService, clusterService, indicesService, threadPool, shardStateAction); - } - - @Override protected ShardRefreshRequest newRequestInstance() { - return new ShardRefreshRequest(); - } - - @Override protected ShardRefreshResponse newResponseInstance() { - return new ShardRefreshResponse(); - } - - @Override protected String transportAction() { - return "indices/index/shard/refresh"; - } - - @Override protected ShardRefreshResponse shardOperationOnPrimary(ShardOperationRequest shardRequest) { - ShardRefreshRequest request = shardRequest.request; - indexShard(shardRequest).refresh(request.waitForOperations()); - return new ShardRefreshResponse(); - } - - @Override protected void shardOperationOnBackup(ShardOperationRequest shardRequest) { - ShardRefreshRequest request = shardRequest.request; - indexShard(shardRequest).refresh(request.waitForOperations()); - } - - @Override protected ShardsIterator shards(ShardRefreshRequest request) { - return clusterService.state().routingTable().index(request.index()).shard(request.shardId()).shardsIt(); - } -} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndicesStatusRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndicesStatusRequest.java index cc28e6c6958..e22a5ba8b79 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndicesStatusRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndicesStatusRequest.java @@ -19,21 +19,21 @@ package org.elasticsearch.action.admin.indices.status; -import org.elasticsearch.action.support.shards.ShardsOperationRequest; -import org.elasticsearch.action.support.shards.ShardsOperationThreading; +import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest; +import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading; import org.elasticsearch.util.Strings; /** * @author kimchy (Shay Banon) */ -public class IndicesStatusRequest extends ShardsOperationRequest { +public class IndicesStatusRequest extends BroadcastOperationRequest { public IndicesStatusRequest() { this(Strings.EMPTY_ARRAY); } public IndicesStatusRequest(String... indices) { - super(indices); + super(indices, null); } @Override public IndicesStatusRequest listenerThreaded(boolean listenerThreaded) { @@ -41,8 +41,7 @@ public class IndicesStatusRequest extends ShardsOperationRequest { return this; } - @Override public IndicesStatusRequest operationThreading(ShardsOperationThreading operationThreading) { - super.operationThreading(operationThreading); - return this; + @Override public BroadcastOperationRequest operationThreading(BroadcastOperationThreading operationThreading) { + return super.operationThreading(operationThreading); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndicesStatusResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndicesStatusResponse.java index 0e7a7397d6b..6feff82e4e7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndicesStatusResponse.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndicesStatusResponse.java @@ -20,7 +20,7 @@ package org.elasticsearch.action.admin.indices.status; import com.google.common.collect.ImmutableMap; -import org.elasticsearch.action.support.shards.ShardsOperationResponse; +import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.util.settings.Settings; @@ -38,7 +38,9 @@ import static org.elasticsearch.util.settings.ImmutableSettings.*; /** * @author kimchy (Shay Banon) */ -public class IndicesStatusResponse extends ShardsOperationResponse { +public class IndicesStatusResponse extends BroadcastOperationResponse { + + protected ShardStatus[] shards; private Map indicesSettings = ImmutableMap.of(); @@ -47,8 +49,9 @@ public class IndicesStatusResponse extends ShardsOperationResponse IndicesStatusResponse() { } - IndicesStatusResponse(ShardStatus[] shards, ClusterState clusterState) { - super(shards); + IndicesStatusResponse(ShardStatus[] shards, ClusterState clusterState, int successfulShards, int failedShards) { + super(successfulShards, failedShards); + this.shards = shards; indicesSettings = newHashMap(); for (ShardStatus shard : shards) { if (!indicesSettings.containsKey(shard.shardRouting().index())) { @@ -57,6 +60,10 @@ public class IndicesStatusResponse extends ShardsOperationResponse } } + public ShardStatus[] shards() { + return this.shards; + } + public IndexStatus index(String index) { return indices().get(index); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/ShardStatus.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/ShardStatus.java index 17470aea55c..0961644cd78 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/ShardStatus.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/ShardStatus.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.admin.indices.status; -import org.elasticsearch.action.support.shards.ShardOperationResponse; +import org.elasticsearch.action.support.broadcast.BroadcastShardOperationResponse; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.util.SizeValue; @@ -28,12 +28,13 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import static org.elasticsearch.cluster.routing.ImmutableShardRouting.*; import static org.elasticsearch.util.SizeValue.*; /** * @author kimchy (Shay Banon) */ -public class ShardStatus extends ShardOperationResponse { +public class ShardStatus extends BroadcastShardOperationResponse { public static class Docs { public static final Docs UNKNOWN = new Docs(); @@ -55,6 +56,8 @@ public class ShardStatus extends ShardOperationResponse { } } + private ShardRouting shardRouting; + IndexShardState state; SizeValue storeSize = SizeValue.UNKNOWN; @@ -71,7 +74,12 @@ public class ShardStatus extends ShardOperationResponse { } ShardStatus(ShardRouting shardRouting) { - super(shardRouting); + super(shardRouting.index(), shardRouting.id()); + this.shardRouting = shardRouting; + } + + public ShardRouting shardRouting() { + return this.shardRouting; } public IndexShardState state() { @@ -106,6 +114,7 @@ public class ShardStatus extends ShardOperationResponse { @Override public void writeTo(DataOutput out) throws IOException { super.writeTo(out); + shardRouting.writeTo(out); out.writeByte(state.id()); storeSize.writeTo(out); estimatedFlushableMemorySize.writeTo(out); @@ -118,6 +127,7 @@ public class ShardStatus extends ShardOperationResponse { @Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException { super.readFrom(in); + shardRouting = readShardRoutingEntry(in); state = IndexShardState.fromId(in.readByte()); storeSize = readSizeValue(in); estimatedFlushableMemorySize = readSizeValue(in); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java index df924b9d27a..ae6b2c84012 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java @@ -22,10 +22,11 @@ package org.elasticsearch.action.admin.indices.status; import com.google.inject.Inject; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.TransportActions; -import org.elasticsearch.action.support.shards.ShardOperationRequest; -import org.elasticsearch.action.support.shards.TransportShardsOperationActions; +import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest; +import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.InternalIndexShard; @@ -43,10 +44,10 @@ import static com.google.common.collect.Lists.*; /** * @author kimchy (Shay Banon) */ -public class TransportIndicesStatusAction extends TransportShardsOperationActions { +public class TransportIndicesStatusAction extends TransportBroadcastOperationAction { - @Inject public TransportIndicesStatusAction(Settings settings, ClusterService clusterService, TransportService transportService, IndicesService indicesService, ThreadPool threadPool) { - super(settings, clusterService, transportService, indicesService, threadPool); + @Inject public TransportIndicesStatusAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, IndicesService indicesService) { + super(settings, threadPool, clusterService, transportService, indicesService); } @Override protected String transportAction() { @@ -61,6 +62,22 @@ public class TransportIndicesStatusAction extends TransportShardsOperationAction return new IndicesStatusRequest(); } + @Override protected IndicesStatusResponse newResponse(IndicesStatusRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) { + int successfulShards = 0; + int failedShards = 0; + final List shards = newArrayList(); + for (int i = 0; i < shardsResponses.length(); i++) { + Object resp = shardsResponses.get(i); + if (resp instanceof ShardStatus) { + shards.add((ShardStatus) resp); + successfulShards++; + } else { + failedShards++; + } + } + return new IndicesStatusResponse(shards.toArray(new ShardStatus[shards.size()]), clusterState, successfulShards, failedShards); + } + @Override protected IndexShardStatusRequest newShardRequest() { return new IndexShardStatusRequest(); } @@ -73,21 +90,6 @@ public class TransportIndicesStatusAction extends TransportShardsOperationAction return new ShardStatus(); } - @Override protected boolean accumulateExceptions() { - return false; - } - - @Override protected IndicesStatusResponse newResponse(IndicesStatusRequest request, ClusterState clusterState, AtomicReferenceArray shardsResponses) { - final List shards = newArrayList(); - for (int i = 0; i < shardsResponses.length(); i++) { - Object resp = shardsResponses.get(i); - if (resp instanceof ShardStatus) { - shards.add((ShardStatus) resp); - } - } - return new IndicesStatusResponse(shards.toArray(new ShardStatus[shards.size()]), clusterState); - } - @Override protected ShardStatus shardOperation(IndexShardStatusRequest request) throws ElasticSearchException { InternalIndexShard indexShard = (InternalIndexShard) indicesService.indexServiceSafe(request.index()).shard(request.shardId()); ShardStatus shardStatus = new ShardStatus(indexShard.routingEntry()); @@ -112,7 +114,18 @@ public class TransportIndicesStatusAction extends TransportShardsOperationAction return shardStatus; } - public static class IndexShardStatusRequest extends ShardOperationRequest { + @Override protected boolean accumulateExceptions() { + return false; + } + + /** + * Status goes across *all* shards. + */ + @Override protected GroupShardsIterator shards(IndicesStatusRequest request, ClusterState clusterState) { + return clusterState.routingTable().allShardsGrouped(request.indices()); + } + + public static class IndexShardStatusRequest extends BroadcastShardOperationRequest { IndexShardStatusRequest() { } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/TransportCountAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/TransportCountAction.java index dfecdc4b0d2..977ef82f0d2 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/TransportCountAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/TransportCountAction.java @@ -24,6 +24,8 @@ import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.TransportActions; import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction; import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; @@ -33,6 +35,8 @@ import org.elasticsearch.util.settings.Settings; import java.util.concurrent.atomic.AtomicReferenceArray; +import static org.elasticsearch.action.Actions.*; + /** * @author kimchy (Shay Banon) */ @@ -66,7 +70,11 @@ public class TransportCountAction extends TransportBroadcastOperationAction listener; + private final ClusterState clusterState; + private final Nodes nodes; private final GroupShardsIterator shardsIts; @@ -110,9 +113,9 @@ public abstract class TransportBroadcastOperationAction shardIt, Exception e, boolean alreadyThreaded) { + @SuppressWarnings({"unchecked"}) private void onOperation(ShardRouting shard, final Iterator shardIt, Exception e, boolean alreadyThreaded) { if (logger.isDebugEnabled()) { if (e != null) { logger.debug(shard.shortSummary() + ": Failed to execute [" + request + "]", e); @@ -251,11 +254,11 @@ public abstract class TransportBroadcastOperationAction implements ActionResponse { - - protected ShardResponse[] shards; - - protected ShardsOperationResponse() { - } - - protected ShardsOperationResponse(ShardResponse[] shards) { - this.shards = shards; - } - - public ShardResponse[] shards() { - return this.shards; - } - - @Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException { - } - - @Override public void writeTo(DataOutput out) throws IOException { - } -} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/shards/ShardsOperationThreading.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/shards/ShardsOperationThreading.java deleted file mode 100644 index 226cc5d70e3..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/shards/ShardsOperationThreading.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.support.shards; - -import org.elasticsearch.ElasticSearchIllegalArgumentException; - -/** - * Controls the operation threading model for shards operation that are performed - * locally on the executing node. - * - * @author kimchy (Shay Banon) - */ -public enum ShardsOperationThreading { - /** - * No threads are used, all the local shards operations will be performed on the calling - * thread. - */ - NO_THREADS((byte) 0), - /** - * The local shards operations will be performed in serial manner on a single forked thread. - */ - SINGLE_THREAD((byte) 1), - /** - * Each local shard operation will execute on its own thread. - */ - THREAD_PER_SHARD((byte) 2); - - private final byte id; - - ShardsOperationThreading(byte id) { - this.id = id; - } - - public byte id() { - return this.id; - } - - public static ShardsOperationThreading fromId(byte id) { - if (id == 0) { - return NO_THREADS; - } - if (id == 1) { - return SINGLE_THREAD; - } - if (id == 2) { - return THREAD_PER_SHARD; - } - throw new ElasticSearchIllegalArgumentException("No type matching id [" + id + "]"); - } - - public static ShardsOperationThreading fromString(String value, ShardsOperationThreading defaultValue) { - if (value == null) { - return defaultValue; - } - return ShardsOperationThreading.valueOf(value.toUpperCase()); - } -} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/shards/TransportShardsOperationActions.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/shards/TransportShardsOperationActions.java deleted file mode 100644 index d1ced46c974..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/shards/TransportShardsOperationActions.java +++ /dev/null @@ -1,291 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.support.shards; - -import org.elasticsearch.ElasticSearchException; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ShardNotActiveException; -import org.elasticsearch.action.ShardOperationFailedException; -import org.elasticsearch.action.support.BaseAction; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.node.Node; -import org.elasticsearch.cluster.node.Nodes; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.*; -import org.elasticsearch.util.settings.Settings; - -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReferenceArray; - -import static org.elasticsearch.action.Actions.*; - -/** - * @author kimchy (Shay Banon) - */ -public abstract class TransportShardsOperationActions extends BaseAction { - - protected final ClusterService clusterService; - - protected final TransportService transportService; - - protected final IndicesService indicesService; - - protected final ThreadPool threadPool; - - protected TransportShardsOperationActions(Settings settings, ClusterService clusterService, TransportService transportService, - IndicesService indicesService, ThreadPool threadPool) { - super(settings); - this.clusterService = clusterService; - this.transportService = transportService; - this.indicesService = indicesService; - this.threadPool = threadPool; - - transportService.registerHandler(transportAction(), new TransportHandler()); - transportService.registerHandler(transportShardAction(), new ShardTransportHandler()); - } - - @Override protected void doExecute(Request request, ActionListener listener) { - new AsyncBroadcastAction(request, listener).start(); - } - - protected abstract String transportAction(); - - protected abstract String transportShardAction(); - - protected abstract Request newRequest(); - - protected abstract Response newResponse(Request request, ClusterState clusterState, AtomicReferenceArray shardsResponses); - - protected abstract ShardRequest newShardRequest(); - - protected abstract ShardRequest newShardRequest(ShardRouting shard, Request request); - - protected abstract ShardResponse newShardResponse(); - - protected abstract ShardResponse shardOperation(ShardRequest request) throws ElasticSearchException; - - protected abstract boolean accumulateExceptions(); - - private class AsyncBroadcastAction { - - private final Request request; - - private final ActionListener listener; - - private final ClusterState clusterState; - - private final Nodes nodes; - - private final List shards; - - private final AtomicInteger opsCounter = new AtomicInteger(); - - private final AtomicInteger indexCounter = new AtomicInteger(); - - private final AtomicReferenceArray shardsResponses; - - private AsyncBroadcastAction(Request request, ActionListener listener) { - this.request = request; - this.listener = listener; - - clusterState = clusterService.state(); - nodes = clusterState.nodes(); - shards = clusterState.routingTable().allShards(processIndices(clusterState, request.indices())); - - shardsResponses = new AtomicReferenceArray(shards.size()); - } - - public void start() { - // count the local operations, and perform the non local ones - int localOperations = 0; - for (final ShardRouting shard : shards) { - if (shard.active()) { - if (shard.currentNodeId().equals(nodes.localNodeId())) { - localOperations++; - } else { - // do the remote operation here, the localAsync flag is not relevant - performOperation(shard, true); - } - } else { - // as if we have a "problem", so we iterate to the next one and maintain counts - onFailure(shard, new ShardNotActiveException(shard.shardId()), false); - } - } - // we have local operations, perform them now - if (localOperations > 0) { - if (request.operationThreading() == ShardsOperationThreading.SINGLE_THREAD) { - threadPool.execute(new Runnable() { - @Override public void run() { - for (final ShardRouting shard : shards) { - if (shard.active()) { - if (shard.currentNodeId().equals(nodes.localNodeId())) { - performOperation(shard, false); - } - } - } - } - }); - } else { - boolean localAsync = request.operationThreading() == ShardsOperationThreading.THREAD_PER_SHARD; - for (final ShardRouting shard : shards) { - if (shard.active()) { - if (shard.currentNodeId().equals(nodes.localNodeId())) { - performOperation(shard, localAsync); - } - } - } - } - } - } - - private void performOperation(final ShardRouting shard, boolean localAsync) { - final ShardRequest shardRequest = newShardRequest(shard, request); - if (shard.currentNodeId().equals(nodes.localNodeId())) { - if (localAsync) { - threadPool.execute(new Runnable() { - @Override public void run() { - try { - onOperation(shard, shardOperation(shardRequest), true); - } catch (Exception e) { - onFailure(shard, e, true); - } - } - }); - } else { - try { - onOperation(shard, shardOperation(shardRequest), false); - } catch (Exception e) { - onFailure(shard, e, false); - } - } - } else { - Node node = nodes.get(shard.currentNodeId()); - transportService.sendRequest(node, transportShardAction(), shardRequest, new BaseTransportResponseHandler() { - @Override public ShardResponse newInstance() { - return newShardResponse(); - } - - @Override public void handleResponse(ShardResponse response) { - onOperation(shard, response, false); - } - - @Override public void handleException(RemoteTransportException exp) { - onFailure(shard, exp, false); - } - - @Override public boolean spawn() { - // we never spawn here, we will spawn if needed in onOperation - return false; - } - }); - } - } - - private void onOperation(ShardRouting shardRouting, ShardResponse shardResponse, boolean alreadyThreaded) { - // need two counters to avoid race conditions - shardsResponses.set(indexCounter.getAndIncrement(), shardResponse); - if (opsCounter.incrementAndGet() == shardsResponses.length()) { - finishHim(alreadyThreaded); - } - } - - private void onFailure(ShardRouting shardRouting, Throwable t, boolean alreadyThreaded) { - int idx = indexCounter.getAndIncrement(); - if (accumulateExceptions()) { - ShardOperationFailedException failedException; - if (t instanceof ShardOperationFailedException) { - failedException = (ShardOperationFailedException) t; - } else { - failedException = new ShardOperationFailedException(shardRouting.shardId(), t); - } - shardsResponses.set(idx, failedException); - } - if (opsCounter.incrementAndGet() == shardsResponses.length()) { - finishHim(alreadyThreaded); - } - } - - private void finishHim(boolean alreadyThreaded) { - // if we need to execute the listener on a thread, and we are not threaded already - // then do it - if (request.listenerThreaded() && !alreadyThreaded) { - threadPool.execute(new Runnable() { - @Override public void run() { - listener.onResponse(newResponse(request, clusterState, shardsResponses)); - } - }); - } else { - listener.onResponse(newResponse(request, clusterState, shardsResponses)); - } - } - } - - private class TransportHandler extends BaseTransportRequestHandler { - - @Override public Request newInstance() { - return newRequest(); - } - - @Override public void messageReceived(Request request, final TransportChannel channel) throws Exception { - // we just send back a response, no need to fork a listener - request.listenerThreaded(false); - // we don't spawn, so if we get a request with no threading, change it to single threaded - if (request.operationThreading() == ShardsOperationThreading.NO_THREADS) { - request.operationThreading(ShardsOperationThreading.SINGLE_THREAD); - } - execute(request, new ActionListener() { - @Override public void onResponse(Response response) { - try { - channel.sendResponse(response); - } catch (Exception e) { - onFailure(e); - } - } - - @Override public void onFailure(Throwable e) { - try { - channel.sendResponse(e); - } catch (Exception e1) { - logger.warn("Failed to send response", e1); - } - } - }); - } - - @Override public boolean spawn() { - return false; - } - } - - private class ShardTransportHandler extends BaseTransportRequestHandler { - - @Override public ShardRequest newInstance() { - return newShardRequest(); - } - - @Override public void messageReceived(ShardRequest request, TransportChannel channel) throws Exception { - channel.sendResponse(shardOperation(request)); - } - } -} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ImmutableShardRouting.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ImmutableShardRouting.java index c64379808ac..2ca9e276e53 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ImmutableShardRouting.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ImmutableShardRouting.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.routing; +import com.google.common.collect.ImmutableList; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.util.io.Streamable; @@ -124,6 +125,10 @@ public class ImmutableShardRouting implements Streamable, Serializable, ShardRou return shardIdentifier; } + @Override public ShardsIterator shardsIt() { + return new PlainShardsIterator(shardId(), ImmutableList.of((ShardRouting) this)); + } + public static ImmutableShardRouting readShardRoutingEntry(DataInput in) throws IOException, ClassNotFoundException { ImmutableShardRouting entry = new ImmutableShardRouting(); entry.readFrom(in); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java index 9a864e981b2..e8ffb4591c0 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java @@ -64,6 +64,10 @@ public class IndexRoutingTable implements Iterable { return shards.get(shardId); } + /** + * A group shards iterator where each group ({@link ShardsIterator} + * is an iterator across shard replication group. + */ public GroupShardsIterator groupByShardsIt() { IdentityHashSet set = new IdentityHashSet(); for (IndexShardRoutingTable indexShard : this) { @@ -72,6 +76,23 @@ public class IndexRoutingTable implements Iterable { return new GroupShardsIterator(set); } + /** + * A groups shards iterator where each groups is a single {@link ShardRouting} and a group + * is created for each shard routing. + * + *

This basically means that components that use the {@link GroupShardsIterator} will itearte + * over *all* the shards (all the replicas) within the index. + */ + public GroupShardsIterator groupByAllIt() { + IdentityHashSet set = new IdentityHashSet(); + for (IndexShardRoutingTable indexShard : this) { + for (ShardRouting shardRouting : indexShard) { + set.add(shardRouting.shardsIt()); + } + } + return new GroupShardsIterator(set); + } + public void validate() throws RoutingValidationException { } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java index cec71a68760..f30c25d5548 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java @@ -70,6 +70,13 @@ public class RoutingTable implements Iterable { return new RoutingNodes(metaData, this); } + /** + * All the shards (replicas) for the provided indices. + * + * @param indices The indices to return all the shards (replicas), can be null or empty array to indicate all indices + * @return All the shards matching the specific index + * @throws IndexMissingException If an index passed does not exists + */ public List allShards(String... indices) throws IndexMissingException { List shards = Lists.newArrayList(); if (indices == null || indices.length == 0) { @@ -89,6 +96,35 @@ public class RoutingTable implements Iterable { return shards; } + /** + * All the shards (replicas) for the provided indices grouped (each group is a single element, consisting + * of the shard). This is handy for components that expect to get group iterators, but still want in some + * cases to iterate over all the shards (and not just one shard in replication group). + * + * @param indices The indices to return all the shards (replicas), can be null or empty array to indicate all indices + * @return All the shards grouped into a single shard element group each + * @throws IndexMissingException If an index passed does not exists + * @see IndexRoutingTable#groupByAllIt() + */ + public GroupShardsIterator allShardsGrouped(String... indices) throws IndexMissingException { + GroupShardsIterator its = new GroupShardsIterator(); + if (indices == null || indices.length == 0) { + indices = indicesRouting.keySet().toArray(new String[indicesRouting.keySet().size()]); + } + for (String index : indices) { + IndexRoutingTable indexRoutingTable = index(index); + if (indexRoutingTable == null) { + throw new IndexMissingException(new Index(index)); + } + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + for (ShardRouting shardRouting : indexShardRoutingTable) { + its.add(shardRouting.shardsIt()); + } + } + } + return its; + } + public static Builder newRoutingTableBuilder() { return new Builder(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java index 700e3bafa97..75d1b20330c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java @@ -63,6 +63,11 @@ public interface ShardRouting extends Streamable, Serializable { String shortSummary(); + /** + * A shard iterator with just this shard in it. + */ + ShardsIterator shardsIt(); + /** * Does not write index name and shard id */ diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/http/action/admin/indices/flush/HttpFlushAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/http/action/admin/indices/flush/HttpFlushAction.java index 19898a1a49d..91a1691d7ef 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/http/action/admin/indices/flush/HttpFlushAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/http/action/admin/indices/flush/HttpFlushAction.java @@ -23,13 +23,11 @@ import com.google.inject.Inject; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.flush.FlushResponse; -import org.elasticsearch.action.admin.indices.flush.IndexFlushResponse; -import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest; +import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading; import org.elasticsearch.client.Client; import org.elasticsearch.http.*; import org.elasticsearch.http.action.support.HttpActions; import org.elasticsearch.http.action.support.HttpJsonBuilder; -import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.json.JsonBuilder; import org.elasticsearch.util.settings.Settings; @@ -50,24 +48,27 @@ public class HttpFlushAction extends BaseHttpServerHandler { @Override public void handleRequest(final HttpRequest request, final HttpChannel channel) { FlushRequest flushRequest = new FlushRequest(HttpActions.splitIndices(request.param("index"))); + // we just send back a response, no need to fork a listener flushRequest.listenerThreaded(false); - flushRequest.timeout(TimeValue.parseTimeValue(request.param("timeout"), ShardReplicationOperationRequest.DEFAULT_TIMEOUT)); + BroadcastOperationThreading operationThreading = BroadcastOperationThreading.fromString(request.param("operationThreading"), BroadcastOperationThreading.SINGLE_THREAD); + if (operationThreading == BroadcastOperationThreading.NO_THREADS) { + // since we don't spawn, don't allow no_threads, but change it to a single thread + operationThreading = BroadcastOperationThreading.THREAD_PER_SHARD; + } + flushRequest.operationThreading(operationThreading); client.admin().indices().execFlush(flushRequest, new ActionListener() { - @Override public void onResponse(FlushResponse result) { + @Override public void onResponse(FlushResponse response) { try { JsonBuilder builder = HttpJsonBuilder.cached(request); builder.startObject(); builder.field("ok", true); - builder.startObject("indices"); - for (IndexFlushResponse indexFlushResponse : result.indices().values()) { - builder.startObject(indexFlushResponse.index()) - .field("ok", true) - .field("totalShards", indexFlushResponse.totalShards()) - .field("successfulShards", indexFlushResponse.successfulShards()) - .field("failedShards", indexFlushResponse.failedShards()) - .endObject(); - } + + builder.startObject("_shards"); + builder.field("total", response.totalShards()); + builder.field("successful", response.successfulShards()); + builder.field("failed", response.failedShards()); builder.endObject(); + builder.endObject(); channel.sendResponse(new JsonHttpResponse(request, OK, builder)); } catch (Exception e) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/http/action/admin/indices/refresh/HttpRefreshAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/http/action/admin/indices/refresh/HttpRefreshAction.java index 29a3a83eee9..ab939c84255 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/http/action/admin/indices/refresh/HttpRefreshAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/http/action/admin/indices/refresh/HttpRefreshAction.java @@ -21,20 +21,18 @@ package org.elasticsearch.http.action.admin.indices.refresh; import com.google.inject.Inject; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.refresh.IndexRefreshResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading; import org.elasticsearch.client.Client; import org.elasticsearch.http.*; import org.elasticsearch.http.action.support.HttpActions; import org.elasticsearch.http.action.support.HttpJsonBuilder; -import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.json.JsonBuilder; import org.elasticsearch.util.settings.Settings; import java.io.IOException; -import static org.elasticsearch.action.support.replication.ShardReplicationOperationRequest.*; import static org.elasticsearch.http.HttpResponse.Status.*; /** @@ -50,24 +48,27 @@ public class HttpRefreshAction extends BaseHttpServerHandler { @Override public void handleRequest(final HttpRequest request, final HttpChannel channel) { RefreshRequest refreshRequest = new RefreshRequest(HttpActions.splitIndices(request.param("index"))); - refreshRequest.timeout(TimeValue.parseTimeValue(request.param("timeout"), DEFAULT_TIMEOUT)); + // we just send back a response, no need to fork a listener refreshRequest.listenerThreaded(false); + BroadcastOperationThreading operationThreading = BroadcastOperationThreading.fromString(request.param("operationThreading"), BroadcastOperationThreading.SINGLE_THREAD); + if (operationThreading == BroadcastOperationThreading.NO_THREADS) { + // since we don't spawn, don't allow no_threads, but change it to a single thread + operationThreading = BroadcastOperationThreading.THREAD_PER_SHARD; + } + refreshRequest.operationThreading(operationThreading); client.admin().indices().execRefresh(refreshRequest, new ActionListener() { - @Override public void onResponse(RefreshResponse result) { + @Override public void onResponse(RefreshResponse response) { try { JsonBuilder builder = HttpJsonBuilder.cached(request); builder.startObject(); builder.field("ok", true); - builder.startObject("indices"); - for (IndexRefreshResponse indexResponse : result.indices().values()) { - builder.startObject(indexResponse.index()) - .field("ok", true) - .field("totalShards", indexResponse.totalShards()) - .field("successfulShards", indexResponse.successfulShards()) - .field("failedShards", indexResponse.failedShards()) - .endObject(); - } + + builder.startObject("_shards"); + builder.field("total", response.totalShards()); + builder.field("successful", response.successfulShards()); + builder.field("failed", response.failedShards()); builder.endObject(); + builder.endObject(); channel.sendResponse(new JsonHttpResponse(request, OK, builder)); } catch (Exception e) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/http/action/admin/indices/status/HttpIndicesStatusAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/http/action/admin/indices/status/HttpIndicesStatusAction.java index f4f7adc65d5..7e306dcc517 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/http/action/admin/indices/status/HttpIndicesStatusAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/http/action/admin/indices/status/HttpIndicesStatusAction.java @@ -22,6 +22,7 @@ package org.elasticsearch.http.action.admin.indices.status; import com.google.inject.Inject; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.status.*; +import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading; import org.elasticsearch.client.Client; import org.elasticsearch.http.*; import org.elasticsearch.http.action.support.HttpJsonBuilder; @@ -47,7 +48,14 @@ public class HttpIndicesStatusAction extends BaseHttpServerHandler { @Override public void handleRequest(final HttpRequest request, final HttpChannel channel) { IndicesStatusRequest indicesStatusRequest = new IndicesStatusRequest(splitIndices(request.param("index"))); + // we just send back a response, no need to fork a listener indicesStatusRequest.listenerThreaded(false); + BroadcastOperationThreading operationThreading = BroadcastOperationThreading.fromString(request.param("operationThreading"), BroadcastOperationThreading.SINGLE_THREAD); + if (operationThreading == BroadcastOperationThreading.NO_THREADS) { + // since we don't spawn, don't allow no_threads, but change it to a single thread + operationThreading = BroadcastOperationThreading.SINGLE_THREAD; + } + indicesStatusRequest.operationThreading(operationThreading); client.admin().indices().execStatus(indicesStatusRequest, new ActionListener() { @Override public void onResponse(IndicesStatusResponse response) { try { @@ -55,6 +63,12 @@ public class HttpIndicesStatusAction extends BaseHttpServerHandler { builder.startObject(); builder.field("ok", true); + builder.startObject("_shards"); + builder.field("total", response.totalShards()); + builder.field("successful", response.successfulShards()); + builder.field("failed", response.failedShards()); + builder.endObject(); + builder.startObject("indices"); for (IndexStatus indexStatus : response.indices().values()) { builder.startObject(indexStatus.index()); diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/client/transport/SimpleSingleTransportClientTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/client/transport/SimpleSingleTransportClientTests.java index 1ab0a3d3880..052c0c02406 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/client/transport/SimpleSingleTransportClientTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/client/transport/SimpleSingleTransportClientTests.java @@ -34,13 +34,13 @@ import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.server.internal.InternalServer; import org.elasticsearch.test.integration.AbstractServersTests; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.util.settings.ImmutableSettings; import org.elasticsearch.util.transport.TransportAddress; import org.testng.annotations.AfterMethod; import org.testng.annotations.Test; import static org.elasticsearch.client.Requests.*; import static org.elasticsearch.index.query.json.JsonQueryBuilders.*; +import static org.elasticsearch.util.settings.ImmutableSettings.*; import static org.hamcrest.MatcherAssert.*; import static org.hamcrest.Matchers.*; @@ -61,7 +61,7 @@ public class SimpleSingleTransportClientTests extends AbstractServersTests { @Test public void testOnlyWithTransportAddress() throws Exception { startServer("server1"); TransportAddress server1Address = ((InternalServer) server("server1")).injector().getInstance(TransportService.class).boundAddress().publishAddress(); - client = new TransportClient(ImmutableSettings.settingsBuilder().putBoolean("discovery.enabled", false).build()); + client = new TransportClient(settingsBuilder().putBoolean("discovery.enabled", false).build()); client.addTransportAddress(server1Address); testSimpleActions(client); } @@ -70,21 +70,28 @@ public class SimpleSingleTransportClientTests extends AbstractServersTests { public void testWithDiscovery() throws Exception { startServer("server1"); - client = new TransportClient(ImmutableSettings.settingsBuilder().putBoolean("discovery.enabled", true).build()); + client = new TransportClient(settingsBuilder().putBoolean("discovery.enabled", true).build()); // wait a bit so nodes will be discovered Thread.sleep(1000); testSimpleActions(client); } - private void testSimpleActions(Client client) { + private void testSimpleActions(Client client) throws Exception { + logger.info("Creating index test"); + client.admin().indices().create(createIndexRequest("test")).actionGet(); + Thread.sleep(500); + IndexResponse indexResponse = client.index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test"))).actionGet(); assertThat(indexResponse.id(), equalTo("1")); assertThat(indexResponse.type(), equalTo("type1")); - RefreshResponse refreshResult = client.admin().indices().refresh(refreshRequest("test")).actionGet(); - assertThat(refreshResult.index("test").successfulShards(), equalTo(5)); - assertThat(refreshResult.index("test").failedShards(), equalTo(0)); + + RefreshResponse refreshResponse = client.admin().indices().refresh(refreshRequest("test")).actionGet(); + assertThat(refreshResponse.successfulShards(), equalTo(5)); + assertThat(refreshResponse.failedShards(), equalTo(5)); // 5 are not active, since we started just one server IndicesStatusResponse indicesStatusResponse = client.admin().indices().status(indicesStatus()).actionGet(); + assertThat(indicesStatusResponse.successfulShards(), equalTo(5)); + assertThat(indicesStatusResponse.failedShards(), equalTo(5)); // 5 are not active, since we started just one server assertThat(indicesStatusResponse.indices().size(), equalTo(1)); assertThat(indicesStatusResponse.index("test").shards().size(), equalTo(5)); // 5 index shards (1 with 1 backup) assertThat(indicesStatusResponse.index("test").docs().numDocs(), equalTo(1)); @@ -117,8 +124,8 @@ public class SimpleSingleTransportClientTests extends AbstractServersTests { client.index(Requests.indexRequest("test").type("type1").id("2").source(source("2", "test"))).actionGet(); FlushResponse flushResult = client.admin().indices().flush(flushRequest("test")).actionGet(); - assertThat(flushResult.index("test").successfulShards(), equalTo(5)); - assertThat(flushResult.index("test").failedShards(), equalTo(0)); + assertThat(flushResult.successfulShards(), equalTo(5)); + assertThat(flushResult.failedShards(), equalTo(5)); // we only start one server client.admin().indices().refresh(refreshRequest("test")).actionGet(); for (int i = 0; i < 5; i++) { diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/document/DocumentActionsTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/document/DocumentActionsTests.java index 005bd9699a6..86ab50ee68c 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/document/DocumentActionsTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/document/DocumentActionsTests.java @@ -51,6 +51,7 @@ public class DocumentActionsTests extends AbstractServersTests { logger.info("Creating index test"); client("server1").admin().indices().create(createIndexRequest("test")).actionGet(); + Thread.sleep(200); logger.info("Indexing [type1/1]"); IndexResponse indexResponse = client("server1").index(indexRequest("test").type("type1").id("1").source(source("1", "test"))).actionGet(); @@ -58,8 +59,8 @@ public class DocumentActionsTests extends AbstractServersTests { assertThat(indexResponse.type(), equalTo("type1")); logger.info("Refreshing"); RefreshResponse refreshResult = client("server1").admin().indices().refresh(refreshRequest("test")).actionGet(); - assertThat(refreshResult.index("test").successfulShards(), equalTo(5)); - assertThat(refreshResult.index("test").failedShards(), equalTo(0)); + assertThat(refreshResult.successfulShards(), equalTo(10)); + assertThat(refreshResult.failedShards(), equalTo(0)); GetResponse getResult; @@ -97,8 +98,8 @@ public class DocumentActionsTests extends AbstractServersTests { logger.info("Flushing"); FlushResponse flushResult = client("server1").admin().indices().flush(flushRequest("test")).actionGet(); - assertThat(flushResult.index("test").successfulShards(), equalTo(5)); - assertThat(flushResult.index("test").failedShards(), equalTo(0)); + assertThat(flushResult.successfulShards(), equalTo(10)); + assertThat(flushResult.failedShards(), equalTo(0)); logger.info("Refreshing"); client("server1").admin().indices().refresh(refreshRequest("test")).actionGet();