wait for mappings to be created on other nodes before returning from the put mapping request

This commit is contained in:
kimchy 2011-01-15 02:13:57 +02:00
parent 64e5784623
commit 2381f668f8
3 changed files with 60 additions and 3 deletions

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.VoidStreamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
@ -60,8 +61,16 @@ public class NodeMappingCreatedAction extends AbstractComponent {
transportService.registerHandler(NodeMappingCreatedTransportHandler.ACTION, new NodeMappingCreatedTransportHandler());
}
public void add(Listener listener) {
public void add(final Listener listener, TimeValue timeout) {
listeners.add(listener);
threadPool.schedule(new Runnable() {
@Override public void run() {
boolean removed = listeners.remove(listener);
if (removed) {
listener.onTimeout();
}
}
}, timeout);
}
public void remove(Listener listener) {
@ -91,6 +100,8 @@ public class NodeMappingCreatedAction extends AbstractComponent {
public static interface Listener {
void onNodeMappingCreated(NodeMappingCreatedResponse response);
void onTimeout();
}
private class NodeMappingCreatedTransportHandler extends BaseTransportRequestHandler<NodeMappingCreatedResponse> {

View File

@ -23,6 +23,8 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.inject.Inject;
@ -39,6 +41,7 @@ import org.elasticsearch.indices.InvalidTypeNameException;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.cluster.ClusterState.*;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
@ -55,10 +58,13 @@ public class MetaDataMappingService extends AbstractComponent {
private final IndicesService indicesService;
@Inject public MetaDataMappingService(Settings settings, ClusterService clusterService, IndicesService indicesService) {
private final NodeMappingCreatedAction mappingCreatedAction;
@Inject public MetaDataMappingService(Settings settings, ClusterService clusterService, IndicesService indicesService, NodeMappingCreatedAction mappingCreatedAction) {
super(settings);
this.clusterService = clusterService;
this.indicesService = indicesService;
this.mappingCreatedAction = mappingCreatedAction;
}
public void updateMapping(final String index, final String type, final CompressedString mappingSource) {
@ -267,7 +273,35 @@ public class MetaDataMappingService extends AbstractComponent {
}
@Override public void clusterStateProcessed(ClusterState clusterState) {
listener.onResponse(new Response(true));
int counter = 0;
for (String index : request.indices) {
IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(index);
if (indexRoutingTable != null) {
counter += indexRoutingTable.numberOfNodesShardsAreAllocatedOn();
}
}
if (counter > 0) {
counter = counter - 1; // we already added the mapping on the master here...
}
if (counter == 0) {
listener.onResponse(new Response(true));
return;
}
final AtomicInteger countDown = new AtomicInteger(counter);
mappingCreatedAction.add(new NodeMappingCreatedAction.Listener() {
@Override public void onNodeMappingCreated(NodeMappingCreatedAction.NodeMappingCreatedResponse response) {
if (countDown.decrementAndGet() == 0) {
mappingCreatedAction.remove(this);
listener.onResponse(new Response(true));
}
}
@Override public void onTimeout() {
listener.onResponse(new Response(false));
}
}, request.timeout);
}
});
}

View File

@ -102,6 +102,18 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
return shards.values().iterator();
}
public int numberOfNodesShardsAreAllocatedOn() {
Set<String> nodes = Sets.newHashSet();
for (IndexShardRoutingTable shardRoutingTable : this) {
for (ShardRouting shardRouting : shardRoutingTable) {
if (shardRouting.assignedToNode()) {
nodes.add(shardRouting.currentNodeId());
}
}
}
return nodes.size();
}
public ImmutableMap<Integer, IndexShardRoutingTable> shards() {
return shards;
}