Index Aliases, closes #88.

This commit is contained in:
kimchy 2010-03-25 02:00:53 +02:00
parent 4851ddde13
commit 8f324678e8
53 changed files with 1128 additions and 130 deletions

View File

@ -19,28 +19,11 @@
package org.elasticsearch.action;
import org.elasticsearch.cluster.ClusterState;
/**
* @author kimchy (shay.banon)
*/
public class Actions {
public static String[] processIndices(ClusterState state, String[] indices) {
if (indices == null || indices.length == 0) {
return state.routingTable().indicesRouting().keySet().toArray(new String[state.routingTable().indicesRouting().keySet().size()]);
}
if (indices.length == 1) {
if (indices[0].length() == 0) {
return state.routingTable().indicesRouting().keySet().toArray(new String[state.routingTable().indicesRouting().keySet().size()]);
}
if (indices[0].equals("_all")) {
return state.routingTable().indicesRouting().keySet().toArray(new String[state.routingTable().indicesRouting().keySet().size()]);
}
}
return indices;
}
public static ActionRequestValidationException addValidationError(String error, ActionRequestValidationException validationException) {
if (validationException == null) {
validationException = new ActionRequestValidationException();

View File

@ -29,6 +29,7 @@ import org.elasticsearch.action.admin.cluster.ping.replication.TransportReplicat
import org.elasticsearch.action.admin.cluster.ping.replication.TransportShardReplicationPingAction;
import org.elasticsearch.action.admin.cluster.ping.single.TransportSinglePingAction;
import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction;
import org.elasticsearch.action.admin.indices.alias.TransportIndicesAliasesAction;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
import org.elasticsearch.action.admin.indices.flush.TransportFlushAction;
@ -74,6 +75,7 @@ public class TransportActionModule extends AbstractModule {
bind(TransportCreateIndexAction.class).asEagerSingleton();
bind(TransportPutMappingAction.class).asEagerSingleton();
bind(TransportDeleteIndexAction.class).asEagerSingleton();
bind(TransportIndicesAliasesAction.class).asEagerSingleton();
bind(TransportShardGatewaySnapshotAction.class).asEagerSingleton();
bind(TransportIndexGatewaySnapshotAction.class).asEagerSingleton();

View File

@ -51,6 +51,7 @@ public class TransportActions {
public static final String REFRESH = "indices/refresh";
public static final String OPTIMIZE = "indices/optimize";
public static final String STATUS = "indices/status";
public static final String ALIASES = "indices/aliases";
public static class Gateway {
public static final String SNAPSHOT = "indices/gateway/snapshot";

View File

@ -55,6 +55,10 @@ public class ClusterHealthRequest extends MasterNodeOperationRequest {
return indices;
}
void indices(String[] indices) {
this.indices = indices;
}
public TimeValue timeout() {
return timeout;
}

View File

@ -36,8 +36,6 @@ import org.elasticsearch.timer.TimerService;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.settings.Settings;
import static org.elasticsearch.action.Actions.*;
/**
* @author kimchy (shay.banon)
*/
@ -110,8 +108,9 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc
RoutingTableValidation validation = clusterState.routingTable().validate(clusterState.metaData());
ClusterHealthResponse response = new ClusterHealthResponse(clusterName.value(), validation.failures());
String[] indices = processIndices(clusterState, request.indices());
for (String index : indices) {
request.indices(clusterState.metaData().concreteIndices(request.indices()));
for (String index : request.indices()) {
IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(index);
IndexMetaData indexMetaData = clusterState.metaData().index(index);
if (indexRoutingTable == null) {

View File

@ -39,7 +39,6 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceArray;
import static com.google.common.collect.Lists.*;
import static org.elasticsearch.action.Actions.*;
/**
* @author kimchy (Shay Banon)
@ -63,7 +62,7 @@ public class TransportBroadcastPingAction extends TransportBroadcastOperationAct
}
@Override protected GroupShardsIterator shards(BroadcastPingRequest request, ClusterState clusterState) {
return indicesService.searchShards(clusterState, processIndices(clusterState, request.indices()), request.queryHint());
return indicesService.searchShards(clusterState, request.indices(), request.queryHint());
}
@Override protected BroadcastPingResponse newResponse(BroadcastPingRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.admin.cluster.ping.replication;
import com.google.inject.Inject;
import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.indices.IndicesService;
@ -59,7 +60,7 @@ public class TransportShardReplicationPingAction extends TransportShardReplicati
@Override protected void shardOperationOnBackup(ShardOperationRequest shardRequest) {
}
@Override protected ShardsIterator shards(ShardReplicationPingRequest request) {
@Override protected ShardsIterator shards(ClusterState clusterState, ShardReplicationPingRequest request) {
return clusterService.state().routingTable().index(request.index()).shard(request.shardId()).shardsIt();
}
}

View File

@ -0,0 +1,97 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.alias;
import com.google.common.collect.Lists;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.cluster.metadata.AliasAction;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import java.io.IOException;
import java.util.List;
import static org.elasticsearch.action.Actions.*;
import static org.elasticsearch.cluster.metadata.AliasAction.*;
/**
* A request to add/remove aliases for one or more indices.
*
* @author kimchy (shay.banon)
*/
public class IndicesAliasesRequest extends MasterNodeOperationRequest {
private List<AliasAction> aliasActions = Lists.newArrayList();
public IndicesAliasesRequest() {
}
/**
* Adds an alias to the index.
*
* @param index The index
* @param alias The alias
*/
public IndicesAliasesRequest addAlias(String index, String alias) {
aliasActions.add(new AliasAction(AliasAction.Type.ADD, index, alias));
return this;
}
/**
* Adds an alias to the index.
*
* @param index The index
* @param alias The alias
*/
public IndicesAliasesRequest removeAlias(String index, String alias) {
aliasActions.add(new AliasAction(AliasAction.Type.REMOVE, index, alias));
return this;
}
List<AliasAction> aliasActions() {
return this.aliasActions;
}
@Override public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (aliasActions.isEmpty()) {
validationException = addValidationError("Must specify at least one alias action", validationException);
}
return validationException;
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
int size = in.readVInt();
for (int i = 0; i < size; i++) {
aliasActions.add(readAliasAction(in));
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(aliasActions.size());
for (AliasAction aliasAction : aliasActions) {
aliasAction.writeTo(out);
}
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.alias;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
import java.io.IOException;
/**
* A response for a create index action.
*
* @author kimchy (shay.banon)
*/
public class IndicesAliasesResponse implements ActionResponse, Streamable {
IndicesAliasesResponse() {
}
@Override public void readFrom(StreamInput in) throws IOException {
}
@Override public void writeTo(StreamOutput out) throws IOException {
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.alias;
import com.google.inject.Inject;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.MetaDataService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.settings.Settings;
/**
* @author kimchy (shay.banon)
*/
public class TransportIndicesAliasesAction extends TransportMasterNodeOperationAction<IndicesAliasesRequest, IndicesAliasesResponse> {
private final MetaDataService metaDataService;
@Inject public TransportIndicesAliasesAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, MetaDataService metaDataService) {
super(settings, transportService, clusterService, threadPool);
this.metaDataService = metaDataService;
}
@Override protected String transportAction() {
return TransportActions.Admin.Indices.ALIASES;
}
@Override protected IndicesAliasesRequest newRequest() {
return new IndicesAliasesRequest();
}
@Override protected IndicesAliasesResponse newResponse() {
return new IndicesAliasesResponse();
}
@Override protected IndicesAliasesResponse masterOperation(IndicesAliasesRequest request) throws ElasticSearchException {
MetaDataService.IndicesAliasesResult indicesAliasesResult = metaDataService.indicesAliases(request.aliasActions());
return new IndicesAliasesResponse();
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.action.admin.indices.create;
import com.google.common.collect.Maps;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
@ -33,6 +32,7 @@ import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static com.google.common.collect.Maps.*;
import static org.elasticsearch.action.Actions.*;
import static org.elasticsearch.util.TimeValue.*;
import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*;
@ -54,7 +54,7 @@ public class CreateIndexRequest extends MasterNodeOperationRequest {
private Settings settings = EMPTY_SETTINGS;
private Map<String, String> mappings = Maps.newHashMap();
private Map<String, String> mappings = newHashMap();
private TimeValue timeout = new TimeValue(10, TimeUnit.SECONDS);
@ -106,6 +106,14 @@ public class CreateIndexRequest extends MasterNodeOperationRequest {
return this;
}
/**
* The settings to created the index with.
*/
public CreateIndexRequest settings(Settings.Builder settings) {
this.settings = settings.build();
return this;
}
/**
* Adds mapping that will be added when the index gets created.
*

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.admin.indices.gateway.snapshot;
import com.google.inject.Inject;
import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.index.gateway.IndexShardGatewayService;
@ -64,7 +65,7 @@ public class TransportShardGatewaySnapshotAction extends TransportShardReplicati
// silently ignore, we disable it with #ignoreBackups anyhow
}
@Override protected ShardsIterator shards(ShardGatewaySnapshotRequest request) {
@Override protected ShardsIterator shards(ClusterState clusterState, ShardGatewaySnapshotRequest request) {
return clusterService.state().routingTable().index(request.index()).shard(request.shardId()).shardsIt();
}

View File

@ -78,17 +78,25 @@ public class PutMappingRequest extends MasterNodeOperationRequest {
return validationException;
}
/**
* Sets the indices this put mapping operation will execute on.
*/
public PutMappingRequest indices(String[] indices) {
this.indices = indices;
return this;
}
/**
* The indices the mappings will be put.
*/
String[] indices() {
public String[] indices() {
return indices;
}
/**
* The mapping type.
*/
String type() {
public String type() {
return mappingType;
}

View File

@ -21,25 +21,15 @@ package org.elasticsearch.action.admin.indices.mapping.put;
import com.google.inject.Inject;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaDataService;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.settings.Settings;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.action.Actions.*;
/**
* Put mapping action.
*
@ -49,16 +39,10 @@ public class TransportPutMappingAction extends TransportMasterNodeOperationActio
private final MetaDataService metaDataService;
private final TransportCreateIndexAction createIndexAction;
private final boolean autoCreateIndex;
@Inject public TransportPutMappingAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, MetaDataService metaDataService, TransportCreateIndexAction createIndexAction) {
ThreadPool threadPool, MetaDataService metaDataService) {
super(settings, transportService, clusterService, threadPool);
this.metaDataService = metaDataService;
this.createIndexAction = createIndexAction;
this.autoCreateIndex = settings.getAsBoolean("action.autoCreateIndex", true);
}
@ -75,40 +59,13 @@ public class TransportPutMappingAction extends TransportMasterNodeOperationActio
}
@Override protected PutMappingResponse masterOperation(PutMappingRequest request) throws ElasticSearchException {
final String[] indices = processIndices(clusterService.state(), request.indices());
ClusterState clusterState = clusterService.state();
// update to concrete indices
request.indices(clusterState.metaData().concreteIndices(request.indices()));
final String[] indices = request.indices();
MetaDataService.PutMappingResult result = metaDataService.putMapping(indices, request.type(), request.mappingSource(), request.ignoreConflicts(), request.timeout());
return new PutMappingResponse(result.acknowledged());
}
@Override protected void doExecute(final PutMappingRequest request, final ActionListener<PutMappingResponse> listener) {
final String[] indices = processIndices(clusterService.state(), request.indices());
if (autoCreateIndex) {
final CountDownLatch latch = new CountDownLatch(indices.length);
for (String index : indices) {
if (!clusterService.state().metaData().hasIndex(index)) {
createIndexAction.execute(new CreateIndexRequest(index), new ActionListener<CreateIndexResponse>() {
@Override public void onResponse(CreateIndexResponse response) {
latch.countDown();
}
@Override public void onFailure(Throwable e) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
latch.countDown();
} else {
listener.onFailure(e);
}
}
});
} else {
latch.countDown();
}
}
try {
latch.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// ignore
}
}
super.doExecute(request, listener);
}
}

View File

@ -40,7 +40,6 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceArray;
import static com.google.common.collect.Lists.*;
import static org.elasticsearch.action.Actions.*;
/**
* @author kimchy (shay.banon)
@ -76,7 +75,7 @@ public class TransportCountAction extends TransportBroadcastOperationAction<Coun
}
@Override protected GroupShardsIterator shards(CountRequest request, ClusterState clusterState) {
return indicesService.searchShards(clusterState, processIndices(clusterState, request.indices()), request.queryHint());
return indicesService.searchShards(clusterState, request.indices(), request.queryHint());
}
@Override protected CountResponse newResponse(CountRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {

View File

@ -82,6 +82,14 @@ public class DeleteRequest extends ShardReplicationOperationRequest {
return validationException;
}
/**
* Sets the index the delete will happen on.
*/
@Override public DeleteRequest index(String index) {
super.index(index);
return this;
}
/**
* Should the listener be called on a separate thread if needed.
*/

View File

@ -28,6 +28,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.indices.IndexAlreadyExistsException;
@ -57,7 +58,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
@Override protected void doExecute(final DeleteRequest deleteRequest, final ActionListener<DeleteResponse> listener) {
if (autoCreateIndex) {
if (!clusterService.state().metaData().hasIndex(deleteRequest.index())) {
if (!clusterService.state().metaData().hasConcreteIndex(deleteRequest.index())) {
createIndexAction.execute(new CreateIndexRequest(deleteRequest.index()), new ActionListener<CreateIndexResponse>() {
@Override public void onResponse(CreateIndexResponse result) {
TransportDeleteAction.super.doExecute(deleteRequest, listener);
@ -101,7 +102,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
indexShard(shardRequest).delete(request.type(), request.id());
}
@Override protected ShardsIterator shards(DeleteRequest request) {
@Override protected ShardsIterator shards(ClusterState clusterState, DeleteRequest request) {
return indicesService.indexServiceSafe(request.index()).operationRouting()
.deleteShards(clusterService.state(), request.type(), request.id());
}

View File

@ -23,6 +23,7 @@ import com.google.inject.Inject;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardsIterator;
@ -65,7 +66,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
indexShard(shardRequest).deleteByQuery(request.querySource(), request.queryParserName(), request.types());
}
@Override protected ShardsIterator shards(ShardDeleteByQueryRequest request) {
@Override protected ShardsIterator shards(ClusterState clusterState, ShardDeleteByQueryRequest request) {
GroupShardsIterator group = indicesService.indexServiceSafe(request.index()).operationRouting().deleteByQueryShards(clusterService.state());
for (ShardsIterator shards : group) {
if (shards.shardId().id() == request.shardId()) {

View File

@ -141,6 +141,14 @@ public class IndexRequest extends ShardReplicationOperationRequest {
return validationException;
}
/**
* Sets the index the index operation will happen on.
*/
@Override public IndexRequest index(String index) {
super.index(index);
return this;
}
/**
* Should the listener be called on a separate thread if needed.
*/

View File

@ -28,6 +28,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.routing.ShardsIterator;
@ -80,25 +81,23 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
indexRequest.opType(IndexRequest.OpType.CREATE);
}
}
if (autoCreateIndex) {
if (!clusterService.state().metaData().hasIndex(indexRequest.index())) {
createIndexAction.execute(new CreateIndexRequest(indexRequest.index()), new ActionListener<CreateIndexResponse>() {
@Override public void onResponse(CreateIndexResponse result) {
TransportIndexAction.super.doExecute(indexRequest, listener);
}
if (autoCreateIndex && !clusterService.state().metaData().hasConcreteIndex(indexRequest.index())) {
createIndexAction.execute(new CreateIndexRequest(indexRequest.index()), new ActionListener<CreateIndexResponse>() {
@Override public void onResponse(CreateIndexResponse result) {
TransportIndexAction.super.doExecute(indexRequest, listener);
}
@Override public void onFailure(Throwable e) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
// we have the index, do it
TransportIndexAction.super.doExecute(indexRequest, listener);
} else {
listener.onFailure(e);
}
@Override public void onFailure(Throwable e) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
// we have the index, do it
TransportIndexAction.super.doExecute(indexRequest, listener);
} else {
listener.onFailure(e);
}
});
} else {
super.doExecute(indexRequest, listener);
}
}
});
} else {
super.doExecute(indexRequest, listener);
}
}
@ -114,7 +113,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
return TransportActions.INDEX;
}
@Override protected ShardsIterator shards(IndexRequest request) {
@Override protected ShardsIterator shards(ClusterState clusterState, IndexRequest request) {
return indicesService.indexServiceSafe(request.index()).operationRouting()
.indexShards(clusterService.state(), request.type(), request.id());
}

View File

@ -105,6 +105,14 @@ public class SearchRequest implements ActionRequest {
return listenerThreaded;
}
/**
* Sets the indices the search will be executed on.
*/
public SearchRequest indices(String[] indices) {
this.indices = indices;
return this;
}
/**
* Should the listener be called on a separate thread if needed.
*/

View File

@ -42,7 +42,6 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.action.Actions.*;
import static org.elasticsearch.action.search.type.TransportSearchHelper.*;
/**
@ -103,7 +102,9 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
nodes = clusterState.nodes();
shardsIts = indicesService.searchShards(clusterState, processIndices(clusterState, request.indices()), request.queryHint());
request.indices(clusterState.metaData().concreteIndices(request.indices()));
shardsIts = indicesService.searchShards(clusterState, request.indices(), request.queryHint());
expectedSuccessfulOps = shardsIts.size();
expectedTotalOps = shardsIts.totalSize();
}

View File

@ -52,6 +52,11 @@ public abstract class BroadcastOperationRequest implements ActionRequest {
return indices;
}
public BroadcastOperationRequest indices(String[] indices) {
this.indices = indices;
return this;
}
public String queryHint() {
return queryHint;
}

View File

@ -114,6 +114,10 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
this.listener = listener;
clusterState = clusterService.state();
// update to concrete indices
request.indices(clusterState.metaData().concreteIndices(request.indices()));
nodes = clusterState.nodes();
shardsIts = shards(request, clusterState);
expectedOps = shardsIts.size();

View File

@ -46,6 +46,11 @@ public class IndicesReplicationOperationRequest implements ActionRequest {
return this.indices;
}
public IndicesReplicationOperationRequest indices(String[] indices) {
this.indices = indices;
return this;
}
@Override public ActionRequestValidationException validate() {
return null;
}

View File

@ -52,6 +52,11 @@ public abstract class ShardReplicationOperationRequest implements ActionRequest
return this.index;
}
public ShardReplicationOperationRequest index(String index) {
this.index = index;
return this;
}
/**
* Should the listener be called on a separate thread if needed.
*/

View File

@ -22,9 +22,9 @@ package org.elasticsearch.action.support.replication;
import com.google.inject.Inject;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.Actions;
import org.elasticsearch.action.support.BaseAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
@ -35,7 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public abstract class TransportIndicesReplicationOperationAction<Request extends IndicesReplicationOperationRequest, Response extends ActionResponse, IndexRequest extends IndexReplicationOperationRequest, IndexResponse extends ActionResponse, ShardRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionResponse>
extends BaseAction<Request, Response> {
@ -57,7 +57,12 @@ public abstract class TransportIndicesReplicationOperationAction<Request extends
}
@Override protected void doExecute(final Request request, final ActionListener<Response> listener) {
String[] indices = Actions.processIndices(clusterService.state(), request.indices());
ClusterState clusterState = clusterService.state();
// update to actual indices
request.indices(clusterState.metaData().concreteIndices(request.indices()));
String[] indices = request.indices();
final AtomicInteger indexCounter = new AtomicInteger();
final AtomicInteger completionCounter = new AtomicInteger(indices.length);
final AtomicReferenceArray<Object> indexResponses = new AtomicReferenceArray<Object>(indices.length);

View File

@ -97,7 +97,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
protected abstract void shardOperationOnBackup(ShardOperationRequest shardRequest);
protected abstract ShardsIterator shards(Request request) throws ElasticSearchException;
protected abstract ShardsIterator shards(ClusterState clusterState, Request request) throws ElasticSearchException;
/**
* Should the operations be performed on the backups as well. Defaults to <tt>false</tt> meaning operations
@ -221,9 +221,11 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
*/
public boolean start(final boolean fromClusterEvent) throws ElasticSearchException {
ClusterState clusterState = clusterService.state();
// update to the concrete index
request.index(clusterState.metaData().concreteIndex(request.index()));
nodes = clusterState.nodes();
try {
shards = shards(request);
shards = shards(clusterState, request);
} catch (Exception e) {
listener.onFailure(e);
return true;

View File

@ -66,6 +66,11 @@ public abstract class SingleOperationRequest implements ActionRequest {
return index;
}
SingleOperationRequest index(String index) {
this.index = index;
return this;
}
public String type() {
return type;
}

View File

@ -99,7 +99,10 @@ public abstract class TransportSingleOperationAction<Request extends SingleOpera
nodes = clusterState.nodes();
this.shards = indicesService.indexServiceSafe(request.index).operationRouting()
// update to the concrete shard to use
request.index(clusterState.metaData().concreteIndex(request.index()));
this.shards = indicesService.indexServiceSafe(request.index()).operationRouting()
.getShards(clusterState, request.type(), request.id());
this.shardsIt = shards.iterator();
}

View File

@ -55,7 +55,6 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.regex.Pattern;
import static com.google.common.collect.Lists.*;
import static org.elasticsearch.action.Actions.*;
/**
* @author kimchy (shay.banon)
@ -426,6 +425,6 @@ public class TransportTermsAction extends TransportBroadcastOperationAction<Term
}
@Override protected GroupShardsIterator shards(TermsRequest request, ClusterState clusterState) {
return indicesService.searchShards(clusterState, processIndices(clusterState, request.indices()), request.queryHint());
return indicesService.searchShards(clusterState, request.indices(), request.queryHint());
}
}

View File

@ -21,6 +21,8 @@ package org.elasticsearch.client;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
@ -189,4 +191,22 @@ public interface IndicesAdminClient {
* @see org.elasticsearch.client.Requests#gatewaySnapshotRequest(String...)
*/
void gatewaySnapshot(GatewaySnapshotRequest request, ActionListener<GatewaySnapshotResponse> listener);
/**
* Allows to add/remove aliases from indices.
*
* @param request The index aliases request
* @return The result future
* @see Requests#indexAliasesRequest()
*/
ActionFuture<IndicesAliasesResponse> indicesAliases(IndicesAliasesRequest request);
/**
* Allows to add/remove aliases from indices.
*
* @param request The index aliases request
* @param listener A listener to be notified with a result
* @see Requests#indexAliasesRequest()
*/
void aliases(IndicesAliasesRequest request, ActionListener<IndicesAliasesResponse> listener);
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.action.admin.cluster.ping.broadcast.BroadcastPingReques
import org.elasticsearch.action.admin.cluster.ping.replication.ReplicationPingRequest;
import org.elasticsearch.action.admin.cluster.ping.single.SinglePingRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
@ -202,6 +203,15 @@ public class Requests {
return new PutMappingRequest(indices);
}
/**
* Creates an index aliases request allowing to add and remove aliases.
*
* @return The index aliases request
*/
public static IndicesAliasesRequest indexAliasesRequest() {
return new IndicesAliasesRequest();
}
/**
* Creates a refresh indices request.
*

View File

@ -22,6 +22,9 @@ package org.elasticsearch.client.server;
import com.google.inject.Inject;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
import org.elasticsearch.action.admin.indices.alias.TransportIndicesAliasesAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
@ -71,10 +74,13 @@ public class ServerIndicesAdminClient extends AbstractComponent implements Indic
private final TransportGatewaySnapshotAction gatewaySnapshotAction;
private final TransportIndicesAliasesAction indicesAliasesAction;
@Inject public ServerIndicesAdminClient(Settings settings, TransportIndicesStatusAction indicesStatusAction,
TransportCreateIndexAction createIndexAction, TransportDeleteIndexAction deleteIndexAction,
TransportRefreshAction refreshAction, TransportFlushAction flushAction, TransportOptimizeAction optimizeAction,
TransportPutMappingAction putMappingAction, TransportGatewaySnapshotAction gatewaySnapshotAction) {
TransportPutMappingAction putMappingAction, TransportGatewaySnapshotAction gatewaySnapshotAction,
TransportIndicesAliasesAction indicesAliasesAction) {
super(settings);
this.indicesStatusAction = indicesStatusAction;
this.createIndexAction = createIndexAction;
@ -84,6 +90,7 @@ public class ServerIndicesAdminClient extends AbstractComponent implements Indic
this.optimizeAction = optimizeAction;
this.putMappingAction = putMappingAction;
this.gatewaySnapshotAction = gatewaySnapshotAction;
this.indicesAliasesAction = indicesAliasesAction;
}
@Override public ActionFuture<IndicesStatusResponse> status(IndicesStatusRequest request) {
@ -149,4 +156,12 @@ public class ServerIndicesAdminClient extends AbstractComponent implements Indic
@Override public void gatewaySnapshot(GatewaySnapshotRequest request, ActionListener<GatewaySnapshotResponse> listener) {
gatewaySnapshotAction.execute(request, listener);
}
@Override public ActionFuture<IndicesAliasesResponse> indicesAliases(IndicesAliasesRequest request) {
return indicesAliasesAction.execute(request);
}
@Override public void aliases(IndicesAliasesRequest request, ActionListener<IndicesAliasesResponse> listener) {
indicesAliasesAction.execute(request, listener);
}
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.client.transport.action.admin.cluster.ping.broadcast.Cl
import org.elasticsearch.client.transport.action.admin.cluster.ping.replication.ClientTransportReplicationPingAction;
import org.elasticsearch.client.transport.action.admin.cluster.ping.single.ClientTransportSinglePingAction;
import org.elasticsearch.client.transport.action.admin.cluster.state.ClientTransportClusterStateAction;
import org.elasticsearch.client.transport.action.admin.indices.alias.ClientTransportIndicesAliasesAction;
import org.elasticsearch.client.transport.action.admin.indices.create.ClientTransportCreateIndexAction;
import org.elasticsearch.client.transport.action.admin.indices.delete.ClientTransportDeleteIndexAction;
import org.elasticsearch.client.transport.action.admin.indices.flush.ClientTransportFlushAction;
@ -67,6 +68,8 @@ public class ClientTransportActionModule extends AbstractModule {
bind(ClientTransportDeleteIndexAction.class).asEagerSingleton();
bind(ClientTransportPutMappingAction.class).asEagerSingleton();
bind(ClientTransportGatewaySnapshotAction.class).asEagerSingleton();
bind(ClientTransportIndicesAliasesAction.class).asEagerSingleton();
bind(ClientTransportNodesInfoAction.class).asEagerSingleton();
bind(ClientTransportNodesShutdownAction.class).asEagerSingleton();
bind(ClientTransportSinglePingAction.class).asEagerSingleton();

View File

@ -0,0 +1,42 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.transport.action.admin.indices.alias;
import com.google.inject.Inject;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
import org.elasticsearch.client.transport.action.support.BaseClientTransportAction;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.settings.Settings;
/**
* @author kimchy (shay.banon)
*/
public class ClientTransportIndicesAliasesAction extends BaseClientTransportAction<IndicesAliasesRequest, IndicesAliasesResponse> {
@Inject public ClientTransportIndicesAliasesAction(Settings settings, TransportService transportService) {
super(settings, transportService, IndicesAliasesResponse.class);
}
@Override protected String action() {
return TransportActions.Admin.Indices.ALIASES;
}
}

View File

@ -28,7 +28,7 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.settings.Settings;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class ClientTransportCreateIndexAction extends BaseClientTransportAction<CreateIndexRequest, CreateIndexResponse> {

View File

@ -23,6 +23,8 @@ import com.google.inject.Inject;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
@ -41,6 +43,7 @@ import org.elasticsearch.action.admin.indices.status.IndicesStatusRequest;
import org.elasticsearch.action.admin.indices.status.IndicesStatusResponse;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.client.transport.TransportClientNodesService;
import org.elasticsearch.client.transport.action.admin.indices.alias.ClientTransportIndicesAliasesAction;
import org.elasticsearch.client.transport.action.admin.indices.create.ClientTransportCreateIndexAction;
import org.elasticsearch.client.transport.action.admin.indices.delete.ClientTransportDeleteIndexAction;
import org.elasticsearch.client.transport.action.admin.indices.flush.ClientTransportFlushAction;
@ -76,11 +79,14 @@ public class InternalTransportIndicesAdminClient extends AbstractComponent imple
private final ClientTransportGatewaySnapshotAction gatewaySnapshotAction;
private final ClientTransportIndicesAliasesAction indicesAliasesAction;
@Inject public InternalTransportIndicesAdminClient(Settings settings, TransportClientNodesService nodesService,
ClientTransportIndicesStatusAction indicesStatusAction,
ClientTransportCreateIndexAction createIndexAction, ClientTransportDeleteIndexAction deleteIndexAction,
ClientTransportRefreshAction refreshAction, ClientTransportFlushAction flushAction, ClientTransportOptimizeAction optimizeAction,
ClientTransportPutMappingAction putMappingAction, ClientTransportGatewaySnapshotAction gatewaySnapshotAction) {
ClientTransportPutMappingAction putMappingAction, ClientTransportGatewaySnapshotAction gatewaySnapshotAction,
ClientTransportIndicesAliasesAction indicesAliasesAction) {
super(settings);
this.nodesService = nodesService;
this.indicesStatusAction = indicesStatusAction;
@ -91,6 +97,7 @@ public class InternalTransportIndicesAdminClient extends AbstractComponent imple
this.optimizeAction = optimizeAction;
this.putMappingAction = putMappingAction;
this.gatewaySnapshotAction = gatewaySnapshotAction;
this.indicesAliasesAction = indicesAliasesAction;
}
@Override public ActionFuture<IndicesStatusResponse> status(final IndicesStatusRequest request) {
@ -228,4 +235,21 @@ public class InternalTransportIndicesAdminClient extends AbstractComponent imple
}
});
}
@Override public ActionFuture<IndicesAliasesResponse> indicesAliases(final IndicesAliasesRequest request) {
return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<IndicesAliasesResponse>>() {
@Override public ActionFuture<IndicesAliasesResponse> doWithNode(Node node) throws ElasticSearchException {
return indicesAliasesAction.execute(node, request);
}
});
}
@Override public void aliases(final IndicesAliasesRequest request, final ActionListener<IndicesAliasesResponse> listener) {
nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
@Override public Void doWithNode(Node node) throws ElasticSearchException {
indicesAliasesAction.execute(node, request, listener);
return null;
}
});
}
}

View File

@ -0,0 +1,104 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class AliasAction implements Streamable {
public static enum Type {
ADD((byte) 0),
REMOVE((byte) 1);
private final byte value;
Type(byte value) {
this.value = value;
}
public byte value() {
return value;
}
public static Type fromValue(byte value) {
if (value == 0) {
return ADD;
} else if (value == 1) {
return REMOVE;
} else {
throw new ElasticSearchIllegalArgumentException("No type for action [" + value + "]");
}
}
}
private Type actionType;
private String index;
private String alias;
private AliasAction() {
}
public AliasAction(Type actionType, String index, String alias) {
this.actionType = actionType;
this.index = index;
this.alias = alias;
}
public Type actionType() {
return actionType;
}
public String index() {
return index;
}
public String alias() {
return alias;
}
public static AliasAction readAliasAction(StreamInput in) throws IOException {
AliasAction aliasAction = new AliasAction();
aliasAction.readFrom(in);
return aliasAction;
}
@Override public void readFrom(StreamInput in) throws IOException {
actionType = Type.fromValue(in.readByte());
index = in.readUTF();
alias = in.readUTF();
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeByte(actionType.value());
out.writeUTF(index);
out.writeUTF(alias);
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cluster.metadata;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.JsonToken;
import org.elasticsearch.util.MapBuilder;
@ -39,7 +40,7 @@ import java.util.Map;
import static org.elasticsearch.util.settings.ImmutableSettings.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
@Immutable
public class IndexMetaData {
@ -50,6 +51,8 @@ public class IndexMetaData {
private final String index;
private final ImmutableSet<String> aliases;
private final Settings settings;
private final ImmutableMap<String, String> mappings;
@ -63,6 +66,8 @@ public class IndexMetaData {
this.settings = settings;
this.mappings = mappings;
this.totalNumberOfShards = numberOfShards() * (numberOfReplicas() + 1);
this.aliases = ImmutableSet.of(settings.getAsArray("index.aliases"));
}
public String index() {
@ -85,6 +90,10 @@ public class IndexMetaData {
return settings;
}
public ImmutableSet<String> aliases() {
return this.aliases;
}
public ImmutableMap<String, String> mappings() {
return mappings;
}

View File

@ -20,9 +20,14 @@
package org.elasticsearch.cluster.metadata;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.UnmodifiableIterator;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.JsonToken;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.util.MapBuilder;
import org.elasticsearch.util.Nullable;
import org.elasticsearch.util.concurrent.Immutable;
@ -34,11 +39,14 @@ import org.elasticsearch.util.json.ToJson;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;
import java.util.*;
import static com.google.common.collect.Lists.*;
import static com.google.common.collect.Sets.*;
import static org.elasticsearch.util.MapBuilder.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
@Immutable
public class MetaData implements Iterable<IndexMetaData> {
@ -52,6 +60,13 @@ public class MetaData implements Iterable<IndexMetaData> {
private final transient int totalNumberOfShards;
private final String[] allIndices;
private final ImmutableSet<String> aliases;
private final ImmutableMap<String, String[]> aliasAndIndexToIndexMap;
private final ImmutableMap<String, ImmutableSet<String>> aliasAndIndexToIndexMap2;
private MetaData(ImmutableMap<String, IndexMetaData> indices, int maxNumberOfShardsPerNode) {
this.indices = ImmutableMap.copyOf(indices);
this.maxNumberOfShardsPerNode = maxNumberOfShardsPerNode;
@ -60,12 +75,118 @@ public class MetaData implements Iterable<IndexMetaData> {
totalNumberOfShards += indexMetaData.totalNumberOfShards();
}
this.totalNumberOfShards = totalNumberOfShards;
// build all indices map
List<String> allIndicesLst = Lists.newArrayList();
for (IndexMetaData indexMetaData : indices.values()) {
allIndicesLst.add(indexMetaData.index());
}
allIndices = allIndicesLst.toArray(new String[allIndicesLst.size()]);
// build aliases set
Set<String> aliases = newHashSet();
for (IndexMetaData indexMetaData : indices.values()) {
aliases.addAll(indexMetaData.aliases());
}
this.aliases = ImmutableSet.copyOf(aliases);
// build aliasAndIndex to Index map
MapBuilder<String, Set<String>> tmpAliasAndIndexToIndexBuilder = newMapBuilder();
for (IndexMetaData indexMetaData : indices.values()) {
Set<String> lst = tmpAliasAndIndexToIndexBuilder.get(indexMetaData.index());
if (lst == null) {
lst = newHashSet();
tmpAliasAndIndexToIndexBuilder.put(indexMetaData.index(), lst);
}
lst.add(indexMetaData.index());
for (String alias : indexMetaData.aliases()) {
lst = tmpAliasAndIndexToIndexBuilder.get(alias);
if (lst == null) {
lst = newHashSet();
tmpAliasAndIndexToIndexBuilder.put(alias, lst);
}
lst.add(indexMetaData.index());
}
}
MapBuilder<String, String[]> aliasAndIndexToIndexBuilder = newMapBuilder();
for (Map.Entry<String, Set<String>> entry : tmpAliasAndIndexToIndexBuilder.map().entrySet()) {
aliasAndIndexToIndexBuilder.put(entry.getKey(), entry.getValue().toArray(new String[entry.getValue().size()]));
}
this.aliasAndIndexToIndexMap = aliasAndIndexToIndexBuilder.immutableMap();
MapBuilder<String, ImmutableSet<String>> aliasAndIndexToIndexBuilder2 = newMapBuilder();
for (Map.Entry<String, Set<String>> entry : tmpAliasAndIndexToIndexBuilder.map().entrySet()) {
aliasAndIndexToIndexBuilder2.put(entry.getKey(), ImmutableSet.copyOf(entry.getValue()));
}
this.aliasAndIndexToIndexMap2 = aliasAndIndexToIndexBuilder2.immutableMap();
}
public ImmutableSet<String> aliases() {
return this.aliases;
}
/**
* Returns all the concrete indices.
*/
public String[] concreteAllIndices() {
return allIndices;
}
/**
* Translates the provided indices (possibly aliased) into actual indices.
*/
public String[] concreteIndices(String[] indices) throws IndexMissingException {
if (indices == null || indices.length == 0) {
return concreteAllIndices();
}
if (indices.length == 1) {
if (indices[0].length() == 0) {
return concreteAllIndices();
}
if (indices[0].equals("_all")) {
return concreteAllIndices();
}
}
ArrayList<String> actualIndices = newArrayListWithExpectedSize(indices.length);
for (String index : indices) {
String[] actualLst = aliasAndIndexToIndexMap.get(index);
if (actualLst == null) {
throw new IndexMissingException(new Index(index));
}
for (String x : actualLst) {
actualIndices.add(x);
}
}
return actualIndices.toArray(new String[actualIndices.size()]);
}
public String concreteIndex(String index) throws IndexMissingException, ElasticSearchIllegalArgumentException {
// a quick check, if this is an actual index, if so, return it
if (indices.containsKey(index)) {
return index;
}
// not an actual index, fetch from an alias
String[] lst = aliasAndIndexToIndexMap.get(index);
if (lst == null) {
throw new IndexMissingException(new Index(index));
}
if (lst.length > 1) {
throw new ElasticSearchIllegalArgumentException("Alias [" + index + "] has more than one indices associated with it [" + Arrays.toString(lst) + "], can't execute a single index op");
}
return lst[0];
}
public boolean hasIndex(String index) {
return indices.containsKey(index);
}
public boolean hasConcreteIndex(String index) {
return aliasAndIndexToIndexMap2.get(index) != null;
}
public IndexMetaData index(String index) {
return indices.get(index);
}
@ -106,6 +227,10 @@ public class MetaData implements Iterable<IndexMetaData> {
return this;
}
public IndexMetaData get(String index) {
return indices.get(index);
}
public Builder remove(String index) {
indices.remove(index);
return this;

View File

@ -19,7 +19,6 @@
package org.elasticsearch.cluster.metadata;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.ClusterService;
@ -48,12 +47,14 @@ import org.elasticsearch.util.component.AbstractComponent;
import org.elasticsearch.util.settings.ImmutableSettings;
import org.elasticsearch.util.settings.Settings;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static com.google.common.collect.Maps.*;
import static com.google.common.collect.Sets.*;
import static org.elasticsearch.cluster.ClusterState.*;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.cluster.metadata.MetaData.*;
@ -65,6 +66,7 @@ import static org.elasticsearch.util.settings.ImmutableSettings.*;
*/
public class MetaDataService extends AbstractComponent {
private final ClusterService clusterService;
private final ShardsRoutingStrategy shardsRoutingStrategy;
@ -91,8 +93,50 @@ public class MetaDataService extends AbstractComponent {
// TODO should find nicer solution than sync here, since we block for timeout (same for other ops)
public synchronized IndicesAliasesResult indicesAliases(final List<AliasAction> aliasActions) {
ClusterState clusterState = clusterService.state();
for (AliasAction aliasAction : aliasActions) {
if (!clusterState.metaData().hasIndex(aliasAction.index())) {
throw new IndexMissingException(new Index(aliasAction.index()));
}
}
clusterService.submitStateUpdateTask("index-aliases", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData());
for (AliasAction aliasAction : aliasActions) {
IndexMetaData indexMetaData = builder.get(aliasAction.index());
if (indexMetaData == null) {
throw new IndexMissingException(new Index(aliasAction.index()));
}
Set<String> indexAliases = newHashSet(indexMetaData.settings().getAsArray("index.aliases"));
if (aliasAction.actionType() == AliasAction.Type.ADD) {
indexAliases.add(aliasAction.alias());
} else if (aliasAction.actionType() == AliasAction.Type.REMOVE) {
indexAliases.remove(aliasAction.alias());
}
Settings settings = settingsBuilder().put(indexMetaData.settings())
.putArray("index.aliases", indexAliases.toArray(new String[indexAliases.size()]))
.build();
builder.put(newIndexMetaDataBuilder(indexMetaData).settings(settings));
}
return newClusterStateBuilder().state(currentState).metaData(builder).build();
}
});
return new IndicesAliasesResult();
}
public synchronized CreateIndexResult createIndex(final String index, final Settings indexSettings, final Map<String, String> mappings, TimeValue timeout) throws IndexAlreadyExistsException {
if (clusterService.state().routingTable().hasIndex(index)) {
ClusterState clusterState = clusterService.state();
if (clusterState.routingTable().hasIndex(index)) {
throw new IndexAlreadyExistsException(new Index(index));
}
if (clusterState.metaData().hasIndex(index)) {
throw new IndexAlreadyExistsException(new Index(index));
}
if (index.contains(" ")) {
@ -113,6 +157,9 @@ public class MetaDataService extends AbstractComponent {
if (!Strings.validFileName(index)) {
throw new InvalidIndexNameException(new Index(index), index, "must not contain the following characters " + Strings.INVALID_FILENAME_CHARS);
}
if (clusterState.metaData().aliases().contains(index)) {
throw new InvalidIndexNameException(new Index(index), index, "an alias with the same name already exists");
}
final CountDownLatch latch = new CountDownLatch(clusterService.state().nodes().size());
NodeIndexCreatedAction.Listener nodeCreatedListener = new NodeIndexCreatedAction.Listener() {
@ -306,7 +353,7 @@ public class MetaDataService extends AbstractComponent {
}
final CountDownLatch latch = new CountDownLatch(clusterService.state().nodes().size() * indices.length);
final Set<String> indicesSet = Sets.newHashSet(indices);
final Set<String> indicesSet = newHashSet(indices);
final String fMappingType = mappingType;
NodeMappingCreatedAction.Listener listener = new NodeMappingCreatedAction.Listener() {
@Override public void onNodeMappingCreated(NodeMappingCreatedAction.NodeMappingCreatedResponse response) {
@ -385,4 +432,10 @@ public class MetaDataService extends AbstractComponent {
return acknowledged;
}
}
public static class IndicesAliasesResult {
public IndicesAliasesResult() {
}
}
}

View File

@ -44,7 +44,8 @@ import static org.elasticsearch.util.json.JsonBuilder.*;
/**
* @author kimchy (shay.banon)
*/
public class JsonDocumentMapper implements DocumentMapper, ToJson {
public class
JsonDocumentMapper implements DocumentMapper, ToJson {
public static class Builder {

View File

@ -27,6 +27,7 @@ import org.elasticsearch.rest.action.admin.cluster.ping.broadcast.RestBroadcastP
import org.elasticsearch.rest.action.admin.cluster.ping.replication.RestReplicationPingAction;
import org.elasticsearch.rest.action.admin.cluster.ping.single.RestSinglePingAction;
import org.elasticsearch.rest.action.admin.cluster.state.RestClusterStateAction;
import org.elasticsearch.rest.action.admin.indices.alias.RestIndicesAliasesAction;
import org.elasticsearch.rest.action.admin.indices.create.RestCreateIndexAction;
import org.elasticsearch.rest.action.admin.indices.delete.RestDeleteIndexAction;
import org.elasticsearch.rest.action.admin.indices.flush.RestFlushAction;
@ -64,6 +65,7 @@ public class RestActionModule extends AbstractModule {
bind(RestReplicationPingAction.class).asEagerSingleton();
bind(RestIndicesStatusAction.class).asEagerSingleton();
bind(RestIndicesAliasesAction.class).asEagerSingleton();
bind(RestCreateIndexAction.class).asEagerSingleton();
bind(RestDeleteIndexAction.class).asEagerSingleton();

View File

@ -0,0 +1,138 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.indices.alias;
import com.google.inject.Inject;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.JsonToken;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.AliasAction;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestJsonBuilder;
import org.elasticsearch.util.json.Jackson;
import org.elasticsearch.util.json.JsonBuilder;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;
import static org.elasticsearch.rest.RestRequest.Method.*;
import static org.elasticsearch.rest.RestResponse.Status.*;
/**
* @author kimchy (shay.banon)
*/
public class RestIndicesAliasesAction extends BaseRestHandler {
@Inject public RestIndicesAliasesAction(Settings settings, Client client, RestController controller) {
super(settings, client);
controller.registerHandler(POST, "/_aliases", this);
}
@Override public void handleRequest(final RestRequest request, final RestChannel channel) {
IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
try {
// {
// actions : [
// { add : { index : "test1", alias : "alias1" } }
// { remove : { index : "test1", alias : "alias1" } }
// ]
// }
JsonParser jp = Jackson.defaultJsonFactory().createJsonParser(request.contentAsStream());
JsonToken token = jp.nextToken();
if (token == null) {
throw new ElasticSearchIllegalArgumentException("No action is specified");
}
while ((token = jp.nextToken()) != JsonToken.END_OBJECT) {
if (token == JsonToken.START_ARRAY) {
while ((token = jp.nextToken()) != JsonToken.END_ARRAY) {
if (token == JsonToken.FIELD_NAME) {
String action = jp.getCurrentName();
AliasAction.Type type;
if ("add".equals(action)) {
type = AliasAction.Type.ADD;
} else if ("remove".equals(action)) {
type = AliasAction.Type.REMOVE;
} else {
throw new ElasticSearchIllegalArgumentException("Alias action [" + action + "] not supported");
}
String index = null;
String alias = null;
String currentFieldName = null;
while ((token = jp.nextToken()) != JsonToken.END_OBJECT) {
if (token == JsonToken.FIELD_NAME) {
currentFieldName = jp.getCurrentName();
} else if (token == JsonToken.VALUE_STRING) {
if ("index".equals(currentFieldName)) {
index = jp.getText();
} else if ("alias".equals(currentFieldName)) {
alias = jp.getText();
}
}
}
if (index == null) {
throw new ElasticSearchIllegalArgumentException("Alias action [" + action + "] requires an [index] to be set");
}
if (alias == null) {
throw new ElasticSearchIllegalArgumentException("Alias action [" + action + "] requires an [alias] to be set");
}
if (type == AliasAction.Type.ADD) {
indicesAliasesRequest.addAlias(index, alias);
} else if (type == AliasAction.Type.REMOVE) {
indicesAliasesRequest.removeAlias(index, alias);
}
}
}
}
}
} catch (Exception e) {
try {
channel.sendResponse(new JsonThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.warn("Failed to send response", e1);
return;
}
}
client.admin().indices().aliases(indicesAliasesRequest, new ActionListener<IndicesAliasesResponse>() {
@Override public void onResponse(IndicesAliasesResponse response) {
try {
JsonBuilder builder = RestJsonBuilder.restJsonBuilder(request);
builder.startObject()
.field("ok", true)
.endObject();
channel.sendResponse(new JsonRestResponse(request, OK, builder));
} catch (Exception e) {
onFailure(e);
}
}
@Override public void onFailure(Throwable e) {
try {
channel.sendResponse(new JsonThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.error("Failed to send failure response", e1);
}
}
});
}
}

View File

@ -41,7 +41,7 @@ import static org.elasticsearch.rest.RestResponse.Status.*;
import static org.elasticsearch.util.TimeValue.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class RestCreateIndexAction extends BaseRestHandler {

View File

@ -70,6 +70,8 @@ public class RestIndicesStatusAction extends BaseRestHandler {
for (IndexStatus indexStatus : response.indices().values()) {
builder.startObject(indexStatus.index());
builder.array("aliases", indexStatus.settings().getAsArray("index.aliases"));
builder.startObject("settings");
for (Map.Entry<String, String> entry : indexStatus.settings().getAsMap().entrySet()) {
builder.field(entry.getKey(), entry.getValue());

View File

@ -254,6 +254,29 @@ public class ImmutableSettings implements Settings {
return Collections.unmodifiableMap(retVal);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ImmutableSettings that = (ImmutableSettings) o;
if (classLoader != null ? !classLoader.equals(that.classLoader) : that.classLoader != null) return false;
if (globalSettings != null ? !globalSettings.equals(that.globalSettings) : that.globalSettings != null)
return false;
if (settings != null ? !settings.equals(that.settings) : that.settings != null) return false;
return true;
}
@Override
public int hashCode() {
int result = settings != null ? settings.hashCode() : 0;
result = 31 * result + (globalSettings != null ? globalSettings.hashCode() : 0);
result = 31 * result + (classLoader != null ? classLoader.hashCode() : 0);
return result;
}
private static ClassLoader buildClassLoader() {
return Classes.getDefaultClassLoader();
}
@ -421,6 +444,27 @@ public class ImmutableSettings implements Settings {
return this;
}
/**
* Sets the setting with the provided setting key and an array of values.
*
* @param setting The setting key
* @param values The values
* @return The builder
*/
public Builder putArray(String setting, String... values) {
int counter = 0;
while (true) {
String value = map.remove(setting + '.' + (counter++));
if (value == null) {
break;
}
}
for (int i = 0; i < values.length; i++) {
put(setting + "." + i, values[i]);
}
return this;
}
/**
* Sets the setting group.
*/

View File

@ -0,0 +1,114 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.integration.aliases;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.test.integration.AbstractServersTests;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.elasticsearch.client.Requests.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (shay.banon)
*/
@Test
public class IndexAliasesTests extends AbstractServersTests {
protected Client client1;
protected Client client2;
@BeforeMethod public void startServers() {
startServer("server1");
startServer("server2");
client1 = getClient1();
client2 = getClient2();
}
@AfterMethod public void closeServers() {
client1.close();
client2.close();
closeAllServers();
}
protected Client getClient1() {
return client("server1");
}
protected Client getClient2() {
return client("server2");
}
@Test public void testAliases() throws Exception {
logger.info("Creating index [test]");
client1.admin().indices().create(createIndexRequest("test")).actionGet();
logger.info("Running Cluster Health");
ClusterHealthResponse clusterHealth = client1.admin().cluster().health(clusterHealth().waitForGreenStatus()).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
try {
logger.info("Indexing against [alias1], should fail");
client1.index(indexRequest("alias1").type("type1").id("1").source(source("1", "test"))).actionGet();
assert false : "index [alias1] should not exists";
} catch (IndexMissingException e) {
assertThat(e.index().name(), equalTo("alias1"));
}
logger.info("Aliasing index [test] with [alias1]");
client1.admin().indices().indicesAliases(indexAliasesRequest().addAlias("test", "alias1")).actionGet();
Thread.sleep(300);
logger.info("Indexing against [alias1], should work now");
IndexResponse indexResponse = client1.index(indexRequest("alias1").type("type1").id("1").source(source("1", "test"))).actionGet();
assertThat(indexResponse.index(), equalTo("test"));
logger.info("Creating index [test]");
client1.admin().indices().create(createIndexRequest("test_x")).actionGet();
logger.info("Running Cluster Health");
clusterHealth = client1.admin().cluster().health(clusterHealth().waitForGreenStatus()).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
logger.info("Remove [alias1], Aliasing index [test_x] with [alias1]");
client1.admin().indices().indicesAliases(indexAliasesRequest().removeAlias("test", "alias1").addAlias("test_x", "alias1")).actionGet();
Thread.sleep(300);
logger.info("Indexing against [alias1], should work against [test_x]");
indexResponse = client1.index(indexRequest("alias1").type("type1").id("1").source(source("1", "test"))).actionGet();
assertThat(indexResponse.index(), equalTo("test_x"));
}
private String source(String id, String nameValue) {
return "{ type1 : { \"id\" : \"" + id + "\", \"name\" : \"" + nameValue + "\" } }";
}
}

View File

@ -0,0 +1,8 @@
cluster:
routing:
schedule: 100ms
index:
numberOfShards: 5
numberOfReplicas: 1
action:
autoCreateIndex: false

View File

@ -0,0 +1,38 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.integration.document;
import static org.elasticsearch.client.Requests.*;
import static org.elasticsearch.util.settings.ImmutableSettings.*;
/**
* @author kimchy (shay.banon)
*/
public class AliasedIndexDocumentActionsTests extends DocumentActionsTests {
protected void createIndex() {
logger.info("Creating index [test1] with alias [test]");
client1.admin().indices().create(createIndexRequest("test1").settings(settingsBuilder().putArray("index.aliases", "test"))).actionGet();
}
@Override protected String getConcreteIndexName() {
return "test1";
}
}

View File

@ -0,0 +1,8 @@
cluster:
routing:
schedule: 100ms
index:
numberOfShards: 5
numberOfReplicas: 1
action:
autoCreateIndex: false

View File

@ -49,14 +49,24 @@ import static org.hamcrest.Matchers.*;
*/
public class DocumentActionsTests extends AbstractServersTests {
private Client client1;
private Client client2;
protected Client client1;
protected Client client2;
@BeforeMethod public void startServers() {
startServer("server1");
startServer("server2");
client1 = getClient1();
client2 = getClient2();
createIndex();
}
protected void createIndex() {
logger.info("Creating index test");
client1.admin().indices().create(createIndexRequest("test")).actionGet();
}
protected String getConcreteIndexName() {
return "test";
}
@AfterMethod public void closeServers() {
@ -74,9 +84,6 @@ public class DocumentActionsTests extends AbstractServersTests {
}
@Test public void testIndexActions() throws Exception {
logger.info("Creating index test");
client1.admin().indices().create(createIndexRequest("test")).actionGet();
logger.info("Running Cluster Health");
ClusterHealthResponse clusterHealth = client1.admin().cluster().health(clusterHealth().waitForGreenStatus()).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
@ -85,6 +92,7 @@ public class DocumentActionsTests extends AbstractServersTests {
logger.info("Indexing [type1/1]");
IndexResponse indexResponse = client1.index(indexRequest("test").type("type1").id("1").source(source("1", "test"))).actionGet();
assertThat(indexResponse.index(), equalTo(getConcreteIndexName()));
assertThat(indexResponse.id(), equalTo("1"));
assertThat(indexResponse.type(), equalTo("type1"));
logger.info("Refreshing");
@ -102,10 +110,12 @@ public class DocumentActionsTests extends AbstractServersTests {
logger.info("Get [type1/1]");
for (int i = 0; i < 5; i++) {
getResult = client1.get(getRequest("test").type("type1").id("1").operationThreaded(false)).actionGet();
assertThat(getResult.index(), equalTo(getConcreteIndexName()));
assertThat("cycle #" + i, getResult.sourceAsString(), equalTo(source("1", "test")));
assertThat("cycle(map) #" + i, (String) ((Map) getResult.sourceAsMap().get("type1")).get("name"), equalTo("test"));
getResult = client1.get(getRequest("test").type("type1").id("1").operationThreaded(true)).actionGet();
assertThat("cycle #" + i, getResult.sourceAsString(), equalTo(source("1", "test")));
assertThat(getResult.index(), equalTo(getConcreteIndexName()));
}
logger.info("Get [type1/2] (should be empty)");
@ -116,6 +126,7 @@ public class DocumentActionsTests extends AbstractServersTests {
logger.info("Delete [type1/1]");
DeleteResponse deleteResponse = client1.delete(deleteRequest("test").type("type1").id("1")).actionGet();
assertThat(deleteResponse.index(), equalTo(getConcreteIndexName()));
assertThat(deleteResponse.id(), equalTo("1"));
assertThat(deleteResponse.type(), equalTo("type1"));
logger.info("Refreshing");
@ -142,9 +153,11 @@ public class DocumentActionsTests extends AbstractServersTests {
logger.info("Get [type1/1] and [type1/2]");
for (int i = 0; i < 5; i++) {
getResult = client1.get(getRequest("test").type("type1").id("1")).actionGet();
assertThat(getResult.index(), equalTo(getConcreteIndexName()));
assertThat("cycle #" + i, getResult.sourceAsString(), equalTo(source("1", "test")));
getResult = client1.get(getRequest("test").type("type1").id("2")).actionGet();
assertThat("cycle #" + i, getResult.sourceAsString(), equalTo(source("2", "test2")));
assertThat(getResult.index(), equalTo(getConcreteIndexName()));
}
logger.info("Count");
@ -176,16 +189,18 @@ public class DocumentActionsTests extends AbstractServersTests {
logger.info("Delete by query");
DeleteByQueryResponse queryResponse = client2.deleteByQuery(deleteByQueryRequest("test").querySource(termQuery("name", "test2"))).actionGet();
assertThat(queryResponse.index("test").successfulShards(), equalTo(5));
assertThat(queryResponse.index("test").failedShards(), equalTo(0));
assertThat(queryResponse.index(getConcreteIndexName()).successfulShards(), equalTo(5));
assertThat(queryResponse.index(getConcreteIndexName()).failedShards(), equalTo(0));
client1.admin().indices().refresh(refreshRequest("test")).actionGet();
logger.info("Get [type1/1] and [type1/2], should be empty");
for (int i = 0; i < 5; i++) {
getResult = client1.get(getRequest("test").type("type1").id("1")).actionGet();
assertThat(getResult.index(), equalTo(getConcreteIndexName()));
assertThat("cycle #" + i, getResult.sourceAsString(), equalTo(source("1", "test")));
getResult = client1.get(getRequest("test").type("type1").id("2")).actionGet();
assertThat("cycle #" + i, getResult.exists(), equalTo(false));
assertThat(getResult.index(), equalTo(getConcreteIndexName()));
}
}