Implement search shards API

Closes #2726
This commit is contained in:
Igor Motov 2013-03-05 07:24:07 -05:00
parent 1eb24d7efc
commit acff102234
13 changed files with 960 additions and 0 deletions

View File

@ -36,6 +36,8 @@ import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteAction;
import org.elasticsearch.action.admin.cluster.reroute.TransportClusterRerouteAction;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction;
import org.elasticsearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction;
import org.elasticsearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction;
@ -171,6 +173,7 @@ public class ActionModule extends AbstractModule {
registerAction(ClusterHealthAction.INSTANCE, TransportClusterHealthAction.class);
registerAction(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class);
registerAction(ClusterRerouteAction.INSTANCE, TransportClusterRerouteAction.class);
registerAction(ClusterSearchShardsAction.INSTANCE, TransportClusterSearchShardsAction.class);
registerAction(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class);
registerAction(IndicesStatusAction.INSTANCE, TransportIndicesStatusAction.class);

View File

@ -0,0 +1,45 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.shards;
import org.elasticsearch.action.admin.cluster.ClusterAction;
import org.elasticsearch.client.ClusterAdminClient;
/**
*/
public class ClusterSearchShardsAction extends ClusterAction<ClusterSearchShardsRequest, ClusterSearchShardsResponse, ClusterSearchShardsRequestBuilder> {
public static final ClusterSearchShardsAction INSTANCE = new ClusterSearchShardsAction();
public static final String NAME = "cluster/shards/search_shards";
private ClusterSearchShardsAction() {
super(NAME);
}
@Override
public ClusterSearchShardsResponse newResponse() {
return new ClusterSearchShardsResponse();
}
@Override
public ClusterSearchShardsRequestBuilder newRequestBuilder(ClusterAdminClient client) {
return new ClusterSearchShardsRequestBuilder(client);
}
}

View File

@ -0,0 +1,97 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.shards;
import org.elasticsearch.cluster.routing.ImmutableShardRouting;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
/**
*/
public class ClusterSearchShardsGroup implements Streamable, ToXContent {
private String index;
private int shardId;
ShardRouting[] shards;
ClusterSearchShardsGroup() {
}
public ClusterSearchShardsGroup(String index, int shardId, ShardRouting[] shards) {
this.index = index;
this.shardId = shardId;
this.shards = shards;
}
public static ClusterSearchShardsGroup readSearchShardsGroupResponse(StreamInput in) throws IOException {
ClusterSearchShardsGroup response = new ClusterSearchShardsGroup();
response.readFrom(in);
return response;
}
public String getIndex() {
return index;
}
public int getShardId() {
return shardId;
}
public ShardRouting[] getShards() {
return shards;
}
@Override
public void readFrom(StreamInput in) throws IOException {
index = in.readString();
shardId = in.readVInt();
shards = new ShardRouting[in.readVInt()];
for (int i = 0; i < shards.length; i++) {
shards[i] = ImmutableShardRouting.readShardRoutingEntry(in, index, shardId);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(index);
out.writeVInt(shardId);
out.writeVInt(shards.length);
for (ShardRouting shardRouting : shards) {
shardRouting.writeToThin(out);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startArray();
for (ShardRouting shard : getShards()) {
shard.toXContent(builder, params);
}
builder.endArray();
return builder;
}
}

View File

@ -0,0 +1,188 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.shards;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.IgnoreIndices;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
*/
public class ClusterSearchShardsRequest extends MasterNodeOperationRequest<ClusterSearchShardsRequest> {
private String[] indices;
@Nullable
private String routing;
@Nullable
private String preference;
private boolean local = false;
private String[] types = Strings.EMPTY_ARRAY;
private IgnoreIndices ignoreIndices = IgnoreIndices.DEFAULT;
public ClusterSearchShardsRequest() {
}
public ClusterSearchShardsRequest(String... indices) {
indices(indices);
}
@Override
public ActionRequestValidationException validate() {
return null;
}
/**
* Sets the indices the search will be executed on.
*/
public ClusterSearchShardsRequest indices(String... indices) {
if (indices == null) {
throw new ElasticSearchIllegalArgumentException("indices must not be null");
} else {
for (int i = 0; i < indices.length; i++) {
if (indices[i] == null) {
throw new ElasticSearchIllegalArgumentException("indices[" + i + "] must not be null");
}
}
}
this.indices = indices;
return this;
}
/**
* The indices
*/
public String[] indices() {
return indices;
}
public IgnoreIndices ignoreIndices() {
return ignoreIndices;
}
public ClusterSearchShardsRequest ignoreIndices(IgnoreIndices ignoreIndices) {
this.ignoreIndices = ignoreIndices;
return this;
}
/**
* The document types to execute the search against. Defaults to be executed against
* all types.
*/
public String[] types() {
return types;
}
/**
* The document types to execute the search against. Defaults to be executed against
* all types.
*/
public ClusterSearchShardsRequest types(String... types) {
this.types = types;
return this;
}
/**
* A comma separated list of routing values to control the shards the search will be executed on.
*/
public String routing() {
return this.routing;
}
/**
* A comma separated list of routing values to control the shards the search will be executed on.
*/
public ClusterSearchShardsRequest routing(String routing) {
this.routing = routing;
return this;
}
/**
* The routing values to control the shards that the search will be executed on.
*/
public ClusterSearchShardsRequest routing(String... routings) {
this.routing = Strings.arrayToCommaDelimitedString(routings);
return this;
}
/**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* <tt>_local</tt> to prefer local shards, <tt>_primary</tt> to execute only on primary shards, or
* a custom value, which guarantees that the same order will be used across different requests.
*/
public ClusterSearchShardsRequest preference(String preference) {
this.preference = preference;
return this;
}
public String preference() {
return this.preference;
}
public ClusterSearchShardsRequest local(boolean local) {
this.local = local;
return this;
}
public boolean local() {
return this.local;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
indices = new String[in.readVInt()];
for (int i = 0; i < indices.length; i++) {
indices[i] = in.readString();
}
routing = in.readOptionalString();
preference = in.readOptionalString();
types = in.readStringArray();
ignoreIndices = IgnoreIndices.fromId(in.readByte());
local = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(indices.length);
for (String index : indices) {
out.writeString(index);
}
out.writeOptionalString(routing);
out.writeOptionalString(preference);
out.writeStringArray(types);
out.writeByte(ignoreIndices.id());
out.writeBoolean(local);
}
}

View File

@ -0,0 +1,101 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.shards;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.IgnoreIndices;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ClusterAdminClient;
import org.elasticsearch.client.internal.InternalClusterAdminClient;
/**
*/
public class ClusterSearchShardsRequestBuilder extends MasterNodeOperationRequestBuilder<ClusterSearchShardsRequest, ClusterSearchShardsResponse, ClusterSearchShardsRequestBuilder> {
public ClusterSearchShardsRequestBuilder(ClusterAdminClient clusterClient) {
super((InternalClusterAdminClient) clusterClient, new ClusterSearchShardsRequest());
}
/**
* Sets the indices the search will be executed on.
*/
public ClusterSearchShardsRequestBuilder setIndices(String... indices) {
request.indices(indices);
return this;
}
/**
* The document types to execute the search against. Defaults to be executed against
* all types.
*/
public ClusterSearchShardsRequestBuilder setTypes(String... types) {
request.types(types);
return this;
}
/**
* A comma separated list of routing values to control the shards the search will be executed on.
*/
public ClusterSearchShardsRequestBuilder setRouting(String routing) {
request.routing(routing);
return this;
}
/**
* The routing values to control the shards that the search will be executed on.
*/
public ClusterSearchShardsRequestBuilder setRouting(String... routing) {
request.routing(routing);
return this;
}
/**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* <tt>_local</tt> to prefer local shards, <tt>_primary</tt> to execute only on primary shards, or
* a custom value, which guarantees that the same order will be used across different requests.
*/
public ClusterSearchShardsRequestBuilder setPreference(String preference) {
request.preference(preference);
return this;
}
/**
* Specifies what type of requested indices to ignore. For example indices that don't exist.
*/
public ClusterSearchShardsRequestBuilder setIgnoreIndices(IgnoreIndices ignoreIndices) {
request().ignoreIndices(ignoreIndices);
return this;
}
/**
* Specifies if request should be executed on local node rather than on master.
*/
public ClusterSearchShardsRequestBuilder setLocal(boolean local) {
request().local(local);
return this;
}
@Override
protected void doExecute(ActionListener<ClusterSearchShardsResponse> listener) {
((ClusterAdminClient) client).searchShards(request, listener);
}
}

View File

@ -0,0 +1,107 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.shards;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Map;
/**
*/
public class ClusterSearchShardsResponse extends ActionResponse implements ToXContent {
private ClusterSearchShardsGroup[] groups;
private DiscoveryNode[] nodes;
ClusterSearchShardsResponse() {
}
public ClusterSearchShardsGroup[] getGroups() {
return groups;
}
public DiscoveryNode[] getNodes() {
return nodes;
}
public ClusterSearchShardsResponse(ClusterSearchShardsGroup[] groups, DiscoveryNode[] nodes) {
this.groups = groups;
this.nodes = nodes;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
groups = new ClusterSearchShardsGroup[in.readVInt()];
for (int i = 0; i < groups.length; i++) {
groups[i] = ClusterSearchShardsGroup.readSearchShardsGroupResponse(in);
}
nodes = new DiscoveryNode[in.readVInt()];
for (int i = 0; i < nodes.length; i++) {
nodes[i] = DiscoveryNode.readNode(in);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(groups.length);
for (ClusterSearchShardsGroup response : groups) {
response.writeTo(out);
}
out.writeVInt(nodes.length);
for (DiscoveryNode node : nodes) {
node.writeTo(out);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("nodes");
for (DiscoveryNode node : nodes) {
builder.startObject(node.getId(), XContentBuilder.FieldCaseConversion.NONE);
builder.field("name", node.name());
builder.field("transport_address", node.getAddress());
if (!node.attributes().isEmpty()) {
builder.startObject("attributes");
for (Map.Entry<String, String> attr : node.attributes().entrySet()) {
builder.field(attr.getKey(), attr.getValue());
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
builder.startArray("shards");
for (ClusterSearchShardsGroup group : groups) {
group.toXContent(builder, params);
}
builder.endArray();
return builder;
}
}

View File

@ -0,0 +1,103 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.shards;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.Map;
import java.util.Set;
import static com.google.common.collect.Sets.newHashSet;
/**
*/
public class TransportClusterSearchShardsAction extends TransportMasterNodeOperationAction<ClusterSearchShardsRequest, ClusterSearchShardsResponse> {
@Inject
public TransportClusterSearchShardsAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) {
super(settings, transportService, clusterService, threadPool);
}
@Override
protected String transportAction() {
return ClusterSearchShardsAction.NAME;
}
@Override
protected String executor() {
return ThreadPool.Names.MANAGEMENT;
}
@Override
protected boolean localExecute(ClusterSearchShardsRequest request) {
return request.local();
}
@Override
protected ClusterSearchShardsRequest newRequest() {
return new ClusterSearchShardsRequest();
}
@Override
protected ClusterSearchShardsResponse newResponse() {
return new ClusterSearchShardsResponse();
}
@Override
protected ClusterSearchShardsResponse masterOperation(ClusterSearchShardsRequest request, ClusterState state) throws ElasticSearchException {
ClusterState clusterState = clusterService.state();
String[] concreteIndices = clusterState.metaData().concreteIndices(request.indices(), request.ignoreIndices(), true);
Map<String, Set<String>> routingMap = clusterState.metaData().resolveSearchRouting(request.routing(), request.indices());
Set<String> nodeIds = newHashSet();
GroupShardsIterator groupShardsIterator = clusterService.operationRouting().searchShards(clusterState, request.indices(), concreteIndices, routingMap, request.preference());
ShardRouting shard;
ClusterSearchShardsGroup[] groupResponses = new ClusterSearchShardsGroup[groupShardsIterator.size()];
int currentGroup = 0;
for (ShardIterator shardIt : groupShardsIterator) {
String index = shardIt.shardId().getIndex();
int shardId = shardIt.shardId().getId();
ShardRouting[] shardRoutings = new ShardRouting[shardIt.size()];
int currentShard = 0;
shardIt.reset();
while ((shard = shardIt.nextOrNull()) != null) {
shardRoutings[currentShard++] = shard;
nodeIds.add(shard.currentNodeId());
}
groupResponses[currentGroup++] = new ClusterSearchShardsGroup(index, shardId, shardRoutings);
}
DiscoveryNode[] nodes = new DiscoveryNode[nodeIds.size()];
int currentNode = 0;
for (String nodeId : nodeIds) {
nodes[currentNode++] = clusterState.getNodes().get(nodeId);
}
return new ClusterSearchShardsResponse(groupResponses, nodes);
}
}

View File

@ -45,6 +45,9 @@ import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequestBuilder;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequestBuilder;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
@ -235,4 +238,25 @@ public interface ClusterAdminClient {
* Restarts nodes in the cluster.
*/
NodesRestartRequestBuilder prepareNodesRestart(String... nodesIds);
/**
* Returns list of shards the given search would be executed on.
*/
ActionFuture<ClusterSearchShardsResponse> searchShards(ClusterSearchShardsRequest request);
/**
* Returns list of shards the given search would be executed on.
*/
void searchShards(ClusterSearchShardsRequest request, ActionListener<ClusterSearchShardsResponse> listener);
/**
* Returns list of shards the given search would be executed on.
*/
ClusterSearchShardsRequestBuilder prepareSearchShards();
/**
* Returns list of shards the given search would be executed on.
*/
ClusterSearchShardsRequestBuilder prepareSearchShards(String... indices);
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.action.admin.cluster.node.shutdown.NodesShutdownRequest
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheRequest;
@ -369,6 +370,20 @@ public class Requests {
return new ClusterHealthRequest(indices);
}
/**
* List all shards for the give search
*/
public static ClusterSearchShardsRequest clusterSearchShardsRequest() {
return new ClusterSearchShardsRequest();
}
/**
* List all shards for the give search
*/
public static ClusterSearchShardsRequest clusterSearchShardsRequest(String... indices) {
return new ClusterSearchShardsRequest(indices);
}
/**
* Creates a nodes info request against all the nodes.
*
@ -436,4 +451,5 @@ public class Requests {
public static NodesRestartRequest nodesRestartRequest(String... nodesIds) {
return new NodesRestartRequest(nodesIds);
}
}

View File

@ -53,6 +53,10 @@ import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsActi
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequestBuilder;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequestBuilder;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
@ -203,4 +207,26 @@ public abstract class AbstractClusterAdminClient implements InternalClusterAdmin
public NodesShutdownRequestBuilder prepareNodesShutdown(String... nodesIds) {
return new NodesShutdownRequestBuilder(this).setNodesIds(nodesIds);
}
@Override
public ActionFuture<ClusterSearchShardsResponse> searchShards(final ClusterSearchShardsRequest request) {
return execute(ClusterSearchShardsAction.INSTANCE, request);
}
@Override
public void searchShards(final ClusterSearchShardsRequest request, final ActionListener<ClusterSearchShardsResponse> listener) {
execute(ClusterSearchShardsAction.INSTANCE, request, listener);
}
@Override
public ClusterSearchShardsRequestBuilder prepareSearchShards() {
return new ClusterSearchShardsRequestBuilder(this);
}
@Override
public ClusterSearchShardsRequestBuilder prepareSearchShards(String... indices) {
return new ClusterSearchShardsRequestBuilder(this).setIndices(indices);
}
}

View File

@ -31,6 +31,7 @@ import org.elasticsearch.rest.action.admin.cluster.node.stats.RestNodesStatsActi
import org.elasticsearch.rest.action.admin.cluster.reroute.RestClusterRerouteAction;
import org.elasticsearch.rest.action.admin.cluster.settings.RestClusterGetSettingsAction;
import org.elasticsearch.rest.action.admin.cluster.settings.RestClusterUpdateSettingsAction;
import org.elasticsearch.rest.action.admin.cluster.shards.RestClusterSearchShardsAction;
import org.elasticsearch.rest.action.admin.cluster.state.RestClusterStateAction;
import org.elasticsearch.rest.action.admin.indices.alias.RestGetIndicesAliasesAction;
import org.elasticsearch.rest.action.admin.indices.alias.RestIndicesAliasesAction;
@ -108,6 +109,7 @@ public class RestActionModule extends AbstractModule {
bind(RestClusterUpdateSettingsAction.class).asEagerSingleton();
bind(RestClusterGetSettingsAction.class).asEagerSingleton();
bind(RestClusterRerouteAction.class).asEagerSingleton();
bind(RestClusterSearchShardsAction.class).asEagerSingleton();
bind(RestIndicesExistsAction.class).asEagerSingleton();
bind(RestTypesExistsAction.class).asEagerSingleton();

View File

@ -0,0 +1,94 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.cluster.shards;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.support.IgnoreIndices;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestActions;
import org.elasticsearch.rest.action.support.RestXContentBuilder;
import java.io.IOException;
import static org.elasticsearch.rest.RestRequest.Method.GET;
import static org.elasticsearch.rest.RestRequest.Method.POST;
/**
*/
public class RestClusterSearchShardsAction extends BaseRestHandler {
@Inject
public RestClusterSearchShardsAction(Settings settings, Client client, RestController controller) {
super(settings, client);
controller.registerHandler(GET, "/_search_shards", this);
controller.registerHandler(POST, "/_search_shards", this);
controller.registerHandler(GET, "/{index}/_search_shards", this);
controller.registerHandler(POST, "/{index}/_search_shards", this);
controller.registerHandler(GET, "/{index}/{type}/_search_shards", this);
controller.registerHandler(POST, "/{index}/{type}/_search_shards", this);
}
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
String[] indices = RestActions.splitIndices(request.param("index"));
final ClusterSearchShardsRequest clusterSearchShardsRequest = Requests.clusterSearchShardsRequest(indices);
clusterSearchShardsRequest.local(request.paramAsBoolean("local", clusterSearchShardsRequest.local()));
clusterSearchShardsRequest.listenerThreaded(false);
clusterSearchShardsRequest.types(RestActions.splitTypes(request.param("type")));
clusterSearchShardsRequest.routing(request.param("routing"));
clusterSearchShardsRequest.preference(request.param("preference"));
if (request.hasParam("ignore_indices")) {
clusterSearchShardsRequest.ignoreIndices(IgnoreIndices.fromString(request.param("ignore_indices")));
}
client.admin().cluster().searchShards(clusterSearchShardsRequest, new ActionListener<ClusterSearchShardsResponse>() {
@Override
public void onResponse(ClusterSearchShardsResponse response) {
try {
XContentBuilder builder = RestXContentBuilder.restContentBuilder(request);
builder.startObject();
builder.field("ok", true);
response.toXContent(builder, request);
builder.endObject();
channel.sendResponse(new XContentRestResponse(request, RestStatus.OK, builder));
} catch (Exception e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.error("Failed to send failure response", e1);
}
}
});
}
}

View File

@ -0,0 +1,154 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.integration.cluster.shards;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.AliasAction;
import org.elasticsearch.common.Priority;
import org.elasticsearch.test.integration.AbstractNodesTests;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
/**
*/
public class ClusterSearchShardsTests extends AbstractNodesTests {
private Client client;
@BeforeClass
public void createNodes() throws Exception {
startNode("node1", settingsBuilder().put("node.tag", "A"));
startNode("node2", settingsBuilder().put("node.tag", "B"));
client = getClient();
}
@AfterClass
public void closeNodes() {
client.close();
closeAllNodes();
}
protected Client getClient() {
return client("node1");
}
@Test
public void testSingleShardAllocation() throws Exception {
try {
client.admin().indices().prepareDelete("test").execute().actionGet();
} catch (Exception e) {
// ignore
}
client.admin().indices().prepareCreate("test").setSettings(settingsBuilder()
.put("index.number_of_shards", "1").put("index.number_of_replicas", 0).put("index.routing.allocation.include.tag", "A")).execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
ClusterSearchShardsResponse response = client("node1").admin().cluster().prepareSearchShards("test").execute().actionGet();
assertThat(response.getGroups().length, equalTo(1));
assertThat(response.getGroups()[0].getIndex(), equalTo("test"));
assertThat(response.getGroups()[0].getShardId(), equalTo(0));
assertThat(response.getGroups()[0].getShards().length, equalTo(1));
assertThat(response.getNodes().length, equalTo(1));
assertThat(response.getGroups()[0].getShards()[0].currentNodeId(), equalTo(response.getNodes()[0].getId()));
response = client("node2").admin().cluster().prepareSearchShards("test").setRouting("A").execute().actionGet();
assertThat(response.getGroups().length, equalTo(1));
assertThat(response.getGroups()[0].getIndex(), equalTo("test"));
assertThat(response.getGroups()[0].getShardId(), equalTo(0));
assertThat(response.getGroups()[0].getShards().length, equalTo(1));
assertThat(response.getNodes().length, equalTo(1));
assertThat(response.getGroups()[0].getShards()[0].currentNodeId(), equalTo(response.getNodes()[0].getId()));
}
@Test
public void testMultipleShardsSingleNodeAllocation() throws Exception {
try {
client.admin().indices().prepareDelete("test").execute().actionGet();
} catch (Exception e) {
// ignore
}
client.admin().indices().prepareCreate("test").setSettings(settingsBuilder()
.put("index.number_of_shards", "4").put("index.number_of_replicas", 0).put("index.routing.allocation.include.tag", "A")).execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
ClusterSearchShardsResponse response = client("node1").admin().cluster().prepareSearchShards("test").execute().actionGet();
assertThat(response.getGroups().length, equalTo(4));
assertThat(response.getGroups()[0].getIndex(), equalTo("test"));
assertThat(response.getNodes().length, equalTo(1));
assertThat(response.getGroups()[0].getShards()[0].currentNodeId(), equalTo(response.getNodes()[0].getId()));
response = client("node1").admin().cluster().prepareSearchShards("test").setRouting("ABC").execute().actionGet();
assertThat(response.getGroups().length, equalTo(1));
response = client("node1").admin().cluster().prepareSearchShards("test").setPreference("_shards:2").execute().actionGet();
assertThat(response.getGroups().length, equalTo(1));
assertThat(response.getGroups()[0].getShardId(), equalTo(2));
}
@Test
public void testMultipleIndicesAllocation() throws Exception {
try {
client.admin().indices().prepareDelete("test1").execute().actionGet();
} catch (Exception e) {
// ignore
}
try {
client.admin().indices().prepareDelete("test2").execute().actionGet();
} catch (Exception e) {
// ignore
}
client.admin().indices().prepareCreate("test1").setSettings(settingsBuilder()
.put("index.number_of_shards", "4").put("index.number_of_replicas", 1)).execute().actionGet();
client.admin().indices().prepareCreate("test2").setSettings(settingsBuilder()
.put("index.number_of_shards", "4").put("index.number_of_replicas", 1)).execute().actionGet();
client.admin().indices().prepareAliases()
.addAliasAction(AliasAction.newAddAliasAction("test1", "routing_alias").routing("ABC"))
.addAliasAction(AliasAction.newAddAliasAction("test2", "routing_alias").routing("EFG"))
.execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
ClusterSearchShardsResponse response = client.admin().cluster().prepareSearchShards("routing_alias").execute().actionGet();
assertThat(response.getGroups().length, equalTo(2));
assertThat(response.getGroups()[0].getShards().length, equalTo(2));
assertThat(response.getGroups()[1].getShards().length, equalTo(2));
boolean seenTest1 = false;
boolean seenTest2 = false;
for (ClusterSearchShardsGroup group : response.getGroups()) {
if (group.getIndex().equals("test1")) {
seenTest1 = true;
assertThat(group.getShards().length, equalTo(2));
} else if (group.getIndex().equals("test2")) {
seenTest2 = true;
assertThat(group.getShards().length, equalTo(2));
} else {
assert false;
}
}
assertThat(seenTest1, equalTo(true));
assertThat(seenTest2, equalTo(true));
assertThat(response.getNodes().length, equalTo(2));
}
}