process internal refresh/update mapping events per index

this allows us to only create the index (if needed) for the exact duration it is needed, and not across processing of all the events
This commit is contained in:
Shay Banon 2013-07-24 21:34:33 +02:00
parent 426c2867d9
commit 993fad4c33
1 changed files with 131 additions and 107 deletions

View File

@ -20,13 +20,13 @@
package org.elasticsearch.cluster.metadata; package org.elasticsearch.cluster.metadata;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest; import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction; import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction;
import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.CompressedString; import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
@ -106,23 +106,47 @@ public class MetaDataMappingService extends AbstractComponent {
* and generate a single cluster change event out of all of those. * and generate a single cluster change event out of all of those.
*/ */
ClusterState executeRefreshOrUpdate(final ClusterState currentState) throws Exception { ClusterState executeRefreshOrUpdate(final ClusterState currentState) throws Exception {
List<Object> tasks = new ArrayList<Object>(); List<Object> allTasks = new ArrayList<Object>();
refreshOrUpdateQueue.drainTo(tasks); refreshOrUpdateQueue.drainTo(allTasks);
if (tasks.isEmpty()) { if (allTasks.isEmpty()) {
return currentState; return currentState;
} }
Set<String> indicesToRemove = Sets.newHashSet(); // break down to tasks per index, so we can optimize the on demand index service creation
// keep track of what we already refreshed, no need to refresh it again... // to only happen for the duration of a single index processing of its respective events
Set<Tuple<String, String>> processedRefreshes = Sets.newHashSet(); Map<String, List<Object>> tasksPerIndex = Maps.newHashMap();
try { for (Object task : allTasks) {
String index = null;
if (task instanceof UpdateTask) {
index = ((UpdateTask) task).index;
} else if (task instanceof RefreshTask) {
index = ((RefreshTask) task).index;
} else {
logger.warn("illegal state, got wrong mapping task type [{}]", task);
}
if (index != null) {
List<Object> indexTasks = tasksPerIndex.get(index);
if (indexTasks == null) {
indexTasks = new ArrayList<Object>();
tasksPerIndex.put(index, indexTasks);
}
indexTasks.add(task);
}
}
boolean dirty = false; boolean dirty = false;
MetaData.Builder mdBuilder = newMetaDataBuilder().metaData(currentState.metaData()); MetaData.Builder mdBuilder = newMetaDataBuilder().metaData(currentState.metaData());
for (Map.Entry<String, List<Object>> entry : tasksPerIndex.entrySet()) {
String index = entry.getKey();
List<Object> tasks = entry.getValue();
boolean removeIndex = false;
// keep track of what we already refreshed, no need to refresh it again...
Set<String> processedRefreshes = Sets.newHashSet();
try {
for (Object task : tasks) { for (Object task : tasks) {
if (task instanceof RefreshTask) { if (task instanceof RefreshTask) {
RefreshTask refreshTask = (RefreshTask) task; RefreshTask refreshTask = (RefreshTask) task;
String index = refreshTask.index;
final IndexMetaData indexMetaData = mdBuilder.get(index); final IndexMetaData indexMetaData = mdBuilder.get(index);
if (indexMetaData == null) { if (indexMetaData == null) {
// index got delete on us, ignore... // index got delete on us, ignore...
@ -132,7 +156,7 @@ public class MetaDataMappingService extends AbstractComponent {
if (indexService == null) { if (indexService == null) {
// we need to create the index here, and add the current mapping to it, so we can merge // we need to create the index here, and add the current mapping to it, so we can merge
indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id()); indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id());
indicesToRemove.add(index); removeIndex = true;
for (String type : refreshTask.types) { for (String type : refreshTask.types) {
// only add the current relevant mapping (if exists) // only add the current relevant mapping (if exists)
if (indexMetaData.mappings().containsKey(type)) { if (indexMetaData.mappings().containsKey(type)) {
@ -144,8 +168,7 @@ public class MetaDataMappingService extends AbstractComponent {
IndexMetaData.Builder indexMetaDataBuilder = newIndexMetaDataBuilder(indexMetaData); IndexMetaData.Builder indexMetaDataBuilder = newIndexMetaDataBuilder(indexMetaData);
List<String> updatedTypes = Lists.newArrayList(); List<String> updatedTypes = Lists.newArrayList();
for (String type : refreshTask.types) { for (String type : refreshTask.types) {
Tuple<String, String> processedRefresh = Tuple.tuple(index, type); if (processedRefreshes.contains(type)) {
if (processedRefreshes.contains(processedRefresh)) {
continue; continue;
} }
DocumentMapper mapper = indexService.mapperService().documentMapper(type); DocumentMapper mapper = indexService.mapperService().documentMapper(type);
@ -153,7 +176,7 @@ public class MetaDataMappingService extends AbstractComponent {
updatedTypes.add(type); updatedTypes.add(type);
indexMetaDataBuilder.putMapping(new MappingMetaData(mapper)); indexMetaDataBuilder.putMapping(new MappingMetaData(mapper));
} }
processedRefreshes.add(processedRefresh); processedRefreshes.add(type);
} }
if (updatedTypes.isEmpty()) { if (updatedTypes.isEmpty()) {
@ -166,7 +189,6 @@ public class MetaDataMappingService extends AbstractComponent {
} else if (task instanceof UpdateTask) { } else if (task instanceof UpdateTask) {
UpdateTask updateTask = (UpdateTask) task; UpdateTask updateTask = (UpdateTask) task;
String index = updateTask.index;
String type = updateTask.type; String type = updateTask.type;
CompressedString mappingSource = updateTask.mappingSource; CompressedString mappingSource = updateTask.mappingSource;
@ -184,7 +206,7 @@ public class MetaDataMappingService extends AbstractComponent {
if (indexService == null) { if (indexService == null) {
// we need to create the index here, and add the current mapping to it, so we can merge // we need to create the index here, and add the current mapping to it, so we can merge
indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id()); indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id());
indicesToRemove.add(index); removeIndex = true;
// only add the current relevant mapping (if exists) // only add the current relevant mapping (if exists)
if (indexMetaData.mappings().containsKey(type)) { if (indexMetaData.mappings().containsKey(type)) {
indexService.mapperService().merge(type, indexMetaData.mappings().get(type).source().string(), false); indexService.mapperService().merge(type, indexMetaData.mappings().get(type).source().string(), false);
@ -192,7 +214,7 @@ public class MetaDataMappingService extends AbstractComponent {
} }
DocumentMapper updatedMapper = indexService.mapperService().merge(type, mappingSource.string(), false); DocumentMapper updatedMapper = indexService.mapperService().merge(type, mappingSource.string(), false);
processedRefreshes.add(Tuple.tuple(index, type)); processedRefreshes.add(type);
// if we end up with the same mapping as the original once, ignore // if we end up with the same mapping as the original once, ignore
if (indexMetaData.mappings().containsKey(type) && indexMetaData.mapping(type).source().equals(updatedMapper.mappingSource())) { if (indexMetaData.mappings().containsKey(type) && indexMetaData.mapping(type).source().equals(updatedMapper.mappingSource())) {
@ -216,12 +238,8 @@ public class MetaDataMappingService extends AbstractComponent {
logger.warn("illegal state, got wrong mapping task type [{}]", task); logger.warn("illegal state, got wrong mapping task type [{}]", task);
} }
} }
if (!dirty) {
return currentState;
}
return newClusterStateBuilder().state(currentState).metaData(mdBuilder).build();
} finally { } finally {
for (String index : indicesToRemove) { if (removeIndex) {
indicesService.removeIndex(index, "created for mapping processing"); indicesService.removeIndex(index, "created for mapping processing");
} }
for (Object task : tasks) { for (Object task : tasks) {
@ -232,6 +250,12 @@ public class MetaDataMappingService extends AbstractComponent {
} }
} }
if (!dirty) {
return currentState;
}
return newClusterStateBuilder().state(currentState).metaData(mdBuilder).build();
}
/** /**
* Refreshes mappings if they are not the same between original and parsed version * Refreshes mappings if they are not the same between original and parsed version
*/ */