Batch processing mapping updates can cause missed merged mappings when batching multiple types
when we bulk changes, we need to use the same index metadata builder across the tasks, otherwise we might remove mappings erroneously also, when we check if we can use a higher order mapping, we need to verify that its for the same mapping type
This commit is contained in:
parent
a760f1f54a
commit
10cdb0ae22
|
@ -466,6 +466,10 @@ public class IndexMetaData {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public MappingMetaData mapping(String type) {
|
||||||
|
return mappings.get(type);
|
||||||
|
}
|
||||||
|
|
||||||
public Builder removeMapping(String mappingType) {
|
public Builder removeMapping(String mappingType) {
|
||||||
mappings.remove(mappingType);
|
mappings.remove(mappingType);
|
||||||
return this;
|
return this;
|
||||||
|
|
|
@ -161,9 +161,10 @@ public class MetaDataMappingService extends AbstractComponent {
|
||||||
|
|
||||||
boolean dirty = false;
|
boolean dirty = false;
|
||||||
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
|
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
|
||||||
|
|
||||||
for (Map.Entry<String, List<MappingTask>> entry : tasksPerIndex.entrySet()) {
|
for (Map.Entry<String, List<MappingTask>> entry : tasksPerIndex.entrySet()) {
|
||||||
String index = entry.getKey();
|
String index = entry.getKey();
|
||||||
final IndexMetaData indexMetaData = mdBuilder.get(index);
|
IndexMetaData indexMetaData = mdBuilder.get(index);
|
||||||
if (indexMetaData == null) {
|
if (indexMetaData == null) {
|
||||||
// index got deleted on us, ignore...
|
// index got deleted on us, ignore...
|
||||||
logger.debug("[{}] ignoring tasks - index meta data doesn't exist", index);
|
logger.debug("[{}] ignoring tasks - index meta data doesn't exist", index);
|
||||||
|
@ -188,13 +189,15 @@ public class MetaDataMappingService extends AbstractComponent {
|
||||||
MappingTask existing = tasks.get(i);
|
MappingTask existing = tasks.get(i);
|
||||||
if (existing instanceof UpdateTask) {
|
if (existing instanceof UpdateTask) {
|
||||||
UpdateTask eTask = (UpdateTask) existing;
|
UpdateTask eTask = (UpdateTask) existing;
|
||||||
// if we have the order, and the node id, then we can compare, and replace if applicable
|
if (eTask.type.equals(uTask.type)) {
|
||||||
if (eTask.order != -1 && eTask.nodeId != null) {
|
// if we have the order, and the node id, then we can compare, and replace if applicable
|
||||||
if (eTask.nodeId.equals(uTask.nodeId) && uTask.order > eTask.order) {
|
if (eTask.order != -1 && eTask.nodeId != null) {
|
||||||
// a newer update task, we can replace so we execute it one!
|
if (eTask.nodeId.equals(uTask.nodeId) && uTask.order > eTask.order) {
|
||||||
tasks.set(i, uTask);
|
// a newer update task, we can replace so we execute it one!
|
||||||
add = false;
|
tasks.set(i, uTask);
|
||||||
break;
|
add = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -207,98 +210,37 @@ public class MetaDataMappingService extends AbstractComponent {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// construct the actual index if needed, and make sure the relevant mappings are there
|
||||||
boolean removeIndex = false;
|
boolean removeIndex = false;
|
||||||
// keep track of what we already refreshed, no need to refresh it again...
|
IndexService indexService = indicesService.indexService(index);
|
||||||
Set<String> processedRefreshes = Sets.newHashSet();
|
if (indexService == null) {
|
||||||
try {
|
// we need to create the index here, and add the current mapping to it, so we can merge
|
||||||
|
indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id());
|
||||||
|
removeIndex = true;
|
||||||
|
Set<String> typesToIntroduce = Sets.newHashSet();
|
||||||
for (MappingTask task : tasks) {
|
for (MappingTask task : tasks) {
|
||||||
if (task instanceof RefreshTask) {
|
if (task instanceof UpdateTask) {
|
||||||
RefreshTask refreshTask = (RefreshTask) task;
|
typesToIntroduce.add(((UpdateTask) task).type);
|
||||||
try {
|
} else if (task instanceof RefreshTask) {
|
||||||
IndexService indexService = indicesService.indexService(index);
|
Collections.addAll(typesToIntroduce, ((RefreshTask) task).types);
|
||||||
if (indexService == null) {
|
|
||||||
// we need to create the index here, and add the current mapping to it, so we can merge
|
|
||||||
indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id());
|
|
||||||
removeIndex = true;
|
|
||||||
for (String type : refreshTask.types) {
|
|
||||||
// only add the current relevant mapping (if exists)
|
|
||||||
if (indexMetaData.mappings().containsKey(type)) {
|
|
||||||
// don't apply the default mapping, it has been applied when the mapping was created
|
|
||||||
indexService.mapperService().merge(type, indexMetaData.mappings().get(type).source(), false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(indexMetaData);
|
|
||||||
List<String> updatedTypes = Lists.newArrayList();
|
|
||||||
for (String type : refreshTask.types) {
|
|
||||||
if (processedRefreshes.contains(type)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
DocumentMapper mapper = indexService.mapperService().documentMapper(type);
|
|
||||||
if (!mapper.mappingSource().equals(indexMetaData.mappings().get(type).source())) {
|
|
||||||
updatedTypes.add(type);
|
|
||||||
indexMetaDataBuilder.putMapping(new MappingMetaData(mapper));
|
|
||||||
}
|
|
||||||
processedRefreshes.add(type);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (updatedTypes.isEmpty()) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.warn("[{}] re-syncing mappings with cluster state for types [{}]", index, updatedTypes);
|
|
||||||
mdBuilder.put(indexMetaDataBuilder);
|
|
||||||
dirty = true;
|
|
||||||
} catch (Throwable t) {
|
|
||||||
logger.warn("[{}] failed to refresh-mapping in cluster state, types [{}]", index, refreshTask.types);
|
|
||||||
}
|
|
||||||
} else if (task instanceof UpdateTask) {
|
|
||||||
UpdateTask updateTask = (UpdateTask) task;
|
|
||||||
try {
|
|
||||||
String type = updateTask.type;
|
|
||||||
CompressedString mappingSource = updateTask.mappingSource;
|
|
||||||
|
|
||||||
if (indexMetaData.mappings().containsKey(type) && indexMetaData.mapping(type).source().equals(mappingSource)) {
|
|
||||||
logger.debug("[{}] update_mapping [{}] ignoring mapping update task as its source is equal to ours", index, updateTask.type);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
IndexService indexService = indicesService.indexService(index);
|
|
||||||
if (indexService == null) {
|
|
||||||
// we need to create the index here, and add the current mapping to it, so we can merge
|
|
||||||
indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id());
|
|
||||||
removeIndex = true;
|
|
||||||
// only add the current relevant mapping (if exists)
|
|
||||||
if (indexMetaData.mappings().containsKey(type)) {
|
|
||||||
indexService.mapperService().merge(type, indexMetaData.mappings().get(type).source(), false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
DocumentMapper updatedMapper = indexService.mapperService().merge(type, mappingSource, false);
|
|
||||||
processedRefreshes.add(type);
|
|
||||||
|
|
||||||
// if we end up with the same mapping as the original once, ignore
|
|
||||||
if (indexMetaData.mappings().containsKey(type) && indexMetaData.mapping(type).source().equals(updatedMapper.mappingSource())) {
|
|
||||||
logger.debug("[{}] update_mapping [{}] ignoring mapping update task as it results in the same source as what we have", index, updateTask.type);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// build the updated mapping source
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("[{}] update_mapping [{}] (dynamic) with source [{}]", index, type, updatedMapper.mappingSource());
|
|
||||||
} else if (logger.isInfoEnabled()) {
|
|
||||||
logger.info("[{}] update_mapping [{}] (dynamic)", index, type);
|
|
||||||
}
|
|
||||||
|
|
||||||
mdBuilder.put(IndexMetaData.builder(indexMetaData).putMapping(new MappingMetaData(updatedMapper)));
|
|
||||||
dirty = true;
|
|
||||||
} catch (Throwable t) {
|
|
||||||
logger.warn("[{}] failed to update-mapping in cluster state, type [{}]", index, updateTask.type);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
logger.warn("illegal state, got wrong mapping task type [{}]", task);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for (String type : typesToIntroduce) {
|
||||||
|
// only add the current relevant mapping (if exists)
|
||||||
|
if (indexMetaData.mappings().containsKey(type)) {
|
||||||
|
// don't apply the default mapping, it has been applied when the mapping was created
|
||||||
|
indexService.mapperService().merge(type, indexMetaData.mappings().get(type).source(), false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
IndexMetaData.Builder builder = IndexMetaData.builder(indexMetaData);
|
||||||
|
try {
|
||||||
|
boolean indexDirty = processIndexMappingTasks(tasks, indexService, builder);
|
||||||
|
if (indexDirty) {
|
||||||
|
mdBuilder.put(builder);
|
||||||
|
dirty = true;
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
if (removeIndex) {
|
if (removeIndex) {
|
||||||
indicesService.removeIndex(index, "created for mapping processing");
|
indicesService.removeIndex(index, "created for mapping processing");
|
||||||
|
@ -321,13 +263,86 @@ public class MetaDataMappingService extends AbstractComponent {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
if (!dirty) {
|
if (!dirty) {
|
||||||
return currentState;
|
return currentState;
|
||||||
}
|
}
|
||||||
return ClusterState.builder(currentState).metaData(mdBuilder).build();
|
return ClusterState.builder(currentState).metaData(mdBuilder).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean processIndexMappingTasks(List<MappingTask> tasks, IndexService indexService, IndexMetaData.Builder builder) {
|
||||||
|
boolean dirty = false;
|
||||||
|
String index = indexService.index().name();
|
||||||
|
// keep track of what we already refreshed, no need to refresh it again...
|
||||||
|
Set<String> processedRefreshes = Sets.newHashSet();
|
||||||
|
for (MappingTask task : tasks) {
|
||||||
|
if (task instanceof RefreshTask) {
|
||||||
|
RefreshTask refreshTask = (RefreshTask) task;
|
||||||
|
try {
|
||||||
|
List<String> updatedTypes = Lists.newArrayList();
|
||||||
|
for (String type : refreshTask.types) {
|
||||||
|
if (processedRefreshes.contains(type)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
DocumentMapper mapper = indexService.mapperService().documentMapper(type);
|
||||||
|
if (mapper == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (!mapper.mappingSource().equals(builder.mapping(type).source())) {
|
||||||
|
updatedTypes.add(type);
|
||||||
|
builder.putMapping(new MappingMetaData(mapper));
|
||||||
|
}
|
||||||
|
processedRefreshes.add(type);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (updatedTypes.isEmpty()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.warn("[{}] re-syncing mappings with cluster state for types [{}]", index, updatedTypes);
|
||||||
|
dirty = true;
|
||||||
|
} catch (Throwable t) {
|
||||||
|
logger.warn("[{}] failed to refresh-mapping in cluster state, types [{}]", index, refreshTask.types);
|
||||||
|
}
|
||||||
|
} else if (task instanceof UpdateTask) {
|
||||||
|
UpdateTask updateTask = (UpdateTask) task;
|
||||||
|
try {
|
||||||
|
String type = updateTask.type;
|
||||||
|
CompressedString mappingSource = updateTask.mappingSource;
|
||||||
|
|
||||||
|
MappingMetaData mappingMetaData = builder.mapping(type);
|
||||||
|
if (mappingMetaData != null && mappingMetaData.source().equals(mappingSource)) {
|
||||||
|
logger.debug("[{}] update_mapping [{}] ignoring mapping update task as its source is equal to ours", index, updateTask.type);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
DocumentMapper updatedMapper = indexService.mapperService().merge(type, mappingSource, false);
|
||||||
|
processedRefreshes.add(type);
|
||||||
|
|
||||||
|
// if we end up with the same mapping as the original once, ignore
|
||||||
|
if (mappingMetaData != null && mappingMetaData.source().equals(updatedMapper.mappingSource())) {
|
||||||
|
logger.debug("[{}] update_mapping [{}] ignoring mapping update task as it results in the same source as what we have", index, updateTask.type);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// build the updated mapping source
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("[{}] update_mapping [{}] (dynamic) with source [{}]", index, type, updatedMapper.mappingSource());
|
||||||
|
} else if (logger.isInfoEnabled()) {
|
||||||
|
logger.info("[{}] update_mapping [{}] (dynamic)", index, type);
|
||||||
|
}
|
||||||
|
|
||||||
|
builder.putMapping(new MappingMetaData(updatedMapper));
|
||||||
|
dirty = true;
|
||||||
|
} catch (Throwable t) {
|
||||||
|
logger.warn("[{}] failed to update-mapping in cluster state, type [{}]", index, updateTask.type);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logger.warn("illegal state, got wrong mapping task type [{}]", task);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return dirty;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Refreshes mappings if they are not the same between original and parsed version
|
* Refreshes mappings if they are not the same between original and parsed version
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -655,13 +655,13 @@ public class DocumentMapper implements ToXContent {
|
||||||
return new MergeResult(mergeContext.buildConflicts());
|
return new MergeResult(mergeContext.buildConflicts());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void refreshSource() throws FailedToGenerateSourceMapperException {
|
public CompressedString refreshSource() throws FailedToGenerateSourceMapperException {
|
||||||
try {
|
try {
|
||||||
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
|
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
|
||||||
builder.startObject();
|
builder.startObject();
|
||||||
toXContent(builder, ToXContent.EMPTY_PARAMS);
|
toXContent(builder, ToXContent.EMPTY_PARAMS);
|
||||||
builder.endObject();
|
builder.endObject();
|
||||||
this.mappingSource = new CompressedString(builder.bytes());
|
return mappingSource = new CompressedString(builder.bytes());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new FailedToGenerateSourceMapperException(e.getMessage(), e);
|
throw new FailedToGenerateSourceMapperException(e.getMessage(), e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,19 +1,18 @@
|
||||||
package org.elasticsearch.indices.mapping;
|
package org.elasticsearch.indices.mapping;
|
||||||
|
|
||||||
|
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||||
import com.google.common.base.Predicate;
|
import com.google.common.base.Predicate;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
|
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
|
||||||
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
|
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
|
||||||
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
|
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
|
||||||
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
|
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
|
||||||
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
|
||||||
import org.elasticsearch.action.bulk.BulkResponse;
|
|
||||||
import org.elasticsearch.action.count.CountResponse;
|
import org.elasticsearch.action.count.CountResponse;
|
||||||
import org.elasticsearch.action.get.GetResponse;
|
import org.elasticsearch.action.get.GetResponse;
|
||||||
import org.elasticsearch.action.index.IndexRequest;
|
|
||||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||||
import org.elasticsearch.action.index.IndexResponse;
|
|
||||||
import org.elasticsearch.client.Client;
|
import org.elasticsearch.client.Client;
|
||||||
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||||
import org.elasticsearch.common.Priority;
|
import org.elasticsearch.common.Priority;
|
||||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||||
|
@ -23,13 +22,12 @@ import org.elasticsearch.index.mapper.MapperParsingException;
|
||||||
import org.elasticsearch.index.mapper.MapperService;
|
import org.elasticsearch.index.mapper.MapperService;
|
||||||
import org.elasticsearch.index.mapper.MergeMappingException;
|
import org.elasticsearch.index.mapper.MergeMappingException;
|
||||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
|
||||||
import org.hamcrest.Matchers;
|
import org.hamcrest.Matchers;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.CyclicBarrier;
|
import java.util.concurrent.CyclicBarrier;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
@ -41,7 +39,7 @@ import static org.hamcrest.Matchers.*;
|
||||||
public class UpdateMappingTests extends ElasticsearchIntegrationTest {
|
public class UpdateMappingTests extends ElasticsearchIntegrationTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void dynamicUpdates_Index() throws Exception {
|
public void dynamicUpdates() throws Exception {
|
||||||
client().admin().indices().prepareCreate("test")
|
client().admin().indices().prepareCreate("test")
|
||||||
.setSettings(
|
.setSettings(
|
||||||
ImmutableSettings.settingsBuilder()
|
ImmutableSettings.settingsBuilder()
|
||||||
|
@ -50,32 +48,15 @@ public class UpdateMappingTests extends ElasticsearchIntegrationTest {
|
||||||
).execute().actionGet();
|
).execute().actionGet();
|
||||||
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
|
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
|
||||||
|
|
||||||
ThreadPool.Info info = cluster().getInstance(ThreadPool.class).info(ThreadPool.Names.INDEX);
|
int recCount = randomIntBetween(200, 600);
|
||||||
int recCount = info.getMax() + (int) info.getQueueSize().getSingles();
|
int numberOfTypes = randomIntBetween(1, 5);
|
||||||
final CountDownLatch latch = new CountDownLatch(recCount);
|
List<IndexRequestBuilder> indexRequests = Lists.newArrayList();
|
||||||
for (int rec = 0; rec < recCount; rec++) {
|
for (int rec = 0; rec < recCount; rec++) {
|
||||||
client().prepareIndex("test", "type", "rec" + rec).setSource("field" + rec, "some_value").execute(new ActionListener<IndexResponse>() {
|
String type = "type" + (rec % numberOfTypes);
|
||||||
@Override
|
String fieldName = "field_" + type + "_" + rec;
|
||||||
public void onResponse(IndexResponse indexResponse) {
|
indexRequests.add(client().prepareIndex("test", type, Integer.toString(rec)).setSource(fieldName, "some_value"));
|
||||||
latch.countDown();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(Throwable e) {
|
|
||||||
logger.error("failed to index in test", e);
|
|
||||||
latch.countDown();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
latch.await();
|
indexRandom(true, indexRequests);
|
||||||
logger.info("wait till the mappings have been processed...");
|
|
||||||
awaitBusy(new Predicate<Object>() {
|
|
||||||
@Override
|
|
||||||
public boolean apply(Object input) {
|
|
||||||
PendingClusterTasksResponse pendingTasks = client().admin().cluster().preparePendingClusterTasks().get();
|
|
||||||
return pendingTasks.pendingTasks().isEmpty();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
logger.info("checking all the documents are there");
|
logger.info("checking all the documents are there");
|
||||||
RefreshResponse refreshResponse = client().admin().indices().prepareRefresh().execute().actionGet();
|
RefreshResponse refreshResponse = client().admin().indices().prepareRefresh().execute().actionGet();
|
||||||
|
@ -84,76 +65,37 @@ public class UpdateMappingTests extends ElasticsearchIntegrationTest {
|
||||||
assertThat(response.getCount(), equalTo((long) recCount));
|
assertThat(response.getCount(), equalTo((long) recCount));
|
||||||
|
|
||||||
logger.info("checking all the fields are in the mappings");
|
logger.info("checking all the fields are in the mappings");
|
||||||
String source = client().admin().cluster().prepareState().get().getState().getMetaData().getIndices().get("test").getMappings().get("type").source().string();
|
|
||||||
for (int rec = 0; rec < recCount; rec++) {
|
|
||||||
assertThat(source, containsString("\"field" + rec + "\""));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
reRunTest:
|
||||||
public void dynamicUpdates_Bulk() throws Exception {
|
while (true) {
|
||||||
client().admin().indices().prepareCreate("test")
|
Map<String, String> typeToSource = Maps.newHashMap();
|
||||||
.setSettings(
|
ClusterState state = client().admin().cluster().prepareState().get().getState();
|
||||||
ImmutableSettings.settingsBuilder()
|
for (ObjectObjectCursor<String, MappingMetaData> cursor : state.getMetaData().getIndices().get("test").getMappings()) {
|
||||||
.put("index.number_of_shards", 1)
|
typeToSource.put(cursor.key, cursor.value.source().string());
|
||||||
.put("index.number_of_replicas", 0)
|
|
||||||
).execute().actionGet();
|
|
||||||
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
|
|
||||||
|
|
||||||
ThreadPool.Info info = cluster().getInstance(ThreadPool.class).info(ThreadPool.Names.BULK);
|
|
||||||
int bulkCount = info.getMax() + (int) info.getQueueSize().getSingles();
|
|
||||||
int bulkSize = between(4, 10);
|
|
||||||
int recCount = bulkCount * bulkSize;
|
|
||||||
int idCounter = 0;
|
|
||||||
final CountDownLatch latch = new CountDownLatch(bulkCount);
|
|
||||||
for (int i = 0; i < bulkCount; i++) {
|
|
||||||
BulkRequestBuilder bulk = client().prepareBulk();
|
|
||||||
for (int rec = 0; rec < bulkSize; rec++) {
|
|
||||||
int id = idCounter++;
|
|
||||||
bulk.add(new IndexRequestBuilder(client())
|
|
||||||
.setOpType(IndexRequest.OpType.INDEX)
|
|
||||||
.setIndex("test")
|
|
||||||
.setType("type")
|
|
||||||
.setId("rec" + id)
|
|
||||||
.setSource("field" + id, "some_value"));
|
|
||||||
}
|
}
|
||||||
bulk.execute(new ActionListener<BulkResponse>() {
|
for (int rec = 0; rec < recCount; rec++) {
|
||||||
@Override
|
String type = "type" + (rec % numberOfTypes);
|
||||||
public void onResponse(BulkResponse bulkItemResponses) {
|
String fieldName = "field_" + type + "_" + rec;
|
||||||
if (bulkItemResponses.hasFailures()) {
|
fieldName = "\"" + fieldName + "\""; // quote it, so we make sure we catch the exact one
|
||||||
System.out.println("failed to index in test: " + bulkItemResponses.buildFailureMessage());
|
if (!typeToSource.containsKey(type) || !typeToSource.get(type).contains(fieldName)) {
|
||||||
|
awaitBusy(new Predicate<Object>() {
|
||||||
|
@Override
|
||||||
|
public boolean apply(Object input) {
|
||||||
|
PendingClusterTasksResponse pendingTasks = client().admin().cluster().preparePendingClusterTasks().get();
|
||||||
|
return pendingTasks.pendingTasks().isEmpty();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
// its going to break, before we do, make sure that the cluster state hasn't changed on us...
|
||||||
|
ClusterState state2 = client().admin().cluster().prepareState().get().getState();
|
||||||
|
if (state.version() != state2.version()) {
|
||||||
|
logger.info("not the same version, used for test {}, new one {}, re-running test, first wait for mapping to wait", state.version(), state2.version());
|
||||||
|
continue reRunTest;
|
||||||
}
|
}
|
||||||
latch.countDown();
|
logger.info("failing, type {}, field {}, mapping {}", type, fieldName, typeToSource.get(type));
|
||||||
|
assertThat(typeToSource.get(type), containsString(fieldName));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(Throwable e) {
|
|
||||||
logger.error("failed to index in test", e);
|
|
||||||
latch.countDown();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
latch.await();
|
|
||||||
|
|
||||||
logger.info("wait till the mappings have been processed...");
|
|
||||||
awaitBusy(new Predicate<Object>() {
|
|
||||||
@Override
|
|
||||||
public boolean apply(Object input) {
|
|
||||||
PendingClusterTasksResponse pendingTasks = client().admin().cluster().preparePendingClusterTasks().get();
|
|
||||||
return pendingTasks.pendingTasks().isEmpty();
|
|
||||||
}
|
}
|
||||||
});
|
break;
|
||||||
|
|
||||||
logger.info("checking all the documents are there");
|
|
||||||
RefreshResponse refreshResponse = client().admin().indices().prepareRefresh().execute().actionGet();
|
|
||||||
assertThat(refreshResponse.getFailedShards(), equalTo(0));
|
|
||||||
CountResponse response = client().prepareCount("test").execute().actionGet();
|
|
||||||
assertThat(response.getCount(), equalTo((long) recCount));
|
|
||||||
|
|
||||||
logger.info("checking all the fields are in the mappings");
|
|
||||||
String source = client().admin().cluster().prepareState().get().getState().getMetaData().getIndices().get("test").getMappings().get("type").source().string();
|
|
||||||
for (int rec = 0; rec < recCount; rec++) {
|
|
||||||
assertThat(source, containsString("\"field" + rec + "\""));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -227,12 +169,12 @@ public class UpdateMappingTests extends ElasticsearchIntegrationTest {
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Test
|
@Test
|
||||||
public void updateIncludeExclude() throws Exception {
|
public void updateIncludeExclude() throws Exception {
|
||||||
assertAcked(prepareCreate("test").addMapping("type",
|
assertAcked(prepareCreate("test").addMapping("type",
|
||||||
jsonBuilder().startObject().startObject("type").startObject("properties")
|
jsonBuilder().startObject().startObject("type").startObject("properties")
|
||||||
.startObject("normal").field("type", "long").endObject()
|
.startObject("normal").field("type", "long").endObject()
|
||||||
.startObject("exclude").field("type", "long").endObject()
|
.startObject("exclude").field("type", "long").endObject()
|
||||||
.startObject("include").field("type", "long").endObject()
|
.startObject("include").field("type", "long").endObject()
|
||||||
.endObject().endObject().endObject()));
|
.endObject().endObject().endObject()));
|
||||||
ensureGreen(); // make sure that replicas are initialized so the refresh command will work them too
|
ensureGreen(); // make sure that replicas are initialized so the refresh command will work them too
|
||||||
|
|
||||||
logger.info("Index doc");
|
logger.info("Index doc");
|
||||||
|
|
Loading…
Reference in New Issue