improve refreshing logic to resync mappings on upgrade, reduce the amount of cluster events processing requires if the even if fired from several nodes / sources
This commit is contained in:
parent
ffc74260ac
commit
24f1f0ff96
|
@ -26,6 +26,8 @@ import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
|
||||||
import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction;
|
import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction;
|
||||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||||
import org.elasticsearch.common.collect.Lists;
|
import org.elasticsearch.common.collect.Lists;
|
||||||
|
import org.elasticsearch.common.collect.Maps;
|
||||||
|
import org.elasticsearch.common.collect.Sets;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.compress.CompressedString;
|
import org.elasticsearch.common.compress.CompressedString;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
|
@ -44,6 +46,7 @@ import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
@ -64,6 +67,8 @@ public class MetaDataMappingService extends AbstractComponent {
|
||||||
|
|
||||||
private final NodeMappingCreatedAction mappingCreatedAction;
|
private final NodeMappingCreatedAction mappingCreatedAction;
|
||||||
|
|
||||||
|
private final Map<String, Set<String>> indicesAndTypesToRefresh = Maps.newHashMap();
|
||||||
|
|
||||||
@Inject public MetaDataMappingService(Settings settings, ClusterService clusterService, IndicesService indicesService, NodeMappingCreatedAction mappingCreatedAction) {
|
@Inject public MetaDataMappingService(Settings settings, ClusterService clusterService, IndicesService indicesService, NodeMappingCreatedAction mappingCreatedAction) {
|
||||||
super(settings);
|
super(settings);
|
||||||
this.clusterService = clusterService;
|
this.clusterService = clusterService;
|
||||||
|
@ -75,10 +80,27 @@ public class MetaDataMappingService extends AbstractComponent {
|
||||||
* Refreshes mappings if they are not the same between original and parsed version
|
* Refreshes mappings if they are not the same between original and parsed version
|
||||||
*/
|
*/
|
||||||
public void refreshMapping(final String index, final String... types) {
|
public void refreshMapping(final String index, final String... types) {
|
||||||
|
synchronized (indicesAndTypesToRefresh) {
|
||||||
|
Set<String> sTypes = indicesAndTypesToRefresh.get(index);
|
||||||
|
if (sTypes == null) {
|
||||||
|
sTypes = Sets.newHashSet();
|
||||||
|
indicesAndTypesToRefresh.put(index, sTypes);
|
||||||
|
}
|
||||||
|
sTypes.addAll(Arrays.asList(types));
|
||||||
|
}
|
||||||
clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]", new ClusterStateUpdateTask() {
|
clusterService.submitStateUpdateTask("refresh-mapping [" + index + "][" + Arrays.toString(types) + "]", new ClusterStateUpdateTask() {
|
||||||
@Override public ClusterState execute(ClusterState currentState) {
|
@Override public ClusterState execute(ClusterState currentState) {
|
||||||
boolean createdIndex = false;
|
boolean createdIndex = false;
|
||||||
try {
|
try {
|
||||||
|
Set<String> sTypes;
|
||||||
|
synchronized (indicesAndTypesToRefresh) {
|
||||||
|
sTypes = indicesAndTypesToRefresh.remove(index);
|
||||||
|
}
|
||||||
|
// we already processed those types...
|
||||||
|
if (sTypes == null || sTypes.isEmpty()) {
|
||||||
|
return currentState;
|
||||||
|
}
|
||||||
|
|
||||||
// first, check if it really needs to be updated
|
// first, check if it really needs to be updated
|
||||||
final IndexMetaData indexMetaData = currentState.metaData().index(index);
|
final IndexMetaData indexMetaData = currentState.metaData().index(index);
|
||||||
if (indexMetaData == null) {
|
if (indexMetaData == null) {
|
||||||
|
@ -91,7 +113,7 @@ public class MetaDataMappingService extends AbstractComponent {
|
||||||
// we need to create the index here, and add the current mapping to it, so we can merge
|
// we need to create the index here, and add the current mapping to it, so we can merge
|
||||||
indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id());
|
indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id());
|
||||||
createdIndex = true;
|
createdIndex = true;
|
||||||
for (String type : types) {
|
for (String type : sTypes) {
|
||||||
// only add the current relevant mapping (if exists)
|
// only add the current relevant mapping (if exists)
|
||||||
if (indexMetaData.mappings().containsKey(type)) {
|
if (indexMetaData.mappings().containsKey(type)) {
|
||||||
indexService.mapperService().add(type, indexMetaData.mappings().get(type).source().string());
|
indexService.mapperService().add(type, indexMetaData.mappings().get(type).source().string());
|
||||||
|
@ -100,7 +122,7 @@ public class MetaDataMappingService extends AbstractComponent {
|
||||||
}
|
}
|
||||||
IndexMetaData.Builder indexMetaDataBuilder = newIndexMetaDataBuilder(indexMetaData);
|
IndexMetaData.Builder indexMetaDataBuilder = newIndexMetaDataBuilder(indexMetaData);
|
||||||
List<String> updatedTypes = Lists.newArrayList();
|
List<String> updatedTypes = Lists.newArrayList();
|
||||||
for (String type : types) {
|
for (String type : sTypes) {
|
||||||
DocumentMapper mapper = indexService.mapperService().documentMapper(type);
|
DocumentMapper mapper = indexService.mapperService().documentMapper(type);
|
||||||
if (!mapper.mappingSource().equals(indexMetaData.mappings().get(type).source())) {
|
if (!mapper.mappingSource().equals(indexMetaData.mappings().get(type).source())) {
|
||||||
updatedTypes.add(type);
|
updatedTypes.add(type);
|
||||||
|
|
Loading…
Reference in New Issue