mappings: update cluster state with type mapping also for failed indexing request

When indexing of a document with a type that is not in the mappings fails,
for example because "dynamic": "strict" but doc contains a new field,
then the type is still created on the node that executed the indexing request.
However, the change was never added to the cluster state.
This commit makes sure mapping updates are always added to the cluster state
even if indexing of a document fails.

closes #8692
relates to #8650
This commit is contained in:
Britta Weber 2015-02-23 18:33:31 +01:00
parent 4ee7ed987e
commit ff8fd677cc
7 changed files with 277 additions and 64 deletions

View File

@ -0,0 +1,40 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchWrapperException;
import org.elasticsearch.common.Nullable;
public class WriteFailureException extends ElasticsearchException implements ElasticsearchWrapperException {
@Nullable
private final String mappingTypeToUpdate;
public WriteFailureException(Throwable cause, String mappingTypeToUpdate) {
super(null, cause);
assert cause != null;
this.mappingTypeToUpdate = mappingTypeToUpdate;
}
public String getMappingTypeToUpdate() {
return mappingTypeToUpdate;
}
}

View File

@ -22,11 +22,11 @@ package org.elasticsearch.action.bulk;
import com.google.common.collect.Sets;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ElasticsearchWrapperException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.WriteFailureException;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
@ -42,7 +42,6 @@ import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
@ -161,9 +160,9 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
}
ops[requestIndex] = result.op;
}
} catch (WriteFailure e) {
if (e.mappingTypeToUpdate != null) {
mappingTypesToUpdate.add(e.mappingTypeToUpdate);
} catch (WriteFailureException e) {
if (e.getMappingTypeToUpdate() != null) {
mappingTypesToUpdate.add(e.getMappingTypeToUpdate());
}
throw e.getCause();
}
@ -397,17 +396,6 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
}
static class WriteFailure extends ElasticsearchException implements ElasticsearchWrapperException {
@Nullable
final String mappingTypeToUpdate;
WriteFailure(Throwable cause, String mappingTypeToUpdate) {
super(null, cause);
assert cause != null;
this.mappingTypeToUpdate = mappingTypeToUpdate;
}
}
private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, ClusterState clusterState,
IndexShard indexShard, boolean processed) {
@ -457,7 +445,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery());
indexRequest.version(version);
} catch (Throwable t) {
throw new WriteFailure(t, mappingTypeToUpdate);
throw new WriteFailureException(t, mappingTypeToUpdate);
}
assert indexRequest.versionType().validateVersionForWrites(indexRequest.version());

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.index;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.WriteFailureException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
@ -40,6 +41,7 @@ import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
@ -167,7 +169,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
}
@Override
protected Tuple<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
protected Tuple<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
final IndexRequest request = shardRequest.request;
// validate, if routing is required, that we got routing
@ -185,6 +187,8 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
long version;
boolean created;
try {
if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());
if (index.parsedDoc().mappingsModified()) {
@ -216,8 +220,16 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
assert request.versionType().validateVersionForWrites(request.version());
return new Tuple<>(new IndexResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), version, created), shardRequest.request);
} catch (WriteFailureException e) {
if (e.getMappingTypeToUpdate() != null){
DocumentMapper docMapper = indexService.mapperService().documentMapper(e.getMappingTypeToUpdate());
if (docMapper != null) {
mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), docMapper, indexService.indexUUID());
}
}
throw e.getCause();
}
}
@Override

View File

@ -117,7 +117,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
* @return A tuple containing not null values, as first value the result of the primary operation and as second value
* the request to be executed on the replica shards.
*/
protected abstract Tuple<Response, ReplicaRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest);
protected abstract Tuple<Response, ReplicaRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable;
protected abstract void shardOperationOnReplica(ReplicaOperationRequest shardRequest);

View File

