better error handling when processing refresh/update mapping
don't fail the whole bulk of updates, just the specific ones, and warn log it
This commit is contained in:
parent
dd86db3347
commit
df4ffbe723
|
@ -160,80 +160,87 @@ public class MetaDataMappingService extends AbstractComponent {
|
||||||
|
|
||||||
if (task instanceof RefreshTask) {
|
if (task instanceof RefreshTask) {
|
||||||
RefreshTask refreshTask = (RefreshTask) task;
|
RefreshTask refreshTask = (RefreshTask) task;
|
||||||
IndexService indexService = indicesService.indexService(index);
|
try {
|
||||||
if (indexService == null) {
|
IndexService indexService = indicesService.indexService(index);
|
||||||
// we need to create the index here, and add the current mapping to it, so we can merge
|
if (indexService == null) {
|
||||||
indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id());
|
// we need to create the index here, and add the current mapping to it, so we can merge
|
||||||
removeIndex = true;
|
indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id());
|
||||||
|
removeIndex = true;
|
||||||
|
for (String type : refreshTask.types) {
|
||||||
|
// only add the current relevant mapping (if exists)
|
||||||
|
if (indexMetaData.mappings().containsKey(type)) {
|
||||||
|
// don't apply the default mapping, it has been applied when the mapping was created
|
||||||
|
indexService.mapperService().merge(type, indexMetaData.mappings().get(type).source(), false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(indexMetaData);
|
||||||
|
List<String> updatedTypes = Lists.newArrayList();
|
||||||
for (String type : refreshTask.types) {
|
for (String type : refreshTask.types) {
|
||||||
|
if (processedRefreshes.contains(type)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
DocumentMapper mapper = indexService.mapperService().documentMapper(type);
|
||||||
|
if (!mapper.mappingSource().equals(indexMetaData.mappings().get(type).source())) {
|
||||||
|
updatedTypes.add(type);
|
||||||
|
indexMetaDataBuilder.putMapping(new MappingMetaData(mapper));
|
||||||
|
}
|
||||||
|
processedRefreshes.add(type);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (updatedTypes.isEmpty()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.warn("[{}] re-syncing mappings with cluster state for types [{}]", index, updatedTypes);
|
||||||
|
mdBuilder.put(indexMetaDataBuilder);
|
||||||
|
dirty = true;
|
||||||
|
} catch (Throwable t) {
|
||||||
|
logger.warn("[{}] failed to refresh-mapping in cluster state, types [{}]", index, refreshTask.types);
|
||||||
|
}
|
||||||
|
} else if (task instanceof UpdateTask) {
|
||||||
|
UpdateTask updateTask = (UpdateTask) task;
|
||||||
|
try {
|
||||||
|
String type = updateTask.type;
|
||||||
|
CompressedString mappingSource = updateTask.mappingSource;
|
||||||
|
|
||||||
|
if (indexMetaData.mappings().containsKey(type) && indexMetaData.mapping(type).source().equals(mappingSource)) {
|
||||||
|
logger.debug("[{}] update_mapping [{}] ignoring mapping update task as its source is equal to ours", index, updateTask.type);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
IndexService indexService = indicesService.indexService(index);
|
||||||
|
if (indexService == null) {
|
||||||
|
// we need to create the index here, and add the current mapping to it, so we can merge
|
||||||
|
indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id());
|
||||||
|
removeIndex = true;
|
||||||
// only add the current relevant mapping (if exists)
|
// only add the current relevant mapping (if exists)
|
||||||
if (indexMetaData.mappings().containsKey(type)) {
|
if (indexMetaData.mappings().containsKey(type)) {
|
||||||
// don't apply the default mapping, it has been applied when the mapping was created
|
|
||||||
indexService.mapperService().merge(type, indexMetaData.mappings().get(type).source(), false);
|
indexService.mapperService().merge(type, indexMetaData.mappings().get(type).source(), false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(indexMetaData);
|
DocumentMapper updatedMapper = indexService.mapperService().merge(type, mappingSource, false);
|
||||||
List<String> updatedTypes = Lists.newArrayList();
|
processedRefreshes.add(type);
|
||||||
for (String type : refreshTask.types) {
|
|
||||||
if (processedRefreshes.contains(type)) {
|
// if we end up with the same mapping as the original once, ignore
|
||||||
|
if (indexMetaData.mappings().containsKey(type) && indexMetaData.mapping(type).source().equals(updatedMapper.mappingSource())) {
|
||||||
|
logger.debug("[{}] update_mapping [{}] ignoring mapping update task as it results in the same source as what we have", index, updateTask.type);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
DocumentMapper mapper = indexService.mapperService().documentMapper(type);
|
|
||||||
if (!mapper.mappingSource().equals(indexMetaData.mappings().get(type).source())) {
|
// build the updated mapping source
|
||||||
updatedTypes.add(type);
|
if (logger.isDebugEnabled()) {
|
||||||
indexMetaDataBuilder.putMapping(new MappingMetaData(mapper));
|
logger.debug("[{}] update_mapping [{}] (dynamic) with source [{}]", index, type, updatedMapper.mappingSource());
|
||||||
|
} else if (logger.isInfoEnabled()) {
|
||||||
|
logger.info("[{}] update_mapping [{}] (dynamic)", index, type);
|
||||||
}
|
}
|
||||||
processedRefreshes.add(type);
|
|
||||||
|
mdBuilder.put(IndexMetaData.builder(indexMetaData).putMapping(new MappingMetaData(updatedMapper)));
|
||||||
|
dirty = true;
|
||||||
|
} catch (Throwable t) {
|
||||||
|
logger.warn("[{}] failed to update-mapping in cluster state, type [{}]", index, updateTask.type);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (updatedTypes.isEmpty()) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.warn("[{}] re-syncing mappings with cluster state for types [{}]", index, updatedTypes);
|
|
||||||
mdBuilder.put(indexMetaDataBuilder);
|
|
||||||
dirty = true;
|
|
||||||
|
|
||||||
} else if (task instanceof UpdateTask) {
|
|
||||||
UpdateTask updateTask = (UpdateTask) task;
|
|
||||||
String type = updateTask.type;
|
|
||||||
CompressedString mappingSource = updateTask.mappingSource;
|
|
||||||
|
|
||||||
if (indexMetaData.mappings().containsKey(type) && indexMetaData.mapping(type).source().equals(mappingSource)) {
|
|
||||||
logger.debug("[{}] update_mapping [{}] ignoring mapping update task as its source is equal to ours", index, updateTask.type);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
IndexService indexService = indicesService.indexService(index);
|
|
||||||
if (indexService == null) {
|
|
||||||
// we need to create the index here, and add the current mapping to it, so we can merge
|
|
||||||
indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id());
|
|
||||||
removeIndex = true;
|
|
||||||
// only add the current relevant mapping (if exists)
|
|
||||||
if (indexMetaData.mappings().containsKey(type)) {
|
|
||||||
indexService.mapperService().merge(type, indexMetaData.mappings().get(type).source(), false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
DocumentMapper updatedMapper = indexService.mapperService().merge(type, mappingSource, false);
|
|
||||||
processedRefreshes.add(type);
|
|
||||||
|
|
||||||
// if we end up with the same mapping as the original once, ignore
|
|
||||||
if (indexMetaData.mappings().containsKey(type) && indexMetaData.mapping(type).source().equals(updatedMapper.mappingSource())) {
|
|
||||||
logger.debug("[{}] update_mapping [{}] ignoring mapping update task as it results in the same source as what we have", index, updateTask.type);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// build the updated mapping source
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("[{}] update_mapping [{}] (dynamic) with source [{}]", index, type, updatedMapper.mappingSource());
|
|
||||||
} else if (logger.isInfoEnabled()) {
|
|
||||||
logger.info("[{}] update_mapping [{}] (dynamic)", index, type);
|
|
||||||
}
|
|
||||||
|
|
||||||
mdBuilder.put(IndexMetaData.builder(indexMetaData).putMapping(new MappingMetaData(updatedMapper)));
|
|
||||||
dirty = true;
|
|
||||||
} else {
|
} else {
|
||||||
logger.warn("illegal state, got wrong mapping task type [{}]", task);
|
logger.warn("illegal state, got wrong mapping task type [{}]", task);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue