Add ability to specify routing information for aliases

This commit is contained in:
Igor Motov 2011-06-09 16:03:20 -04:00 committed by kimchy
parent 35b4cabed8
commit b35dfd3aa7
29 changed files with 1166 additions and 81 deletions

View File

@ -65,7 +65,7 @@ public class TransportBroadcastPingAction extends TransportBroadcastOperationAct
}
@Override protected GroupShardsIterator shards(BroadcastPingRequest request, String[] concreteIndices, ClusterState clusterState) {
return clusterService.operationRouting().searchShards(clusterState, concreteIndices, request.queryHint(), null, null);
return clusterService.operationRouting().searchShards(clusterState, request.indices(), concreteIndices, request.queryHint(), null, null);
}
@Override protected BroadcastPingResponse newResponse(BroadcastPingRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
@ -61,7 +62,7 @@ public class TransportReplicationPingAction extends TransportIndicesReplicationO
return TransportActions.Admin.Cluster.Ping.REPLICATION;
}
@Override protected IndexReplicationPingRequest newIndexRequestInstance(ReplicationPingRequest request, String index) {
@Override protected IndexReplicationPingRequest newIndexRequestInstance(ReplicationPingRequest request, String index, Set<String> routing) {
return new IndexReplicationPingRequest(request, index);
}
}

View File

@ -130,6 +130,11 @@ public class IndicesAliasesRequest extends MasterNodeOperationRequest {
return this;
}
public IndicesAliasesRequest addAliasAction(AliasAction action) {
aliasActions.add(action);
return this;
}
List<AliasAction> aliasActions() {
return this.aliasActions;
}

View File

@ -138,6 +138,7 @@ public class TransportBulkAction extends BaseAction<BulkRequest, BulkResponse> {
for (ActionRequest request : bulkRequest.requests) {
if (request instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) request;
indexRequest.routing(clusterState.metaData().resolveIndexRouting(indexRequest.routing(), indexRequest.index()));
indexRequest.index(clusterState.metaData().concreteIndex(indexRequest.index()));
if (allowIdGeneration) {
if (indexRequest.id() == null) {

View File

@ -38,6 +38,8 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReferenceArray;
import static org.elasticsearch.common.collect.Lists.*;
@ -84,7 +86,8 @@ public class TransportCountAction extends TransportBroadcastOperationAction<Coun
}
@Override protected GroupShardsIterator shards(CountRequest request, String[] concreteIndices, ClusterState clusterState) {
return clusterService.operationRouting().searchShards(clusterState, concreteIndices, request.queryHint(), request.routing(), null);
Map<String, Set<String>> routingMap = clusterState.metaData().resolveSearchRouting(request.routing(), request.indices());
return clusterService.operationRouting().searchShards(clusterState, request.indices(), concreteIndices, request.queryHint(), routingMap, null);
}
@Override protected void checkBlock(CountRequest request, String[] concreteIndices, ClusterState state) {

View File

@ -94,6 +94,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
private void innerExecute(final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
ClusterState clusterState = clusterService.state();
request.routing(clusterState.metaData().resolveIndexRouting(request.routing(), request.index()));
request.index(clusterState.metaData().concreteIndex(request.index())); // we need to get the concrete index here...
if (clusterState.metaData().hasIndex(request.index())) {
// check if routing is required, if so, do a broadcast delete

View File

@ -196,7 +196,7 @@ public class DeleteByQueryRequest extends IndicesReplicationOperationRequest {
/**
* A comma separated list of routing values to control the shards the search will be executed on.
*/
public String routing() {
@Override public String routing() {
return this.routing;
}

View File

@ -26,10 +26,12 @@ import org.elasticsearch.common.Required;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.trove.set.hash.THashSet;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import java.io.IOException;
import java.util.Set;
import static org.elasticsearch.action.Actions.*;
@ -42,17 +44,17 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest
private byte[] querySource;
private String[] types = Strings.EMPTY_ARRAY;
@Nullable private String routing;
@Nullable private Set<String> routing;
@Nullable private String[] filteringAliases;
IndexDeleteByQueryRequest(DeleteByQueryRequest request, String index, @Nullable String[] filteringAliases) {
IndexDeleteByQueryRequest(DeleteByQueryRequest request, String index, @Nullable Set<String> routing, @Nullable String[] filteringAliases) {
this.index = index;
this.timeout = request.timeout();
this.querySource = request.querySource();
this.types = request.types();
this.replicationType = request.replicationType();
this.consistencyLevel = request.consistencyLevel();
this.routing = request.routing();
this.routing = routing;
this.filteringAliases = filteringAliases;
}
@ -80,7 +82,7 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest
return this;
}
String routing() {
Set<String> routing() {
return this.routing;
}
@ -108,8 +110,12 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest
types[i] = in.readUTF();
}
}
if (in.readBoolean()) {
routing = in.readUTF();
int routingSize = in.readVInt();
if (routingSize > 0) {
routing = new THashSet<String>(routingSize);
for (int i = 0; i < routingSize; i++) {
routing.add(in.readUTF());
}
}
int aliasesSize = in.readVInt();
if (aliasesSize > 0) {
@ -128,11 +134,13 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest
for (String type : types) {
out.writeUTF(type);
}
if (routing == null) {
out.writeBoolean(false);
if (routing != null) {
out.writeVInt(routing.size());
for (String r : routing) {
out.writeUTF(r);
}
} else {
out.writeBoolean(true);
out.writeUTF(routing);
out.writeVInt(0);
}
if (filteringAliases != null) {
out.writeVInt(filteringAliases.length);

View File

@ -26,9 +26,11 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.trove.set.hash.THashSet;
import java.io.IOException;
import java.util.Arrays;
import java.util.Set;
import static org.elasticsearch.action.Actions.*;
@ -42,7 +44,7 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest
private int shardId;
private byte[] querySource;
private String[] types = Strings.EMPTY_ARRAY;
@Nullable private String routing;
@Nullable private Set<String> routing;
@Nullable private String[] filteringAliases;
ShardDeleteByQueryRequest(IndexDeleteByQueryRequest request, int shardId) {
@ -80,7 +82,7 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest
return this.types;
}
public String routing() {
public Set<String> routing() {
return this.routing;
}
@ -100,8 +102,12 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest
types[i] = in.readUTF();
}
}
if (in.readBoolean()) {
routing = in.readUTF();
int routingSize = in.readVInt();
if (routingSize > 0) {
routing = new THashSet<String>(routingSize);
for (int i = 0; i < routingSize; i++) {
routing.add(in.readUTF());
}
}
int aliasesSize = in.readVInt();
if (aliasesSize > 0) {
@ -121,11 +127,13 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest
for (String type : types) {
out.writeUTF(type);
}
if (routing == null) {
out.writeBoolean(false);
if (routing != null) {
out.writeVInt(routing.size());
for (String r : routing) {
out.writeUTF(r);
}
} else {
out.writeBoolean(true);
out.writeUTF(routing);
out.writeVInt(0);
}
if (filteringAliases != null) {
out.writeVInt(filteringAliases.length);

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
@ -70,8 +71,8 @@ public class TransportDeleteByQueryAction extends TransportIndicesReplicationOpe
}
}
@Override protected IndexDeleteByQueryRequest newIndexRequestInstance(DeleteByQueryRequest request, String index) {
@Override protected IndexDeleteByQueryRequest newIndexRequestInstance(DeleteByQueryRequest request, String index, Set<String> routing) {
String[] filteringAliases = clusterService.state().metaData().filteringAliases(index, request.indices());
return new IndexDeleteByQueryRequest(request, index, filteringAliases);
return new IndexDeleteByQueryRequest(request, index, routing, filteringAliases);
}
}

View File

@ -120,6 +120,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
private void innerExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {
MetaData metaData = clusterService.state().metaData();
request.routing(metaData.resolveIndexRouting(request.routing(), request.index()));
request.index(metaData.concreteIndex(request.index()));
if (metaData.hasIndex(request.index())) {
MappingMetaData mappingMd = metaData.index(request.index()).mapping(request.type());

View File

@ -34,6 +34,9 @@ import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.action.search.SearchType.*;
/**
@ -85,7 +88,8 @@ public class TransportSearchAction extends BaseAction<SearchRequest, SearchRespo
try {
ClusterState clusterState = clusterService.state();
String[] concreteIndices = clusterState.metaData().concreteIndices(searchRequest.indices());
GroupShardsIterator groupIt = clusterService.operationRouting().searchShards(clusterState, concreteIndices, searchRequest.queryHint(), searchRequest.routing(), searchRequest.preference());
Map<String, Set<String>> routingMap = clusterState.metaData().resolveSearchRouting(searchRequest.routing(), searchRequest.indices());
GroupShardsIterator groupIt = clusterService.operationRouting().searchShards(clusterState, searchRequest.indices(), concreteIndices, searchRequest.queryHint(), routingMap, searchRequest.preference());
if (groupIt.size() == 1) {
// if we only have one group, then we always want Q_A_F, no need for DFS, and no need to do THEN since we hit one shard
searchRequest.searchType(QUERY_AND_FETCH);

View File

@ -46,6 +46,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.action.search.type.TransportSearchHelper.*;
@ -110,7 +111,9 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
clusterState.blocks().indexBlockedRaiseException(ClusterBlockLevel.READ, index);
}
shardsIts = clusterService.operationRouting().searchShards(clusterState, concreteIndices, request.queryHint(), request.routing(), request.preference());
Map<String, Set<String>> routingMap = clusterState.metaData().resolveSearchRouting(request.routing(), request.indices());
shardsIts = clusterService.operationRouting().searchShards(clusterState, request.indices(), concreteIndices, request.queryHint(), routingMap, request.preference());
expectedSuccessfulOps = shardsIts.size();
// we need to add 1 for non active partition, since we count it in the total!
expectedTotalOps = shardsIts.totalSizeActiveWith1ForEmpty();

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.support.replication;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
@ -38,6 +39,8 @@ public class IndicesReplicationOperationRequest implements ActionRequest {
protected String[] indices;
private boolean threadedListener = false;
@Nullable private String routing;
protected ReplicationType replicationType = ReplicationType.DEFAULT;
protected WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
@ -82,6 +85,10 @@ public class IndicesReplicationOperationRequest implements ActionRequest {
return this.consistencyLevel;
}
public String routing() {
return null;
}
@Override public void readFrom(StreamInput in) throws IOException {
replicationType = ReplicationType.fromId(in.readByte());
consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());

View File

@ -59,7 +59,7 @@ public abstract class TransportIndexReplicationOperationAction<Request extends I
@Override protected void doExecute(final Request request, final ActionListener<Response> listener) {
ClusterState clusterState = clusterService.state();
// upate to concrete index
// update to concrete index
request.index(clusterState.metaData().concreteIndex(request.index()));
checkBlock(request, clusterState);

View File

@ -31,6 +31,8 @@ import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
@ -71,8 +73,14 @@ public abstract class TransportIndicesReplicationOperationAction<Request extends
final AtomicInteger completionCounter = new AtomicInteger(concreteIndices.length);
final AtomicReferenceArray<Object> indexResponses = new AtomicReferenceArray<Object>(concreteIndices.length);
Map<String, Set<String>> routingMap = clusterState.metaData().resolveSearchRouting(request.routing(), request.indices());
for (final String index : concreteIndices) {
IndexRequest indexRequest = newIndexRequestInstance(request, index);
Set<String> routing = null;
if (routingMap != null) {
routing = routingMap.get(index);
}
IndexRequest indexRequest = newIndexRequestInstance(request, index, routing);
// no threading needed, all is done on the index replication one
indexRequest.listenerThreaded(false);
indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
@ -103,7 +111,7 @@ public abstract class TransportIndicesReplicationOperationAction<Request extends
protected abstract String transportAction();
protected abstract IndexRequest newIndexRequestInstance(Request request, String index);
protected abstract IndexRequest newIndexRequestInstance(Request request, String index, Set<String> routing);
protected abstract boolean accumulateExceptions();

View File

@ -38,6 +38,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.Set;
/**
* @author kimchy (shay.banon)
@ -103,13 +104,15 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
nodes = clusterState.nodes();
// update to the concrete shard to use
// update to the concrete shard and find routing to use
String alias = request.index();
request.index(clusterState.metaData().concreteIndex(request.index()));
String effectiveRouting = clusterState.metaData().resolveIndexRouting(request.routing(), alias);
checkBlock(request, clusterState);
this.shardIt = clusterService.operationRouting()
.getShards(clusterState, request.index(), request.type(), request.id(), request.routing(), request.preference());
.getShards(clusterState, request.index(), request.type(), request.id(), effectiveRouting, request.preference());
}
public void start() {

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.client.action.admin.indices.support.BaseIndicesRequestBuilder;
import org.elasticsearch.cluster.metadata.AliasAction;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.FilterBuilder;
@ -85,6 +86,16 @@ public class IndicesAliasesRequestBuilder extends BaseIndicesRequestBuilder<Indi
return this;
}
/**
* Adds an alias action to the request.
*
* @param aliasAction The alias Action
*/
public IndicesAliasesRequestBuilder addAliasAction(AliasAction aliasAction) {
request.addAliasAction(aliasAction);
return this;
}
/**
* Removes an alias to the index.
*

View File

@ -19,12 +19,20 @@
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.ElasticSearchGenerationException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.FilterBuilder;
import java.io.IOException;
import java.util.Map;
/**
* @author kimchy (shay.banon)
@ -62,14 +70,20 @@ public class AliasAction implements Streamable {
private String alias;
private String filter;
@Nullable private String filter;
@Nullable private String indexRouting;
@Nullable private String searchRouting;
private AliasAction() {
}
public AliasAction(Type actionType, String index, String alias) {
this(actionType, index, alias, "");
this.actionType = actionType;
this.index = index;
this.alias = alias;
}
public AliasAction(Type actionType, String index, String alias, String filter) {
@ -95,6 +109,66 @@ public class AliasAction implements Streamable {
return filter;
}
public AliasAction filter(String filter) {
this.filter = filter;
return this;
}
public AliasAction filter(Map<String, Object> filter) {
if (filter == null || filter.isEmpty()) {
this.filter = null;
return this;
}
try {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
builder.map(filter);
this.filter = builder.string();
return this;
} catch (IOException e) {
throw new ElasticSearchGenerationException("Failed to generate [" + filter + "]", e);
}
}
public AliasAction filter(FilterBuilder filterBuilder) {
if (filterBuilder == null) {
this.filter = null;
return this;
}
try {
XContentBuilder builder = XContentFactory.jsonBuilder();
filterBuilder.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.close();
this.filter = builder.string();
return this;
} catch (IOException e) {
throw new ElasticSearchGenerationException("Failed to build json for alias request", e);
}
}
public AliasAction routing(String routing) {
this.indexRouting = routing;
this.searchRouting = routing;
return this;
}
public String indexRouting() {
return indexRouting;
}
public AliasAction indexRouting(String indexRouting) {
this.indexRouting = indexRouting;
return this;
}
public String searchRouting() {
return searchRouting;
}
public AliasAction searchRouting(String searchRouting) {
this.searchRouting = searchRouting;
return this;
}
public static AliasAction readAliasAction(StreamInput in) throws IOException {
AliasAction aliasAction = new AliasAction();
aliasAction.readFrom(in);
@ -105,13 +179,47 @@ public class AliasAction implements Streamable {
actionType = Type.fromValue(in.readByte());
index = in.readUTF();
alias = in.readUTF();
filter = in.readUTF();
if (in.readBoolean()) {
filter = in.readUTF();
}
if (in.readBoolean()) {
indexRouting = in.readUTF();
}
if (in.readBoolean()) {
searchRouting = in.readUTF();
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeByte(actionType.value());
out.writeUTF(index);
out.writeUTF(alias);
out.writeUTF(filter);
if (filter == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(filter);
}
if (indexRouting == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(indexRouting);
}
if (searchRouting == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(searchRouting);
}
}
public static AliasAction newAddAliasAction(String index, String alias) {
return new AliasAction(Type.ADD, index, alias);
}
public static AliasAction newRemoveAliasAction(String index, String alias) {
return new AliasAction(Type.REMOVE, index, alias);
}
}

View File

@ -40,9 +40,15 @@ public class AliasMetaData {
private final CompressedString filter;
private AliasMetaData(String alias, CompressedString filter) {
private String indexRouting;
private String searchRouting;
private AliasMetaData(String alias, CompressedString filter, String indexRouting, String searchRouting) {
this.alias = alias;
this.filter = filter;
this.indexRouting = indexRouting;
this.searchRouting = searchRouting;
}
public String alias() {
@ -61,6 +67,22 @@ public class AliasMetaData {
return filter();
}
public String getSearchRouting() {
return searchRouting();
}
public String searchRouting() {
return searchRouting;
}
public String getIndexRouting() {
return indexRouting();
}
public String indexRouting() {
return indexRouting;
}
public static Builder newAliasMetaDataBuilder(String alias) {
return new Builder(alias);
}
@ -71,6 +93,11 @@ public class AliasMetaData {
private CompressedString filter;
private String indexRouting;
private String searchRouting;
public Builder(String alias) {
this.alias = alias;
}
@ -78,6 +105,8 @@ public class AliasMetaData {
public Builder(AliasMetaData aliasMetaData) {
this(aliasMetaData.alias());
filter = aliasMetaData.filter();
indexRouting = aliasMetaData.indexRouting();
searchRouting = aliasMetaData.searchRouting();
}
public String alias() {
@ -123,8 +152,24 @@ public class AliasMetaData {
}
}
public Builder routing(String routing) {
this.indexRouting = routing;
this.searchRouting = routing;
return this;
}
public Builder indexRouting(String indexRouting) {
this.indexRouting = indexRouting;
return this;
}
public Builder searchRouting(String searchRouting) {
this.searchRouting = searchRouting;
return this;
}
public AliasMetaData build() {
return new AliasMetaData(alias, filter);
return new AliasMetaData(alias, filter, indexRouting, searchRouting);
}
public static void toXContent(AliasMetaData aliasMetaData, XContentBuilder builder, ToXContent.Params params) throws IOException {
@ -137,6 +182,12 @@ public class AliasMetaData {
parser.close();
builder.field("filter", filter);
}
if (aliasMetaData.indexRouting() != null) {
builder.field("index_routing", aliasMetaData.indexRouting());
}
if (aliasMetaData.searchRouting() != null) {
builder.field("search_routing", aliasMetaData.searchRouting());
}
builder.endObject();
}
@ -158,6 +209,14 @@ public class AliasMetaData {
Map<String, Object> filter = parser.mapOrdered();
builder.filter(filter);
}
} else if (token == XContentParser.Token.VALUE_STRING) {
if ("routing".equals(currentFieldName)) {
builder.routing(parser.text());
} else if ("index_routing".equals(currentFieldName) || "indexRouting".equals(currentFieldName)) {
builder.indexRouting(parser.text());
} else if ("search_routing".equals(currentFieldName) || "searchRouting".equals(currentFieldName)) {
builder.searchRouting(parser.text());
}
}
}
return builder.build();
@ -171,6 +230,19 @@ public class AliasMetaData {
} else {
out.writeBoolean(false);
}
if (aliasMetaData.indexRouting() != null) {
out.writeBoolean(true);
out.writeUTF(aliasMetaData.indexRouting());
} else {
out.writeBoolean(false);
}
if (aliasMetaData.searchRouting() != null) {
out.writeBoolean(true);
out.writeUTF(aliasMetaData.searchRouting());
} else {
out.writeBoolean(false);
}
}
public static AliasMetaData readFrom(StreamInput in) throws IOException {
@ -179,7 +251,15 @@ public class AliasMetaData {
if (in.readBoolean()) {
filter = CompressedString.readCompressedString(in);
}
return new AliasMetaData(alias, filter);
String indexRouting = null;
if (in.readBoolean()) {
indexRouting = in.readUTF();
}
String searchRouting = null;
if (in.readBoolean()) {
searchRouting = in.readUTF();
}
return new AliasMetaData(alias, filter, indexRouting, searchRouting);
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.ImmutableSet;
@ -30,6 +31,7 @@ import org.elasticsearch.common.collect.UnmodifiableIterator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.trove.set.hash.THashSet;
import org.elasticsearch.common.util.concurrent.Immutable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -44,9 +46,11 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import static org.elasticsearch.common.collect.Lists.*;
import static org.elasticsearch.common.collect.MapBuilder.*;
import static org.elasticsearch.common.collect.Maps.newHashMap;
import static org.elasticsearch.common.collect.Sets.*;
import static org.elasticsearch.common.settings.ImmutableSettings.*;
@ -59,6 +63,9 @@ public class MetaData implements Iterable<IndexMetaData> {
public static final MetaData EMPTY_META_DATA = newMetaDataBuilder().build();
private final long version;
public final static Pattern routingPattern = Pattern.compile(",");
private final ImmutableMap<String, IndexMetaData> indices;
private final ImmutableMap<String, IndexTemplateMetaData> templates;
@ -66,7 +73,9 @@ public class MetaData implements Iterable<IndexMetaData> {
private final String[] allIndices;
private final ImmutableSet<String> aliases;
private final ImmutableMap<String, ImmutableMap<String, AliasMetaData>> aliases;
private final ImmutableMap<String, ImmutableMap<String, ImmutableSet<String>>> aliasToIndexToSearchRoutingMap;
// This map indicates if an alias associated with an index is filtering alias
private final ImmutableMap<String, ImmutableMap<String, Boolean>> indexToAliasFilteringRequiredMap;
@ -90,12 +99,46 @@ public class MetaData implements Iterable<IndexMetaData> {
}
allIndices = allIndicesLst.toArray(new String[allIndicesLst.size()]);
// build aliases set
Set<String> aliases = newHashSet();
// build aliases map
MapBuilder<String, MapBuilder<String, AliasMetaData>> tmpAliasesMap = newMapBuilder();
for (IndexMetaData indexMetaData : indices.values()) {
aliases.addAll(indexMetaData.aliases().keySet());
String index = indexMetaData.index();
for (AliasMetaData aliasMd : indexMetaData.aliases().values()) {
MapBuilder<String, AliasMetaData> indexAliasMap = tmpAliasesMap.get(aliasMd.alias());
if (indexAliasMap == null) {
indexAliasMap = newMapBuilder();
tmpAliasesMap.put(aliasMd.alias(), indexAliasMap);
}
indexAliasMap.put(index, aliasMd);
}
}
this.aliases = ImmutableSet.copyOf(aliases);
MapBuilder<String, ImmutableMap<String, AliasMetaData>> aliases = newMapBuilder();
for (Map.Entry<String, MapBuilder<String, AliasMetaData>> alias : tmpAliasesMap.map().entrySet()) {
aliases.put(alias.getKey(), alias.getValue().immutableMap());
}
this.aliases = aliases.immutableMap();
// build routing aliases set
MapBuilder<String, MapBuilder<String, ImmutableSet<String>>> tmpAliasToIndexToSearchRoutingMap = newMapBuilder();
for (IndexMetaData indexMetaData : indices.values()) {
for (AliasMetaData aliasMd : indexMetaData.aliases().values()) {
MapBuilder<String, ImmutableSet<String>> indexToSearchRoutingMap = tmpAliasToIndexToSearchRoutingMap.get(aliasMd.alias());
if (indexToSearchRoutingMap == null) {
indexToSearchRoutingMap = newMapBuilder();
tmpAliasToIndexToSearchRoutingMap.put(aliasMd.alias(), indexToSearchRoutingMap);
}
if (aliasMd.searchRouting() != null) {
indexToSearchRoutingMap.put(indexMetaData.index(), ImmutableSet.copyOf(routingPattern.split(aliasMd.searchRouting())));
} else {
indexToSearchRoutingMap.put(indexMetaData.index(), ImmutableSet.<String>of());
}
}
}
MapBuilder<String, ImmutableMap<String, ImmutableSet<String>>> aliasToIndexToSearchRoutingMap = newMapBuilder();
for (Map.Entry<String, MapBuilder<String, ImmutableSet<String>>> alias : tmpAliasToIndexToSearchRoutingMap.map().entrySet()) {
aliasToIndexToSearchRoutingMap.put(alias.getKey(), alias.getValue().immutableMap());
}
this.aliasToIndexToSearchRoutingMap = aliasToIndexToSearchRoutingMap.immutableMap();
// build filtering required map
MapBuilder<String, ImmutableMap<String, Boolean>> filteringRequiredMap = newMapBuilder();
@ -146,7 +189,7 @@ public class MetaData implements Iterable<IndexMetaData> {
}
public ImmutableSet<String> aliases() {
return this.aliases;
return this.aliases.keySet();
}
public ImmutableSet<String> getAliases() {
@ -178,6 +221,184 @@ public class MetaData implements Iterable<IndexMetaData> {
return concreteIndices(indices, true);
}
/**
* Returns indexing routing for the given index.
*/
public String resolveIndexRouting(@Nullable String routing, String aliasOrIndex) {
// Check if index is specified by an alias
ImmutableMap<String, AliasMetaData> indexAliases = aliases.get(aliasOrIndex);
if (indexAliases == null || indexAliases.isEmpty()) {
return routing;
}
if (indexAliases.size() > 1) {
throw new ElasticSearchIllegalArgumentException("Alias [" + aliasOrIndex + "] has more than one indices associated with it [" + indexAliases.keySet() + "], can't execute a single index op");
}
AliasMetaData aliasMd = indexAliases.values().iterator().next();
if (aliasMd.indexRouting() != null) {
if (routing != null) {
if (routing.equals(aliasMd.indexRouting())) {
return routing;
} else {
return null;
}
}
return aliasMd.indexRouting();
} else {
return routing;
}
}
/**
* Sets the same routing for all indices
*/
private Map<String, Set<String>> resolveSearchRoutingAllIndices(String routing) {
if (routing != null) {
THashSet<String> r = new THashSet<String>(Arrays.asList(routingPattern.split(routing)));
Map<String, Set<String>> routings = newHashMap();
String[] concreteIndices = concreteAllIndices();
for (String index : concreteIndices) {
routings.put(index, r);
}
return routings;
}
return null;
}
public Map<String, Set<String>> resolveSearchRouting(@Nullable String routing, String aliasOrIndex) {
Map<String, Set<String>> routings = null;
List<String> paramRouting = null;
if (routing != null) {
paramRouting = Arrays.asList(routingPattern.split(routing));
}
ImmutableMap<String, ImmutableSet<String>> indexToRoutingMap = aliasToIndexToSearchRoutingMap.get(aliasOrIndex);
if (indexToRoutingMap != null && !indexToRoutingMap.isEmpty()) {
// It's an alias
for (Map.Entry<String, ImmutableSet<String>> indexRouting : indexToRoutingMap.entrySet()) {
if (!indexRouting.getValue().isEmpty()) {
// Routing alias
Set<String> r = new THashSet<String>(indexRouting.getValue());
if (paramRouting != null) {
r.retainAll(paramRouting);
}
if (!r.isEmpty()) {
if (routings == null) {
routings = newHashMap();
}
routings.put(indexRouting.getKey(), r);
}
} else {
// Non-routing alias
if (paramRouting != null) {
Set<String> r = new THashSet<String>(paramRouting);
if (routings == null) {
routings = newHashMap();
}
routings.put(indexRouting.getKey(), r);
}
}
}
} else {
// It's an index
if (paramRouting != null) {
Set<String> r = new THashSet<String>(paramRouting);
if (routings == null) {
routings = newHashMap();
}
routings.put(aliasOrIndex, r);
}
}
return routings;
}
public Map<String, Set<String>> resolveSearchRouting(@Nullable String routing, String[] aliasesOrIndices) {
if (aliasesOrIndices == null || aliasesOrIndices.length == 0) {
return resolveSearchRoutingAllIndices(routing);
}
Map<String, Set<String>> routings = null;
List<String> paramRouting = null;
// List of indices that don't require any routing
Set<String> norouting = newHashSet();
if (routing != null) {
paramRouting = Arrays.asList(routingPattern.split(routing));
}
if (aliasesOrIndices.length == 1) {
if (aliasesOrIndices[0].equals("_all")) {
return resolveSearchRoutingAllIndices(routing);
} else {
return resolveSearchRouting(routing, aliasesOrIndices[0]);
}
}
for (String aliasOrIndex : aliasesOrIndices) {
ImmutableMap<String, ImmutableSet<String>> indexToRoutingMap = aliasToIndexToSearchRoutingMap.get(aliasOrIndex);
if (indexToRoutingMap != null && !indexToRoutingMap.isEmpty()) {
for (Map.Entry<String, ImmutableSet<String>> indexRouting : indexToRoutingMap.entrySet()) {
if (!norouting.contains(indexRouting.getKey())) {
if (!indexRouting.getValue().isEmpty()) {
// Routing alias
if (routings == null) {
routings = newHashMap();
}
Set<String> r = routings.get(indexRouting.getKey());
if (r == null) {
r = new THashSet<String>();
routings.put(indexRouting.getKey(), r);
}
r.addAll(indexRouting.getValue());
if (paramRouting != null) {
r.retainAll(paramRouting);
}
if (r.isEmpty()) {
routings.remove(indexRouting.getKey());
}
} else {
// Non-routing alias
if (!norouting.contains(indexRouting.getKey())) {
norouting.add(indexRouting.getKey());
if (paramRouting != null) {
Set<String> r = new THashSet<String>(paramRouting);
if (routings == null) {
routings = newHashMap();
}
routings.put(indexRouting.getKey(), r);
} else {
if (routings != null) {
routings.remove(indexRouting.getKey());
}
}
}
}
}
}
} else {
// Index
if (!norouting.contains(aliasOrIndex)) {
norouting.add(aliasOrIndex);
if (paramRouting != null) {
Set<String> r = new THashSet<String>(paramRouting);
if (routings == null) {
routings = newHashMap();
}
routings.put(aliasOrIndex, r);
} else {
if (routings != null) {
routings.remove(aliasOrIndex);
}
}
}
}
}
if (routings == null || routings.isEmpty()) {
return null;
}
return routings;
}
/**
* Translates the provided indices (possibly aliased) into actual indices.
*/

View File

@ -114,7 +114,12 @@ public class MetaDataIndexAliasesService extends AbstractComponent {
return currentState;
}
}
indexMetaDataBuilder.putAlias(AliasMetaData.newAliasMetaDataBuilder(aliasAction.alias()).filter(filter).build());
indexMetaDataBuilder.putAlias(AliasMetaData.newAliasMetaDataBuilder(
aliasAction.alias())
.filter(filter)
.indexRouting(aliasAction.indexRouting())
.searchRouting(aliasAction.searchRouting())
.build());
} else if (aliasAction.actionType() == AliasAction.Type.REMOVE) {
indexMetaDataBuilder.removerAlias(aliasAction.alias());
}

View File

@ -26,6 +26,9 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.indices.IndexMissingException;
import java.util.Map;
import java.util.Set;
/**
* @author kimchy (shay.banon)
*/
@ -39,7 +42,7 @@ public interface OperationRouting {
ShardIterator getShards(ClusterState clusterState, String index, String type, String id, @Nullable String routing, @Nullable String preference) throws IndexMissingException, IndexShardMissingException;
GroupShardsIterator deleteByQueryShards(ClusterState clusterState, String index, @Nullable String routing) throws IndexMissingException;
GroupShardsIterator deleteByQueryShards(ClusterState clusterState, String index, @Nullable Set<String> routing) throws IndexMissingException;
GroupShardsIterator searchShards(ClusterState clusterState, String[] indices, @Nullable String queryHint, @Nullable String routing, @Nullable String preference) throws IndexMissingException;
GroupShardsIterator searchShards(ClusterState clusterState, String[] indices, String[] concreteIndices, @Nullable String queryHint, @Nullable Map<String, Set<String>> routing, @Nullable String preference) throws IndexMissingException;
}

View File

@ -39,6 +39,8 @@ import org.elasticsearch.indices.IndexMissingException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
/**
@ -74,20 +76,15 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio
return indexRoutingTable(clusterState, index).groupByShardsIt();
}
@Override public GroupShardsIterator deleteByQueryShards(ClusterState clusterState, String index, @Nullable String routing) throws IndexMissingException {
if (routing == null) {
return indexRoutingTable(clusterState, index).groupByShardsIt();
}
String[] routings = routingPattern.split(routing);
if (routing.length() == 0) {
@Override public GroupShardsIterator deleteByQueryShards(ClusterState clusterState, String index, @Nullable Set<String> routing) throws IndexMissingException {
if (routing == null || routing.isEmpty()) {
return indexRoutingTable(clusterState, index).groupByShardsIt();
}
// we use set here and not identity set since we might get duplicates
HashSet<ShardIterator> set = new HashSet<ShardIterator>();
IndexRoutingTable indexRouting = indexRoutingTable(clusterState, index);
for (String r : routings) {
for (String r : routing) {
int shardId = shardId(clusterState, index, null, null, r);
IndexShardRoutingTable indexShard = indexRouting.shard(shardId);
if (indexShard == null) {
@ -98,36 +95,34 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio
return new GroupShardsIterator(set);
}
@Override public GroupShardsIterator searchShards(ClusterState clusterState, String[] indices, @Nullable String queryHint, @Nullable String routing, @Nullable String preference) throws IndexMissingException {
if (indices == null || indices.length == 0) {
indices = clusterState.metaData().concreteAllIndices();
@Override public GroupShardsIterator searchShards(ClusterState clusterState, String[] indices, String[] concreteIndices, @Nullable String queryHint, @Nullable Map<String, Set<String>> routing, @Nullable String preference) throws IndexMissingException {
if (concreteIndices == null || concreteIndices.length == 0) {
concreteIndices = clusterState.metaData().concreteAllIndices();
}
String[] routings = null;
if (routing != null) {
routings = routingPattern.split(routing);
}
if (routings != null && routings.length > 0) {
// we use set here and not list since we might get duplicates
HashSet<ShardIterator> set = new HashSet<ShardIterator>();
for (String index : indices) {
for (String index : concreteIndices) {
IndexRoutingTable indexRouting = indexRoutingTable(clusterState, index);
for (String r : routings) {
int shardId = shardId(clusterState, index, null, null, r);
IndexShardRoutingTable indexShard = indexRouting.shard(shardId);
if (indexShard == null) {
throw new IndexShardMissingException(new ShardId(index, shardId));
Set<String> effectiveRouting = routing.get(index);
if (effectiveRouting != null) {
for (String r : effectiveRouting) {
int shardId = shardId(clusterState, index, null, null, r);
IndexShardRoutingTable indexShard = indexRouting.shard(shardId);
if (indexShard == null) {
throw new IndexShardMissingException(new ShardId(index, shardId));
}
// we might get duplicates, but that's ok, they will override one another
set.add(preferenceShardIterator(indexShard, clusterState.nodes().localNodeId(), preference));
}
// we might get duplicates, but that's ok, they will override one another
set.add(preferenceShardIterator(indexShard, clusterState.nodes().localNodeId(), preference));
}
}
return new GroupShardsIterator(set);
} else {
// we use list here since we know we are not going to create duplicates
ArrayList<ShardIterator> set = new ArrayList<ShardIterator>();
for (String index : indices) {
for (String index : concreteIndices) {
IndexRoutingTable indexRouting = indexRoutingTable(clusterState, index);
for (IndexShardRoutingTable indexShard : indexRouting) {
set.add(preferenceShardIterator(indexShard, clusterState.nodes().localNodeId(), preference));

View File

@ -35,6 +35,7 @@ import org.elasticsearch.rest.*;
import java.io.IOException;
import java.util.Map;
import static org.elasticsearch.cluster.metadata.AliasAction.newAddAliasAction;
import static org.elasticsearch.rest.RestRequest.Method.*;
import static org.elasticsearch.rest.RestStatus.*;
import static org.elasticsearch.rest.action.support.RestXContentBuilder.*;
@ -80,6 +81,9 @@ public class RestIndicesAliasesAction extends BaseRestHandler {
String index = null;
String alias = null;
Map<String, Object> filter = null;
String routing = null;
String indexRouting = null;
String searchRouting = null;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
@ -89,6 +93,12 @@ public class RestIndicesAliasesAction extends BaseRestHandler {
index = parser.text();
} else if ("alias".equals(currentFieldName)) {
alias = parser.text();
} else if ("routing".equals(currentFieldName)) {
routing = parser.text();
} else if ("indexRouting".equals(currentFieldName) || "index-routing".equals(currentFieldName)) {
indexRouting = parser.text();
} else if ("searchRouting".equals(currentFieldName) || "search-routing".equals(currentFieldName)) {
searchRouting = parser.text();
}
} else if (token == XContentParser.Token.START_OBJECT) {
if ("filter".equals(currentFieldName)) {
@ -103,7 +113,17 @@ public class RestIndicesAliasesAction extends BaseRestHandler {
throw new ElasticSearchIllegalArgumentException("Alias action [" + action + "] requires an [alias] to be set");
}
if (type == AliasAction.Type.ADD) {
indicesAliasesRequest.addAlias(index, alias, filter);
AliasAction aliasAction = newAddAliasAction(index, alias).filter(filter);
if (routing != null) {
aliasAction.routing(routing);
}
if (indexRouting != null) {
aliasAction.indexRouting(indexRouting);
}
if (searchRouting != null) {
aliasAction.searchRouting(searchRouting);
}
indicesAliasesRequest.addAliasAction(aliasAction);
} else if (type == AliasAction.Type.REMOVE) {
indicesAliasesRequest.removeAlias(index, alias);
}

View File

@ -0,0 +1,176 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.integration.routing;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.test.integration.AbstractNodesTests;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import static org.elasticsearch.cluster.metadata.AliasAction.newAddAliasAction;
import static org.elasticsearch.common.collect.Maps.newHashMap;
import static org.elasticsearch.common.collect.Sets.newHashSet;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
/**
* @author imotov
*/
public class AliasResolveRoutingTests extends AbstractNodesTests {
private Client client;
private ClusterService clusterService;
@BeforeClass public void createNodes() throws Exception {
startNode("node1");
startNode("node2");
client = getClient();
clusterService = ((InternalNode) node("node1")).injector().getInstance(ClusterService.class);
}
@AfterClass public void closeNodes() {
client.close();
closeAllNodes();
}
protected Client getClient() {
return client("node1");
}
@Test public void testResolveIndexRouting() throws Exception {
try {
client.admin().indices().prepareDelete("test1").execute().actionGet();
client.admin().indices().prepareDelete("test2").execute().actionGet();
} catch (Exception e) {
// ignore
}
client.admin().indices().prepareCreate("test1").execute().actionGet();
client.admin().indices().prepareCreate("test2").execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
client.admin().indices().prepareAliases().addAliasAction(newAddAliasAction("test1", "alias")).execute().actionGet();
client.admin().indices().prepareAliases().addAliasAction(newAddAliasAction("test1", "alias10").routing("0")).execute().actionGet();
client.admin().indices().prepareAliases().addAliasAction(newAddAliasAction("test1", "alias110").routing("1,0")).execute().actionGet();
client.admin().indices().prepareAliases().addAliasAction(newAddAliasAction("test1", "alias12").routing("2")).execute().actionGet();
client.admin().indices().prepareAliases().addAliasAction(newAddAliasAction("test2", "alias20").routing("0")).execute().actionGet();
client.admin().indices().prepareAliases().addAliasAction(newAddAliasAction("test2", "alias21").routing("1")).execute().actionGet();
client.admin().indices().prepareAliases().addAliasAction(newAddAliasAction("test1", "alias0").routing("0")).execute().actionGet();
client.admin().indices().prepareAliases().addAliasAction(newAddAliasAction("test2", "alias0").routing("0")).execute().actionGet();
Thread.sleep(300);
assertThat(clusterService.state().metaData().resolveIndexRouting(null, "test1"), nullValue());
assertThat(clusterService.state().metaData().resolveIndexRouting(null, "alias"), nullValue());
assertThat(clusterService.state().metaData().resolveIndexRouting(null, "test1"), nullValue());
assertThat(clusterService.state().metaData().resolveIndexRouting(null, "alias10"), equalTo("0"));
assertThat(clusterService.state().metaData().resolveIndexRouting(null, "alias20"), equalTo("0"));
assertThat(clusterService.state().metaData().resolveIndexRouting(null, "alias21"), equalTo("1"));
assertThat(clusterService.state().metaData().resolveIndexRouting("3", "test1"), equalTo("3"));
assertThat(clusterService.state().metaData().resolveIndexRouting("0", "alias10"), equalTo("0"));
assertThat(clusterService.state().metaData().resolveIndexRouting("1", "alias10"), nullValue());
try {
clusterService.state().metaData().resolveIndexRouting(null, "alias0");
assert false : "should fail";
} catch (ElasticSearchIllegalArgumentException ex) {
// Expected
}
}
@Test public void testResolveSearchRouting() throws Exception {
try {
client.admin().indices().prepareDelete("test1").execute().actionGet();
client.admin().indices().prepareDelete("test2").execute().actionGet();
} catch (Exception e) {
// ignore
}
client.admin().indices().prepareCreate("test1").execute().actionGet();
client.admin().indices().prepareCreate("test2").execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
client.admin().indices().prepareAliases().addAliasAction(newAddAliasAction("test1", "alias")).execute().actionGet();
client.admin().indices().prepareAliases().addAliasAction(newAddAliasAction("test1", "alias10").routing("0")).execute().actionGet();
client.admin().indices().prepareAliases().addAliasAction(newAddAliasAction("test2", "alias20").routing("0")).execute().actionGet();
client.admin().indices().prepareAliases().addAliasAction(newAddAliasAction("test2", "alias21").routing("1")).execute().actionGet();
client.admin().indices().prepareAliases().addAliasAction(newAddAliasAction("test1", "alias0").routing("0")).execute().actionGet();
client.admin().indices().prepareAliases().addAliasAction(newAddAliasAction("test2", "alias0").routing("0")).execute().actionGet();
Thread.sleep(300);
assertThat(clusterService.state().metaData().resolveSearchRouting(null, "alias"), nullValue());
assertThat(clusterService.state().metaData().resolveSearchRouting("0,1", "alias"), equalTo(newMap("test1", newSet("0", "1"))));
assertThat(clusterService.state().metaData().resolveSearchRouting(null, "alias10"), equalTo(newMap("test1", newSet("0"))));
assertThat(clusterService.state().metaData().resolveSearchRouting(null, "alias10"), equalTo(newMap("test1", newSet("0"))));
assertThat(clusterService.state().metaData().resolveSearchRouting("0", "alias10"), equalTo(newMap("test1", newSet("0"))));
assertThat(clusterService.state().metaData().resolveSearchRouting("1", "alias10"), nullValue());
assertThat(clusterService.state().metaData().resolveSearchRouting(null, "alias0"), equalTo(newMap("test1", newSet("0"), "test2", newSet("0"))));
assertThat(clusterService.state().metaData().resolveSearchRouting(null, new String[]{"alias10", "alias20"}),
equalTo(newMap("test1", newSet("0"), "test2", newSet("0"))));
assertThat(clusterService.state().metaData().resolveSearchRouting(null, new String[]{"alias10", "alias21"}),
equalTo(newMap("test1", newSet("0"), "test2", newSet("1"))));
assertThat(clusterService.state().metaData().resolveSearchRouting(null, new String[]{"alias20", "alias21"}),
equalTo(newMap("test2", newSet("0", "1"))));
assertThat(clusterService.state().metaData().resolveSearchRouting(null, new String[]{"test1", "alias10"}), nullValue());
assertThat(clusterService.state().metaData().resolveSearchRouting(null, new String[]{"alias10", "test1"}), nullValue());
assertThat(clusterService.state().metaData().resolveSearchRouting("0", new String[]{"alias10", "alias20"}),
equalTo(newMap("test1", newSet("0"), "test2", newSet("0"))));
assertThat(clusterService.state().metaData().resolveSearchRouting("0,1", new String[]{"alias10", "alias20"}),
equalTo(newMap("test1", newSet("0"), "test2", newSet("0"))));
assertThat(clusterService.state().metaData().resolveSearchRouting("1", new String[]{"alias10", "alias20"}), nullValue());
assertThat(clusterService.state().metaData().resolveSearchRouting("0", new String[]{"alias10", "alias21"}),
equalTo(newMap("test1", newSet("0"))));
assertThat(clusterService.state().metaData().resolveSearchRouting("1", new String[]{"alias10", "alias21"}),
equalTo(newMap("test2", newSet("1"))));
assertThat(clusterService.state().metaData().resolveSearchRouting("0,1,2", new String[]{"alias10", "alias21"}),
equalTo(newMap("test1", newSet("0"), "test2", newSet("1"))));
assertThat(clusterService.state().metaData().resolveSearchRouting("0,1,2", new String[]{"test1", "alias10", "alias21"}),
equalTo(newMap("test1", newSet("0", "1", "2"), "test2", newSet("1"))));
}
private <T> Set<T> newSet(T... elements) {
return newHashSet(elements);
}
private <K, V> Map<K, V> newMap(K key, V value) {
Map<K, V> r = newHashMap();
r.put(key, value);
return r;
}
private <K, V> Map<K, V> newMap(K key1, V value1, K key2, V value2) {
Map<K, V> r = newHashMap();
r.put(key1, value1);
r.put(key2, value2);
return r;
}
}

View File

@ -0,0 +1,412 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.integration.routing;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.integration.AbstractNodesTests;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.elasticsearch.cluster.metadata.AliasAction.newAddAliasAction;
import static org.elasticsearch.index.query.QueryBuilders.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author imotov
*/
public class AliasRoutingTests extends AbstractNodesTests {
private Client client;
@BeforeClass public void createNodes() throws Exception {
startNode("node1");
startNode("node2");
client = getClient();
}
@AfterClass public void closeNodes() {
client.close();
closeAllNodes();
}
protected Client getClient() {
return client("node1");
}
@Test public void testAliasCrudRouting() throws Exception {
try {
client.admin().indices().prepareDelete("test").execute().actionGet();
} catch (Exception e) {
// ignore
}
client.admin().indices().prepareCreate("test").execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
client.admin().indices().prepareAliases().addAliasAction(newAddAliasAction("test", "alias0").routing("0")).execute().actionGet();
Thread.sleep(300);
logger.info("--> indexing with id [1], and routing [0] using alias");
client.prepareIndex("alias0", "type1", "1").setSource("field", "value1").setRefresh(true).execute().actionGet();
logger.info("--> verifying get with no routing, should not find anything");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(false));
}
logger.info("--> verifying get with routing, should find");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareGet("test", "type1", "1").setRouting("0").execute().actionGet().exists(), equalTo(true));
}
logger.info("--> verifying get with routing alias, should find");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareGet("alias0", "type1", "1").execute().actionGet().exists(), equalTo(true));
}
logger.info("--> deleting with no routing, should not delete anything");
client.prepareDelete("test", "type1", "1").setRefresh(true).execute().actionGet();
client.admin().indices().prepareRefresh().execute().actionGet();
for (int i = 0; i < 5; i++) {
assertThat(client.prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(false));
assertThat(client.prepareGet("test", "type1", "1").setRouting("0").execute().actionGet().exists(), equalTo(true));
assertThat(client.prepareGet("alias0", "type1", "1").execute().actionGet().exists(), equalTo(true));
}
logger.info("--> deleting with routing alias, should delete");
client.prepareDelete("alias0", "type1", "1").setRefresh(true).execute().actionGet();
client.admin().indices().prepareRefresh().execute().actionGet();
for (int i = 0; i < 5; i++) {
assertThat(client.prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(false));
assertThat(client.prepareGet("test", "type1", "1").setRouting("0").execute().actionGet().exists(), equalTo(false));
assertThat(client.prepareGet("alias0", "type1", "1").execute().actionGet().exists(), equalTo(false));
}
logger.info("--> indexing with id [1], and routing [0] using alias");
client.prepareIndex("alias0", "type1", "1").setSource("field", "value1").setRefresh(true).execute().actionGet();
logger.info("--> verifying get with no routing, should not find anything");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(false));
}
logger.info("--> verifying get with routing, should find");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareGet("test", "type1", "1").setRouting("0").execute().actionGet().exists(), equalTo(true));
assertThat(client.prepareGet("alias0", "type1", "1").execute().actionGet().exists(), equalTo(true));
}
logger.info("--> deleting_by_query with 1 as routing, should not delete anything");
client.prepareDeleteByQuery().setQuery(matchAllQuery()).setRouting("1").execute().actionGet();
client.admin().indices().prepareRefresh().execute().actionGet();
for (int i = 0; i < 5; i++) {
assertThat(client.prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(false));
assertThat(client.prepareGet("test", "type1", "1").setRouting("0").execute().actionGet().exists(), equalTo(true));
assertThat(client.prepareGet("alias0", "type1", "1").execute().actionGet().exists(), equalTo(true));
}
logger.info("--> deleting_by_query with alias0, should delete");
client.prepareDeleteByQuery("alias0").setQuery(matchAllQuery()).execute().actionGet();
client.admin().indices().prepareRefresh().execute().actionGet();
for (int i = 0; i < 5; i++) {
assertThat(client.prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(false));
assertThat(client.prepareGet("test", "type1", "1").setRouting("0").execute().actionGet().exists(), equalTo(false));
assertThat(client.prepareGet("alias0", "type1", "1").execute().actionGet().exists(), equalTo(false));
}
}
@Test public void testAliasSearchRouting() throws Exception {
try {
client.admin().indices().prepareDelete("test").execute().actionGet();
} catch (Exception e) {
// ignore
}
client.admin().indices().prepareCreate("test").execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
client.admin().indices().prepareAliases()
.addAliasAction(newAddAliasAction("test", "alias"))
.addAliasAction(newAddAliasAction("test", "alias0").routing("0"))
.addAliasAction(newAddAliasAction("test", "alias1").routing("1"))
.addAliasAction(newAddAliasAction("test", "alias01").searchRouting("0,1"))
.execute().actionGet();
Thread.sleep(300);
logger.info("--> indexing with id [1], and routing [0] using alias");
client.prepareIndex("alias0", "type1", "1").setSource("field", "value1").setRefresh(true).execute().actionGet();
logger.info("--> verifying get with no routing, should not find anything");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(false));
}
logger.info("--> verifying get with routing, should find");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareGet("alias0", "type1", "1").execute().actionGet().exists(), equalTo(true));
}
logger.info("--> search with no routing, should fine one");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareSearch().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().hits().totalHits(), equalTo(1l));
}
logger.info("--> search with wrong routing, should not find");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareSearch().setRouting("1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().hits().totalHits(), equalTo(0l));
assertThat(client.prepareCount().setRouting("1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(0l));
assertThat(client.prepareSearch("alias1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().hits().totalHits(), equalTo(0l));
assertThat(client.prepareCount("alias1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(0l));
}
logger.info("--> search with correct routing, should find");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareSearch().setRouting("0").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().hits().totalHits(), equalTo(1l));
assertThat(client.prepareCount().setRouting("0").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(1l));
assertThat(client.prepareSearch("alias0").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().hits().totalHits(), equalTo(1l));
assertThat(client.prepareCount("alias0").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(1l));
}
logger.info("--> indexing with id [2], and routing [1] using alias");
client.prepareIndex("alias1", "type1", "2").setSource("field", "value1").setRefresh(true).execute().actionGet();
logger.info("--> search with no routing, should fine two");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareSearch().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().hits().totalHits(), equalTo(2l));
assertThat(client.prepareCount().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(2l));
}
logger.info("--> search with 0 routing, should find one");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareSearch().setRouting("0").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().hits().totalHits(), equalTo(1l));
assertThat(client.prepareCount().setRouting("0").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(1l));
assertThat(client.prepareSearch("alias0").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().hits().totalHits(), equalTo(1l));
assertThat(client.prepareCount("alias0").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(1l));
}
logger.info("--> search with 1 routing, should find one");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareSearch().setRouting("1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().hits().totalHits(), equalTo(1l));
assertThat(client.prepareCount().setRouting("1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(1l));
assertThat(client.prepareSearch("alias1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().hits().totalHits(), equalTo(1l));
assertThat(client.prepareCount("alias1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(1l));
}
logger.info("--> search with 0,1 routings , should find two");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareSearch().setRouting("0", "1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().hits().totalHits(), equalTo(2l));
assertThat(client.prepareCount().setRouting("0", "1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(2l));
assertThat(client.prepareSearch("alias01").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().hits().totalHits(), equalTo(2l));
assertThat(client.prepareCount("alias01").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(2l));
}
logger.info("--> search with two routing aliases , should find two");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareSearch("alias0", "alias1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().hits().totalHits(), equalTo(2l));
assertThat(client.prepareCount("alias0", "alias1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(2l));
}
logger.info("--> search with alias0, alias1 and alias01, should find two");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareSearch("alias0", "alias1", "alias01").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().hits().totalHits(), equalTo(2l));
assertThat(client.prepareCount("alias0", "alias1", "alias01").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(2l));
}
logger.info("--> search with test, alias0 and alias1, should find two");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareSearch("test", "alias0", "alias1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().hits().totalHits(), equalTo(2l));
assertThat(client.prepareCount("test", "alias0", "alias1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(2l));
}
}
@Test public void testAliasSearchRoutingWithTwoIndices() throws Exception {
try {
client.admin().indices().prepareDelete("test-a").execute().actionGet();
client.admin().indices().prepareDelete("test-b").execute().actionGet();
} catch (Exception e) {
// ignore
}
client.admin().indices().prepareCreate("test-a").execute().actionGet();
client.admin().indices().prepareCreate("test-b").execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
client.admin().indices().prepareAliases()
.addAliasAction(newAddAliasAction("test-a", "alias-a0").routing("0"))
.addAliasAction(newAddAliasAction("test-a", "alias-a1").routing("1"))
.addAliasAction(newAddAliasAction("test-b", "alias-b0").routing("0"))
.addAliasAction(newAddAliasAction("test-b", "alias-b1").routing("1"))
.addAliasAction(newAddAliasAction("test-a", "alias-ab").searchRouting("0"))
.addAliasAction(newAddAliasAction("test-b", "alias-ab").searchRouting("1"))
.execute().actionGet();
Thread.sleep(300);
logger.info("--> indexing with id [1], and routing [0] using alias to test-a");
client.prepareIndex("alias-a0", "type1", "1").setSource("field", "value1").setRefresh(true).execute().actionGet();
logger.info("--> verifying get with no routing, should not find anything");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(false));
}
logger.info("--> verifying get with routing, should find");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareGet("alias-a0", "type1", "1").execute().actionGet().exists(), equalTo(true));
}
logger.info("--> indexing with id [0], and routing [1] using alias to test-b");
client.prepareIndex("alias-b1", "type1", "1").setSource("field", "value1").setRefresh(true).execute().actionGet();
logger.info("--> verifying get with no routing, should not find anything");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(false));
}
logger.info("--> verifying get with routing, should find");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareGet("alias-b1", "type1", "1").execute().actionGet().exists(), equalTo(true));
}
logger.info("--> search with alias-a1,alias-b0, should not find");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareSearch("alias-a1", "alias-b0").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().hits().totalHits(), equalTo(0l));
assertThat(client.prepareCount("alias-a1", "alias-b0").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(0l));
}
logger.info("--> search with alias-ab, should find two");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareSearch("alias-ab").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().hits().totalHits(), equalTo(2l));
assertThat(client.prepareCount("alias-ab").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(2l));
}
logger.info("--> search with alias-a0,alias-b1 should find two");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareSearch("alias-a0", "alias-b1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().hits().totalHits(), equalTo(2l));
assertThat(client.prepareCount("alias-a0", "alias-b1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(2l));
}
}
@Test public void testRequiredRoutingMappingWithAlias() throws Exception {
try {
client.admin().indices().prepareDelete("test").execute().actionGet();
} catch (Exception e) {
// ignore
}
client.admin().indices().prepareCreate("test")
.addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("_routing").field("required", true).endObject().endObject().endObject())
.execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
logger.info("--> indexing with id [1], and routing [0]");
client.prepareIndex("test", "type1", "1").setRouting("0").setSource("field", "value1").setRefresh(true).execute().actionGet();
logger.info("--> verifying get with no routing, should not find anything");
logger.info("--> indexing with id [1], with no routing, should fail");
try {
client.prepareIndex("test", "type1", "1").setSource("field", "value1").setRefresh(true).execute().actionGet();
assert false;
} catch (ElasticSearchException e) {
assertThat(e.unwrapCause(), instanceOf(RoutingMissingException.class));
}
logger.info("--> verifying get with routing, should find");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareGet("test", "type1", "1").setRouting("0").execute().actionGet().exists(), equalTo(true));
}
logger.info("--> deleting with no routing, should broadcast the delete since _routing is required");
client.prepareDelete("test", "type1", "1").setRefresh(true).execute().actionGet();
for (int i = 0; i < 5; i++) {
assertThat(client.prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(false));
assertThat(client.prepareGet("test", "type1", "1").setRouting("0").execute().actionGet().exists(), equalTo(false));
}
logger.info("--> indexing with id [1], and routing [0]");
client.prepareIndex("test", "type1", "1").setRouting("0").setSource("field", "value1").setRefresh(true).execute().actionGet();
logger.info("--> verifying get with no routing, should not find anything");
logger.info("--> bulk deleting with no routing, should broadcast the delete since _routing is required");
client.prepareBulk().add(Requests.deleteRequest("test").type("type1").id("1")).execute().actionGet();
client.admin().indices().prepareRefresh().execute().actionGet();
for (int i = 0; i < 5; i++) {
assertThat(client.prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(false));
assertThat(client.prepareGet("test", "type1", "1").setRouting("0").execute().actionGet().exists(), equalTo(false));
}
}
@Test public void testIndexingAliasesOverTime() throws Exception {
try {
client.admin().indices().prepareDelete("test").execute().actionGet();
} catch (Exception e) {
// ignore
}
client.admin().indices().prepareCreate("test").execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
logger.info("--> creating alias with routing [3]");
client.admin().indices().prepareAliases()
.addAliasAction(newAddAliasAction("test", "alias").routing("3"))
.execute().actionGet();
Thread.sleep(300);
logger.info("--> indexing with id [0], and routing [3]");
client.prepareIndex("alias", "type1", "0").setSource("field", "value1").setRefresh(true).execute().actionGet();
logger.info("--> verifying get with no routing, should not find anything");
logger.info("--> verifying get and search with routing, should find");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareGet("test", "type1", "0").setRouting("3").execute().actionGet().exists(), equalTo(true));
assertThat(client.prepareSearch("alias").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().hits().totalHits(), equalTo(1l));
assertThat(client.prepareCount("alias").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(1l));
}
logger.info("--> creating alias with routing [4]");
client.admin().indices().prepareAliases()
.addAliasAction(newAddAliasAction("test", "alias").routing("4"))
.execute().actionGet();
Thread.sleep(300);
logger.info("--> verifying search with wrong routing should not find");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareSearch("alias").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().hits().totalHits(), equalTo(0l));
assertThat(client.prepareCount("alias").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(0l));
}
logger.info("--> creating alias with search routing [3,4] and index routing 4");
client.admin().indices().prepareAliases()
.addAliasAction(newAddAliasAction("test", "alias").searchRouting("3,4").indexRouting("4"))
.execute().actionGet();
Thread.sleep(300);
logger.info("--> indexing with id [1], and routing [4]");
client.prepareIndex("alias", "type1", "1").setSource("field", "value2").setRefresh(true).execute().actionGet();
logger.info("--> verifying get with no routing, should not find anything");
logger.info("--> verifying get and search with routing, should find");
for (int i = 0; i < 5; i++) {
assertThat(client.prepareGet("test", "type1", "0").setRouting("3").execute().actionGet().exists(), equalTo(true));
assertThat(client.prepareGet("test", "type1", "1").setRouting("4").execute().actionGet().exists(), equalTo(true));
assertThat(client.prepareSearch("alias").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().hits().totalHits(), equalTo(2l));
assertThat(client.prepareCount("alias").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(2l));
}
}
}

