bulk refresh and update mapping cluster events

try and bulk as much as possible refresh and update mapping events, so they will all be processed at a single go, resulting in less cluster change events, and also reduce the load of multiple changes to the same index
also, change the prio for those to HIGH, since we want URGENT ones (like create index, delete index) to execute
This commit is contained in:
Shay Banon 2013-07-24 01:55:32 +02:00
parent 6a5d2bf767
commit bb6df34671
2 changed files with 118 additions and 86 deletions

View File

@ -60,12 +60,7 @@ public class NodeMappingRefreshAction extends AbstractComponent {
public void nodeMappingRefresh(final NodeMappingRefreshRequest request) throws ElasticSearchException {
DiscoveryNodes nodes = clusterService.state().nodes();
if (nodes.localNodeMaster()) {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
innerMappingRefresh(request);
}
});
innerMappingRefresh(request);
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(),
NodeMappingRefreshTransportHandler.ACTION, request, EmptyTransportResponseHandler.INSTANCE_SAME);

View File

@ -20,7 +20,6 @@
package org.elasticsearch.cluster.metadata;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.cluster.*;
@ -32,6 +31,7 @@ import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
@ -43,11 +43,8 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InvalidTypeNameException;
import org.elasticsearch.indices.TypeMissingException;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -68,7 +65,7 @@ public class MetaDataMappingService extends AbstractComponent {
private final NodeMappingCreatedAction mappingCreatedAction;
private final Map<String, Set<String>> indicesAndTypesToRefresh = Maps.newHashMap();
private final BlockingQueue<Object> refreshOrUpdateQueue = ConcurrentCollections.newBlockingQueue();
@Inject
public MetaDataMappingService(Settings settings, ClusterService clusterService, IndicesService indicesService, NodeMappingCreatedAction mappingCreatedAction) {
@ -78,50 +75,62 @@ public class MetaDataMappingService extends AbstractComponent {
this.mappingCreatedAction = mappingCreatedAction;
}
/**
* Refreshes mappings if they are not the same between original and parsed version
*/
public void refreshMapping(final String index, final String... types) {
synchronized (indicesAndTypesToRefresh) {
Set<String> sTypes = indicesAndTypesToRefresh.get(index);
if (sTypes == null) {
sTypes = Sets.newHashSet();
indicesAndTypesToRefresh.put(index, sTypes);
}
sTypes.addAll(Arrays.asList(types));
static class RefreshTask {
final String index;
final String[] types;
RefreshTask(String index, String[] types) {
this.index = index;
this.types = types;
}
clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]", Priority.URGENT, new ClusterStateUpdateTask() {
@Override
public void onFailure(String source, Throwable t) {
logger.warn("failure during [{}]", t, source);
}
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
boolean createdIndex = false;
try {
Set<String> sTypes;
synchronized (indicesAndTypesToRefresh) {
sTypes = indicesAndTypesToRefresh.remove(index);
}
// we already processed those types...
if (sTypes == null || sTypes.isEmpty()) {
return currentState;
}
static class UpdateTask {
final String index;
final String type;
final CompressedString mappingSource;
final Listener listener;
// first, check if it really needs to be updated
final IndexMetaData indexMetaData = currentState.metaData().index(index);
UpdateTask(String index, String type, CompressedString mappingSource, Listener listener) {
this.index = index;
this.type = type;
this.mappingSource = mappingSource;
this.listener = listener;
}
}
/**
* Batch method to apply all the queued refresh or update operations. The idea is to try and batch as much
* as possible so we won't create the same index all the time for example for the updates on the same mapping
* and generate a single cluster change event out of all of those.
*/
ClusterState executeRefreshOrUpdate(final ClusterState currentState) throws Exception {
List<Object> tasks = new ArrayList<Object>();
refreshOrUpdateQueue.drainTo(tasks);
if (tasks.isEmpty()) {
return currentState;
}
Set<String> indicesToRemove = Sets.newHashSet();
try {
boolean dirty = false;
MetaData.Builder mdBuilder = newMetaDataBuilder().metaData(currentState.metaData());
for (Object task : tasks) {
if (task instanceof RefreshTask) {
RefreshTask refreshTask = (RefreshTask) task;
String index = refreshTask.index;
final IndexMetaData indexMetaData = mdBuilder.get(index);
if (indexMetaData == null) {
// index got delete on us, ignore...
return currentState;
continue;
}
IndexService indexService = indicesService.indexService(index);
if (indexService == null) {
// we need to create the index here, and add the current mapping to it, so we can merge
indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id());
createdIndex = true;
for (String type : sTypes) {
indicesToRemove.add(index);
for (String type : refreshTask.types) {
// only add the current relevant mapping (if exists)
if (indexMetaData.mappings().containsKey(type)) {
// don't apply the default mapping, it has been applied when the mapping was created
@ -131,7 +140,7 @@ public class MetaDataMappingService extends AbstractComponent {
}
IndexMetaData.Builder indexMetaDataBuilder = newIndexMetaDataBuilder(indexMetaData);
List<String> updatedTypes = Lists.newArrayList();
for (String type : sTypes) {
for (String type : refreshTask.types) {
DocumentMapper mapper = indexService.mapperService().documentMapper(type);
if (!mapper.mappingSource().equals(indexMetaData.mappings().get(type).source())) {
updatedTypes.add(type);
@ -140,49 +149,34 @@ public class MetaDataMappingService extends AbstractComponent {
}
if (updatedTypes.isEmpty()) {
return currentState;
continue;
}
logger.warn("[{}] re-syncing mappings with cluster state for types [{}]", index, updatedTypes);
MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData());
builder.put(indexMetaDataBuilder);
return newClusterStateBuilder().state(currentState).metaData(builder).build();
} finally {
if (createdIndex) {
indicesService.removeIndex(index, "created for mapping processing");
}
}
}
});
}
mdBuilder.put(indexMetaDataBuilder);
dirty = true;
public void updateMapping(final String index, final String type, final CompressedString mappingSource, final Listener listener) {
} else if (task instanceof UpdateTask) {
UpdateTask updateTask = (UpdateTask) task;
String index = updateTask.index;
String type = updateTask.type;
CompressedString mappingSource = updateTask.mappingSource;
clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
@Override
public void onFailure(String source, Throwable t) {
listener.onFailure(t);
}
@Override
public ClusterState execute(final ClusterState currentState) throws Exception {
boolean createdIndex = false;
try {
// first, check if it really needs to be updated
final IndexMetaData indexMetaData = currentState.metaData().index(index);
final IndexMetaData indexMetaData = mdBuilder.get(index);
if (indexMetaData == null) {
// index got delete on us, ignore...
return currentState;
continue;
}
if (indexMetaData.mappings().containsKey(type) && indexMetaData.mapping(type).source().equals(mappingSource)) {
return currentState;
continue;
}
IndexService indexService = indicesService.indexService(index);
if (indexService == null) {
// we need to create the index here, and add the current mapping to it, so we can merge
indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id());
createdIndex = true;
indicesToRemove.add(index);
// only add the current relevant mapping (if exists)
if (indexMetaData.mappings().containsKey(type)) {
indexService.mapperService().merge(type, indexMetaData.mappings().get(type).source().string(), false);
@ -193,29 +187,72 @@ public class MetaDataMappingService extends AbstractComponent {
// if we end up with the same mapping as the original once, ignore
if (indexMetaData.mappings().containsKey(type) && indexMetaData.mapping(type).source().equals(updatedMapper.mappingSource())) {
return currentState;
continue;
}
// build the updated mapping source
if (logger.isDebugEnabled()) {
try {
logger.debug("[{}] update_mapping [{}] (dynamic) with source [{}]", index, type, updatedMapper.mappingSource().string());
} catch (IOException e) {
} catch (Exception e) {
// ignore
}
} else if (logger.isInfoEnabled()) {
logger.info("[{}] update_mapping [{}] (dynamic)", index, type);
}
MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData());
builder.put(newIndexMetaDataBuilder(indexMetaData).putMapping(new MappingMetaData(updatedMapper)));
return newClusterStateBuilder().state(currentState).metaData(builder).build();
} finally {
if (createdIndex) {
indicesService.removeIndex(index, "created for mapping processing");
}
mdBuilder.put(newIndexMetaDataBuilder(indexMetaData).putMapping(new MappingMetaData(updatedMapper)));
dirty = true;
} else {
logger.warn("illegal state, got wrong mapping task type [{}]", task);
}
}
if (!dirty) {
return currentState;
}
return newClusterStateBuilder().state(currentState).metaData(mdBuilder).build();
} finally {
for (String index : indicesToRemove) {
indicesService.removeIndex(index, "created for mapping processing");
}
for (Object task : tasks) {
if (task instanceof UpdateTask) {
((UpdateTask) task).listener.onResponse(new Response(true));
}
}
}
}
/**
* Refreshes mappings if they are not the same between original and parsed version
*/
public void refreshMapping(final String index, final String... types) {
refreshOrUpdateQueue.add(new RefreshTask(index, types));
clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]", Priority.HIGH, new ClusterStateUpdateTask() {
@Override
public void onFailure(String source, Throwable t) {
logger.warn("failure during [{}]", t, source);
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return executeRefreshOrUpdate(currentState);
}
});
}
public void updateMapping(final String index, final String type, final CompressedString mappingSource, final Listener listener) {
refreshOrUpdateQueue.add(new UpdateTask(index, type, mappingSource, listener));
clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "]", Priority.HIGH, new ProcessedClusterStateUpdateTask() {
@Override
public void onFailure(String source, Throwable t) {
listener.onFailure(t);
}
@Override
public ClusterState execute(final ClusterState currentState) throws Exception {
return executeRefreshOrUpdate(currentState);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
@ -225,7 +262,7 @@ public class MetaDataMappingService extends AbstractComponent {
}
public void removeMapping(final RemoveRequest request, final Listener listener) {
clusterService.submitStateUpdateTask("remove-mapping [" + request.mappingType + "]", Priority.URGENT, new TimeoutClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("remove-mapping [" + request.mappingType + "]", Priority.HIGH, new TimeoutClusterStateUpdateTask() {
@Override
public TimeValue timeout() {
return request.masterTimeout;
@ -275,7 +312,7 @@ public class MetaDataMappingService extends AbstractComponent {
public void putMapping(final PutRequest request, final Listener listener) {
final AtomicBoolean notifyOnPostProcess = new AtomicBoolean();
clusterService.submitStateUpdateTask("put-mapping [" + request.mappingType + "]", Priority.URGENT, new TimeoutClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("put-mapping [" + request.mappingType + "]", Priority.HIGH, new TimeoutClusterStateUpdateTask() {
@Override
public TimeValue timeout() {
return request.masterTimeout;