Call callback on actual mapping processed

only callback the registered callback listeners when mapping have actually been processed...
closes #6748
This commit is contained in:
Shay Banon 2014-07-06 02:19:49 +02:00
parent 8d793391da
commit b471aeb24c
1 changed files with 53 additions and 31 deletions

View File

@ -25,12 +25,10 @@ import com.google.common.collect.Sets;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingClusterStateUpdateRequest;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.inject.Inject;
@ -116,19 +114,19 @@ public class MetaDataMappingService extends AbstractComponent {
* as possible so we won't create the same index all the time for example for the updates on the same mapping
* and generate a single cluster change event out of all of those.
*/
ClusterState executeRefreshOrUpdate(final ClusterState currentState, final long insertionOrder) throws Exception {
Tuple<ClusterState, List<MappingTask>> executeRefreshOrUpdate(final ClusterState currentState, final long insertionOrder) throws Exception {
final List<MappingTask> allTasks = new ArrayList<>();
synchronized (refreshOrUpdateMutex) {
if (refreshOrUpdateQueue.isEmpty()) {
return currentState;
return Tuple.tuple(currentState, allTasks);
}
// we already processed this task in a bulk manner in a previous cluster event, simply ignore
// it so we will let other tasks get in and processed ones, we will handle the queued ones
// later on in a subsequent cluster state event
if (insertionOrder < refreshOrUpdateProcessedInsertOrder) {
return currentState;
return Tuple.tuple(currentState, allTasks);
}
allTasks.addAll(refreshOrUpdateQueue);
@ -138,7 +136,7 @@ public class MetaDataMappingService extends AbstractComponent {
}
if (allTasks.isEmpty()) {
return currentState;
return Tuple.tuple(currentState, allTasks);
}
// break down to tasks per index, so we can optimize the on demand index service creation
@ -245,25 +243,10 @@ public class MetaDataMappingService extends AbstractComponent {
}
}
// fork sending back updates, so we won't wait to send them back on the cluster state, there
// might be a few of those...
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
for (Object task : allTasks) {
if (task instanceof UpdateTask) {
UpdateTask uTask = (UpdateTask) task;
ClusterStateUpdateResponse response = new ClusterStateUpdateResponse(true);
uTask.listener.onResponse(response);
}
}
}
});
if (!dirty) {
return currentState;
return Tuple.tuple(currentState, allTasks);
}
return ClusterState.builder(currentState).metaData(mdBuilder).build();
return Tuple.tuple(ClusterState.builder(currentState).metaData(mdBuilder).build(), allTasks);
}
private boolean processIndexMappingTasks(List<MappingTask> tasks, IndexService indexService, IndexMetaData.Builder builder) {
@ -349,7 +332,9 @@ public class MetaDataMappingService extends AbstractComponent {
insertOrder = ++refreshOrUpdateInsertOrder;
refreshOrUpdateQueue.add(new RefreshTask(index, indexUUID, types));
}
clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]", Priority.HIGH, new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]", Priority.HIGH, new ProcessedClusterStateUpdateTask() {
private volatile List<MappingTask> allTasks;
@Override
public void onFailure(String source, Throwable t) {
logger.warn("failure during [{}]", t, source);
@ -357,7 +342,23 @@ public class MetaDataMappingService extends AbstractComponent {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return executeRefreshOrUpdate(currentState, insertOrder);
Tuple<ClusterState, List<MappingTask>> tuple = executeRefreshOrUpdate(currentState, insertOrder);
this.allTasks = tuple.v2();
return tuple.v1();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (allTasks == null) {
return;
}
for (Object task : allTasks) {
if (task instanceof UpdateTask) {
UpdateTask uTask = (UpdateTask) task;
ClusterStateUpdateResponse response = new ClusterStateUpdateResponse(true);
uTask.listener.onResponse(response);
}
}
}
});
}
@ -368,15 +369,36 @@ public class MetaDataMappingService extends AbstractComponent {
insertOrder = ++refreshOrUpdateInsertOrder;
refreshOrUpdateQueue.add(new UpdateTask(index, indexUUID, type, mappingSource, order, nodeId, listener));
}
clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "] / node [" + nodeId + "], order [" + order + "]", Priority.HIGH, new ClusterStateUpdateTask() {
@Override
clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "] / node [" + nodeId + "], order [" + order + "]", Priority.HIGH, new ProcessedClusterStateUpdateTask() {
private volatile List<MappingTask> allTasks;
public void onFailure(String source, Throwable t) {
listener.onFailure(t);
}
@Override
public ClusterState execute(final ClusterState currentState) throws Exception {
return executeRefreshOrUpdate(currentState, insertOrder);
Tuple<ClusterState, List<MappingTask>> tuple = executeRefreshOrUpdate(currentState, insertOrder);
this.allTasks = tuple.v2();
return tuple.v1();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (allTasks == null) {
return;
}
for (Object task : allTasks) {
if (task instanceof UpdateTask) {
UpdateTask uTask = (UpdateTask) task;
ClusterStateUpdateResponse response = new ClusterStateUpdateResponse(true);
try {
uTask.listener.onResponse(response);
} catch (Throwable t) {
logger.debug("failed ot ping back on response of mapping processing for task [{}]", t, uTask.listener);
}
}
}
}
});
}
@ -400,7 +422,7 @@ public class MetaDataMappingService extends AbstractComponent {
for (String indexName : request.indices()) {
IndexMetaData indexMetaData = currentState.metaData().index(indexName);
IndexMetaData.Builder indexBuilder = IndexMetaData.builder(indexMetaData);
if (indexMetaData != null) {
boolean isLatestIndexWithout = true;
for (String type : request.types()) {