View File

@ -118,7 +118,7 @@ public class ThreeShardsEmbeddedSearchTests extends AbstractNodesTests {
.from(0).size(60).explain(true).indexBoost("test", 1.0f).indexBoost("test2", 2.0f);
List<DfsSearchResult> dfsResults = newArrayList();
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null, null)) {
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, new String[]{"test"}, null, null, null)) {
for (ShardRouting shardRouting : shardIt) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder, SearchType.QUERY_THEN_FETCH)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
@ -186,7 +186,7 @@ public class ThreeShardsEmbeddedSearchTests extends AbstractNodesTests {
.from(0).size(60).explain(true).sort("age", SortOrder.ASC);
List<DfsSearchResult> dfsResults = newArrayList();
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null, null)) {
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, new String[]{"test"}, null, null, null)) {
for (ShardRouting shardRouting : shardIt) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder, SearchType.QUERY_THEN_FETCH)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
@ -280,7 +280,7 @@ public class ThreeShardsEmbeddedSearchTests extends AbstractNodesTests {
}
Map<SearchShardTarget, QueryFetchSearchResult> queryFetchResults = newHashMap();
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null, null)) {
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, new String[]{"test"}, null, null, null)) {
for (ShardRouting shardRouting : shardIt) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder, SearchType.QUERY_AND_FETCH)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
@ -332,7 +332,7 @@ public class ThreeShardsEmbeddedSearchTests extends AbstractNodesTests {
.facet(FacetBuilders.queryFacet("test1", termQuery("name", "test1")));
Map<SearchShardTarget, QuerySearchResultProvider> queryResults = newHashMap();
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null, null)) {
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, new String[]{"test"}, null, null, null)) {
for (ShardRouting shardRouting : shardIt) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder, SearchType.QUERY_THEN_FETCH)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));

