Create Index API: Better logic to wait for ack for indices being created on nodes, closes #879.

This commit is contained in:
kimchy 2011-04-27 00:24:27 +03:00
parent 6ace8ba0d8
commit 6ecbef463f
1 changed files with 96 additions and 4 deletions

View File

@ -23,10 +23,13 @@ import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.ShardsAllocation;
import org.elasticsearch.common.Strings;
@ -55,11 +58,15 @@ import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.cluster.ClusterState.*;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
@ -73,25 +80,31 @@ public class MetaDataCreateIndexService extends AbstractComponent {
private final Environment environment;
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final IndicesService indicesService;
private final ShardsAllocation shardsAllocation;
private final NodeIndexCreatedAction nodeIndexCreatedAction;
private final String riverIndexName;
@Inject public MetaDataCreateIndexService(Settings settings, Environment environment, ClusterService clusterService, IndicesService indicesService,
ShardsAllocation shardsAllocation, @RiverIndexName String riverIndexName) {
@Inject public MetaDataCreateIndexService(Settings settings, Environment environment, ThreadPool threadPool, ClusterService clusterService, IndicesService indicesService,
ShardsAllocation shardsAllocation, NodeIndexCreatedAction nodeIndexCreatedAction, @RiverIndexName String riverIndexName) {
super(settings);
this.environment = environment;
this.threadPool = threadPool;
this.clusterService = clusterService;
this.indicesService = indicesService;
this.shardsAllocation = shardsAllocation;
this.nodeIndexCreatedAction = nodeIndexCreatedAction;
this.riverIndexName = riverIndexName;
}
public void createIndex(final Request request, final Listener listener) {
public void createIndex(final Request request, final Listener userListener) {
ImmutableSettings.Builder updatedSettingsBuilder = ImmutableSettings.settingsBuilder();
for (Map.Entry<String, String> entry : request.settings.getAsMap().entrySet()) {
if (!entry.getKey().startsWith("index.")) {
@ -101,8 +114,12 @@ public class MetaDataCreateIndexService extends AbstractComponent {
}
}
request.settings(updatedSettingsBuilder.build());
final CreateIndexListener listener = new CreateIndexListener(request, userListener);
clusterService.submitStateUpdateTask("create-index [" + request.index + "], cause [" + request.cause + "]", new ProcessedClusterStateUpdateTask() {
final Set<String> allocatedNodes = Sets.newHashSet();
@Override public ClusterState execute(ClusterState currentState) {
try {
try {
@ -254,6 +271,44 @@ public class MetaDataCreateIndexService extends AbstractComponent {
updatedState = newClusterStateBuilder().state(updatedState).routingResult(routingResult).build();
}
// initialize the counter only for nodes the shards are allocated to
if (updatedState.routingTable().hasIndex(request.index)) {
for (IndexShardRoutingTable indexShardRoutingTable : updatedState.routingTable().index(request.index)) {
for (ShardRouting shardRouting : indexShardRoutingTable) {
// if we have a routing for this shard on a node, and its not the master node (since we already created
// an index on it), then add it
if (shardRouting.currentNodeId() != null && !updatedState.nodes().localNodeId().equals(shardRouting.currentNodeId())) {
allocatedNodes.add(shardRouting.currentNodeId());
}
}
}
}
if (!allocatedNodes.isEmpty()) {
final AtomicInteger counter = new AtomicInteger(allocatedNodes.size());
final NodeIndexCreatedAction.Listener nodeIndexCreatedListener = new NodeIndexCreatedAction.Listener() {
@Override public void onNodeIndexCreated(String index, String nodeId) {
if (index.equals(request.index)) {
if (counter.decrementAndGet() == 0) {
listener.onResponse(new Response(true, indexMetaData));
nodeIndexCreatedAction.remove(this);
}
}
}
};
nodeIndexCreatedAction.add(nodeIndexCreatedListener);
listener.future = threadPool.schedule(request.timeout, ThreadPool.Names.SAME, new Runnable() {
@Override public void run() {
listener.onResponse(new Response(false, indexMetaData));
nodeIndexCreatedAction.remove(nodeIndexCreatedListener);
}
});
}
return updatedState;
} catch (Exception e) {
logger.warn("[{}] failed to create", e, request.index);
@ -263,11 +318,48 @@ public class MetaDataCreateIndexService extends AbstractComponent {
}
@Override public void clusterStateProcessed(ClusterState clusterState) {
listener.onResponse(new Response(true, clusterState.metaData().index(request.index)));
if (allocatedNodes.isEmpty()) {
listener.onResponse(new Response(true, clusterState.metaData().index(request.index)));
}
}
});
}
class CreateIndexListener implements Listener {
private AtomicBoolean notified = new AtomicBoolean();
private final Request request;
private final Listener listener;
volatile ScheduledFuture future;
private CreateIndexListener(Request request, Listener listener) {
this.request = request;
this.listener = listener;
}
@Override public void onResponse(final Response response) {
if (notified.compareAndSet(false, true)) {
if (future != null) {
future.cancel(false);
}
listener.onResponse(response);
}
}
@Override public void onFailure(Throwable t) {
if (notified.compareAndSet(false, true)) {
if (future != null) {
future.cancel(false);
}
listener.onFailure(t);
}
}
}
private Map<String, Object> parseMapping(String mappingSource) throws Exception {
return XContentFactory.xContent(mappingSource).createParser(mappingSource).mapAndClose();
}