Postponed acknowledging put mapping requests to after master has finished processed them

Also - TransportMasterNodeOperationAction was potentially use stale cluster state

Closes #3487
This commit is contained in:
Boaz Leskes 2013-08-12 16:10:59 +02:00
parent 4b25e6b63e
commit ab6163898f
6 changed files with 47 additions and 24 deletions

View File

@ -57,6 +57,7 @@ public class TransportGetMappingsAction extends TransportClusterInfoAction<GetMa
@Override
protected void doMasterOperation(final GetMappingsRequest request, final ClusterState state, final ActionListener<GetMappingsResponse> listener) throws ElasticSearchException {
logger.debug("Serving getMapping request based on version {}", state.version());
ImmutableMap<String, ImmutableMap<String, MappingMetaData>> result = state.metaData().findMappings(
request.indices(), request.types()
);

View File

@ -141,7 +141,7 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
@Override
public void run() {
try {
masterOperation(request, clusterState, listener);
masterOperation(request, clusterService.state(), listener);
} catch (Throwable e) {
listener.onFailure(e);
}

View File

@ -294,6 +294,7 @@ public class MetaDataMappingService extends AbstractComponent {
public void removeMapping(final RemoveRequest request, final Listener listener) {
clusterService.submitStateUpdateTask("remove-mapping [" + request.mappingType + "]", Priority.HIGH, new TimeoutClusterStateUpdateTask() {
@Override
public TimeValue timeout() {
return request.masterTimeout;
@ -342,8 +343,11 @@ public class MetaDataMappingService extends AbstractComponent {
}
public void putMapping(final PutRequest request, final Listener listener) {
final AtomicBoolean notifyOnPostProcess = new AtomicBoolean();
clusterService.submitStateUpdateTask("put-mapping [" + request.mappingType + "]", Priority.HIGH, new TimeoutClusterStateUpdateTask() {
CountDownListener countDownListener; // used to count ack responses before confirming operation is complete
@Override
public TimeValue timeout() {
return request.masterTimeout;
@ -479,8 +483,8 @@ public class MetaDataMappingService extends AbstractComponent {
ClusterState updatedState = newClusterStateBuilder().state(currentState).metaData(builder).build();
// wait for responses from other nodes if needed
int counter = 0;
// counter the number of nodes participating so we can wait for responses from other nodes if needed
int counter = 1; // this mast node
for (String index : request.indices) {
IndexRoutingTable indexRoutingTable = updatedState.routingTable().index(index);
if (indexRoutingTable != null) {
@ -488,11 +492,9 @@ public class MetaDataMappingService extends AbstractComponent {
}
}
if (counter == 0) {
notifyOnPostProcess.set(true);
return updatedState;
}
mappingCreatedAction.add(new CountDownListener(counter, listener), request.timeout);
countDownListener = new CountDownListener(counter, listener);
mappingCreatedAction.add(countDownListener, request.timeout);
return updatedState;
} finally {
for (String index : indicesToClose) {
@ -503,8 +505,9 @@ public class MetaDataMappingService extends AbstractComponent {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (notifyOnPostProcess.get()) {
listener.onResponse(new Response(true));
if (countDownListener != null) {
// notify we did stuff on our end.
countDownListener.onNodeMappingCreated(null);
}
}
});
@ -594,6 +597,8 @@ public class MetaDataMappingService extends AbstractComponent {
@Override
public void onNodeMappingCreated(NodeMappingCreatedAction.NodeMappingCreatedResponse response) {
// response may be null - see clusterStateProcessed implementation in {@link MetaDataMappingService#putMapping}
if (countDown.decrementAndGet() == 0) {
mappingCreatedAction.remove(this);
if (notified.compareAndSet(false, true)) {

View File

@ -250,7 +250,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
UpdateTask updateTask = (UpdateTask) pending.task;
source = updateTask.source;
timeInQueue = now - updateTask.addedAt;
} else {
} else {
source = "unknown";
timeInQueue = -1;
}
@ -367,10 +367,12 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
// we publish here before we send a notification to all the listeners, since if it fails
// we don't want to notify
if (newClusterState.nodes().localNodeMaster()) {
logger.debug("Publishing cluster state version {}", newClusterState.version());
discoveryService.publish(newClusterState);
}
// update the current cluster state
logger.debug("Updating cluster state version {}: {}", newClusterState.version(), newClusterState);
clusterState = newClusterState;
for (ClusterStateListener listener : priorityClusterStateListeners) {
@ -406,6 +408,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
sb.append(newClusterState.routingTable().prettyPrint());
sb.append(newClusterState.readOnlyRoutingNodes().prettyPrint());
logger.warn(sb.toString(), e);
// TODO: do we want to call updateTask.onFailure here?
}
}
}

View File

@ -149,6 +149,7 @@ public class PublishClusterStateAction extends AbstractComponent {
}
in.setVersion(request.version);
ClusterState clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode());
logger.debug("Received clusterstate version {}", clusterState.version());
listener.onNewClusterState(clusterState);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}

View File

@ -1,6 +1,5 @@
package org.elasticsearch.test.integration.indices.mapping;
import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
@ -112,51 +111,65 @@ public class UpdateMappingTests extends AbstractSharedClusterTest {
@SuppressWarnings("unchecked")
@Test
@AwaitsFix(bugUrl="too flaky - bleskes is on it?")
public void updateDefaultMappingSettings() throws Exception {
// TODO: bleskes: move back to combined index and mapping creation (pending bug fix concerning concurrent not-acked mapping requests)
createIndex("test");
client().admin().indices().preparePutMapping("test").setType(MapperService.DEFAULT_MAPPING).setSource(
logger.info("Creating _default_ mappings");
PutMappingResponse putResponse = client().admin().indices().preparePutMapping("test").setType(MapperService.DEFAULT_MAPPING).setSource(
JsonXContent.contentBuilder().startObject().startObject(MapperService.DEFAULT_MAPPING)
.field("date_detection", false)
.endObject().endObject()
).get();
GetMappingsResponse response = client().admin().indices().prepareGetMappings("test").addTypes(MapperService.DEFAULT_MAPPING).get();
Map<String, Object> defaultMapping = response.getMappings().get("test").get(MapperService.DEFAULT_MAPPING).sourceAsMap();
assertThat(putResponse.isAcknowledged(), equalTo(true));
logger.info("DONE: Creating _default_ mappings");
GetMappingsResponse getResponse = client().admin().indices().prepareGetMappings("test").addTypes(MapperService.DEFAULT_MAPPING).get();
Map<String, Object> defaultMapping = getResponse.getMappings().get("test").get(MapperService.DEFAULT_MAPPING).sourceAsMap();
assertThat(defaultMapping, hasKey("date_detection"));
logger.info("Emptying _default_ mappings");
// now remove it
client().admin().indices().preparePutMapping("test").setType(MapperService.DEFAULT_MAPPING).setSource(
putResponse = client().admin().indices().preparePutMapping("test").setType(MapperService.DEFAULT_MAPPING).setSource(
JsonXContent.contentBuilder().startObject().startObject(MapperService.DEFAULT_MAPPING)
.endObject().endObject()
).get();
assertThat(putResponse.isAcknowledged(), equalTo(true));
logger.info("Done Emptying _default_ mappings");
response = client().admin().indices().prepareGetMappings("test").addTypes(MapperService.DEFAULT_MAPPING).get();
defaultMapping = response.getMappings().get("test").get(MapperService.DEFAULT_MAPPING).sourceAsMap();
getResponse = client().admin().indices().prepareGetMappings("test").addTypes(MapperService.DEFAULT_MAPPING).get();
defaultMapping = getResponse.getMappings().get("test").get(MapperService.DEFAULT_MAPPING).sourceAsMap();
assertThat(defaultMapping, not(hasKey("date_detection")));
// now test you can change stuff that are normally unchangable
client().admin().indices().preparePutMapping("test").setType(MapperService.DEFAULT_MAPPING).setSource(
logger.info("Creating _default_ mappings with an analyzed field");
putResponse = client().admin().indices().preparePutMapping("test").setType(MapperService.DEFAULT_MAPPING).setSource(
JsonXContent.contentBuilder().startObject().startObject(MapperService.DEFAULT_MAPPING)
.startObject("properties").startObject("f").field("type", "string").field("index", "analyzed").endObject().endObject()
.endObject().endObject()
).get();
assertThat(putResponse.isAcknowledged(), equalTo(true));
client().admin().indices().preparePutMapping("test").setType(MapperService.DEFAULT_MAPPING).setSource(
logger.info("Changing _default_ mappings field from analyzed to non-analyzed");
putResponse = client().admin().indices().preparePutMapping("test").setType(MapperService.DEFAULT_MAPPING).setSource(
JsonXContent.contentBuilder().startObject().startObject(MapperService.DEFAULT_MAPPING)
.startObject("properties").startObject("f").field("type", "string").field("index", "not_analyzed").endObject().endObject()
.endObject().endObject()
).get();
response = client().admin().indices().prepareGetMappings("test").addTypes(MapperService.DEFAULT_MAPPING).get();
defaultMapping = response.getMappings().get("test").get(MapperService.DEFAULT_MAPPING).sourceAsMap();
assertThat(putResponse.isAcknowledged(), equalTo(true));
logger.info("Done changing _default_ mappings field from analyzed to non-analyzed");
getResponse = client().admin().indices().prepareGetMappings("test").addTypes(MapperService.DEFAULT_MAPPING).get();
defaultMapping = getResponse.getMappings().get("test").get(MapperService.DEFAULT_MAPPING).sourceAsMap();
Map<String, Object> fieldSettings = (Map<String, Object>) ((Map) defaultMapping.get("properties")).get("f");
assertThat(fieldSettings, hasEntry("index", (Object) "not_analyzed"));
// but we still validate the _default_ type
logger.info("Confirming _default_ mappings validation");
assertThrows(client().admin().indices().preparePutMapping("test").setType(MapperService.DEFAULT_MAPPING).setSource(
JsonXContent.contentBuilder().startObject().startObject(MapperService.DEFAULT_MAPPING)
.startObject("properties").startObject("f").field("type", "DOESNT_EXIST").endObject().endObject()