API: Allow to control document shard routing, and search shard routing, closes #470.

This commit is contained in:
kimchy 2010-11-02 19:56:18 +02:00
parent 8e2e85f460
commit a62f1f3e0d
44 changed files with 681 additions and 99 deletions

View File

@ -61,7 +61,7 @@ public class TransportBroadcastPingAction extends TransportBroadcastOperationAct
}
@Override protected GroupShardsIterator shards(BroadcastPingRequest request, ClusterState clusterState) {
return clusterService.operationRouting().searchShards(clusterState, request.indices(), request.queryHint());
return clusterService.operationRouting().searchShards(clusterState, request.indices(), request.queryHint(), null);
}
@Override protected BroadcastPingResponse newResponse(BroadcastPingRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {

View File

@ -108,6 +108,7 @@ public class BulkRequest implements ActionRequest {
String index = null;
String type = null;
String id = null;
String routing = null;
String opType = null;
String currentFieldName = null;
@ -121,6 +122,8 @@ public class BulkRequest implements ActionRequest {
type = parser.text();
} else if ("_id".equals(currentFieldName)) {
id = parser.text();
} else if ("_routing".equals(currentFieldName)) {
routing = parser.text();
} else if ("op_type".equals(currentFieldName) || "opType".equals(currentFieldName)) {
opType = parser.text();
}
@ -128,7 +131,7 @@ public class BulkRequest implements ActionRequest {
}
if ("delete".equals(action)) {
add(new DeleteRequest(index, type, id));
add(new DeleteRequest(index, type, id).routing(routing));
} else {
nextMarker = findNextMarker(marker, from, data, length);
if (nextMarker == -1) {
@ -136,15 +139,15 @@ public class BulkRequest implements ActionRequest {
}
if ("index".equals(action)) {
if (opType == null) {
add(new IndexRequest(index, type, id)
add(new IndexRequest(index, type, id).routing(routing)
.source(data, from, nextMarker - from, contentUnsafe));
} else {
add(new IndexRequest(index, type, id)
add(new IndexRequest(index, type, id).routing(routing)
.create("create".equals(opType))
.source(data, from, nextMarker - from, contentUnsafe));
}
} else if ("create".equals(action)) {
add(new IndexRequest(index, type, id)
add(new IndexRequest(index, type, id).routing(routing)
.create(true)
.source(data, from, nextMarker - from, contentUnsafe));
}

View File

@ -157,10 +157,10 @@ public class TransportBulkAction extends BaseAction<BulkRequest, BulkResponse> {
ShardId shardId = null;
if (request instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) request;
shardId = clusterService.operationRouting().indexShards(clusterState, indexRequest.index(), indexRequest.type(), indexRequest.id()).shardId();
shardId = clusterService.operationRouting().indexShards(clusterState, indexRequest.index(), indexRequest.type(), indexRequest.id(), indexRequest.routing()).shardId();
} else if (request instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) request;
shardId = clusterService.operationRouting().deleteShards(clusterState, deleteRequest.index(), deleteRequest.type(), deleteRequest.id()).shardId();
shardId = clusterService.operationRouting().deleteShards(clusterState, deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), deleteRequest.routing()).shardId();
}
List<BulkItemRequest> list = requestsByShard.get(shardId);
if (list == null) {

View File

@ -63,6 +63,7 @@ public class CountRequest extends BroadcastOperationRequest {
private float minScore = DEFAULT_MIN_SCORE;
@Nullable protected String queryHint;
@Nullable protected String routing;
private byte[] querySource;
private int querySourceOffset;
@ -264,6 +265,29 @@ public class CountRequest extends BroadcastOperationRequest {
return this;
}
/**
* A comma separated list of routing values to control the shards the search will be executed on.
*/
public String routing() {
return this.routing;
}
/**
* A comma separated list of routing values to control the shards the search will be executed on.
*/
public CountRequest routing(String routing) {
this.routing = routing;
return this;
}
/**
* The routing values to control the shards that the search will be executed on.
*/
public CountRequest routing(String... routings) {
this.routing = Strings.arrayToCommaDelimitedString(routings);
return this;
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
minScore = in.readFloat();
@ -271,6 +295,9 @@ public class CountRequest extends BroadcastOperationRequest {
if (in.readBoolean()) {
queryHint = in.readUTF();
}
if (in.readBoolean()) {
routing = in.readUTF();
}
querySourceUnsafe = false;
querySourceOffset = 0;
@ -300,6 +327,12 @@ public class CountRequest extends BroadcastOperationRequest {
out.writeBoolean(true);
out.writeUTF(queryHint);
}
if (routing == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(routing);
}
out.writeVInt(querySourceLength);
out.writeBytes(querySource, querySourceOffset, querySourceLength);

View File

@ -79,7 +79,7 @@ public class TransportCountAction extends TransportBroadcastOperationAction<Coun
}
@Override protected GroupShardsIterator shards(CountRequest request, ClusterState clusterState) {
return clusterService.operationRouting().searchShards(clusterState, request.indices(), request.queryHint());
return clusterService.operationRouting().searchShards(clusterState, request.indices(), request.queryHint(), request.routing());
}
@Override protected void checkBlock(CountRequest request, ClusterState state) {

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import javax.annotation.Nullable;
import java.io.IOException;
import static org.elasticsearch.action.Actions.*;
@ -48,6 +49,7 @@ public class DeleteRequest extends ShardReplicationOperationRequest {
private String type;
private String id;
@Nullable private String routing;
private boolean refresh;
/**
@ -164,6 +166,23 @@ public class DeleteRequest extends ShardReplicationOperationRequest {
return this;
}
/**
* Controls the shard routing of the request. Using this value to hash the shard
* and not the id.
*/
public DeleteRequest routing(String routing) {
this.routing = routing;
return this;
}
/**
* Controls the shard routing of the delete request. Using this value to hash the shard
* and not the id.
*/
public String routing() {
return this.routing;
}
/**
* Should a refresh be executed post this index operation causing the operation to
* be searchable. Note, heavy indexing should not set this to <tt>true</tt>. Defaults
@ -182,6 +201,9 @@ public class DeleteRequest extends ShardReplicationOperationRequest {
super.readFrom(in);
type = in.readUTF();
id = in.readUTF();
if (in.readBoolean()) {
routing = in.readUTF();
}
refresh = in.readBoolean();
}
@ -189,6 +211,12 @@ public class DeleteRequest extends ShardReplicationOperationRequest {
super.writeTo(out);
out.writeUTF(type);
out.writeUTF(id);
if (routing == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(routing);
}
out.writeBoolean(refresh);
}

View File

@ -102,7 +102,10 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
@Override protected DeleteResponse shardOperationOnPrimary(ShardOperationRequest shardRequest) {
DeleteRequest request = shardRequest.request;
indexShard(shardRequest).delete(request.type(), request.id());
IndexShard indexShard = indexShard(shardRequest);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id());
delete.refresh(request.refresh());
indexShard.delete(delete);
return new DeleteResponse(request.index(), request.type(), request.id());
}
@ -116,6 +119,6 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
@Override protected ShardsIterator shards(ClusterState clusterState, DeleteRequest request) {
return clusterService.operationRouting()
.deleteShards(clusterService.state(), request.index(), request.type(), request.id());
.deleteShards(clusterService.state(), request.index(), request.type(), request.id(), request.routing());
}
}

View File

@ -38,6 +38,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
@ -67,6 +68,7 @@ public class DeleteByQueryRequest extends IndicesReplicationOperationRequest {
private String queryParserName;
private String[] types = Strings.EMPTY_ARRAY;
@Nullable private String routing;
/**
* Constructs a new delete by query request to run against the provided indices. No indices means
@ -207,6 +209,29 @@ public class DeleteByQueryRequest extends IndicesReplicationOperationRequest {
return this.types;
}
/**
* A comma separated list of routing values to control the shards the search will be executed on.
*/
public String routing() {
return this.routing;
}
/**
* A comma separated list of routing values to control the shards the search will be executed on.
*/
public DeleteByQueryRequest routing(String routing) {
this.routing = routing;
return this;
}
/**
* The routing values to control the shards that the search will be executed on.
*/
public DeleteByQueryRequest routing(String... routings) {
this.routing = Strings.arrayToCommaDelimitedString(routings);
return this;
}
/**
* The types of documents the query will run against. Defaults to all types.
*/
@ -264,6 +289,10 @@ public class DeleteByQueryRequest extends IndicesReplicationOperationRequest {
if (in.readBoolean()) {
queryParserName = in.readUTF();
}
if (in.readBoolean()) {
routing = in.readUTF();
}
}
public void writeTo(StreamOutput out) throws IOException {
@ -278,6 +307,12 @@ public class DeleteByQueryRequest extends IndicesReplicationOperationRequest {
out.writeBoolean(true);
out.writeUTF(queryParserName);
}
if (routing == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(routing);
}
}
@Override public String toString() {

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import javax.annotation.Nullable;
import java.io.IOException;
import static org.elasticsearch.action.Actions.*;
@ -42,6 +43,7 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest
private byte[] querySource;
private String queryParserName;
private String[] types = Strings.EMPTY_ARRAY;
@Nullable private String routing;
IndexDeleteByQueryRequest(DeleteByQueryRequest request, String index) {
this.index = index;
@ -51,6 +53,7 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest
this.types = request.types();
this.replicationType = request.replicationType();
this.consistencyLevel = request.consistencyLevel();
this.routing = request.routing();
}
IndexDeleteByQueryRequest() {
@ -81,6 +84,10 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest
return queryParserName;
}
String routing() {
return this.routing;
}
String[] types() {
return this.types;
}
@ -109,6 +116,9 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest
types[i] = in.readUTF();
}
}
if (in.readBoolean()) {
routing = in.readUTF();
}
}
public void writeTo(StreamOutput out) throws IOException {
@ -125,5 +135,11 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest
for (String type : types) {
out.writeUTF(type);
}
if (routing == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(routing);
}
}
}

View File

@ -43,20 +43,18 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest
private byte[] querySource;
private String queryParserName;
private String[] types = Strings.EMPTY_ARRAY;
public ShardDeleteByQueryRequest(String index, byte[] querySource, @Nullable String queryParserName, String[] types, int shardId) {
this.index = index;
this.querySource = querySource;
this.queryParserName = queryParserName;
this.types = types;
this.shardId = shardId;
}
@Nullable private String routing;
ShardDeleteByQueryRequest(IndexDeleteByQueryRequest request, int shardId) {
this(request.index(), request.querySource(), request.queryParserName(), request.types(), shardId);
this.index = request.index();
this.querySource = request.querySource();
this.queryParserName = request.queryParserName();
this.types = request.types();
this.shardId = shardId;
replicationType(request.replicationType());
consistencyLevel(request.consistencyLevel());
timeout = request.timeout();
this.routing = request.routing();
}
ShardDeleteByQueryRequest() {
@ -86,6 +84,10 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest
return this.types;
}
public String routing() {
return this.routing;
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
querySource = new byte[in.readVInt()];
@ -101,6 +103,9 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest
types[i] = in.readUTF();
}
}
if (in.readBoolean()) {
routing = in.readUTF();
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
@ -118,6 +123,12 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest
for (String type : types) {
out.writeUTF(type);
}
if (routing == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(routing);
}
}
@Override public String toString() {

View File

@ -71,7 +71,7 @@ public class TransportIndexDeleteByQueryAction extends TransportIndexReplication
}
@Override protected GroupShardsIterator shards(IndexDeleteByQueryRequest request) {
return clusterService.operationRouting().deleteByQueryShards(clusterService.state(), request.index());
return clusterService.operationRouting().deleteByQueryShards(clusterService.state(), request.index(), request.routing());
}
@Override protected ShardDeleteByQueryRequest newShardRequestInstance(IndexDeleteByQueryRequest request, int shardId) {

View File

@ -76,7 +76,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
}
@Override protected ShardsIterator shards(ClusterState clusterState, ShardDeleteByQueryRequest request) {
GroupShardsIterator group = clusterService.operationRouting().deleteByQueryShards(clusterService.state(), request.index());
GroupShardsIterator group = clusterService.operationRouting().deleteByQueryShards(clusterService.state(), request.index(), request.routing());
for (ShardsIterator shards : group) {
if (shards.shardId().id() == request.shardId()) {
return shards;

View File

@ -88,6 +88,16 @@ public class GetRequest extends SingleOperationRequest {
return this;
}
/**
* Controls the shard routing of the request. Using this value to hash the shard
* and not the id.
*/
public GetRequest routing(String routing) {
this.routing = routing;
return this;
}
/**
* Explicitly specify the fields that will be returned. By default, the <tt>_source</tt>
* field will be returned.

View File

@ -36,6 +36,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
@ -106,6 +107,7 @@ public class IndexRequest extends ShardReplicationOperationRequest {
private String type;
private String id;
@Nullable private String routing;
private byte[] source;
private int sourceOffset;
@ -231,6 +233,23 @@ public class IndexRequest extends ShardReplicationOperationRequest {
return this;
}
/**
* Controls the shard routing of the request. Using this value to hash the shard
* and not the id.
*/
public IndexRequest routing(String routing) {
this.routing = routing;
return this;
}
/**
* Controls the shard routing of the request. Using this value to hash the shard
* and not the id.
*/
public String routing() {
return this.routing;
}
/**
* The source of the document to index.
*/
@ -472,6 +491,9 @@ public class IndexRequest extends ShardReplicationOperationRequest {
if (in.readBoolean()) {
id = in.readUTF();
}
if (in.readBoolean()) {
routing = in.readUTF();
}
sourceUnsafe = false;
sourceOffset = 0;
@ -492,6 +514,12 @@ public class IndexRequest extends ShardReplicationOperationRequest {
out.writeBoolean(true);
out.writeUTF(id);
}
if (routing == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(routing);
}
out.writeVInt(sourceLength);
out.writeBytes(source, sourceOffset, sourceLength);
out.writeByte(opType.id());

View File

@ -129,7 +129,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
@Override protected ShardsIterator shards(ClusterState clusterState, IndexRequest request) {
return clusterService.operationRouting()
.indexShards(clusterService.state(), request.index(), request.type(), request.id());
.indexShards(clusterService.state(), request.index(), request.type(), request.id(), request.routing());
}
@Override protected IndexResponse shardOperationOnPrimary(ShardOperationRequest shardRequest) {
@ -141,9 +141,9 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
index.refresh(request.refresh());
doc = indexShard.index(index);
} else {
Engine.Create create = indexShard(shardRequest).prepareCreate(request.type(), request.id(), request.source());
Engine.Create create = indexShard.prepareCreate(request.type(), request.id(), request.source());
create.refresh(request.refresh());
doc = indexShard(shardRequest).create(create);
doc = indexShard.create(create);
}
if (doc.mappersAdded()) {
updateMappingOnMaster(request);
@ -152,11 +152,16 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
}
@Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {
IndexShard indexShard = indexShard(shardRequest);
IndexRequest request = shardRequest.request;
if (request.opType() == IndexRequest.OpType.INDEX) {
indexShard(shardRequest).index(request.type(), request.id(), request.source());
Engine.Index index = indexShard.prepareIndex(request.type(), request.id(), request.source());
index.refresh(request.refresh());
indexShard.index(index);
} else {
indexShard(shardRequest).create(request.type(), request.id(), request.source());
Engine.Create create = indexShard.prepareCreate(request.type(), request.id(), request.source());
create.refresh(request.refresh());
indexShard.create(create);
}
}

View File

@ -38,6 +38,7 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
@ -68,7 +69,8 @@ public class SearchRequest implements ActionRequest {
private String[] indices;
private String queryHint;
@Nullable private String queryHint;
@Nullable private String routing;
private byte[] source;
private int sourceOffset;
@ -195,6 +197,29 @@ public class SearchRequest implements ActionRequest {
return this;
}
/**
* A comma separated list of routing values to control the shards the search will be executed on.
*/
public String routing() {
return this.routing;
}
/**
* A comma separated list of routing values to control the shards the search will be executed on.
*/
public SearchRequest routing(String routing) {
this.routing = routing;
return this;
}
/**
* The routing values to control the shards that the search will be executed on.
*/
public SearchRequest routing(String... routings) {
this.routing = Strings.arrayToCommaDelimitedString(routings);
return this;
}
/**
* The search type to execute, defaults to {@link SearchType#DEFAULT}.
*/
@ -466,6 +491,9 @@ public class SearchRequest implements ActionRequest {
if (in.readBoolean()) {
queryHint = in.readUTF();
}
if (in.readBoolean()) {
routing = in.readUTF();
}
if (in.readBoolean()) {
scroll = readScroll(in);
@ -518,6 +546,12 @@ public class SearchRequest implements ActionRequest {
out.writeBoolean(true);
out.writeUTF(queryHint);
}
if (routing == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(routing);
}
if (scroll == null) {
out.writeBoolean(false);

View File

@ -110,7 +110,7 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
clusterState.blocks().indexBlockedRaiseException(ClusterBlockLevel.READ, index);
}
shardsIts = clusterService.operationRouting().searchShards(clusterState, request.indices(), request.queryHint());
shardsIts = clusterService.operationRouting().searchShards(clusterState, request.indices(), request.queryHint(), request.routing());
expectedSuccessfulOps = shardsIts.size();
expectedTotalOps = shardsIts.totalSizeActive();

View File

@ -35,6 +35,7 @@ public abstract class SingleOperationRequest implements ActionRequest {
protected String index;
protected String type;
protected String id;
protected String routing;
private boolean threadedListener = false;
private boolean threadedOperation = true;
@ -79,6 +80,10 @@ public abstract class SingleOperationRequest implements ActionRequest {
return id;
}
public String routing() {
return this.routing;
}
/**
* Should the listener be called on a separate thread if needed.
*/
@ -110,6 +115,9 @@ public abstract class SingleOperationRequest implements ActionRequest {
index = in.readUTF();
type = in.readUTF();
id = in.readUTF();
if (in.readBoolean()) {
routing = in.readUTF();
}
// no need to pass threading over the network, they are always false when coming throw a thread pool
}
@ -117,6 +125,12 @@ public abstract class SingleOperationRequest implements ActionRequest {
out.writeUTF(index);
out.writeUTF(type);
out.writeUTF(id);
if (routing == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(routing);
}
}
}

View File

@ -106,7 +106,7 @@ public abstract class TransportSingleOperationAction<Request extends SingleOpera
checkBlock(request, clusterState);
this.shardsIt = clusterService.operationRouting()
.getShards(clusterState, request.index(), request.type(), request.id());
.getShards(clusterState, request.index(), request.type(), request.id(), request.routing());
}
public void start() {

View File

@ -79,6 +79,22 @@ public class CountRequestBuilder extends BaseRequestBuilder<CountRequest, CountR
return this;
}
/**
* A comma separated list of routing values to control the shards the search will be executed on.
*/
public CountRequestBuilder setRouting(String routing) {
request.routing(routing);
return this;
}
/**
* The routing values to control the shards that the search will be executed on.
*/
public CountRequestBuilder setRouting(String... routing) {
request.routing(routing);
return this;
}
/**
* The query source to execute.
*

View File

@ -64,6 +64,15 @@ public class DeleteRequestBuilder extends BaseRequestBuilder<DeleteRequest, Dele
return this;
}
/**
* Controls the shard routing of the delete request. Using this value to hash the shard
* and not the id.
*/
public DeleteRequestBuilder setRouting(String routing) {
request.routing(routing);
return this;
}
/**
* Should a refresh be executed post this index operation causing the operation to
* be searchable. Note, heavy indexing should not set this to <tt>true</tt>. Defaults

View File

@ -57,6 +57,23 @@ public class DeleteByQueryRequestBuilder extends BaseRequestBuilder<DeleteByQuer
return this;
}
/**
* A comma separated list of routing values to control the shards the action will be executed on.
*/
public DeleteByQueryRequestBuilder setRouting(String routing) {
request.routing(routing);
return this;
}
/**
* The routing values to control the shards that the action will be executed on.
*/
public DeleteByQueryRequestBuilder setRouting(String... routing) {
request.routing(routing);
return this;
}
/**
* The query source to execute.
*
@ -103,7 +120,7 @@ public class DeleteByQueryRequestBuilder extends BaseRequestBuilder<DeleteByQuer
/**
* The query source to execute.
*/
public DeleteByQueryRequestBuilder query(byte[] querySource, int offset, int length, boolean unsafe) {
public DeleteByQueryRequestBuilder setQuery(byte[] querySource, int offset, int length, boolean unsafe) {
request.query(querySource, offset, length, unsafe);
return this;
}

View File

@ -62,6 +62,15 @@ public class GetRequestBuilder extends BaseRequestBuilder<GetRequest, GetRespons
return this;
}
/**
* Controls the shard routing of the request. Using this value to hash the shard
* and not the id.
*/
public GetRequestBuilder setRouting(String routing) {
request.routing(routing);
return this;
}
/**
* Explicitly specify the fields that will be returned. By default, the <tt>_source</tt>
* field will be returned.

View File

@ -69,6 +69,15 @@ public class IndexRequestBuilder extends BaseRequestBuilder<IndexRequest, IndexR
return this;
}
/**
* Controls the shard routing of the request. Using this value to hash the shard
* and not the id.
*/
public IndexRequestBuilder setRouting(String routing) {
request.routing(routing);
return this;
}
/**
* Index the Map as a JSON.
*

View File

@ -134,6 +134,22 @@ public class SearchRequestBuilder extends BaseRequestBuilder<SearchRequest, Sear
return this;
}
/**
* A comma separated list of routing values to control the shards the search will be executed on.
*/
public SearchRequestBuilder setRouting(String routing) {
request.routing(routing);
return this;
}
/**
* The routing values to control the shards that the search will be executed on.
*/
public SearchRequestBuilder setRouting(String... routing) {
request.routing(routing);
return this;
}
/**
* Controls the the search operation threading model.
*/

View File

@ -19,36 +19,20 @@
package org.elasticsearch.cluster.routing;
import org.elasticsearch.common.collect.IdentityHashSet;
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class GroupShardsIterator implements Iterable<ShardsIterator> {
private final Set<ShardsIterator> iterators;
private final Collection<ShardsIterator> iterators;
public GroupShardsIterator() {
this(new IdentityHashSet<ShardsIterator>());
}
public GroupShardsIterator(Set<ShardsIterator> iterators) {
public GroupShardsIterator(Collection<ShardsIterator> iterators) {
this.iterators = iterators;
}
public void add(ShardsIterator shardsIterator) {
iterators.add(shardsIterator);
}
public void add(Iterable<ShardsIterator> shardsIterator) {
for (ShardsIterator it : shardsIterator) {
add(it);
}
}
public int totalSize() {
int size = 0;
for (ShardsIterator shard : iterators) {
@ -69,7 +53,7 @@ public class GroupShardsIterator implements Iterable<ShardsIterator> {
return iterators.size();
}
public Set<ShardsIterator> iterators() {
public Collection<ShardsIterator> iterators() {
return iterators;
}

View File

@ -21,7 +21,6 @@ package org.elasticsearch.cluster.routing;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.collect.IdentityHashSet;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.collect.UnmodifiableIterator;
@ -30,10 +29,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.concurrent.Immutable;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import static org.elasticsearch.common.collect.Lists.*;
@ -146,7 +142,8 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
* is an iterator across shard replication group.
*/
public GroupShardsIterator groupByShardsIt() {
IdentityHashSet<ShardsIterator> set = new IdentityHashSet<ShardsIterator>();
// use list here since we need to maintain identity across shards
ArrayList<ShardsIterator> set = new ArrayList<ShardsIterator>();
for (IndexShardRoutingTable indexShard : this) {
set.add(indexShard.shardsIt());
}
@ -161,7 +158,8 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
* over *all* the shards (all the replicas) within the index.
*/
public GroupShardsIterator groupByAllIt() {
IdentityHashSet<ShardsIterator> set = new IdentityHashSet<ShardsIterator>();
// use list here since we need to maintain identity across shards
ArrayList<ShardsIterator> set = new ArrayList<ShardsIterator>();
for (IndexShardRoutingTable indexShard : this) {
for (ShardRouting shardRouting : indexShard) {
set.add(shardRouting.shardsIt());

View File

@ -40,11 +40,11 @@ import static org.elasticsearch.common.collect.Lists.*;
*/
public class IndexShardRoutingTable implements Iterable<ShardRouting> {
private final ShardId shardId;
final ShardId shardId;
private final ImmutableList<ShardRouting> shards;
final ImmutableList<ShardRouting> shards;
private final AtomicInteger counter;
final AtomicInteger counter;
IndexShardRoutingTable(ShardId shardId, ImmutableList<ShardRouting> shards) {
this.shardId = shardId;
@ -279,6 +279,20 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
@Override public ShardId shardId() {
return IndexShardRoutingTable.this.shardId();
}
@Override public boolean equals(Object o) {
if (this == o) return true;
ShardsIterator that = (ShardsIterator) o;
if (shardId != null ? !shardId.equals(that.shardId()) : that.shardId() != null) return false;
return true;
}
@Override public int hashCode() {
return shardId != null ? shardId.hashCode() : 0;
}
}
public static class Builder {

View File

@ -148,4 +148,18 @@ public class PlainShardsIterator implements ShardsIterator {
@Override public void remove() {
throw new UnsupportedOperationException();
}
@Override public boolean equals(Object o) {
if (this == o) return true;
ShardsIterator that = (ShardsIterator) o;
if (shardId != null ? !shardId.equals(that.shardId()) : that.shardId() != null) return false;
return true;
}
@Override public int hashCode() {
return shardId != null ? shardId.hashCode() : 0;
}
}

View File

@ -33,6 +33,7 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndexMissingException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -143,7 +144,8 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
* @see IndexRoutingTable#groupByAllIt()
*/
public GroupShardsIterator allShardsGrouped(String... indices) throws IndexMissingException {
GroupShardsIterator its = new GroupShardsIterator();
// use list here since we need to maintain identity across shards
ArrayList<ShardsIterator> set = new ArrayList<ShardsIterator>();
if (indices == null || indices.length == 0) {
indices = indicesRouting.keySet().toArray(new String[indicesRouting.keySet().size()]);
}
@ -156,11 +158,11 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
}
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
for (ShardRouting shardRouting : indexShardRoutingTable) {
its.add(shardRouting.shardsIt());
set.add(shardRouting.shardsIt());
}
}
}
return its;
return new GroupShardsIterator(set);
}
/**
@ -174,7 +176,8 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
* @see IndexRoutingTable#groupByAllIt()
*/
public GroupShardsIterator primaryShardsGrouped(String... indices) throws IndexMissingException {
GroupShardsIterator its = new GroupShardsIterator();
// use list here since we need to maintain identity across shards
ArrayList<ShardsIterator> set = new ArrayList<ShardsIterator>();
if (indices == null || indices.length == 0) {
indices = indicesRouting.keySet().toArray(new String[indicesRouting.keySet().size()]);
}
@ -184,10 +187,10 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
throw new IndexMissingException(new Index(index));
}
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
its.add(indexShardRoutingTable.primaryShard().shardsIt());
set.add(indexShardRoutingTable.primaryShard().shardsIt());
}
}
return its;
return new GroupShardsIterator(set);
}
public static Builder builder() {

View File

@ -101,4 +101,8 @@ public interface ShardsIterator extends Iterable<ShardRouting>, Iterator<ShardRo
* @see ShardRouting#assignedToNode()
*/
ShardRouting nextAssignedOrNull();
int hashCode();
boolean equals(Object other);
}

View File

@ -32,13 +32,13 @@ import javax.annotation.Nullable;
*/
public interface OperationRouting {
ShardsIterator indexShards(ClusterState clusterState, String index, String type, String id) throws IndexMissingException, IndexShardMissingException;
ShardsIterator indexShards(ClusterState clusterState, String index, String type, String id, @Nullable String routing) throws IndexMissingException, IndexShardMissingException;
ShardsIterator deleteShards(ClusterState clusterState, String index, String type, String id) throws IndexMissingException, IndexShardMissingException;
ShardsIterator deleteShards(ClusterState clusterState, String index, String type, String id, @Nullable String routing) throws IndexMissingException, IndexShardMissingException;
ShardsIterator getShards(ClusterState clusterState, String index, String type, String id) throws IndexMissingException, IndexShardMissingException;
ShardsIterator getShards(ClusterState clusterState, String index, String type, String id, @Nullable String routing) throws IndexMissingException, IndexShardMissingException;
GroupShardsIterator deleteByQueryShards(ClusterState clusterState, String index) throws IndexMissingException;
GroupShardsIterator deleteByQueryShards(ClusterState clusterState, String index, @Nullable String routing) throws IndexMissingException;
GroupShardsIterator searchShards(ClusterState clusterState, String[] indices, @Nullable String queryHint) throws IndexMissingException;
GroupShardsIterator searchShards(ClusterState clusterState, String[] indices, @Nullable String queryHint, @Nullable String routing) throws IndexMissingException;
}

View File

@ -24,5 +24,7 @@ package org.elasticsearch.cluster.routing.operation.hash;
*/
public interface HashFunction {
int hash(String routing);
int hash(String type, String id);
}

View File

@ -26,6 +26,16 @@ import org.elasticsearch.cluster.routing.operation.hash.HashFunction;
*/
public class DjbHashFunction implements HashFunction {
@Override public int hash(String routing) {
long hash = 5381;
for (int i = 0; i < routing.length(); i++) {
hash = ((hash << 5) + hash) + routing.charAt(i);
}
return (int) hash;
}
@Override public int hash(String type, String id) {
long hash = 5381;

View File

@ -26,6 +26,10 @@ import org.elasticsearch.cluster.routing.operation.hash.HashFunction;
*/
public class SimpleHashFunction implements HashFunction {
@Override public int hash(String routing) {
return routing.hashCode();
}
@Override public int hash(String type, String id) {
return type.hashCode() + 31 * id.hashCode();
}

View File

@ -27,7 +27,6 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.routing.operation.OperationRouting;
import org.elasticsearch.cluster.routing.operation.hash.HashFunction;
import org.elasticsearch.common.collect.IdentityHashSet;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -37,12 +36,17 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexMissingException;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.regex.Pattern;
/**
* @author kimchy (shay.banon)
*/
public class PlainOperationRouting extends AbstractComponent implements OperationRouting {
public final static Pattern routingPattern = Pattern.compile(",");
private final HashFunction hashFunction;
@Inject public PlainOperationRouting(Settings indexSettings, HashFunction hashFunction) {
@ -50,35 +54,79 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio
this.hashFunction = hashFunction;
}
@Override public ShardsIterator indexShards(ClusterState clusterState, String index, String type, String id) throws IndexMissingException, IndexShardMissingException {
return shards(clusterState, index, type, id).shardsIt();
@Override public ShardsIterator indexShards(ClusterState clusterState, String index, String type, String id, @Nullable String routing) throws IndexMissingException, IndexShardMissingException {
return shards(clusterState, index, type, id, routing).shardsIt();
}
@Override public ShardsIterator deleteShards(ClusterState clusterState, String index, String type, String id) throws IndexMissingException, IndexShardMissingException {
return shards(clusterState, index, type, id).shardsIt();
@Override public ShardsIterator deleteShards(ClusterState clusterState, String index, String type, String id, @Nullable String routing) throws IndexMissingException, IndexShardMissingException {
return shards(clusterState, index, type, id, routing).shardsIt();
}
@Override public ShardsIterator getShards(ClusterState clusterState, String index, String type, String id) throws IndexMissingException, IndexShardMissingException {
return shards(clusterState, index, type, id).shardsRandomIt();
@Override public ShardsIterator getShards(ClusterState clusterState, String index, String type, String id, @Nullable String routing) throws IndexMissingException, IndexShardMissingException {
return shards(clusterState, index, type, id, routing).shardsRandomIt();
}
@Override public GroupShardsIterator deleteByQueryShards(ClusterState clusterState, String index) throws IndexMissingException {
return indexRoutingTable(clusterState, index).groupByShardsIt();
@Override public GroupShardsIterator deleteByQueryShards(ClusterState clusterState, String index, @Nullable String routing) throws IndexMissingException {
if (routing == null) {
return indexRoutingTable(clusterState, index).groupByShardsIt();
}
String[] routings = routingPattern.split(routing);
if (routing.length() == 0) {
return indexRoutingTable(clusterState, index).groupByShardsIt();
}
// we use set here and not identity set since we might get duplicates
HashSet<ShardsIterator> set = new HashSet<ShardsIterator>();
IndexRoutingTable indexRouting = indexRoutingTable(clusterState, index);
for (String r : routings) {
int shardId = shardId(clusterState, index, null, null, r);
IndexShardRoutingTable indexShard = indexRouting.shard(shardId);
if (indexShard == null) {
throw new IndexShardMissingException(new ShardId(index, shardId));
}
set.add(indexShard.shardsRandomIt());
}
return new GroupShardsIterator(set);
}
@Override public GroupShardsIterator searchShards(ClusterState clusterState, String[] indices, @Nullable String queryHint) throws IndexMissingException {
@Override public GroupShardsIterator searchShards(ClusterState clusterState, String[] indices, @Nullable String queryHint, @Nullable String routing) throws IndexMissingException {
if (indices == null || indices.length == 0) {
indices = clusterState.metaData().concreteAllIndices();
}
IdentityHashSet<ShardsIterator> set = new IdentityHashSet<ShardsIterator>();
for (String index : indices) {
IndexRoutingTable indexRouting = indexRoutingTable(clusterState, index);
for (IndexShardRoutingTable indexShard : indexRouting) {
set.add(indexShard.shardsRandomIt());
}
String[] routings = null;
if (routing != null) {
routings = routingPattern.split(routing);
}
if (routings != null && routings.length > 0) {
// we use set here and not list since we might get duplicates
HashSet<ShardsIterator> set = new HashSet<ShardsIterator>();
for (String index : indices) {
IndexRoutingTable indexRouting = indexRoutingTable(clusterState, index);
for (String r : routings) {
int shardId = shardId(clusterState, index, null, null, r);
IndexShardRoutingTable indexShard = indexRouting.shard(shardId);
if (indexShard == null) {
throw new IndexShardMissingException(new ShardId(index, shardId));
}
// we might get duplicates, but that's ok, they will override one another
set.add(indexShard.shardsRandomIt());
}
}
return new GroupShardsIterator(set);
} else {
// we use list here since we know we are not going to create duplicates
ArrayList<ShardsIterator> set = new ArrayList<ShardsIterator>();
for (String index : indices) {
IndexRoutingTable indexRouting = indexRoutingTable(clusterState, index);
for (IndexShardRoutingTable indexShard : indexRouting) {
set.add(indexShard.shardsRandomIt());
}
}
return new GroupShardsIterator(set);
}
return new GroupShardsIterator(set);
}
public IndexMetaData indexMetaData(ClusterState clusterState, String index) {
@ -98,8 +146,10 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio
}
protected IndexShardRoutingTable shards(ClusterState clusterState, String index, String type, String id) {
int shardId = Math.abs(hash(type, id)) % indexMetaData(clusterState, index).numberOfShards();
// either routing is set, or type/id are set
protected IndexShardRoutingTable shards(ClusterState clusterState, String index, String type, String id, String routing) {
int shardId = shardId(clusterState, index, type, id, routing);
IndexShardRoutingTable indexShard = indexRoutingTable(clusterState, index).shard(shardId);
if (indexShard == null) {
throw new IndexShardMissingException(new ShardId(index, shardId));
@ -107,6 +157,17 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio
return indexShard;
}
private int shardId(ClusterState clusterState, String index, String type, @Nullable String id, @Nullable String routing) {
if (routing == null) {
return Math.abs(hash(type, id)) % indexMetaData(clusterState, index).numberOfShards();
}
return Math.abs(hash(routing)) % indexMetaData(clusterState, index).numberOfShards();
}
protected int hash(String routing) {
return hashFunction.hash(routing);
}
protected int hash(String type, String id) {
return hashFunction.hash(type, id);
}

View File

@ -76,6 +76,7 @@ public class RestCountAction extends BaseRestHandler {
}
countRequest.queryParserName(request.param("query_parser_name"));
countRequest.queryHint(request.param("query_hint"));
countRequest.routing(request.param("routing"));
countRequest.minScore(request.paramAsFloat("min_score", DEFAULT_MIN_SCORE));
countRequest.types(splitTypes(request.param("type")));
} catch (Exception e) {

View File

@ -48,6 +48,7 @@ public class RestDeleteAction extends BaseRestHandler {
@Override public void handleRequest(final RestRequest request, final RestChannel channel) {
DeleteRequest deleteRequest = new DeleteRequest(request.param("index"), request.param("type"), request.param("id"));
deleteRequest.routing(request.param("routing"));
deleteRequest.timeout(request.paramAsTime("timeout", DeleteRequest.DEFAULT_TIMEOUT));
deleteRequest.refresh(request.paramAsBoolean("refresh", deleteRequest.refresh()));
// we just send a response, no need to fork

View File

@ -57,6 +57,7 @@ public class RestGetAction extends BaseRestHandler {
getRequest.listenerThreaded(false);
// if we have a local operation, execute it on a thread since we don't spawn
getRequest.operationThreaded(true);
getRequest.routing(request.param("routing"));
String sField = request.param("fields");

View File

@ -60,6 +60,7 @@ public class RestIndexAction extends BaseRestHandler {
@Override public void handleRequest(final RestRequest request, final RestChannel channel) {
IndexRequest indexRequest = new IndexRequest(request.param("index"), request.param("type"), request.param("id"));
indexRequest.routing(request.param("routing"));
indexRequest.source(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength(), request.contentUnsafe());
indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT));
indexRequest.refresh(request.paramAsBoolean("refresh", indexRequest.refresh()));

View File

@ -139,6 +139,7 @@ public class RestSearchAction extends BaseRestHandler {
searchRequest.timeout(request.paramAsTime("timeout", null));
searchRequest.types(RestActions.splitTypes(request.param("type")));
searchRequest.queryHint(request.param("query_hint"));
searchRequest.routing(request.param("routing"));
return searchRequest;
}

View File

@ -0,0 +1,187 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.integration.routing;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.query.xcontent.QueryBuilders;
import org.elasticsearch.test.integration.AbstractNodesTests;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.elasticsearch.index.query.xcontent.QueryBuilders.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (shay.banon)
*/
public class SimpleRoutingTests extends AbstractNodesTests {
private Client client;
@BeforeClass public void createNodes() throws Exception {
startNode("node1");
startNode("node2");
client = getClient();
}
@AfterClass public void closeNodes() {
client.close();
closeAllNodes();
}
protected Client getClient() {
return client("node1");
}
@Test public void testSimpleCrudRouting() throws Exception {
try {
client.admin().indices().prepareDelete("test").execute().actionGet();
} catch (Exception e) {
// ignore
}
client.admin().indices().prepareCreate("test").execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
logger.info("--> indexing with id [1], and routing [0]");
client.prepareIndex("test", "type1", "1").setRouting("0").setSource("field", "value1").setRefresh(true).execute().actionGet();
logger.info("--> verifying get with no routing, should not find anything");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(false));
}
logger.info("--> verifying get with routing, should find");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareGet("test", "type1", "1").setRouting("0").execute().actionGet().exists(), equalTo(true));
}
logger.info("--> deleting with no routing, should not delete anything");
client.prepareDelete("test", "type1", "1").setRefresh(true).execute().actionGet();
for (int i = 0; i < 5; i++) {
assertThat(client.prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(false));
assertThat(client.prepareGet("test", "type1", "1").setRouting("0").execute().actionGet().exists(), equalTo(true));
}
logger.info("--> deleting with routing, should delete");
client.prepareDelete("test", "type1", "1").setRouting("0").setRefresh(true).execute().actionGet();
for (int i = 0; i < 5; i++) {
assertThat(client.prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(false));
assertThat(client.prepareGet("test", "type1", "1").setRouting("0").execute().actionGet().exists(), equalTo(false));
}
logger.info("--> indexing with id [1], and routing [0]");
client.prepareIndex("test", "type1", "1").setRouting("0").setSource("field", "value1").setRefresh(true).execute().actionGet();
logger.info("--> verifying get with no routing, should not find anything");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(false));
}
logger.info("--> verifying get with routing, should find");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareGet("test", "type1", "1").setRouting("0").execute().actionGet().exists(), equalTo(true));
}
logger.info("--> deleting_by_query with 1 as routing, should not delete anything");
client.prepareDeleteByQuery().setQuery(matchAllQuery()).setRouting("1").execute().actionGet();
client.admin().indices().prepareRefresh().execute().actionGet();
for (int i = 0; i < 5; i++) {
assertThat(client.prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(false));
assertThat(client.prepareGet("test", "type1", "1").setRouting("0").execute().actionGet().exists(), equalTo(true));
}
logger.info("--> deleting_by_query with , should delete");
client.prepareDeleteByQuery().setQuery(matchAllQuery()).setRouting("0").execute().actionGet();
client.admin().indices().prepareRefresh().execute().actionGet();
for (int i = 0; i < 5; i++) {
assertThat(client.prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(false));
assertThat(client.prepareGet("test", "type1", "1").setRouting("0").execute().actionGet().exists(), equalTo(false));
}
}
@Test public void testSimpleSearchRouting() {
try {
client.admin().indices().prepareDelete("test").execute().actionGet();
} catch (Exception e) {
// ignore
}
client.admin().indices().prepareCreate("test").execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
logger.info("--> indexing with id [1], and routing [0]");
client.prepareIndex("test", "type1", "1").setRouting("0").setSource("field", "value1").setRefresh(true).execute().actionGet();
logger.info("--> verifying get with no routing, should not find anything");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(false));
}
logger.info("--> verifying get with routing, should find");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareGet("test", "type1", "1").setRouting("0").execute().actionGet().exists(), equalTo(true));
}
logger.info("--> search with no routing, should fine one");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareSearch().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().hits().totalHits(), equalTo(1l));
}
logger.info("--> search with wrong routing, should not find");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareSearch().setRouting("1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().hits().totalHits(), equalTo(0l));
assertThat(client.prepareCount().setRouting("1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(0l));
}
logger.info("--> search with correct routing, should find");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareSearch().setRouting("0").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().hits().totalHits(), equalTo(1l));
assertThat(client.prepareCount().setRouting("0").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(1l));
}
logger.info("--> indexing with id [2], and routing [1]");
client.prepareIndex("test", "type1", "2").setRouting("1").setSource("field", "value1").setRefresh(true).execute().actionGet();
logger.info("--> search with no routing, should fine two");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareSearch().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().hits().totalHits(), equalTo(2l));
assertThat(client.prepareCount().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(2l));
}
logger.info("--> search with 0 routing, should find one");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareSearch().setRouting("0").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().hits().totalHits(), equalTo(1l));
assertThat(client.prepareCount().setRouting("0").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(1l));
}
logger.info("--> search with 1 routing, should find one");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareSearch().setRouting("1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().hits().totalHits(), equalTo(1l));
assertThat(client.prepareCount().setRouting("1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(1l));
}
logger.info("--> search with 0,1 routings , should find two");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareSearch().setRouting("0", "1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().hits().totalHits(), equalTo(2l));
assertThat(client.prepareCount().setRouting("0", "1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(2l));
}
logger.info("--> search with 0,1,0 routings , should find two");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareSearch().setRouting("0", "1", "0").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().hits().totalHits(), equalTo(2l));
assertThat(client.prepareCount().setRouting("0", "1", "0").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(2l));
}
}
}

View File

@ -114,7 +114,7 @@ public class TwoInstanceEmbeddedSearchTests extends AbstractNodesTests {
.from(0).size(60).explain(true).indexBoost("test", 1.0f).indexBoost("test2", 2.0f);
List<DfsSearchResult> dfsResults = newArrayList();
for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null)) {
for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null)) {
for (ShardRouting shardRouting : shardsIt) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
@ -182,7 +182,7 @@ public class TwoInstanceEmbeddedSearchTests extends AbstractNodesTests {
.from(0).size(60).explain(true).sort("age", SortOrder.ASC);
List<DfsSearchResult> dfsResults = newArrayList();
for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null)) {
for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null)) {
for (ShardRouting shardRouting : shardsIt) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
@ -276,7 +276,7 @@ public class TwoInstanceEmbeddedSearchTests extends AbstractNodesTests {
}
Map<SearchShardTarget, QueryFetchSearchResult> queryFetchResults = newHashMap();
for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null)) {
for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null)) {
for (ShardRouting shardRouting : shardsIt) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
@ -328,7 +328,7 @@ public class TwoInstanceEmbeddedSearchTests extends AbstractNodesTests {
.facet(FacetBuilders.queryFacet("test1", termQuery("name", "test1")));
Map<SearchShardTarget, QuerySearchResultProvider> queryResults = newHashMap();
for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null)) {
for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null)) {
for (ShardRouting shardRouting : shardsIt) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));

View File

@ -118,7 +118,7 @@ public class TwoInstanceUnbalancedShardsEmbeddedSearchTests extends AbstractNode
.from(0).size(60).explain(true);
List<DfsSearchResult> dfsResults = newArrayList();
for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null)) {
for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null)) {
for (ShardRouting shardRouting : shardsIt) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
@ -185,7 +185,7 @@ public class TwoInstanceUnbalancedShardsEmbeddedSearchTests extends AbstractNode
.from(0).size(60).explain(true).sort("age", SortOrder.ASC);
List<DfsSearchResult> dfsResults = newArrayList();
for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null)) {
for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null)) {
for (ShardRouting shardRouting : shardsIt) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
@ -275,7 +275,7 @@ public class TwoInstanceUnbalancedShardsEmbeddedSearchTests extends AbstractNode
// do this with dfs, since we have uneven distribution of docs between shards
List<DfsSearchResult> dfsResults = newArrayList();
for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null)) {
for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null)) {
for (ShardRouting shardRouting : shardsIt) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
@ -330,7 +330,7 @@ public class TwoInstanceUnbalancedShardsEmbeddedSearchTests extends AbstractNode
.facet(queryFacet("test1").query(termQuery("name", "test1")));
Map<SearchShardTarget, QuerySearchResultProvider> queryResults = newHashMap();
for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null)) {
for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null)) {
for (ShardRouting shardRouting : shardsIt) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));