Put Mappings CountDownListener validates cluster state version of incoming change confirmations.

Closes #3508
This commit is contained in:
Boaz Leskes 2013-08-14 17:04:19 +02:00
parent 3eed2625e2
commit 3ac3c7d12c
4 changed files with 37 additions and 19 deletions

View File

@ -76,6 +76,7 @@ public class NodeMappingCreatedAction extends AbstractComponent {
public void nodeMappingCreated(final NodeMappingCreatedResponse response) throws ElasticSearchException {
DiscoveryNodes nodes = clusterService.state().nodes();
logger.debug("Sending mapping created for index {}, type {} (cluster state version: {})", response.index, response.type, response.clusterStateVersion);
if (nodes.localNodeMaster()) {
threadPool.generic().execute(new Runnable() {
@Override
@ -128,14 +129,16 @@ public class NodeMappingCreatedAction extends AbstractComponent {
private String index;
private String type;
private String nodeId;
private long clusterStateVersion;
private NodeMappingCreatedResponse() {
}
public NodeMappingCreatedResponse(String index, String type, String nodeId) {
public NodeMappingCreatedResponse(String index, String type, String nodeId, long clusterStateVersion) {
this.index = index;
this.type = type;
this.nodeId = nodeId;
this.clusterStateVersion = clusterStateVersion;
}
public String index() {
@ -150,12 +153,17 @@ public class NodeMappingCreatedAction extends AbstractComponent {
return nodeId;
}
public long clusterStateVersion() {
return clusterStateVersion;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(index);
out.writeString(type);
out.writeString(nodeId);
out.writeVLong(clusterStateVersion);
}
@Override
@ -164,6 +172,7 @@ public class NodeMappingCreatedAction extends AbstractComponent {
index = in.readString();
type = in.readString();
nodeId = in.readString();
clusterStateVersion = in.readVLong();
}
}
}

View File

@ -492,7 +492,11 @@ public class MetaDataMappingService extends AbstractComponent {
}
}
countDownListener = new CountDownListener(counter, request.indices, request.mappingType, listener);
// TODO: adding one to the version is based on knowledge on how the parent class will increment the version
// move this to the base class or add another callback before publishing the new cluster state so we
// capture it's version.
countDownListener = new CountDownListener(counter, currentState.version() + 1, listener);
mappingCreatedAction.add(countDownListener, request.timeout);
return updatedState;
@ -589,22 +593,22 @@ public class MetaDataMappingService extends AbstractComponent {
private final AtomicBoolean notified = new AtomicBoolean();
private final AtomicInteger countDown;
private final Listener listener;
private final List<String> indices;
private final String type;
private final long minClusterStateVersion;
public CountDownListener(int countDown, String[] indices, String type, Listener listener) {
this.indices = Arrays.asList(indices);
this.type = type;
/**
* @param countDown initial counter value
* @param minClusterStateVersion the minimum cluster state version for which accept responses
* @param listener listener to call when counter reaches 0.
*/
public CountDownListener(int countDown, long minClusterStateVersion, Listener listener) {
this.countDown = new AtomicInteger(countDown);
this.listener = listener;
this.minClusterStateVersion = minClusterStateVersion;
}
@Override
public void onNodeMappingCreated(NodeMappingCreatedAction.NodeMappingCreatedResponse response) {
if (indices.indexOf(response.index()) < 0) {
return;
}
if (type != null && !type.equals(response.type())) {
if (response.clusterStateVersion() < minClusterStateVersion) {
return;
}
decrementCounter();

View File

@ -133,8 +133,9 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
@Override
public void clusterChanged(final ClusterChangedEvent event) {
if (!indicesService.changesAllowed())
if (!indicesService.changesAllowed()) {
return;
}
if (!lifecycle.started()) {
return;
@ -402,7 +403,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
logger.debug("[{}] parsed mapping [{}], and got different sources\noriginal:\n{}\nparsed:\n{}", index, mappingType, mappingSource, mapperService.documentMapper(mappingType).mappingSource());
requiresRefresh = true;
}
nodeMappingCreatedAction.nodeMappingCreated(new NodeMappingCreatedAction.NodeMappingCreatedResponse(index, mappingType, event.state().nodes().localNodeId()));
nodeMappingCreatedAction.nodeMappingCreated(
new NodeMappingCreatedAction.NodeMappingCreatedResponse(index, mappingType, event.state().nodes().localNodeId(), event.state().version()));
} else {
DocumentMapper existingMapper = mapperService.documentMapper(mappingType);
if (!mappingSource.equals(existingMapper.mappingSource())) {
@ -417,7 +419,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// this might happen when upgrading from 0.15 to 0.16
logger.debug("[{}] parsed mapping [{}], and got different sources\noriginal:\n{}\nparsed:\n{}", index, mappingType, mappingSource, mapperService.documentMapper(mappingType).mappingSource());
}
nodeMappingCreatedAction.nodeMappingCreated(new NodeMappingCreatedAction.NodeMappingCreatedResponse(index, mappingType, event.state().nodes().localNodeId()));
nodeMappingCreatedAction.nodeMappingCreated(
new NodeMappingCreatedAction.NodeMappingCreatedResponse(index, mappingType, event.state().nodes().localNodeId(), event.state().version()));
}
}
} catch (Throwable e) {
@ -486,8 +489,9 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
private void applyNewOrUpdatedShards(final ClusterChangedEvent event) throws ElasticSearchException {
if (!indicesService.changesAllowed())
if (!indicesService.changesAllowed()) {
return;
}
RoutingTable routingTable = event.state().routingTable();
RoutingNode routingNodes = event.state().readOnlyRoutingNodes().nodesToShards().get(event.state().nodes().localNodeId());

View File

@ -245,8 +245,6 @@ public class UpdateMappingTests extends AbstractSharedClusterTest {
@Test
public void updateMappingConcurrently() throws Throwable {
// Test that we can concurrently update different indexes and types.
// NOTE: concurrently updating the mapping of the same type and index can still return before all (relevant) nodes are updated.
// The fix for that tracked on issues #3508
int shardNo = Math.max(5, numberOfNodes());
prepareCreate("test1").setSettings("index.number_of_shards", shardNo).execute().actionGet();
@ -263,6 +261,7 @@ public class UpdateMappingTests extends AbstractSharedClusterTest {
for (int j = 0; j < threads.length; j++) {
threads[j] = new Thread(new Runnable() {
@SuppressWarnings("unchecked")
@Override
public void run() {
try {
@ -276,11 +275,12 @@ public class UpdateMappingTests extends AbstractSharedClusterTest {
Client client1 = clientArray.get(i % clientArray.size());
Client client2 = clientArray.get((i + 1) % clientArray.size());
String indexName = i % 2 == 0 ? "test2" : "test1";
String typeName = Thread.currentThread().getName() + "_" + i;
String typeName = "type" + (i % 10);
String fieldName = Thread.currentThread().getName() + "_" + i;
PutMappingResponse response = client1.admin().indices().preparePutMapping(indexName).setType(typeName).setSource(
JsonXContent.contentBuilder().startObject().startObject(typeName)
.startObject("properties").startObject("f").field("type", "string").endObject().endObject()
.startObject("properties").startObject(fieldName).field("type", "string").endObject().endObject()
.endObject().endObject()
).get();
@ -288,6 +288,7 @@ public class UpdateMappingTests extends AbstractSharedClusterTest {
GetMappingsResponse getMappingResponse = client2.admin().indices().prepareGetMappings(indexName).get();
Map<String, MappingMetaData> mappings = getMappingResponse.getMappings().get(indexName);
assertThat(mappings.keySet(), Matchers.hasItem(typeName));
assertThat(((Map<String, Object>) mappings.get(typeName).getSourceAsMap().get("properties")).keySet(), Matchers.hasItem(fieldName));
}
} catch (Throwable t) {
threadException[0] = t;