change broadcast support to be able to run on all shards replicas (in parallel) and not just one shard per replica group. Change flush and refresh to use broadcast and not replicaiton. Remove shards transport support since broadcast now does exactly the same, and refactor index status to use broadcast (across all shards).
This commit is contained in:
parent
c31d29b702
commit
7631b1383b
|
@ -30,15 +30,11 @@ import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction;
|
|||
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
|
||||
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
|
||||
import org.elasticsearch.action.admin.indices.flush.TransportFlushAction;
|
||||
import org.elasticsearch.action.admin.indices.flush.TransportIndexFlushAction;
|
||||
import org.elasticsearch.action.admin.indices.flush.TransportShardFlushAction;
|
||||
import org.elasticsearch.action.admin.indices.gateway.snapshot.TransportGatewaySnapshotAction;
|
||||
import org.elasticsearch.action.admin.indices.gateway.snapshot.TransportIndexGatewaySnapshotAction;
|
||||
import org.elasticsearch.action.admin.indices.gateway.snapshot.TransportShardGatewaySnapshotAction;
|
||||
import org.elasticsearch.action.admin.indices.mapping.create.TransportCreateMappingAction;
|
||||
import org.elasticsearch.action.admin.indices.refresh.TransportIndexRefreshAction;
|
||||
import org.elasticsearch.action.admin.indices.refresh.TransportRefreshAction;
|
||||
import org.elasticsearch.action.admin.indices.refresh.TransportShardRefreshAction;
|
||||
import org.elasticsearch.action.admin.indices.status.TransportIndicesStatusAction;
|
||||
import org.elasticsearch.action.count.TransportCountAction;
|
||||
import org.elasticsearch.action.delete.TransportDeleteAction;
|
||||
|
@ -76,18 +72,11 @@ public class TransportActionModule extends AbstractModule {
|
|||
bind(TransportIndexGatewaySnapshotAction.class).asEagerSingleton();
|
||||
bind(TransportGatewaySnapshotAction.class).asEagerSingleton();
|
||||
|
||||
bind(TransportShardRefreshAction.class).asEagerSingleton();
|
||||
bind(TransportIndexRefreshAction.class).asEagerSingleton();
|
||||
bind(TransportRefreshAction.class).asEagerSingleton();
|
||||
|
||||
bind(TransportShardFlushAction.class).asEagerSingleton();
|
||||
bind(TransportIndexFlushAction.class).asEagerSingleton();
|
||||
bind(TransportFlushAction.class).asEagerSingleton();
|
||||
|
||||
bind(TransportIndexAction.class).asEagerSingleton();
|
||||
|
||||
bind(TransportGetAction.class).asEagerSingleton();
|
||||
|
||||
bind(TransportDeleteAction.class).asEagerSingleton();
|
||||
|
||||
bind(TransportShardDeleteByQueryAction.class).asEagerSingleton();
|
||||
|
|
|
@ -24,6 +24,8 @@ import org.elasticsearch.ElasticSearchException;
|
|||
import org.elasticsearch.action.TransportActions;
|
||||
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -32,6 +34,8 @@ import org.elasticsearch.util.settings.Settings;
|
|||
|
||||
import java.util.concurrent.atomic.AtomicReferenceArray;
|
||||
|
||||
import static org.elasticsearch.action.Actions.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
|
@ -53,7 +57,11 @@ public class TransportBroadcastPingAction extends TransportBroadcastOperationAct
|
|||
return new BroadcastPingRequest();
|
||||
}
|
||||
|
||||
@Override protected BroadcastPingResponse newResponse(BroadcastPingRequest broadcastPingRequest, AtomicReferenceArray shardsResponses) {
|
||||
@Override protected GroupShardsIterator shards(BroadcastPingRequest request, ClusterState clusterState) {
|
||||
return indicesService.searchShards(clusterState, processIndices(clusterState, request.indices()), request.queryHint());
|
||||
}
|
||||
|
||||
@Override protected BroadcastPingResponse newResponse(BroadcastPingRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
|
||||
int successfulShards = 0;
|
||||
int failedShards = 0;
|
||||
for (int i = 0; i < shardsResponses.length(); i++) {
|
||||
|
|
|
@ -19,20 +19,26 @@
|
|||
|
||||
package org.elasticsearch.action.admin.indices.flush;
|
||||
|
||||
import org.elasticsearch.action.support.replication.IndicesReplicationOperationRequest;
|
||||
import org.elasticsearch.util.TimeValue;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
public class FlushRequest extends IndicesReplicationOperationRequest {
|
||||
public class FlushRequest extends BroadcastOperationRequest {
|
||||
|
||||
FlushRequest() {
|
||||
|
||||
}
|
||||
|
||||
public FlushRequest(String index) {
|
||||
this(new String[]{index});
|
||||
}
|
||||
|
||||
public FlushRequest(String... indices) {
|
||||
this.indices = indices;
|
||||
super(indices, null);
|
||||
// we want to do the refresh in parallel on local shards...
|
||||
operationThreading(BroadcastOperationThreading.THREAD_PER_SHARD);
|
||||
}
|
||||
|
||||
@Override public FlushRequest listenerThreaded(boolean threadedListener) {
|
||||
|
@ -40,12 +46,8 @@ public class FlushRequest extends IndicesReplicationOperationRequest {
|
|||
return this;
|
||||
}
|
||||
|
||||
public FlushRequest timeout(TimeValue timeout) {
|
||||
this.timeout = timeout;
|
||||
@Override public FlushRequest operationThreading(BroadcastOperationThreading operationThreading) {
|
||||
super.operationThreading(operationThreading);
|
||||
return this;
|
||||
}
|
||||
|
||||
FlushRequest() {
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,47 +19,30 @@
|
|||
|
||||
package org.elasticsearch.action.admin.indices.flush;
|
||||
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.util.io.Streamable;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
public class FlushResponse implements ActionResponse, Streamable {
|
||||
|
||||
private Map<String, IndexFlushResponse> indices = new HashMap<String, IndexFlushResponse>();
|
||||
public class FlushResponse extends BroadcastOperationResponse {
|
||||
|
||||
FlushResponse() {
|
||||
|
||||
}
|
||||
|
||||
public Map<String, IndexFlushResponse> indices() {
|
||||
return indices;
|
||||
}
|
||||
|
||||
public IndexFlushResponse index(String index) {
|
||||
return indices.get(index);
|
||||
FlushResponse(int successfulShards, int failedShards) {
|
||||
super(successfulShards, failedShards);
|
||||
}
|
||||
|
||||
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
|
||||
int size = in.readInt();
|
||||
for (int i = 0; i < size; i++) {
|
||||
IndexFlushResponse indexFlushResponse = new IndexFlushResponse();
|
||||
indexFlushResponse.readFrom(in);
|
||||
indices.put(indexFlushResponse.index(), indexFlushResponse);
|
||||
}
|
||||
super.readFrom(in);
|
||||
}
|
||||
|
||||
@Override public void writeTo(DataOutput out) throws IOException {
|
||||
out.writeInt(indices.size());
|
||||
for (IndexFlushResponse indexFlushResponse : indices.values()) {
|
||||
indexFlushResponse.writeTo(out);
|
||||
}
|
||||
super.writeTo(out);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,58 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.admin.indices.flush;
|
||||
|
||||
import org.elasticsearch.action.support.replication.IndexReplicationOperationRequest;
|
||||
import org.elasticsearch.util.TimeValue;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
public class IndexFlushRequest extends IndexReplicationOperationRequest {
|
||||
|
||||
public IndexFlushRequest(String index) {
|
||||
this.index = index;
|
||||
}
|
||||
|
||||
IndexFlushRequest(FlushRequest request, String index) {
|
||||
this.index = index;
|
||||
this.timeout = request.timeout();
|
||||
}
|
||||
|
||||
IndexFlushRequest() {
|
||||
}
|
||||
|
||||
public IndexFlushRequest timeout(TimeValue timeout) {
|
||||
this.timeout = timeout;
|
||||
return this;
|
||||
}
|
||||
|
||||
public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
|
||||
super.readFrom(in);
|
||||
}
|
||||
|
||||
public void writeTo(DataOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
}
|
||||
}
|
|
@ -1,77 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.admin.indices.flush;
|
||||
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.util.io.Streamable;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
public class IndexFlushResponse implements ActionResponse, Streamable {
|
||||
|
||||
private String index;
|
||||
|
||||
private int successfulShards;
|
||||
|
||||
private int failedShards;
|
||||
|
||||
IndexFlushResponse(String index, int successfulShards, int failedShards) {
|
||||
this.index = index;
|
||||
this.successfulShards = successfulShards;
|
||||
this.failedShards = failedShards;
|
||||
}
|
||||
|
||||
IndexFlushResponse() {
|
||||
|
||||
}
|
||||
|
||||
public String index() {
|
||||
return index;
|
||||
}
|
||||
|
||||
public int successfulShards() {
|
||||
return successfulShards;
|
||||
}
|
||||
|
||||
public int failedShards() {
|
||||
return failedShards;
|
||||
}
|
||||
|
||||
public int totalShards() {
|
||||
return successfulShards + failedShards;
|
||||
}
|
||||
|
||||
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
|
||||
index = in.readUTF();
|
||||
successfulShards = in.readInt();
|
||||
failedShards = in.readInt();
|
||||
}
|
||||
|
||||
@Override public void writeTo(DataOutput out) throws IOException {
|
||||
out.writeUTF(index);
|
||||
out.writeInt(successfulShards);
|
||||
out.writeInt(failedShards);
|
||||
}
|
||||
}
|
|
@ -19,7 +19,7 @@
|
|||
|
||||
package org.elasticsearch.action.admin.indices.flush;
|
||||
|
||||
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
|
@ -28,34 +28,20 @@ import java.io.IOException;
|
|||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
public class ShardFlushRequest extends ShardReplicationOperationRequest {
|
||||
|
||||
private int shardId;
|
||||
|
||||
public ShardFlushRequest(IndexFlushRequest indexFlushRequest, int shardId) {
|
||||
this(indexFlushRequest.index(), shardId);
|
||||
timeout = indexFlushRequest.timeout();
|
||||
}
|
||||
|
||||
public ShardFlushRequest(String index, int shardId) {
|
||||
this.index = index;
|
||||
this.shardId = shardId;
|
||||
}
|
||||
public class ShardFlushRequest extends BroadcastShardOperationRequest {
|
||||
|
||||
ShardFlushRequest() {
|
||||
}
|
||||
|
||||
public int shardId() {
|
||||
return this.shardId;
|
||||
public ShardFlushRequest(String index, int shardId) {
|
||||
super(index, shardId);
|
||||
}
|
||||
|
||||
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
|
||||
super.readFrom(in);
|
||||
shardId = in.readInt();
|
||||
}
|
||||
|
||||
@Override public void writeTo(DataOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeInt(shardId);
|
||||
}
|
||||
}
|
|
@ -19,8 +19,7 @@
|
|||
|
||||
package org.elasticsearch.action.admin.indices.flush;
|
||||
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.util.io.Streamable;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationResponse;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
|
@ -29,15 +28,21 @@ import java.io.IOException;
|
|||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
public class ShardFlushResponse implements ActionResponse, Streamable {
|
||||
public class ShardFlushResponse extends BroadcastShardOperationResponse {
|
||||
|
||||
ShardFlushResponse() {
|
||||
|
||||
}
|
||||
|
||||
public ShardFlushResponse(String index, int shardId) {
|
||||
super(index, shardId);
|
||||
}
|
||||
|
||||
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
|
||||
super.readFrom(in);
|
||||
}
|
||||
|
||||
@Override public void writeTo(DataOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
}
|
||||
}
|
|
@ -20,9 +20,15 @@
|
|||
package org.elasticsearch.action.admin.indices.flush;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.action.TransportActions;
|
||||
import org.elasticsearch.action.support.replication.TransportIndicesReplicationOperationAction;
|
||||
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.util.settings.Settings;
|
||||
|
@ -32,36 +38,91 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
|
|||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
public class TransportFlushAction extends TransportIndicesReplicationOperationAction<FlushRequest, FlushResponse, IndexFlushRequest, IndexFlushResponse, ShardFlushRequest, ShardFlushResponse> {
|
||||
public class TransportFlushAction extends TransportBroadcastOperationAction<FlushRequest, FlushResponse, ShardFlushRequest, ShardFlushResponse> {
|
||||
|
||||
@Inject public TransportFlushAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, TransportIndexFlushAction indexFlushAction) {
|
||||
super(settings, transportService, clusterService, threadPool, indexFlushAction);
|
||||
}
|
||||
|
||||
@Override protected FlushRequest newRequestInstance() {
|
||||
return new FlushRequest();
|
||||
}
|
||||
|
||||
@Override protected FlushResponse newResponseInstance(FlushRequest request, AtomicReferenceArray indexResponses) {
|
||||
FlushResponse response = new FlushResponse();
|
||||
for (int i = 0; i < indexResponses.length(); i++) {
|
||||
IndexFlushResponse indexFlushResponse = (IndexFlushResponse) indexResponses.get(i);
|
||||
if (indexFlushResponse != null) {
|
||||
response.indices().put(indexFlushResponse.index(), indexFlushResponse);
|
||||
}
|
||||
}
|
||||
return response;
|
||||
}
|
||||
|
||||
@Override protected boolean accumulateExceptions() {
|
||||
return false;
|
||||
@Inject public TransportFlushAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, IndicesService indicesService) {
|
||||
super(settings, threadPool, clusterService, transportService, indicesService);
|
||||
}
|
||||
|
||||
@Override protected String transportAction() {
|
||||
return TransportActions.Admin.Indices.FLUSH;
|
||||
}
|
||||
|
||||
@Override protected IndexFlushRequest newIndexRequestInstance(FlushRequest request, String index) {
|
||||
return new IndexFlushRequest(request, index);
|
||||
@Override protected String transportShardAction() {
|
||||
return "indices/flush/shard";
|
||||
}
|
||||
|
||||
@Override protected FlushRequest newRequest() {
|
||||
return new FlushRequest();
|
||||
}
|
||||
|
||||
@Override protected FlushResponse newResponse(FlushRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
|
||||
int successfulShards = 0;
|
||||
int failedShards = 0;
|
||||
for (int i = 0; i < shardsResponses.length(); i++) {
|
||||
ShardFlushResponse shardCountResponse = (ShardFlushResponse) shardsResponses.get(i);
|
||||
if (shardCountResponse == null) {
|
||||
failedShards++;
|
||||
} else {
|
||||
successfulShards++;
|
||||
}
|
||||
}
|
||||
return new FlushResponse(successfulShards, failedShards);
|
||||
}
|
||||
|
||||
@Override protected ShardFlushRequest newShardRequest() {
|
||||
return new ShardFlushRequest();
|
||||
}
|
||||
|
||||
@Override protected ShardFlushRequest newShardRequest(ShardRouting shard, FlushRequest request) {
|
||||
return new ShardFlushRequest(shard.index(), shard.id());
|
||||
}
|
||||
|
||||
@Override protected ShardFlushResponse newShardResponse() {
|
||||
return new ShardFlushResponse();
|
||||
}
|
||||
|
||||
@Override protected ShardFlushResponse shardOperation(ShardFlushRequest request) throws ElasticSearchException {
|
||||
IndexShard indexShard = indicesService.indexServiceSafe(request.index()).shardSafe(request.shardId());
|
||||
indexShard.flush();
|
||||
return new ShardFlushResponse(request.index(), request.shardId());
|
||||
}
|
||||
|
||||
@Override protected boolean accumulateExceptions() {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* The refresh request works against *all* shards.
|
||||
*/
|
||||
@Override protected GroupShardsIterator shards(FlushRequest request, ClusterState clusterState) {
|
||||
return clusterState.routingTable().allShardsGrouped(request.indices());
|
||||
}
|
||||
|
||||
// @Override protected FlushRequest newRequestInstance() {
|
||||
// return new FlushRequest();
|
||||
// }
|
||||
//
|
||||
// @Override protected FlushResponse newResponseInstance(FlushRequest request, AtomicReferenceArray indexResponses) {
|
||||
// FlushResponse response = new FlushResponse();
|
||||
// for (int i = 0; i < indexResponses.length(); i++) {
|
||||
// IndexFlushResponse indexFlushResponse = (IndexFlushResponse) indexResponses.get(i);
|
||||
// if (indexFlushResponse != null) {
|
||||
// response.indices().put(indexFlushResponse.index(), indexFlushResponse);
|
||||
// }
|
||||
// }
|
||||
// return response;
|
||||
// }
|
||||
//
|
||||
// @Override protected boolean accumulateExceptions() {
|
||||
// return false;
|
||||
// }
|
||||
//
|
||||
// @Override protected String transportAction() {
|
||||
// return TransportActions.Admin.Indices.FLUSH;
|
||||
// }
|
||||
//
|
||||
// @Override protected IndexFlushRequest newIndexRequestInstance(FlushRequest request, String index) {
|
||||
// return new IndexFlushRequest(request, index);
|
||||
// }
|
||||
}
|
|
@ -1,85 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.admin.indices.flush;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import org.elasticsearch.action.support.replication.TransportIndexReplicationOperationAction;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.indices.IndexMissingException;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.util.settings.Settings;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReferenceArray;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
public class TransportIndexFlushAction extends TransportIndexReplicationOperationAction<IndexFlushRequest, IndexFlushResponse, ShardFlushRequest, ShardFlushResponse> {
|
||||
|
||||
private final ClusterService clusterService;
|
||||
|
||||
@Inject public TransportIndexFlushAction(Settings settings, ClusterService clusterService,
|
||||
TransportService transportService, ThreadPool threadPool,
|
||||
TransportShardFlushAction shardFlushAction) {
|
||||
super(settings, transportService, threadPool, shardFlushAction);
|
||||
this.clusterService = clusterService;
|
||||
}
|
||||
|
||||
@Override protected IndexFlushRequest newRequestInstance() {
|
||||
return new IndexFlushRequest();
|
||||
}
|
||||
|
||||
@Override protected IndexFlushResponse newResponseInstance(IndexFlushRequest indexFlushReqest, AtomicReferenceArray shardsResponses) {
|
||||
int successfulShards = 0;
|
||||
int failedShards = 0;
|
||||
for (int i = 0; i < shardsResponses.length(); i++) {
|
||||
if (shardsResponses.get(i) == null) {
|
||||
failedShards++;
|
||||
} else {
|
||||
successfulShards++;
|
||||
}
|
||||
}
|
||||
return new IndexFlushResponse(indexFlushReqest.index(), successfulShards, failedShards);
|
||||
}
|
||||
|
||||
@Override protected boolean accumulateExceptions() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override protected String transportAction() {
|
||||
return "indices/index/flush";
|
||||
}
|
||||
|
||||
@Override protected GroupShardsIterator shards(IndexFlushRequest indexFlushRequest) {
|
||||
IndexRoutingTable indexRouting = clusterService.state().routingTable().index(indexFlushRequest.index());
|
||||
if (indexRouting == null) {
|
||||
throw new IndexMissingException(new Index(indexFlushRequest.index()));
|
||||
}
|
||||
return indexRouting.groupByShardsIt();
|
||||
}
|
||||
|
||||
@Override protected ShardFlushRequest newShardRequestInstance(IndexFlushRequest indexFlushRequest, int shardId) {
|
||||
return new ShardFlushRequest(indexFlushRequest, shardId);
|
||||
}
|
||||
}
|
|
@ -1,69 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.admin.indices.flush;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.routing.ShardsIterator;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.util.settings.Settings;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
public class TransportShardFlushAction extends TransportShardReplicationOperationAction<ShardFlushRequest, ShardFlushResponse> {
|
||||
|
||||
@Inject public TransportShardFlushAction(Settings settings, TransportService transportService,
|
||||
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
|
||||
ShardStateAction shardStateAction) {
|
||||
super(settings, transportService, clusterService, indicesService, threadPool, shardStateAction);
|
||||
}
|
||||
|
||||
@Override protected ShardFlushRequest newRequestInstance() {
|
||||
return new ShardFlushRequest();
|
||||
}
|
||||
|
||||
@Override protected ShardFlushResponse newResponseInstance() {
|
||||
return new ShardFlushResponse();
|
||||
}
|
||||
|
||||
@Override protected String transportAction() {
|
||||
return "indices/index/shard/flush";
|
||||
}
|
||||
|
||||
@Override protected ShardFlushResponse shardOperationOnPrimary(ShardOperationRequest shardRequest) {
|
||||
ShardFlushRequest request = shardRequest.request;
|
||||
indexShard(shardRequest).flush();
|
||||
return new ShardFlushResponse();
|
||||
}
|
||||
|
||||
@Override protected void shardOperationOnBackup(ShardOperationRequest shardRequest) {
|
||||
ShardFlushRequest request = shardRequest.request;
|
||||
indexShard(shardRequest).flush();
|
||||
}
|
||||
|
||||
@Override protected ShardsIterator shards(ShardFlushRequest request) {
|
||||
return clusterService.state().routingTable().index(request.index()).shard(request.shardId()).shardsIt();
|
||||
}
|
||||
}
|
|
@ -1,72 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.admin.indices.refresh;
|
||||
|
||||
import org.elasticsearch.action.support.replication.IndexReplicationOperationRequest;
|
||||
import org.elasticsearch.util.TimeValue;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
public class IndexRefreshRequest extends IndexReplicationOperationRequest {
|
||||
|
||||
private boolean waitForOperations = true;
|
||||
|
||||
public IndexRefreshRequest(String index) {
|
||||
this.index = index;
|
||||
}
|
||||
|
||||
IndexRefreshRequest(RefreshRequest request, String index) {
|
||||
this.index = index;
|
||||
this.timeout = request.timeout();
|
||||
this.waitForOperations = request.waitForOperations();
|
||||
}
|
||||
|
||||
IndexRefreshRequest() {
|
||||
}
|
||||
|
||||
public IndexRefreshRequest timeout(TimeValue timeout) {
|
||||
this.timeout = timeout;
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean waitForOperations() {
|
||||
return waitForOperations;
|
||||
}
|
||||
|
||||
public IndexRefreshRequest waitForOperations(boolean waitForOperations) {
|
||||
this.waitForOperations = waitForOperations;
|
||||
return this;
|
||||
}
|
||||
|
||||
public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
|
||||
super.readFrom(in);
|
||||
waitForOperations = in.readBoolean();
|
||||
}
|
||||
|
||||
public void writeTo(DataOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeBoolean(waitForOperations);
|
||||
}
|
||||
}
|
|
@ -1,77 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.admin.indices.refresh;
|
||||
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.util.io.Streamable;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
public class IndexRefreshResponse implements ActionResponse, Streamable {
|
||||
|
||||
private String index;
|
||||
|
||||
private int successfulShards;
|
||||
|
||||
private int failedShards;
|
||||
|
||||
IndexRefreshResponse(String index, int successfulShards, int failedShards) {
|
||||
this.index = index;
|
||||
this.successfulShards = successfulShards;
|
||||
this.failedShards = failedShards;
|
||||
}
|
||||
|
||||
IndexRefreshResponse() {
|
||||
|
||||
}
|
||||
|
||||
public String index() {
|
||||
return index;
|
||||
}
|
||||
|
||||
public int successfulShards() {
|
||||
return successfulShards;
|
||||
}
|
||||
|
||||
public int failedShards() {
|
||||
return failedShards;
|
||||
}
|
||||
|
||||
public int totalShards() {
|
||||
return successfulShards + failedShards;
|
||||
}
|
||||
|
||||
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
|
||||
index = in.readUTF();
|
||||
successfulShards = in.readInt();
|
||||
failedShards = in.readInt();
|
||||
}
|
||||
|
||||
@Override public void writeTo(DataOutput out) throws IOException {
|
||||
out.writeUTF(index);
|
||||
out.writeInt(successfulShards);
|
||||
out.writeInt(failedShards);
|
||||
}
|
||||
}
|
|
@ -19,8 +19,8 @@
|
|||
|
||||
package org.elasticsearch.action.admin.indices.refresh;
|
||||
|
||||
import org.elasticsearch.action.support.replication.IndicesReplicationOperationRequest;
|
||||
import org.elasticsearch.util.TimeValue;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
|
@ -29,7 +29,7 @@ import java.io.IOException;
|
|||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
public class RefreshRequest extends IndicesReplicationOperationRequest {
|
||||
public class RefreshRequest extends BroadcastOperationRequest {
|
||||
|
||||
private boolean waitForOperations = true;
|
||||
|
||||
|
@ -38,12 +38,9 @@ public class RefreshRequest extends IndicesReplicationOperationRequest {
|
|||
}
|
||||
|
||||
public RefreshRequest(String... indices) {
|
||||
this.indices = indices;
|
||||
}
|
||||
|
||||
public RefreshRequest timeout(TimeValue timeout) {
|
||||
this.timeout = timeout;
|
||||
return this;
|
||||
super(indices, null);
|
||||
// we want to do the refresh in parallel on local shards...
|
||||
operationThreading(BroadcastOperationThreading.THREAD_PER_SHARD);
|
||||
}
|
||||
|
||||
RefreshRequest() {
|
||||
|
@ -55,6 +52,11 @@ public class RefreshRequest extends IndicesReplicationOperationRequest {
|
|||
return this;
|
||||
}
|
||||
|
||||
@Override public RefreshRequest operationThreading(BroadcastOperationThreading operationThreading) {
|
||||
super.operationThreading(operationThreading);
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean waitForOperations() {
|
||||
return waitForOperations;
|
||||
}
|
||||
|
|
|
@ -19,47 +19,30 @@
|
|||
|
||||
package org.elasticsearch.action.admin.indices.refresh;
|
||||
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.util.io.Streamable;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
public class RefreshResponse implements ActionResponse, Streamable {
|
||||
|
||||
private Map<String, IndexRefreshResponse> indices = new HashMap<String, IndexRefreshResponse>();
|
||||
public class RefreshResponse extends BroadcastOperationResponse {
|
||||
|
||||
RefreshResponse() {
|
||||
|
||||
}
|
||||
|
||||
public Map<String, IndexRefreshResponse> indices() {
|
||||
return indices;
|
||||
}
|
||||
|
||||
public IndexRefreshResponse index(String index) {
|
||||
return indices.get(index);
|
||||
RefreshResponse(int successfulShards, int failedShards) {
|
||||
super(successfulShards, failedShards);
|
||||
}
|
||||
|
||||
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
|
||||
int size = in.readInt();
|
||||
for (int i = 0; i < size; i++) {
|
||||
IndexRefreshResponse response = new IndexRefreshResponse();
|
||||
response.readFrom(in);
|
||||
indices.put(response.index(), response);
|
||||
}
|
||||
super.readFrom(in);
|
||||
}
|
||||
|
||||
@Override public void writeTo(DataOutput out) throws IOException {
|
||||
out.writeInt(indices.size());
|
||||
for (IndexRefreshResponse indexRefreshResponse : indices.values()) {
|
||||
indexRefreshResponse.writeTo(out);
|
||||
}
|
||||
super.writeTo(out);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
|
||||
package org.elasticsearch.action.admin.indices.refresh;
|
||||
|
||||
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
|
@ -28,27 +28,16 @@ import java.io.IOException;
|
|||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
public class ShardRefreshRequest extends ShardReplicationOperationRequest {
|
||||
public class ShardRefreshRequest extends BroadcastShardOperationRequest {
|
||||
|
||||
private int shardId;
|
||||
private boolean waitForOperations = true;
|
||||
|
||||
public ShardRefreshRequest(IndexRefreshRequest request, int shardId) {
|
||||
this(request.index(), shardId);
|
||||
timeout = request.timeout();
|
||||
waitForOperations = request.waitForOperations();
|
||||
}
|
||||
|
||||
public ShardRefreshRequest(String index, int shardId) {
|
||||
this.index = index;
|
||||
this.shardId = shardId;
|
||||
}
|
||||
|
||||
ShardRefreshRequest() {
|
||||
}
|
||||
|
||||
public int shardId() {
|
||||
return this.shardId;
|
||||
public ShardRefreshRequest(String index, int shardId, RefreshRequest request) {
|
||||
super(index, shardId);
|
||||
waitForOperations = request.waitForOperations();
|
||||
}
|
||||
|
||||
public boolean waitForOperations() {
|
||||
|
@ -62,13 +51,11 @@ public class ShardRefreshRequest extends ShardReplicationOperationRequest {
|
|||
|
||||
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
|
||||
super.readFrom(in);
|
||||
shardId = in.readInt();
|
||||
waitForOperations = in.readBoolean();
|
||||
}
|
||||
|
||||
@Override public void writeTo(DataOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeInt(shardId);
|
||||
out.writeBoolean(waitForOperations);
|
||||
}
|
||||
}
|
|
@ -19,8 +19,7 @@
|
|||
|
||||
package org.elasticsearch.action.admin.indices.refresh;
|
||||
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.util.io.Streamable;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationResponse;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
|
@ -29,14 +28,20 @@ import java.io.IOException;
|
|||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
public class ShardRefreshResponse implements ActionResponse, Streamable {
|
||||
public class ShardRefreshResponse extends BroadcastShardOperationResponse {
|
||||
|
||||
ShardRefreshResponse() {
|
||||
}
|
||||
|
||||
public ShardRefreshResponse(String index, int shardId) {
|
||||
super(index, shardId);
|
||||
}
|
||||
|
||||
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
|
||||
super.readFrom(in);
|
||||
}
|
||||
|
||||
@Override public void writeTo(DataOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
}
|
||||
}
|
|
@ -1,78 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.admin.indices.refresh;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import org.elasticsearch.action.support.replication.TransportIndexReplicationOperationAction;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.util.settings.Settings;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReferenceArray;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
public class TransportIndexRefreshAction extends TransportIndexReplicationOperationAction<IndexRefreshRequest, IndexRefreshResponse, ShardRefreshRequest, ShardRefreshResponse> {
|
||||
|
||||
private final ClusterService clusterService;
|
||||
|
||||
@Inject public TransportIndexRefreshAction(Settings settings, ClusterService clusterService,
|
||||
TransportService transportService, ThreadPool threadPool,
|
||||
TransportShardRefreshAction shardRefreshAction) {
|
||||
super(settings, transportService, threadPool, shardRefreshAction);
|
||||
this.clusterService = clusterService;
|
||||
}
|
||||
|
||||
@Override protected IndexRefreshRequest newRequestInstance() {
|
||||
return new IndexRefreshRequest();
|
||||
}
|
||||
|
||||
@Override protected IndexRefreshResponse newResponseInstance(IndexRefreshRequest request, AtomicReferenceArray shardsResponses) {
|
||||
int successfulShards = 0;
|
||||
int failedShards = 0;
|
||||
for (int i = 0; i < shardsResponses.length(); i++) {
|
||||
if (shardsResponses.get(i) == null) {
|
||||
failedShards++;
|
||||
} else {
|
||||
successfulShards++;
|
||||
}
|
||||
}
|
||||
return new IndexRefreshResponse(request.index(), successfulShards, failedShards);
|
||||
}
|
||||
|
||||
@Override protected boolean accumulateExceptions() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override protected String transportAction() {
|
||||
return "indices/index/refresh";
|
||||
}
|
||||
|
||||
@Override protected GroupShardsIterator shards(IndexRefreshRequest request) {
|
||||
return clusterService.state().routingTable().index(request.index()).groupByShardsIt();
|
||||
}
|
||||
|
||||
@Override protected ShardRefreshRequest newShardRequestInstance(IndexRefreshRequest request, int shardId) {
|
||||
return new ShardRefreshRequest(request, shardId);
|
||||
}
|
||||
}
|
|
@ -20,9 +20,15 @@
|
|||
package org.elasticsearch.action.admin.indices.refresh;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.action.TransportActions;
|
||||
import org.elasticsearch.action.support.replication.TransportIndicesReplicationOperationAction;
|
||||
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.util.settings.Settings;
|
||||
|
@ -32,36 +38,65 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
|
|||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
public class TransportRefreshAction extends TransportIndicesReplicationOperationAction<RefreshRequest, RefreshResponse, IndexRefreshRequest, IndexRefreshResponse, ShardRefreshRequest, ShardRefreshResponse> {
|
||||
public class TransportRefreshAction extends TransportBroadcastOperationAction<RefreshRequest, RefreshResponse, ShardRefreshRequest, ShardRefreshResponse> {
|
||||
|
||||
@Inject public TransportRefreshAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, TransportIndexRefreshAction indexAction) {
|
||||
super(settings, transportService, clusterService, threadPool, indexAction);
|
||||
}
|
||||
|
||||
@Override protected RefreshRequest newRequestInstance() {
|
||||
return new RefreshRequest();
|
||||
}
|
||||
|
||||
@Override protected RefreshResponse newResponseInstance(RefreshRequest request, AtomicReferenceArray indexResponses) {
|
||||
RefreshResponse response = new RefreshResponse();
|
||||
for (int i = 0; i < indexResponses.length(); i++) {
|
||||
IndexRefreshResponse indexResponse = (IndexRefreshResponse) indexResponses.get(i);
|
||||
if (indexResponse != null) {
|
||||
response.indices().put(indexResponse.index(), indexResponse);
|
||||
}
|
||||
}
|
||||
return response;
|
||||
}
|
||||
|
||||
@Override protected boolean accumulateExceptions() {
|
||||
return false;
|
||||
@Inject public TransportRefreshAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
|
||||
TransportService transportService, IndicesService indicesService) {
|
||||
super(settings, threadPool, clusterService, transportService, indicesService);
|
||||
}
|
||||
|
||||
@Override protected String transportAction() {
|
||||
return TransportActions.Admin.Indices.REFRESH;
|
||||
}
|
||||
|
||||
@Override protected IndexRefreshRequest newIndexRequestInstance(RefreshRequest request, String index) {
|
||||
return new IndexRefreshRequest(request, index);
|
||||
@Override protected String transportShardAction() {
|
||||
return "indices/refresh/shard";
|
||||
}
|
||||
|
||||
@Override protected RefreshRequest newRequest() {
|
||||
return new RefreshRequest();
|
||||
}
|
||||
|
||||
@Override protected RefreshResponse newResponse(RefreshRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
|
||||
int successfulShards = 0;
|
||||
int failedShards = 0;
|
||||
for (int i = 0; i < shardsResponses.length(); i++) {
|
||||
ShardRefreshResponse shardCountResponse = (ShardRefreshResponse) shardsResponses.get(i);
|
||||
if (shardCountResponse == null) {
|
||||
failedShards++;
|
||||
} else {
|
||||
successfulShards++;
|
||||
}
|
||||
}
|
||||
return new RefreshResponse(successfulShards, failedShards);
|
||||
}
|
||||
|
||||
@Override protected ShardRefreshRequest newShardRequest() {
|
||||
return new ShardRefreshRequest();
|
||||
}
|
||||
|
||||
@Override protected ShardRefreshRequest newShardRequest(ShardRouting shard, RefreshRequest request) {
|
||||
return new ShardRefreshRequest(shard.index(), shard.id(), request);
|
||||
}
|
||||
|
||||
@Override protected ShardRefreshResponse newShardResponse() {
|
||||
return new ShardRefreshResponse();
|
||||
}
|
||||
|
||||
@Override protected ShardRefreshResponse shardOperation(ShardRefreshRequest request) throws ElasticSearchException {
|
||||
IndexShard indexShard = indicesService.indexServiceSafe(request.index()).shardSafe(request.shardId());
|
||||
indexShard.refresh(request.waitForOperations());
|
||||
return new ShardRefreshResponse(request.index(), request.shardId());
|
||||
}
|
||||
|
||||
@Override protected boolean accumulateExceptions() {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* The refresh request works against *all* shards.
|
||||
*/
|
||||
@Override protected GroupShardsIterator shards(RefreshRequest request, ClusterState clusterState) {
|
||||
return clusterState.routingTable().allShardsGrouped(request.indices());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,69 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.admin.indices.refresh;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.routing.ShardsIterator;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.util.settings.Settings;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
public class TransportShardRefreshAction extends TransportShardReplicationOperationAction<ShardRefreshRequest, ShardRefreshResponse> {
|
||||
|
||||
@Inject public TransportShardRefreshAction(Settings settings, TransportService transportService,
|
||||
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
|
||||
ShardStateAction shardStateAction) {
|
||||
super(settings, transportService, clusterService, indicesService, threadPool, shardStateAction);
|
||||
}
|
||||
|
||||
@Override protected ShardRefreshRequest newRequestInstance() {
|
||||
return new ShardRefreshRequest();
|
||||
}
|
||||
|
||||
@Override protected ShardRefreshResponse newResponseInstance() {
|
||||
return new ShardRefreshResponse();
|
||||
}
|
||||
|
||||
@Override protected String transportAction() {
|
||||
return "indices/index/shard/refresh";
|
||||
}
|
||||
|
||||
@Override protected ShardRefreshResponse shardOperationOnPrimary(ShardOperationRequest shardRequest) {
|
||||
ShardRefreshRequest request = shardRequest.request;
|
||||
indexShard(shardRequest).refresh(request.waitForOperations());
|
||||
return new ShardRefreshResponse();
|
||||
}
|
||||
|
||||
@Override protected void shardOperationOnBackup(ShardOperationRequest shardRequest) {
|
||||
ShardRefreshRequest request = shardRequest.request;
|
||||
indexShard(shardRequest).refresh(request.waitForOperations());
|
||||
}
|
||||
|
||||
@Override protected ShardsIterator shards(ShardRefreshRequest request) {
|
||||
return clusterService.state().routingTable().index(request.index()).shard(request.shardId()).shardsIt();
|
||||
}
|
||||
}
|
|
@ -19,21 +19,21 @@
|
|||
|
||||
package org.elasticsearch.action.admin.indices.status;
|
||||
|
||||
import org.elasticsearch.action.support.shards.ShardsOperationRequest;
|
||||
import org.elasticsearch.action.support.shards.ShardsOperationThreading;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading;
|
||||
import org.elasticsearch.util.Strings;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
public class IndicesStatusRequest extends ShardsOperationRequest {
|
||||
public class IndicesStatusRequest extends BroadcastOperationRequest {
|
||||
|
||||
public IndicesStatusRequest() {
|
||||
this(Strings.EMPTY_ARRAY);
|
||||
}
|
||||
|
||||
public IndicesStatusRequest(String... indices) {
|
||||
super(indices);
|
||||
super(indices, null);
|
||||
}
|
||||
|
||||
@Override public IndicesStatusRequest listenerThreaded(boolean listenerThreaded) {
|
||||
|
@ -41,8 +41,7 @@ public class IndicesStatusRequest extends ShardsOperationRequest {
|
|||
return this;
|
||||
}
|
||||
|
||||
@Override public IndicesStatusRequest operationThreading(ShardsOperationThreading operationThreading) {
|
||||
super.operationThreading(operationThreading);
|
||||
return this;
|
||||
@Override public BroadcastOperationRequest operationThreading(BroadcastOperationThreading operationThreading) {
|
||||
return super.operationThreading(operationThreading);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
package org.elasticsearch.action.admin.indices.status;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.action.support.shards.ShardsOperationResponse;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.util.settings.Settings;
|
||||
|
||||
|
@ -38,7 +38,9 @@ import static org.elasticsearch.util.settings.ImmutableSettings.*;
|
|||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
public class IndicesStatusResponse extends ShardsOperationResponse<ShardStatus> {
|
||||
public class IndicesStatusResponse extends BroadcastOperationResponse {
|
||||
|
||||
protected ShardStatus[] shards;
|
||||
|
||||
private Map<String, Settings> indicesSettings = ImmutableMap.of();
|
||||
|
||||
|
@ -47,8 +49,9 @@ public class IndicesStatusResponse extends ShardsOperationResponse<ShardStatus>
|
|||
IndicesStatusResponse() {
|
||||
}
|
||||
|
||||
IndicesStatusResponse(ShardStatus[] shards, ClusterState clusterState) {
|
||||
super(shards);
|
||||
IndicesStatusResponse(ShardStatus[] shards, ClusterState clusterState, int successfulShards, int failedShards) {
|
||||
super(successfulShards, failedShards);
|
||||
this.shards = shards;
|
||||
indicesSettings = newHashMap();
|
||||
for (ShardStatus shard : shards) {
|
||||
if (!indicesSettings.containsKey(shard.shardRouting().index())) {
|
||||
|
@ -57,6 +60,10 @@ public class IndicesStatusResponse extends ShardsOperationResponse<ShardStatus>
|
|||
}
|
||||
}
|
||||
|
||||
public ShardStatus[] shards() {
|
||||
return this.shards;
|
||||
}
|
||||
|
||||
public IndexStatus index(String index) {
|
||||
return indices().get(index);
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
|
||||
package org.elasticsearch.action.admin.indices.status;
|
||||
|
||||
import org.elasticsearch.action.support.shards.ShardOperationResponse;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationResponse;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.index.shard.IndexShardState;
|
||||
import org.elasticsearch.util.SizeValue;
|
||||
|
@ -28,12 +28,13 @@ import java.io.DataInput;
|
|||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.cluster.routing.ImmutableShardRouting.*;
|
||||
import static org.elasticsearch.util.SizeValue.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
public class ShardStatus extends ShardOperationResponse {
|
||||
public class ShardStatus extends BroadcastShardOperationResponse {
|
||||
|
||||
public static class Docs {
|
||||
public static final Docs UNKNOWN = new Docs();
|
||||
|
@ -55,6 +56,8 @@ public class ShardStatus extends ShardOperationResponse {
|
|||
}
|
||||
}
|
||||
|
||||
private ShardRouting shardRouting;
|
||||
|
||||
IndexShardState state;
|
||||
|
||||
SizeValue storeSize = SizeValue.UNKNOWN;
|
||||
|
@ -71,7 +74,12 @@ public class ShardStatus extends ShardOperationResponse {
|
|||
}
|
||||
|
||||
ShardStatus(ShardRouting shardRouting) {
|
||||
super(shardRouting);
|
||||
super(shardRouting.index(), shardRouting.id());
|
||||
this.shardRouting = shardRouting;
|
||||
}
|
||||
|
||||
public ShardRouting shardRouting() {
|
||||
return this.shardRouting;
|
||||
}
|
||||
|
||||
public IndexShardState state() {
|
||||
|
@ -106,6 +114,7 @@ public class ShardStatus extends ShardOperationResponse {
|
|||
|
||||
@Override public void writeTo(DataOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
shardRouting.writeTo(out);
|
||||
out.writeByte(state.id());
|
||||
storeSize.writeTo(out);
|
||||
estimatedFlushableMemorySize.writeTo(out);
|
||||
|
@ -118,6 +127,7 @@ public class ShardStatus extends ShardOperationResponse {
|
|||
|
||||
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
|
||||
super.readFrom(in);
|
||||
shardRouting = readShardRoutingEntry(in);
|
||||
state = IndexShardState.fromId(in.readByte());
|
||||
storeSize = readSizeValue(in);
|
||||
estimatedFlushableMemorySize = readSizeValue(in);
|
||||
|
|
|
@ -22,10 +22,11 @@ package org.elasticsearch.action.admin.indices.status;
|
|||
import com.google.inject.Inject;
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.action.TransportActions;
|
||||
import org.elasticsearch.action.support.shards.ShardOperationRequest;
|
||||
import org.elasticsearch.action.support.shards.TransportShardsOperationActions;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest;
|
||||
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.shard.InternalIndexShard;
|
||||
|
@ -43,10 +44,10 @@ import static com.google.common.collect.Lists.*;
|
|||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
public class TransportIndicesStatusAction extends TransportShardsOperationActions<IndicesStatusRequest, IndicesStatusResponse, TransportIndicesStatusAction.IndexShardStatusRequest, ShardStatus> {
|
||||
public class TransportIndicesStatusAction extends TransportBroadcastOperationAction<IndicesStatusRequest, IndicesStatusResponse, TransportIndicesStatusAction.IndexShardStatusRequest, ShardStatus> {
|
||||
|
||||
@Inject public TransportIndicesStatusAction(Settings settings, ClusterService clusterService, TransportService transportService, IndicesService indicesService, ThreadPool threadPool) {
|
||||
super(settings, clusterService, transportService, indicesService, threadPool);
|
||||
@Inject public TransportIndicesStatusAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, IndicesService indicesService) {
|
||||
super(settings, threadPool, clusterService, transportService, indicesService);
|
||||
}
|
||||
|
||||
@Override protected String transportAction() {
|
||||
|
@ -61,6 +62,22 @@ public class TransportIndicesStatusAction extends TransportShardsOperationAction
|
|||
return new IndicesStatusRequest();
|
||||
}
|
||||
|
||||
@Override protected IndicesStatusResponse newResponse(IndicesStatusRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
|
||||
int successfulShards = 0;
|
||||
int failedShards = 0;
|
||||
final List<ShardStatus> shards = newArrayList();
|
||||
for (int i = 0; i < shardsResponses.length(); i++) {
|
||||
Object resp = shardsResponses.get(i);
|
||||
if (resp instanceof ShardStatus) {
|
||||
shards.add((ShardStatus) resp);
|
||||
successfulShards++;
|
||||
} else {
|
||||
failedShards++;
|
||||
}
|
||||
}
|
||||
return new IndicesStatusResponse(shards.toArray(new ShardStatus[shards.size()]), clusterState, successfulShards, failedShards);
|
||||
}
|
||||
|
||||
@Override protected IndexShardStatusRequest newShardRequest() {
|
||||
return new IndexShardStatusRequest();
|
||||
}
|
||||
|
@ -73,21 +90,6 @@ public class TransportIndicesStatusAction extends TransportShardsOperationAction
|
|||
return new ShardStatus();
|
||||
}
|
||||
|
||||
@Override protected boolean accumulateExceptions() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override protected IndicesStatusResponse newResponse(IndicesStatusRequest request, ClusterState clusterState, AtomicReferenceArray<Object> shardsResponses) {
|
||||
final List<ShardStatus> shards = newArrayList();
|
||||
for (int i = 0; i < shardsResponses.length(); i++) {
|
||||
Object resp = shardsResponses.get(i);
|
||||
if (resp instanceof ShardStatus) {
|
||||
shards.add((ShardStatus) resp);
|
||||
}
|
||||
}
|
||||
return new IndicesStatusResponse(shards.toArray(new ShardStatus[shards.size()]), clusterState);
|
||||
}
|
||||
|
||||
@Override protected ShardStatus shardOperation(IndexShardStatusRequest request) throws ElasticSearchException {
|
||||
InternalIndexShard indexShard = (InternalIndexShard) indicesService.indexServiceSafe(request.index()).shard(request.shardId());
|
||||
ShardStatus shardStatus = new ShardStatus(indexShard.routingEntry());
|
||||
|
@ -112,7 +114,18 @@ public class TransportIndicesStatusAction extends TransportShardsOperationAction
|
|||
return shardStatus;
|
||||
}
|
||||
|
||||
public static class IndexShardStatusRequest extends ShardOperationRequest {
|
||||
@Override protected boolean accumulateExceptions() {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Status goes across *all* shards.
|
||||
*/
|
||||
@Override protected GroupShardsIterator shards(IndicesStatusRequest request, ClusterState clusterState) {
|
||||
return clusterState.routingTable().allShardsGrouped(request.indices());
|
||||
}
|
||||
|
||||
public static class IndexShardStatusRequest extends BroadcastShardOperationRequest {
|
||||
|
||||
IndexShardStatusRequest() {
|
||||
}
|
||||
|
|
|
@ -24,6 +24,8 @@ import org.elasticsearch.ElasticSearchException;
|
|||
import org.elasticsearch.action.TransportActions;
|
||||
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
|
@ -33,6 +35,8 @@ import org.elasticsearch.util.settings.Settings;
|
|||
|
||||
import java.util.concurrent.atomic.AtomicReferenceArray;
|
||||
|
||||
import static org.elasticsearch.action.Actions.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
|
@ -66,7 +70,11 @@ public class TransportCountAction extends TransportBroadcastOperationAction<Coun
|
|||
return new ShardCountResponse();
|
||||
}
|
||||
|
||||
@Override protected CountResponse newResponse(CountRequest request, AtomicReferenceArray shardsResponses) {
|
||||
@Override protected GroupShardsIterator shards(CountRequest request, ClusterState clusterState) {
|
||||
return indicesService.searchShards(clusterState, processIndices(clusterState, request.indices()), request.queryHint());
|
||||
}
|
||||
|
||||
@Override protected CountResponse newResponse(CountRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
|
||||
int successfulShards = 0;
|
||||
int failedShards = 0;
|
||||
long count = 0;
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.action.support.broadcast;
|
|||
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ShardNotActiveException;
|
||||
import org.elasticsearch.action.ShardOperationFailedException;
|
||||
import org.elasticsearch.action.support.BaseAction;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
|
@ -39,8 +40,6 @@ import java.util.Iterator;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReferenceArray;
|
||||
|
||||
import static org.elasticsearch.action.Actions.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
|
@ -76,7 +75,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
|
|||
|
||||
protected abstract Request newRequest();
|
||||
|
||||
protected abstract Response newResponse(Request request, AtomicReferenceArray shardsResponses);
|
||||
protected abstract Response newResponse(Request request, AtomicReferenceArray shardsResponses, ClusterState clusterState);
|
||||
|
||||
protected abstract ShardRequest newShardRequest();
|
||||
|
||||
|
@ -88,12 +87,16 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
|
|||
|
||||
protected abstract boolean accumulateExceptions();
|
||||
|
||||
protected abstract GroupShardsIterator shards(Request request, ClusterState clusterState);
|
||||
|
||||
private class AsyncBroadcastAction {
|
||||
|
||||
private final Request request;
|
||||
|
||||
private final ActionListener<Response> listener;
|
||||
|
||||
private final ClusterState clusterState;
|
||||
|
||||
private final Nodes nodes;
|
||||
|
||||
private final GroupShardsIterator shardsIts;
|
||||
|
@ -110,9 +113,9 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
|
|||
this.request = request;
|
||||
this.listener = listener;
|
||||
|
||||
ClusterState clusterState = clusterService.state();
|
||||
clusterState = clusterService.state();
|
||||
nodes = clusterState.nodes();
|
||||
shardsIts = indicesService.searchShards(clusterState, processIndices(clusterState, request.indices()), request.queryHint());
|
||||
shardsIts = shards(request, clusterState);
|
||||
expectedOps = shardsIts.size();
|
||||
|
||||
|
||||
|
@ -133,7 +136,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
|
|||
}
|
||||
} else {
|
||||
// as if we have a "problem", so we iterate to the next one and maintain counts
|
||||
onOperation(shard, shardIt, null, false);
|
||||
onOperation(shard, shardIt, new ShardNotActiveException(shard.shardId()), false);
|
||||
}
|
||||
}
|
||||
// we have local operations, perform them now
|
||||
|
@ -169,7 +172,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
|
|||
final ShardRouting shard = shardIt.next();
|
||||
if (!shard.active()) {
|
||||
// as if we have a "problem", so we iterate to the next one and maintain counts
|
||||
onOperation(shard, shardIt, null, false);
|
||||
onOperation(shard, shardIt, new ShardNotActiveException(shard.shardId()), false);
|
||||
} else {
|
||||
final ShardRequest shardRequest = newShardRequest(shard, request);
|
||||
if (shard.currentNodeId().equals(nodes.localNodeId())) {
|
||||
|
@ -214,14 +217,14 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
|
|||
}
|
||||
}
|
||||
|
||||
private void onOperation(ShardRouting shard, ShardResponse response, boolean alreadyThreaded) {
|
||||
@SuppressWarnings({"unchecked"}) private void onOperation(ShardRouting shard, ShardResponse response, boolean alreadyThreaded) {
|
||||
shardsResponses.set(indexCounter.getAndIncrement(), response);
|
||||
if (expectedOps == counterOps.incrementAndGet()) {
|
||||
finishHim(alreadyThreaded);
|
||||
}
|
||||
}
|
||||
|
||||
private void onOperation(ShardRouting shard, final Iterator<ShardRouting> shardIt, Exception e, boolean alreadyThreaded) {
|
||||
@SuppressWarnings({"unchecked"}) private void onOperation(ShardRouting shard, final Iterator<ShardRouting> shardIt, Exception e, boolean alreadyThreaded) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
if (e != null) {
|
||||
logger.debug(shard.shortSummary() + ": Failed to execute [" + request + "]", e);
|
||||
|
@ -251,11 +254,11 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
|
|||
if (request.listenerThreaded() && !alreadyThreaded) {
|
||||
threadPool.execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
listener.onResponse(newResponse(request, shardsResponses));
|
||||
listener.onResponse(newResponse(request, shardsResponses, clusterState));
|
||||
}
|
||||
});
|
||||
} else {
|
||||
listener.onResponse(newResponse(request, shardsResponses));
|
||||
listener.onResponse(newResponse(request, shardsResponses, clusterState));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,62 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.support.shards;
|
||||
|
||||
import org.elasticsearch.util.io.Streamable;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
public class ShardOperationRequest implements Streamable {
|
||||
|
||||
private String index;
|
||||
|
||||
private int shardId;
|
||||
|
||||
protected ShardOperationRequest() {
|
||||
}
|
||||
|
||||
protected ShardOperationRequest(String index, int shardId) {
|
||||
this.index = index;
|
||||
this.shardId = shardId;
|
||||
}
|
||||
|
||||
public String index() {
|
||||
return this.index;
|
||||
}
|
||||
|
||||
public int shardId() {
|
||||
return this.shardId;
|
||||
}
|
||||
|
||||
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
|
||||
index = in.readUTF();
|
||||
shardId = in.readInt();
|
||||
}
|
||||
|
||||
@Override public void writeTo(DataOutput out) throws IOException {
|
||||
out.writeUTF(index);
|
||||
out.writeInt(shardId);
|
||||
}
|
||||
}
|
|
@ -1,57 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.support.shards;
|
||||
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.util.io.Streamable;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.cluster.routing.ImmutableShardRouting.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
public class ShardOperationResponse implements Streamable {
|
||||
|
||||
private ShardRouting shardRouting;
|
||||
|
||||
protected ShardOperationResponse() {
|
||||
|
||||
}
|
||||
|
||||
protected ShardOperationResponse(ShardRouting shardRouting) {
|
||||
this.shardRouting = shardRouting;
|
||||
}
|
||||
|
||||
public ShardRouting shardRouting() {
|
||||
return shardRouting;
|
||||
}
|
||||
|
||||
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
|
||||
shardRouting = readShardRoutingEntry(in);
|
||||
}
|
||||
|
||||
@Override public void writeTo(DataOutput out) throws IOException {
|
||||
shardRouting.writeTo(out);
|
||||
}
|
||||
}
|
|
@ -1,88 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.support.shards;
|
||||
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.util.Strings;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
public class ShardsOperationRequest implements ActionRequest {
|
||||
|
||||
private String[] indices;
|
||||
|
||||
private boolean listenerThreaded = false;
|
||||
private ShardsOperationThreading operationThreading = ShardsOperationThreading.SINGLE_THREAD;
|
||||
|
||||
protected ShardsOperationRequest() {
|
||||
}
|
||||
|
||||
protected ShardsOperationRequest(String... indices) {
|
||||
this.indices = indices == null ? Strings.EMPTY_ARRAY : indices;
|
||||
}
|
||||
|
||||
public String[] indices() {
|
||||
return this.indices;
|
||||
}
|
||||
|
||||
@Override public ActionRequestValidationException validate() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override public boolean listenerThreaded() {
|
||||
return this.listenerThreaded;
|
||||
}
|
||||
|
||||
@Override public ShardsOperationRequest listenerThreaded(boolean listenerThreaded) {
|
||||
this.listenerThreaded = listenerThreaded;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ShardsOperationThreading operationThreading() {
|
||||
return operationThreading;
|
||||
}
|
||||
|
||||
public ShardsOperationRequest operationThreading(ShardsOperationThreading operationThreading) {
|
||||
this.operationThreading = operationThreading;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override public void writeTo(DataOutput out) throws IOException {
|
||||
out.writeInt(indices.length);
|
||||
for (String index : indices) {
|
||||
out.writeUTF(index);
|
||||
}
|
||||
out.writeByte(operationThreading.id());
|
||||
}
|
||||
|
||||
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
|
||||
indices = new String[in.readInt()];
|
||||
for (int i = 0; i < indices.length; i++) {
|
||||
indices[i] = in.readUTF();
|
||||
}
|
||||
operationThreading = ShardsOperationThreading.fromId(in.readByte());
|
||||
}
|
||||
}
|
|
@ -1,51 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.support.shards;
|
||||
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
public abstract class ShardsOperationResponse<ShardResponse extends ShardOperationResponse> implements ActionResponse {
|
||||
|
||||
protected ShardResponse[] shards;
|
||||
|
||||
protected ShardsOperationResponse() {
|
||||
}
|
||||
|
||||
protected ShardsOperationResponse(ShardResponse[] shards) {
|
||||
this.shards = shards;
|
||||
}
|
||||
|
||||
public ShardResponse[] shards() {
|
||||
return this.shards;
|
||||
}
|
||||
|
||||
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
|
||||
}
|
||||
|
||||
@Override public void writeTo(DataOutput out) throws IOException {
|
||||
}
|
||||
}
|
|
@ -1,74 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.support.shards;
|
||||
|
||||
import org.elasticsearch.ElasticSearchIllegalArgumentException;
|
||||
|
||||
/**
|
||||
* Controls the operation threading model for shards operation that are performed
|
||||
* locally on the executing node.
|
||||
*
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
public enum ShardsOperationThreading {
|
||||
/**
|
||||
* No threads are used, all the local shards operations will be performed on the calling
|
||||
* thread.
|
||||
*/
|
||||
NO_THREADS((byte) 0),
|
||||
/**
|
||||
* The local shards operations will be performed in serial manner on a single forked thread.
|
||||
*/
|
||||
SINGLE_THREAD((byte) 1),
|
||||
/**
|
||||
* Each local shard operation will execute on its own thread.
|
||||
*/
|
||||
THREAD_PER_SHARD((byte) 2);
|
||||
|
||||
private final byte id;
|
||||
|
||||
ShardsOperationThreading(byte id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public byte id() {
|
||||
return this.id;
|
||||
}
|
||||
|
||||
public static ShardsOperationThreading fromId(byte id) {
|
||||
if (id == 0) {
|
||||
return NO_THREADS;
|
||||
}
|
||||
if (id == 1) {
|
||||
return SINGLE_THREAD;
|
||||
}
|
||||
if (id == 2) {
|
||||
return THREAD_PER_SHARD;
|
||||
}
|
||||
throw new ElasticSearchIllegalArgumentException("No type matching id [" + id + "]");
|
||||
}
|
||||
|
||||
public static ShardsOperationThreading fromString(String value, ShardsOperationThreading defaultValue) {
|
||||
if (value == null) {
|
||||
return defaultValue;
|
||||
}
|
||||
return ShardsOperationThreading.valueOf(value.toUpperCase());
|
||||
}
|
||||
}
|
|
@ -1,291 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.support.shards;
|
||||
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ShardNotActiveException;
|
||||
import org.elasticsearch.action.ShardOperationFailedException;
|
||||
import org.elasticsearch.action.support.BaseAction;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.node.Node;
|
||||
import org.elasticsearch.cluster.node.Nodes;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.*;
|
||||
import org.elasticsearch.util.settings.Settings;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReferenceArray;
|
||||
|
||||
import static org.elasticsearch.action.Actions.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
*/
|
||||
public abstract class TransportShardsOperationActions<Request extends ShardsOperationRequest, Response extends ShardsOperationResponse, ShardRequest extends ShardOperationRequest, ShardResponse extends ShardOperationResponse> extends BaseAction<Request, Response> {
|
||||
|
||||
protected final ClusterService clusterService;
|
||||
|
||||
protected final TransportService transportService;
|
||||
|
||||
protected final IndicesService indicesService;
|
||||
|
||||
protected final ThreadPool threadPool;
|
||||
|
||||
protected TransportShardsOperationActions(Settings settings, ClusterService clusterService, TransportService transportService,
|
||||
IndicesService indicesService, ThreadPool threadPool) {
|
||||
super(settings);
|
||||
this.clusterService = clusterService;
|
||||
this.transportService = transportService;
|
||||
this.indicesService = indicesService;
|
||||
this.threadPool = threadPool;
|
||||
|
||||
transportService.registerHandler(transportAction(), new TransportHandler());
|
||||
transportService.registerHandler(transportShardAction(), new ShardTransportHandler());
|
||||
}
|
||||
|
||||
@Override protected void doExecute(Request request, ActionListener<Response> listener) {
|
||||
new AsyncBroadcastAction(request, listener).start();
|
||||
}
|
||||
|
||||
protected abstract String transportAction();
|
||||
|
||||
protected abstract String transportShardAction();
|
||||
|
||||
protected abstract Request newRequest();
|
||||
|
||||
protected abstract Response newResponse(Request request, ClusterState clusterState, AtomicReferenceArray<Object> shardsResponses);
|
||||
|
||||
protected abstract ShardRequest newShardRequest();
|
||||
|
||||
protected abstract ShardRequest newShardRequest(ShardRouting shard, Request request);
|
||||
|
||||
protected abstract ShardResponse newShardResponse();
|
||||
|
||||
protected abstract ShardResponse shardOperation(ShardRequest request) throws ElasticSearchException;
|
||||
|
||||
protected abstract boolean accumulateExceptions();
|
||||
|
||||
private class AsyncBroadcastAction {
|
||||
|
||||
private final Request request;
|
||||
|
||||
private final ActionListener<Response> listener;
|
||||
|
||||
private final ClusterState clusterState;
|
||||
|
||||
private final Nodes nodes;
|
||||
|
||||
private final List<ShardRouting> shards;
|
||||
|
||||
private final AtomicInteger opsCounter = new AtomicInteger();
|
||||
|
||||
private final AtomicInteger indexCounter = new AtomicInteger();
|
||||
|
||||
private final AtomicReferenceArray<Object> shardsResponses;
|
||||
|
||||
private AsyncBroadcastAction(Request request, ActionListener<Response> listener) {
|
||||
this.request = request;
|
||||
this.listener = listener;
|
||||
|
||||
clusterState = clusterService.state();
|
||||
nodes = clusterState.nodes();
|
||||
shards = clusterState.routingTable().allShards(processIndices(clusterState, request.indices()));
|
||||
|
||||
shardsResponses = new AtomicReferenceArray<Object>(shards.size());
|
||||
}
|
||||
|
||||
public void start() {
|
||||
// count the local operations, and perform the non local ones
|
||||
int localOperations = 0;
|
||||
for (final ShardRouting shard : shards) {
|
||||
if (shard.active()) {
|
||||
if (shard.currentNodeId().equals(nodes.localNodeId())) {
|
||||
localOperations++;
|
||||
} else {
|
||||
// do the remote operation here, the localAsync flag is not relevant
|
||||
performOperation(shard, true);
|
||||
}
|
||||
} else {
|
||||
// as if we have a "problem", so we iterate to the next one and maintain counts
|
||||
onFailure(shard, new ShardNotActiveException(shard.shardId()), false);
|
||||
}
|
||||
}
|
||||
// we have local operations, perform them now
|
||||
if (localOperations > 0) {
|
||||
if (request.operationThreading() == ShardsOperationThreading.SINGLE_THREAD) {
|
||||
threadPool.execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
for (final ShardRouting shard : shards) {
|
||||
if (shard.active()) {
|
||||
if (shard.currentNodeId().equals(nodes.localNodeId())) {
|
||||
performOperation(shard, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
} else {
|
||||
boolean localAsync = request.operationThreading() == ShardsOperationThreading.THREAD_PER_SHARD;
|
||||
for (final ShardRouting shard : shards) {
|
||||
if (shard.active()) {
|
||||
if (shard.currentNodeId().equals(nodes.localNodeId())) {
|
||||
performOperation(shard, localAsync);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void performOperation(final ShardRouting shard, boolean localAsync) {
|
||||
final ShardRequest shardRequest = newShardRequest(shard, request);
|
||||
if (shard.currentNodeId().equals(nodes.localNodeId())) {
|
||||
if (localAsync) {
|
||||
threadPool.execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
try {
|
||||
onOperation(shard, shardOperation(shardRequest), true);
|
||||
} catch (Exception e) {
|
||||
onFailure(shard, e, true);
|
||||
}
|
||||
}
|
||||
});
|
||||
} else {
|
||||
try {
|
||||
onOperation(shard, shardOperation(shardRequest), false);
|
||||
} catch (Exception e) {
|
||||
onFailure(shard, e, false);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Node node = nodes.get(shard.currentNodeId());
|
||||
transportService.sendRequest(node, transportShardAction(), shardRequest, new BaseTransportResponseHandler<ShardResponse>() {
|
||||
@Override public ShardResponse newInstance() {
|
||||
return newShardResponse();
|
||||
}
|
||||
|
||||
@Override public void handleResponse(ShardResponse response) {
|
||||
onOperation(shard, response, false);
|
||||
}
|
||||
|
||||
@Override public void handleException(RemoteTransportException exp) {
|
||||
onFailure(shard, exp, false);
|
||||
}
|
||||
|
||||
@Override public boolean spawn() {
|
||||
// we never spawn here, we will spawn if needed in onOperation
|
||||
return false;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void onOperation(ShardRouting shardRouting, ShardResponse shardResponse, boolean alreadyThreaded) {
|
||||
// need two counters to avoid race conditions
|
||||
shardsResponses.set(indexCounter.getAndIncrement(), shardResponse);
|
||||
if (opsCounter.incrementAndGet() == shardsResponses.length()) {
|
||||
finishHim(alreadyThreaded);
|
||||
}
|
||||
}
|
||||
|
||||
private void onFailure(ShardRouting shardRouting, Throwable t, boolean alreadyThreaded) {
|
||||
int idx = indexCounter.getAndIncrement();
|
||||
if (accumulateExceptions()) {
|
||||
ShardOperationFailedException failedException;
|
||||
if (t instanceof ShardOperationFailedException) {
|
||||
failedException = (ShardOperationFailedException) t;
|
||||
} else {
|
||||
failedException = new ShardOperationFailedException(shardRouting.shardId(), t);
|
||||
}
|
||||
shardsResponses.set(idx, failedException);
|
||||
}
|
||||
if (opsCounter.incrementAndGet() == shardsResponses.length()) {
|
||||
finishHim(alreadyThreaded);
|
||||
}
|
||||
}
|
||||
|
||||
private void finishHim(boolean alreadyThreaded) {
|
||||
// if we need to execute the listener on a thread, and we are not threaded already
|
||||
// then do it
|
||||
if (request.listenerThreaded() && !alreadyThreaded) {
|
||||
threadPool.execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
listener.onResponse(newResponse(request, clusterState, shardsResponses));
|
||||
}
|
||||
});
|
||||
} else {
|
||||
listener.onResponse(newResponse(request, clusterState, shardsResponses));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class TransportHandler extends BaseTransportRequestHandler<Request> {
|
||||
|
||||
@Override public Request newInstance() {
|
||||
return newRequest();
|
||||
}
|
||||
|
||||
@Override public void messageReceived(Request request, final TransportChannel channel) throws Exception {
|
||||
// we just send back a response, no need to fork a listener
|
||||
request.listenerThreaded(false);
|
||||
// we don't spawn, so if we get a request with no threading, change it to single threaded
|
||||
if (request.operationThreading() == ShardsOperationThreading.NO_THREADS) {
|
||||
request.operationThreading(ShardsOperationThreading.SINGLE_THREAD);
|
||||
}
|
||||
execute(request, new ActionListener<Response>() {
|
||||
@Override public void onResponse(Response response) {
|
||||
try {
|
||||
channel.sendResponse(response);
|
||||
} catch (Exception e) {
|
||||
onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void onFailure(Throwable e) {
|
||||
try {
|
||||
channel.sendResponse(e);
|
||||
} catch (Exception e1) {
|
||||
logger.warn("Failed to send response", e1);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override public boolean spawn() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private class ShardTransportHandler extends BaseTransportRequestHandler<ShardRequest> {
|
||||
|
||||
@Override public ShardRequest newInstance() {
|
||||
return newShardRequest();
|
||||
}
|
||||
|
||||
@Override public void messageReceived(ShardRequest request, TransportChannel channel) throws Exception {
|
||||
channel.sendResponse(shardOperation(request));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.cluster.routing;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.util.io.Streamable;
|
||||
|
||||
|
@ -124,6 +125,10 @@ public class ImmutableShardRouting implements Streamable, Serializable, ShardRou
|
|||
return shardIdentifier;
|
||||
}
|
||||
|
||||
@Override public ShardsIterator shardsIt() {
|
||||
return new PlainShardsIterator(shardId(), ImmutableList.of((ShardRouting) this));
|
||||
}
|
||||
|
||||
public static ImmutableShardRouting readShardRoutingEntry(DataInput in) throws IOException, ClassNotFoundException {
|
||||
ImmutableShardRouting entry = new ImmutableShardRouting();
|
||||
entry.readFrom(in);
|
||||
|
|
|
@ -64,6 +64,10 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
|
|||
return shards.get(shardId);
|
||||
}
|
||||
|
||||
/**
|
||||
* A group shards iterator where each group ({@link ShardsIterator}
|
||||
* is an iterator across shard replication group.
|
||||
*/
|
||||
public GroupShardsIterator groupByShardsIt() {
|
||||
IdentityHashSet<ShardsIterator> set = new IdentityHashSet<ShardsIterator>();
|
||||
for (IndexShardRoutingTable indexShard : this) {
|
||||
|
@ -72,6 +76,23 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
|
|||
return new GroupShardsIterator(set);
|
||||
}
|
||||
|
||||
/**
|
||||
* A groups shards iterator where each groups is a single {@link ShardRouting} and a group
|
||||
* is created for each shard routing.
|
||||
*
|
||||
* <p>This basically means that components that use the {@link GroupShardsIterator} will itearte
|
||||
* over *all* the shards (all the replicas) within the index.
|
||||
*/
|
||||
public GroupShardsIterator groupByAllIt() {
|
||||
IdentityHashSet<ShardsIterator> set = new IdentityHashSet<ShardsIterator>();
|
||||
for (IndexShardRoutingTable indexShard : this) {
|
||||
for (ShardRouting shardRouting : indexShard) {
|
||||
set.add(shardRouting.shardsIt());
|
||||
}
|
||||
}
|
||||
return new GroupShardsIterator(set);
|
||||
}
|
||||
|
||||
public void validate() throws RoutingValidationException {
|
||||
}
|
||||
|
||||
|
|
|
@ -70,6 +70,13 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
|
|||
return new RoutingNodes(metaData, this);
|
||||
}
|
||||
|
||||
/**
|
||||
* All the shards (replicas) for the provided indices.
|
||||
*
|
||||
* @param indices The indices to return all the shards (replicas), can be <tt>null</tt> or empty array to indicate all indices
|
||||
* @return All the shards matching the specific index
|
||||
* @throws IndexMissingException If an index passed does not exists
|
||||
*/
|
||||
public List<ShardRouting> allShards(String... indices) throws IndexMissingException {
|
||||
List<ShardRouting> shards = Lists.newArrayList();
|
||||
if (indices == null || indices.length == 0) {
|
||||
|
@ -89,6 +96,35 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
|
|||
return shards;
|
||||
}
|
||||
|
||||
/**
|
||||
* All the shards (replicas) for the provided indices grouped (each group is a single element, consisting
|
||||
* of the shard). This is handy for components that expect to get group iterators, but still want in some
|
||||
* cases to iterate over all the shards (and not just one shard in replication group).
|
||||
*
|
||||
* @param indices The indices to return all the shards (replicas), can be <tt>null</tt> or empty array to indicate all indices
|
||||
* @return All the shards grouped into a single shard element group each
|
||||
* @throws IndexMissingException If an index passed does not exists
|
||||
* @see IndexRoutingTable#groupByAllIt()
|
||||
*/
|
||||
public GroupShardsIterator allShardsGrouped(String... indices) throws IndexMissingException {
|
||||
GroupShardsIterator its = new GroupShardsIterator();
|
||||
if (indices == null || indices.length == 0) {
|
||||
indices = indicesRouting.keySet().toArray(new String[indicesRouting.keySet().size()]);
|
||||
}
|
||||
for (String index : indices) {
|
||||
IndexRoutingTable indexRoutingTable = index(index);
|
||||
if (indexRoutingTable == null) {
|
||||
throw new IndexMissingException(new Index(index));
|
||||
}
|
||||
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
|
||||
for (ShardRouting shardRouting : indexShardRoutingTable) {
|
||||
its.add(shardRouting.shardsIt());
|
||||
}
|
||||
}
|
||||
}
|
||||
return its;
|
||||
}
|
||||
|
||||
public static Builder newRoutingTableBuilder() {
|
||||
return new Builder();
|
||||
}
|
||||
|
|
|
@ -63,6 +63,11 @@ public interface ShardRouting extends Streamable, Serializable {
|
|||
|
||||
String shortSummary();
|
||||
|
||||
/**
|
||||
* A shard iterator with just this shard in it.
|
||||
*/
|
||||
ShardsIterator shardsIt();
|
||||
|
||||
/**
|
||||
* Does not write index name and shard id
|
||||
*/
|
||||
|
|
|
@ -23,13 +23,11 @@ import com.google.inject.Inject;
|
|||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
||||
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
|
||||
import org.elasticsearch.action.admin.indices.flush.IndexFlushResponse;
|
||||
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.http.*;
|
||||
import org.elasticsearch.http.action.support.HttpActions;
|
||||
import org.elasticsearch.http.action.support.HttpJsonBuilder;
|
||||
import org.elasticsearch.util.TimeValue;
|
||||
import org.elasticsearch.util.json.JsonBuilder;
|
||||
import org.elasticsearch.util.settings.Settings;
|
||||
|
||||
|
@ -50,24 +48,27 @@ public class HttpFlushAction extends BaseHttpServerHandler {
|
|||
|
||||
@Override public void handleRequest(final HttpRequest request, final HttpChannel channel) {
|
||||
FlushRequest flushRequest = new FlushRequest(HttpActions.splitIndices(request.param("index")));
|
||||
// we just send back a response, no need to fork a listener
|
||||
flushRequest.listenerThreaded(false);
|
||||
flushRequest.timeout(TimeValue.parseTimeValue(request.param("timeout"), ShardReplicationOperationRequest.DEFAULT_TIMEOUT));
|
||||
BroadcastOperationThreading operationThreading = BroadcastOperationThreading.fromString(request.param("operationThreading"), BroadcastOperationThreading.SINGLE_THREAD);
|
||||
if (operationThreading == BroadcastOperationThreading.NO_THREADS) {
|
||||
// since we don't spawn, don't allow no_threads, but change it to a single thread
|
||||
operationThreading = BroadcastOperationThreading.THREAD_PER_SHARD;
|
||||
}
|
||||
flushRequest.operationThreading(operationThreading);
|
||||
client.admin().indices().execFlush(flushRequest, new ActionListener<FlushResponse>() {
|
||||
@Override public void onResponse(FlushResponse result) {
|
||||
@Override public void onResponse(FlushResponse response) {
|
||||
try {
|
||||
JsonBuilder builder = HttpJsonBuilder.cached(request);
|
||||
builder.startObject();
|
||||
builder.field("ok", true);
|
||||
builder.startObject("indices");
|
||||
for (IndexFlushResponse indexFlushResponse : result.indices().values()) {
|
||||
builder.startObject(indexFlushResponse.index())
|
||||
.field("ok", true)
|
||||
.field("totalShards", indexFlushResponse.totalShards())
|
||||
.field("successfulShards", indexFlushResponse.successfulShards())
|
||||
.field("failedShards", indexFlushResponse.failedShards())
|
||||
.endObject();
|
||||
}
|
||||
|
||||
builder.startObject("_shards");
|
||||
builder.field("total", response.totalShards());
|
||||
builder.field("successful", response.successfulShards());
|
||||
builder.field("failed", response.failedShards());
|
||||
builder.endObject();
|
||||
|
||||
builder.endObject();
|
||||
channel.sendResponse(new JsonHttpResponse(request, OK, builder));
|
||||
} catch (Exception e) {
|
||||
|
|
|
@ -21,20 +21,18 @@ package org.elasticsearch.http.action.admin.indices.refresh;
|
|||
|
||||
import com.google.inject.Inject;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.refresh.IndexRefreshResponse;
|
||||
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
|
||||
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.http.*;
|
||||
import org.elasticsearch.http.action.support.HttpActions;
|
||||
import org.elasticsearch.http.action.support.HttpJsonBuilder;
|
||||
import org.elasticsearch.util.TimeValue;
|
||||
import org.elasticsearch.util.json.JsonBuilder;
|
||||
import org.elasticsearch.util.settings.Settings;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.action.support.replication.ShardReplicationOperationRequest.*;
|
||||
import static org.elasticsearch.http.HttpResponse.Status.*;
|
||||
|
||||
/**
|
||||
|
@ -50,24 +48,27 @@ public class HttpRefreshAction extends BaseHttpServerHandler {
|
|||
|
||||
@Override public void handleRequest(final HttpRequest request, final HttpChannel channel) {
|
||||
RefreshRequest refreshRequest = new RefreshRequest(HttpActions.splitIndices(request.param("index")));
|
||||
refreshRequest.timeout(TimeValue.parseTimeValue(request.param("timeout"), DEFAULT_TIMEOUT));
|
||||
// we just send back a response, no need to fork a listener
|
||||
refreshRequest.listenerThreaded(false);
|
||||
BroadcastOperationThreading operationThreading = BroadcastOperationThreading.fromString(request.param("operationThreading"), BroadcastOperationThreading.SINGLE_THREAD);
|
||||
if (operationThreading == BroadcastOperationThreading.NO_THREADS) {
|
||||
// since we don't spawn, don't allow no_threads, but change it to a single thread
|
||||
operationThreading = BroadcastOperationThreading.THREAD_PER_SHARD;
|
||||
}
|
||||
refreshRequest.operationThreading(operationThreading);
|
||||
client.admin().indices().execRefresh(refreshRequest, new ActionListener<RefreshResponse>() {
|
||||
@Override public void onResponse(RefreshResponse result) {
|
||||
@Override public void onResponse(RefreshResponse response) {
|
||||
try {
|
||||
JsonBuilder builder = HttpJsonBuilder.cached(request);
|
||||
builder.startObject();
|
||||
builder.field("ok", true);
|
||||
builder.startObject("indices");
|
||||
for (IndexRefreshResponse indexResponse : result.indices().values()) {
|
||||
builder.startObject(indexResponse.index())
|
||||
.field("ok", true)
|
||||
.field("totalShards", indexResponse.totalShards())
|
||||
.field("successfulShards", indexResponse.successfulShards())
|
||||
.field("failedShards", indexResponse.failedShards())
|
||||
.endObject();
|
||||
}
|
||||
|
||||
builder.startObject("_shards");
|
||||
builder.field("total", response.totalShards());
|
||||
builder.field("successful", response.successfulShards());
|
||||
builder.field("failed", response.failedShards());
|
||||
builder.endObject();
|
||||
|
||||
builder.endObject();
|
||||
channel.sendResponse(new JsonHttpResponse(request, OK, builder));
|
||||
} catch (Exception e) {
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.http.action.admin.indices.status;
|
|||
import com.google.inject.Inject;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.status.*;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.http.*;
|
||||
import org.elasticsearch.http.action.support.HttpJsonBuilder;
|
||||
|
@ -47,7 +48,14 @@ public class HttpIndicesStatusAction extends BaseHttpServerHandler {
|
|||
|
||||
@Override public void handleRequest(final HttpRequest request, final HttpChannel channel) {
|
||||
IndicesStatusRequest indicesStatusRequest = new IndicesStatusRequest(splitIndices(request.param("index")));
|
||||
// we just send back a response, no need to fork a listener
|
||||
indicesStatusRequest.listenerThreaded(false);
|
||||
BroadcastOperationThreading operationThreading = BroadcastOperationThreading.fromString(request.param("operationThreading"), BroadcastOperationThreading.SINGLE_THREAD);
|
||||
if (operationThreading == BroadcastOperationThreading.NO_THREADS) {
|
||||
// since we don't spawn, don't allow no_threads, but change it to a single thread
|
||||
operationThreading = BroadcastOperationThreading.SINGLE_THREAD;
|
||||
}
|
||||
indicesStatusRequest.operationThreading(operationThreading);
|
||||
client.admin().indices().execStatus(indicesStatusRequest, new ActionListener<IndicesStatusResponse>() {
|
||||
@Override public void onResponse(IndicesStatusResponse response) {
|
||||
try {
|
||||
|
@ -55,6 +63,12 @@ public class HttpIndicesStatusAction extends BaseHttpServerHandler {
|
|||
builder.startObject();
|
||||
builder.field("ok", true);
|
||||
|
||||
builder.startObject("_shards");
|
||||
builder.field("total", response.totalShards());
|
||||
builder.field("successful", response.successfulShards());
|
||||
builder.field("failed", response.failedShards());
|
||||
builder.endObject();
|
||||
|
||||
builder.startObject("indices");
|
||||
for (IndexStatus indexStatus : response.indices().values()) {
|
||||
builder.startObject(indexStatus.index());
|
||||
|
|
|
@ -34,13 +34,13 @@ import org.elasticsearch.client.transport.TransportClient;
|
|||
import org.elasticsearch.server.internal.InternalServer;
|
||||
import org.elasticsearch.test.integration.AbstractServersTests;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.util.settings.ImmutableSettings;
|
||||
import org.elasticsearch.util.transport.TransportAddress;
|
||||
import org.testng.annotations.AfterMethod;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import static org.elasticsearch.client.Requests.*;
|
||||
import static org.elasticsearch.index.query.json.JsonQueryBuilders.*;
|
||||
import static org.elasticsearch.util.settings.ImmutableSettings.*;
|
||||
import static org.hamcrest.MatcherAssert.*;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
|
@ -61,7 +61,7 @@ public class SimpleSingleTransportClientTests extends AbstractServersTests {
|
|||
@Test public void testOnlyWithTransportAddress() throws Exception {
|
||||
startServer("server1");
|
||||
TransportAddress server1Address = ((InternalServer) server("server1")).injector().getInstance(TransportService.class).boundAddress().publishAddress();
|
||||
client = new TransportClient(ImmutableSettings.settingsBuilder().putBoolean("discovery.enabled", false).build());
|
||||
client = new TransportClient(settingsBuilder().putBoolean("discovery.enabled", false).build());
|
||||
client.addTransportAddress(server1Address);
|
||||
testSimpleActions(client);
|
||||
}
|
||||
|
@ -70,21 +70,28 @@ public class SimpleSingleTransportClientTests extends AbstractServersTests {
|
|||
|
||||
public void testWithDiscovery() throws Exception {
|
||||
startServer("server1");
|
||||
client = new TransportClient(ImmutableSettings.settingsBuilder().putBoolean("discovery.enabled", true).build());
|
||||
client = new TransportClient(settingsBuilder().putBoolean("discovery.enabled", true).build());
|
||||
// wait a bit so nodes will be discovered
|
||||
Thread.sleep(1000);
|
||||
testSimpleActions(client);
|
||||
}
|
||||
|
||||
private void testSimpleActions(Client client) {
|
||||
private void testSimpleActions(Client client) throws Exception {
|
||||
logger.info("Creating index test");
|
||||
client.admin().indices().create(createIndexRequest("test")).actionGet();
|
||||
Thread.sleep(500);
|
||||
|
||||
IndexResponse indexResponse = client.index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test"))).actionGet();
|
||||
assertThat(indexResponse.id(), equalTo("1"));
|
||||
assertThat(indexResponse.type(), equalTo("type1"));
|
||||
RefreshResponse refreshResult = client.admin().indices().refresh(refreshRequest("test")).actionGet();
|
||||
assertThat(refreshResult.index("test").successfulShards(), equalTo(5));
|
||||
assertThat(refreshResult.index("test").failedShards(), equalTo(0));
|
||||
|
||||
RefreshResponse refreshResponse = client.admin().indices().refresh(refreshRequest("test")).actionGet();
|
||||
assertThat(refreshResponse.successfulShards(), equalTo(5));
|
||||
assertThat(refreshResponse.failedShards(), equalTo(5)); // 5 are not active, since we started just one server
|
||||
|
||||
IndicesStatusResponse indicesStatusResponse = client.admin().indices().status(indicesStatus()).actionGet();
|
||||
assertThat(indicesStatusResponse.successfulShards(), equalTo(5));
|
||||
assertThat(indicesStatusResponse.failedShards(), equalTo(5)); // 5 are not active, since we started just one server
|
||||
assertThat(indicesStatusResponse.indices().size(), equalTo(1));
|
||||
assertThat(indicesStatusResponse.index("test").shards().size(), equalTo(5)); // 5 index shards (1 with 1 backup)
|
||||
assertThat(indicesStatusResponse.index("test").docs().numDocs(), equalTo(1));
|
||||
|
@ -117,8 +124,8 @@ public class SimpleSingleTransportClientTests extends AbstractServersTests {
|
|||
client.index(Requests.indexRequest("test").type("type1").id("2").source(source("2", "test"))).actionGet();
|
||||
|
||||
FlushResponse flushResult = client.admin().indices().flush(flushRequest("test")).actionGet();
|
||||
assertThat(flushResult.index("test").successfulShards(), equalTo(5));
|
||||
assertThat(flushResult.index("test").failedShards(), equalTo(0));
|
||||
assertThat(flushResult.successfulShards(), equalTo(5));
|
||||
assertThat(flushResult.failedShards(), equalTo(5)); // we only start one server
|
||||
client.admin().indices().refresh(refreshRequest("test")).actionGet();
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
|
|
|
@ -51,6 +51,7 @@ public class DocumentActionsTests extends AbstractServersTests {
|
|||
|
||||
logger.info("Creating index test");
|
||||
client("server1").admin().indices().create(createIndexRequest("test")).actionGet();
|
||||
Thread.sleep(200);
|
||||
|
||||
logger.info("Indexing [type1/1]");
|
||||
IndexResponse indexResponse = client("server1").index(indexRequest("test").type("type1").id("1").source(source("1", "test"))).actionGet();
|
||||
|
@ -58,8 +59,8 @@ public class DocumentActionsTests extends AbstractServersTests {
|
|||
assertThat(indexResponse.type(), equalTo("type1"));
|
||||
logger.info("Refreshing");
|
||||
RefreshResponse refreshResult = client("server1").admin().indices().refresh(refreshRequest("test")).actionGet();
|
||||
assertThat(refreshResult.index("test").successfulShards(), equalTo(5));
|
||||
assertThat(refreshResult.index("test").failedShards(), equalTo(0));
|
||||
assertThat(refreshResult.successfulShards(), equalTo(10));
|
||||
assertThat(refreshResult.failedShards(), equalTo(0));
|
||||
|
||||
GetResponse getResult;
|
||||
|
||||
|
@ -97,8 +98,8 @@ public class DocumentActionsTests extends AbstractServersTests {
|
|||
|
||||
logger.info("Flushing");
|
||||
FlushResponse flushResult = client("server1").admin().indices().flush(flushRequest("test")).actionGet();
|
||||
assertThat(flushResult.index("test").successfulShards(), equalTo(5));
|
||||
assertThat(flushResult.index("test").failedShards(), equalTo(0));
|
||||
assertThat(flushResult.successfulShards(), equalTo(10));
|
||||
assertThat(flushResult.failedShards(), equalTo(0));
|
||||
logger.info("Refreshing");
|
||||
client("server1").admin().indices().refresh(refreshRequest("test")).actionGet();
|
||||
|
||||
|
|
Loading…
Reference in New Issue