View File

@ -126,7 +126,7 @@ public class ThreeShardsUnbalancedShardsEmbeddedSearchTests extends AbstractNode
.from(0).size(60).explain(true);
List<DfsSearchResult> dfsResults = newArrayList();
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null, null)) {
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, new String[]{"test"}, null, null, null)) {
for (ShardRouting shardRouting : shardIt) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder, SearchType.DFS_QUERY_THEN_FETCH)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
@ -193,7 +193,7 @@ public class ThreeShardsUnbalancedShardsEmbeddedSearchTests extends AbstractNode
.from(0).size(60).explain(true).sort("age", SortOrder.ASC);
List<DfsSearchResult> dfsResults = newArrayList();
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null, null)) {
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, new String[]{"test"}, null, null, null)) {
for (ShardRouting shardRouting : shardIt) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder, SearchType.DFS_QUERY_THEN_FETCH)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
@ -283,7 +283,7 @@ public class ThreeShardsUnbalancedShardsEmbeddedSearchTests extends AbstractNode
// do this with dfs, since we have uneven distribution of docs between shards
List<DfsSearchResult> dfsResults = newArrayList();
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null, null)) {
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, new String[]{"test"}, null, null, null)) {
for (ShardRouting shardRouting : shardIt) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder, SearchType.QUERY_AND_FETCH)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
@ -338,7 +338,7 @@ public class ThreeShardsUnbalancedShardsEmbeddedSearchTests extends AbstractNode
.facet(queryFacet("test1").query(termQuery("name", "test1")));
Map<SearchShardTarget, QuerySearchResultProvider> queryResults = newHashMap();
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null, null)) {
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, new String[]{"test"}, null, null, null)) {
for (ShardRouting shardRouting : shardIt) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder, SearchType.QUERY_THEN_FETCH)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));