@ -34,7 +34,7 @@ import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.optimize.OptimizeRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.action.WriteFailureException;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.Booleans;
@ -64,6 +64,12 @@ import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.*;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.IgnoreOnRecoveryEngineException;
import org.elasticsearch.index.engine.RefreshFailedEngineException;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.fielddata.FieldDataStats;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.fielddata.ShardFieldData;
@ -409,8 +415,16 @@ public class IndexShard extends AbstractIndexShardComponent {
public Engine.Create prepareCreate(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates, boolean autoGeneratedId) throws ElasticsearchException {
long startTime = System.nanoTime();
Tuple<DocumentMapper, Boolean> docMapper = mapperService.documentMapperWithAutoCreate(source.type());
try {
ParsedDocument doc = docMapper.v1().parse(source).setMappingsModified(docMapper);
return new Engine.Create(docMapper.v1(), docMapper.v1().uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, state != IndexShardState.STARTED || canHaveDuplicates, autoGeneratedId);
} catch (Throwable t) {
if (docMapper.v2()) {
throw new WriteFailureException(t, docMapper.v1().type());
} else {
throw t;
}
}
}
public ParsedDocument create(Engine.Create create) throws ElasticsearchException {
@ -434,8 +448,16 @@ public class IndexShard extends AbstractIndexShardComponent {
public Engine.Index prepareIndex(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates) throws ElasticsearchException {
long startTime = System.nanoTime();
Tuple<DocumentMapper, Boolean> docMapper = mapperService.documentMapperWithAutoCreate(source.type());
try {
ParsedDocument doc = docMapper.v1().parse(source).setMappingsModified(docMapper);
return new Engine.Index(docMapper.v1(), docMapper.v1().uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, state != IndexShardState.STARTED || canHaveDuplicates);
} catch (Throwable t) {
if (docMapper.v2()) {
throw new WriteFailureException(t, docMapper.v1().type());
} else {
throw t;
}
}
}
public ParsedDocument index(Engine.Index index) throws ElasticsearchException {

View File

@ -0,0 +1,76 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.mapper.dynamic;
import com.google.common.base.Predicate;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.mapper.StrictDynamicMappingException;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
public class DynamicMappingIntegrationTests extends ElasticsearchIntegrationTest {
// https://github.com/elasticsearch/elasticsearch/issues/8423#issuecomment-64229717
@Test
public void testStrictAllMapping() throws Exception {
String defaultMapping = jsonBuilder().startObject().startObject("_default_")
.field("dynamic", "strict")
.endObject().endObject().string();
client().admin().indices().prepareCreate("index").addMapping("_default_", defaultMapping).get();
try {
client().prepareIndex("index", "type", "id").setSource("test", "test").get();
fail();
} catch (StrictDynamicMappingException ex) {
// this should not be created dynamically so we expect this exception
}
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(java.lang.Object input) {
GetMappingsResponse currentMapping = client().admin().indices().prepareGetMappings("index").get();
return currentMapping.getMappings().get("index").get("type") != null;
}
});
String docMapping = jsonBuilder().startObject().startObject("type")
.startObject("_all")
.field("enabled", false)
.endObject()
.endObject().endObject().string();
try {
client().admin().indices()
.preparePutMapping("index")
.setType("type")
.setSource(docMapping).get();
fail();
} catch (Exception e) {
// the mapping was created anyway with _all enabled: true, although the index request fails so we expect the update to fail
}
// make sure type was created
for (Client client : cluster()) {
GetMappingsResponse mapping = client.admin().indices().prepareGetMappings("index").setLocal(true).get();
assertNotNull(mapping.getMappings().get("index").get("type"));
}
}
}

View File

@ -18,6 +18,10 @@
*/
package org.elasticsearch.index.mapper.dynamic;
import com.google.common.base.Predicate;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.FieldMappers;
@ -29,6 +33,7 @@ import org.junit.Test;
import java.io.IOException;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
@ -36,7 +41,7 @@ public class DynamicMappingTests extends ElasticsearchSingleNodeTest {
@Test
public void testDynamicTrue() throws IOException {
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type")
String mapping = jsonBuilder().startObject().startObject("type")
.field("dynamic", "true")
.startObject("properties")
.startObject("field1").field("type", "string").endObject()
@ -45,7 +50,7 @@ public class DynamicMappingTests extends ElasticsearchSingleNodeTest {
DocumentMapper defaultMapper = createIndex("test").mapperService().documentMapperParser().parse(mapping);
ParsedDocument doc = defaultMapper.parse("type", "1", XContentFactory.jsonBuilder()
ParsedDocument doc = defaultMapper.parse("type", "1", jsonBuilder()
.startObject()
.field("field1", "value1")
.field("field2", "value2")
@ -57,7 +62,7 @@ public class DynamicMappingTests extends ElasticsearchSingleNodeTest {
@Test
public void testDynamicFalse() throws IOException {
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type")
String mapping = jsonBuilder().startObject().startObject("type")
.field("dynamic", "false")
.startObject("properties")
.startObject("field1").field("type", "string").endObject()
@ -66,7 +71,7 @@ public class DynamicMappingTests extends ElasticsearchSingleNodeTest {
DocumentMapper defaultMapper = createIndex("test").mapperService().documentMapperParser().parse(mapping);
ParsedDocument doc = defaultMapper.parse("type", "1", XContentFactory.jsonBuilder()
ParsedDocument doc = defaultMapper.parse("type", "1", jsonBuilder()
.startObject()
.field("field1", "value1")
.field("field2", "value2")
@ -79,7 +84,7 @@ public class DynamicMappingTests extends ElasticsearchSingleNodeTest {
@Test
public void testDynamicStrict() throws IOException {
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type")
String mapping = jsonBuilder().startObject().startObject("type")
.field("dynamic", "strict")
.startObject("properties")
.startObject("field1").field("type", "string").endObject()
@ -89,7 +94,7 @@ public class DynamicMappingTests extends ElasticsearchSingleNodeTest {
DocumentMapper defaultMapper = createIndex("test").mapperService().documentMapperParser().parse(mapping);
try {
defaultMapper.parse("type", "1", XContentFactory.jsonBuilder()
defaultMapper.parse("type", "1", jsonBuilder()
.startObject()
.field("field1", "value1")
.field("field2", "value2")
@ -113,7 +118,7 @@ public class DynamicMappingTests extends ElasticsearchSingleNodeTest {
@Test
public void testDynamicFalseWithInnerObjectButDynamicSetOnRoot() throws IOException {
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type")
String mapping = jsonBuilder().startObject().startObject("type")
.field("dynamic", "false")
.startObject("properties")
.startObject("obj1").startObject("properties")
@ -124,7 +129,7 @@ public class DynamicMappingTests extends ElasticsearchSingleNodeTest {
DocumentMapper defaultMapper = createIndex("test").mapperService().documentMapperParser().parse(mapping);
ParsedDocument doc = defaultMapper.parse("type", "1", XContentFactory.jsonBuilder()
ParsedDocument doc = defaultMapper.parse("type", "1", jsonBuilder()
.startObject().startObject("obj1")
.field("field1", "value1")
.field("field2", "value2")
@ -137,7 +142,7 @@ public class DynamicMappingTests extends ElasticsearchSingleNodeTest {
@Test
public void testDynamicStrictWithInnerObjectButDynamicSetOnRoot() throws IOException {
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type")
String mapping = jsonBuilder().startObject().startObject("type")
.field("dynamic", "strict")
.startObject("properties")
.startObject("obj1").startObject("properties")
@ -149,7 +154,7 @@ public class DynamicMappingTests extends ElasticsearchSingleNodeTest {
DocumentMapper defaultMapper = createIndex("test").mapperService().documentMapperParser().parse(mapping);
try {
defaultMapper.parse("type", "1", XContentFactory.jsonBuilder()
defaultMapper.parse("type", "1", jsonBuilder()
.startObject().startObject("obj1")
.field("field1", "value1")
.field("field2", "value2")
@ -167,4 +172,74 @@ public class DynamicMappingTests extends ElasticsearchSingleNodeTest {
FieldMappers mappers = service.mapperService().indexName("empty_field");
assertTrue(mappers != null && mappers.isEmpty() == false);
}
@Test
public void testIndexingFailureDoesStillCreateType() throws IOException, InterruptedException {
XContentBuilder mapping = jsonBuilder().startObject().startObject("_default_")
.field("dynamic", "strict")
.endObject().endObject();
IndexService indexService = createIndex("test", ImmutableSettings.EMPTY, "_default_", mapping);
try {
client().prepareIndex().setIndex("test").setType("type").setSource(jsonBuilder().startObject().field("test", "test").endObject()).get();
fail();
} catch (StrictDynamicMappingException e) {
}
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(java.lang.Object input) {
GetMappingsResponse currentMapping = client().admin().indices().prepareGetMappings("test").get();
return currentMapping.getMappings().get("test").get("type") != null;
}
});
GetMappingsResponse getMappingsResponse = client().admin().indices().prepareGetMappings("test").get();
assertNotNull(getMappingsResponse.getMappings().get("test").get("type"));
DocumentMapper mapper = indexService.mapperService().documentMapper("type");
assertNotNull(mapper);
}
@Test
public void testTypeCreatedProperly() throws IOException, InterruptedException {
XContentBuilder mapping = jsonBuilder().startObject().startObject("_default_")
.field("dynamic", "strict")
.startObject("properties")
.startObject("test_string")
.field("type", "string")
.endObject()
.endObject()
.endObject().endObject();
IndexService indexService = createIndex("test", ImmutableSettings.EMPTY, "_default_", mapping);
try {
client().prepareIndex().setIndex("test").setType("type").setSource(jsonBuilder().startObject().field("test", "test").endObject()).get();
fail();
} catch (StrictDynamicMappingException e) {
}
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(java.lang.Object input) {
GetMappingsResponse currentMapping = client().admin().indices().prepareGetMappings("test").get();
return currentMapping.getMappings().get("test").get("type") != null;
}
});
//type should be in mapping
GetMappingsResponse getMappingsResponse = client().admin().indices().prepareGetMappings("test").get();
assertNotNull(getMappingsResponse.getMappings().get("test").get("type"));
client().prepareIndex().setIndex("test").setType("type").setSource(jsonBuilder().startObject().field("test_string", "test").endObject()).get();
client().admin().indices().prepareRefresh("test").get();
assertThat(client().prepareSearch("test").get().getHits().getTotalHits(), equalTo(1l));
DocumentMapper mapper = indexService.mapperService().documentMapper("type");
assertNotNull(mapper);
getMappingsResponse = client().admin().indices().prepareGetMappings("test").get();
assertNotNull(getMappingsResponse.getMappings().get("test").get("type"));
}
}