Put Mapping: Fix rare case where the response will timeout (ack=false) even though it has been applied, closes #990.

This commit is contained in:
kimchy 2011-06-02 18:23:23 +03:00
parent e8ba7a7974
commit 2bf4a6766e
1 changed files with 46 additions and 29 deletions

View File

@ -26,7 +26,6 @@ import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction; import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction;
import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.CompressedString; import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
@ -37,7 +36,6 @@ import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MergeMappingException; import org.elasticsearch.index.mapper.MergeMappingException;
import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InvalidTypeNameException; import org.elasticsearch.indices.InvalidTypeNameException;
@ -46,6 +44,7 @@ import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.cluster.ClusterState.*; import static org.elasticsearch.cluster.ClusterState.*;
@ -340,7 +339,23 @@ public class MetaDataMappingService extends AbstractComponent {
} }
} }
return newClusterStateBuilder().state(currentState).metaData(builder).build(); ClusterState updatedState = newClusterStateBuilder().state(currentState).metaData(builder).build();
// wait for responses from other nodes if needed
int counter = 0;
for (String index : request.indices) {
IndexRoutingTable indexRoutingTable = updatedState.routingTable().index(index);
if (indexRoutingTable != null) {
counter += indexRoutingTable.numberOfNodesShardsAreAllocatedOn(updatedState.nodes().masterNodeId());
}
}
if (counter == 0) {
listener.onResponse(new Response(true));
return updatedState;
}
mappingCreatedAction.add(new CountDownListener(counter, listener), request.timeout);
return updatedState;
} catch (Exception e) { } catch (Exception e) {
listener.onFailure(e); listener.onFailure(e);
return currentState; return currentState;
@ -352,32 +367,6 @@ public class MetaDataMappingService extends AbstractComponent {
} }
@Override public void clusterStateProcessed(ClusterState clusterState) { @Override public void clusterStateProcessed(ClusterState clusterState) {
int counter = 0;
for (String index : request.indices) {
IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(index);
if (indexRoutingTable != null) {
counter += indexRoutingTable.numberOfNodesShardsAreAllocatedOn(clusterState.nodes().masterNodeId());
}
}
if (counter == 0) {
listener.onResponse(new Response(true));
return;
}
final AtomicInteger countDown = new AtomicInteger(counter);
mappingCreatedAction.add(new NodeMappingCreatedAction.Listener() {
@Override public void onNodeMappingCreated(NodeMappingCreatedAction.NodeMappingCreatedResponse response) {
if (countDown.decrementAndGet() == 0) {
mappingCreatedAction.remove(this);
listener.onResponse(new Response(true));
}
}
@Override public void onTimeout() {
listener.onResponse(new Response(false));
}
}, request.timeout);
} }
}); });
} }
@ -441,4 +430,32 @@ public class MetaDataMappingService extends AbstractComponent {
return acknowledged; return acknowledged;
} }
} }
private class CountDownListener implements NodeMappingCreatedAction.Listener {
private final AtomicBoolean notified = new AtomicBoolean();
private final AtomicInteger countDown;
private final Listener listener;
public CountDownListener(int countDown, Listener listener) {
this.countDown = new AtomicInteger(countDown);
this.listener = listener;
}
@Override public void onNodeMappingCreated(NodeMappingCreatedAction.NodeMappingCreatedResponse response) {
if (countDown.decrementAndGet() == 0) {
mappingCreatedAction.remove(this);
if (notified.compareAndSet(false, true)) {
listener.onResponse(new Response(true));
}
}
}
@Override public void onTimeout() {
mappingCreatedAction.remove(this);
if (notified.compareAndSet(false, true)) {
listener.onResponse(new Response(false));
}
}
}
} }