diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index d5009544a47..3a96f3aeff3 100644 --- a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -359,72 +359,46 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, indexRequest.source()).type(indexRequest.type()).id(indexRequest.id()) .routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl()); - long version; - boolean created; - Engine.IndexingOperation op; + final Engine.IndexingOperation operation; if (indexRequest.opType() == IndexRequest.OpType.INDEX) { - Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates() || indexRequest.canHaveDuplicates()); - Mapping update = index.parsedDoc().dynamicMappingsUpdate(); - if (update != null) { - final String indexName = indexService.index().name(); - if (indexName.equals(RiverIndexName.Conf.indexName(settings))) { - // With rivers, we have a chicken and egg problem if indexing - // the _meta document triggers a mapping update. Because we would - // like to validate the mapping update first, but on the other - // hand putting the mapping would start the river, which expects - // to find a _meta document - // So we have no choice but to index first and send mappings afterwards - MapperService mapperService = indexService.mapperService(); - mapperService.merge(indexRequest.type(), new CompressedString(update.toBytes()), true); - indexShard.index(index); - mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, indexRequest.type(), update); - } else { - mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexRequest.type(), update); - indexShard.index(index); - } - } else { - indexShard.index(index); - } - version = index.version(); - op = index; - created = index.created(); + operation = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates() || indexRequest.canHaveDuplicates()); } else { - Engine.Create create = indexShard.prepareCreate(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY, + assert indexRequest.opType() == IndexRequest.OpType.CREATE : indexRequest.opType(); + operation = indexShard.prepareCreate(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates() || indexRequest.canHaveDuplicates(), indexRequest.autoGeneratedId()); - Mapping update = create.parsedDoc().dynamicMappingsUpdate(); - if (update != null) { - final String indexName = indexService.index().name(); - if (indexName.equals(RiverIndexName.Conf.indexName(settings))) { - // With rivers, we have a chicken and egg problem if indexing - // the _meta document triggers a mapping update. Because we would - // like to validate the mapping update first, but on the other - // hand putting the mapping would start the river, which expects - // to find a _meta document - // So we have no choice but to index first and send mappings afterwards - MapperService mapperService = indexService.mapperService(); - mapperService.merge(indexRequest.type(), new CompressedString(update.toBytes()), true); - indexShard.create(create); - mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, indexRequest.type(), update); - } else { - mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexRequest.type(), update); - indexShard.create(create); - } - } else { - indexShard.create(create); - } - version = create.version(); - op = create; - created = true; } + Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); + final boolean created; + if (update != null) { + final String indexName = indexService.index().name(); + if (indexName.equals(RiverIndexName.Conf.indexName(settings))) { + // With rivers, we have a chicken and egg problem if indexing + // the _meta document triggers a mapping update. Because we would + // like to validate the mapping update first, but on the other + // hand putting the mapping would start the river, which expects + // to find a _meta document + // So we have no choice but to index first and send mappings afterwards + MapperService mapperService = indexService.mapperService(); + mapperService.merge(indexRequest.type(), new CompressedString(update.toBytes()), true); + created = operation.execute(indexShard); + mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, indexRequest.type(), update); + } else { + mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexRequest.type(), update); + created = operation.execute(indexShard); + } + } else { + created = operation.execute(indexShard); + } + // update the version on request so it will happen on the replicas + final long version = operation.version(); indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery()); indexRequest.version(version); assert indexRequest.versionType().validateVersionForWrites(indexRequest.version()); - IndexResponse indexResponse = new IndexResponse(request.index(), indexRequest.type(), indexRequest.id(), version, created); - return new WriteResult(indexResponse, op); + return new WriteResult(indexResponse, operation); } private WriteResult shardDeleteOperation(BulkShardRequest request, DeleteRequest deleteRequest, IndexShard indexShard) { @@ -548,23 +522,20 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, indexRequest.source()).type(indexRequest.type()).id(indexRequest.id()) .routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl()); + final Engine.IndexingOperation operation; if (indexRequest.opType() == IndexRequest.OpType.INDEX) { - Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates()); - Mapping update = index.parsedDoc().dynamicMappingsUpdate(); - if (update != null) { - throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update); - } - indexShard.index(index); + operation = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates()); } else { - Engine.Create create = indexShard.prepareCreate(sourceToParse, + assert indexRequest.opType() == IndexRequest.OpType.CREATE : indexRequest.opType(); + operation = indexShard.prepareCreate(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates(), indexRequest.autoGeneratedId()); - Mapping update = create.parsedDoc().dynamicMappingsUpdate(); - if (update != null) { - throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update); - } - indexShard.create(create); } + Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); + if (update != null) { + throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update); + } + operation.execute(indexShard); } catch (Throwable e) { // if its not an ignore replica failure, we need to make sure to bubble up the failure // so we will fail the shard diff --git a/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 2fd801c025d..8e81009b653 100644 --- a/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; +import org.elasticsearch.action.index.IndexRequest.OpType; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction; @@ -172,62 +173,39 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id()); SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).type(request.type()).id(request.id()) .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl()); - long version; - boolean created; + final Engine.IndexingOperation operation; if (request.opType() == IndexRequest.OpType.INDEX) { - Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates()); - Mapping update = index.parsedDoc().dynamicMappingsUpdate(); - if (update != null) { - final String indexName = indexService.index().name(); - if (indexName.equals(RiverIndexName.Conf.indexName(settings))) { - // With rivers, we have a chicken and egg problem if indexing - // the _meta document triggers a mapping update. Because we would - // like to validate the mapping update first, but on the other - // hand putting the mapping would start the river, which expects - // to find a _meta document - // So we have no choice but to index first and send mappings afterwards - MapperService mapperService = indexService.mapperService(); - mapperService.merge(request.type(), new CompressedString(update.toBytes()), true); - indexShard.index(index); - mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, request.type(), update); - } else { - mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update); - indexShard.index(index); - } - } else { - indexShard.index(index); - } - version = index.version(); - created = index.created(); + operation = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates()); } else { assert request.opType() == IndexRequest.OpType.CREATE : request.opType(); - Engine.Create create = indexShard.prepareCreate(sourceToParse, + operation = indexShard.prepareCreate(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId()); - Mapping update = create.parsedDoc().dynamicMappingsUpdate(); - if (update != null) { - final String indexName = indexService.index().name(); - if (indexName.equals(RiverIndexName.Conf.indexName(settings))) { - // With rivers, we have a chicken and egg problem if indexing - // the _meta document triggers a mapping update. Because we would - // like to validate the mapping update first, but on the other - // hand putting the mapping would start the river, which expects - // to find a _meta document - // So we have no choice but to index first and send mappings afterwards - MapperService mapperService = indexService.mapperService(); - mapperService.merge(request.type(), new CompressedString(update.toBytes()), true); - indexShard.create(create); - mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, request.type(), update); - } else { - mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update); - indexShard.create(create); - } - } else { - indexShard.create(create); - } - version = create.version(); - created = true; } + + final boolean created; + Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); + if (update != null) { + final String indexName = indexService.index().name(); + if (indexName.equals(RiverIndexName.Conf.indexName(settings))) { + // With rivers, we have a chicken and egg problem if indexing + // the _meta document triggers a mapping update. Because we would + // like to validate the mapping update first, but on the other + // hand putting the mapping would start the river, which expects + // to find a _meta document + // So we have no choice but to index first and send mappings afterwards + MapperService mapperService = indexService.mapperService(); + mapperService.merge(request.type(), new CompressedString(update.toBytes()), true); + created = operation.execute(indexShard); + mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, request.type(), update); + } else { + mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update); + created = operation.execute(indexShard); + } + } else { + created = operation.execute(indexShard); + } + if (request.refresh()) { try { indexShard.refresh("refresh_flag_index"); @@ -237,6 +215,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi } // update the version on the request, so it will be used for the replicas + final long version = operation.version(); request.version(version); request.versionType(request.versionType().versionTypeForReplicationAndRecovery()); @@ -250,22 +229,19 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi IndexShard indexShard = indexService.shardSafe(shardId.id()); SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).type(request.type()).id(request.id()) .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl()); + + final Engine.IndexingOperation operation; if (request.opType() == IndexRequest.OpType.INDEX) { - Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates()); - Mapping update = index.parsedDoc().dynamicMappingsUpdate(); - if (update != null) { - throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update); - } - indexShard.index(index); + operation = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates()); } else { assert request.opType() == IndexRequest.OpType.CREATE : request.opType(); - Engine.Create create = indexShard.prepareCreate(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates(), request.autoGeneratedId()); - Mapping update = create.parsedDoc().dynamicMappingsUpdate(); - if (update != null) { - throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update); - } - indexShard.create(create); + operation = indexShard.prepareCreate(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates(), request.autoGeneratedId()); } + Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); + if (update != null) { + throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update); + } + operation.execute(indexShard); if (request.refresh()) { try { indexShard.refresh("refresh_flag_index"); diff --git a/src/main/java/org/elasticsearch/index/engine/Engine.java b/src/main/java/org/elasticsearch/index/engine/Engine.java index ce79c60b527..516313ddd28 100644 --- a/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -47,6 +47,7 @@ import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.ParseContext.Document; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; @@ -200,7 +201,7 @@ public abstract class Engine implements Closeable { public abstract void create(Create create) throws EngineException; - public abstract void index(Index index) throws EngineException; + public abstract boolean index(Index index) throws EngineException; public abstract void delete(Delete delete) throws EngineException; @@ -704,6 +705,12 @@ public abstract class Engine implements Closeable { public long endTime() { return this.endTime; } + + /** + * Execute this operation against the provided {@link IndexShard} and + * return whether the document was created. + */ + public abstract boolean execute(IndexShard shard); } public static final class Create extends IndexingOperation { @@ -732,10 +739,15 @@ public abstract class Engine implements Closeable { public boolean autoGeneratedId() { return this.autoGeneratedId; } + + @Override + public boolean execute(IndexShard shard) { + shard.create(this); + return true; + } } public static final class Index extends IndexingOperation { - private boolean created; public Index(DocumentMapper docMapper, Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime, boolean canHaveDuplicates) { super(docMapper, uid, doc, version, versionType, origin, startTime, canHaveDuplicates); @@ -754,15 +766,9 @@ public abstract class Engine implements Closeable { return Type.INDEX; } - /** - * @return true if object was created - */ - public boolean created() { - return created; - } - - public void created(boolean created) { - this.created = created; + @Override + public boolean execute(IndexShard shard) { + return shard.index(this); } } diff --git a/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 7086ee6986b..267a9042ea3 100644 --- a/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -364,15 +364,16 @@ public class InternalEngine extends Engine { } @Override - public void index(Index index) throws EngineException { + public boolean index(Index index) throws EngineException { + final boolean created; try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); if (index.origin() == Operation.Origin.RECOVERY) { // Don't throttle recovery operations - innerIndex(index); + created = innerIndex(index); } else { try (Releasable r = throttle.acquireThrottle()) { - innerIndex(index); + created = innerIndex(index); } } flushNeeded = true; @@ -381,6 +382,7 @@ public class InternalEngine extends Engine { throw new IndexFailedEngineException(shardId, index, t); } checkVersionMapRefresh(); + return created; } /** @@ -410,7 +412,7 @@ public class InternalEngine extends Engine { } } - private void innerIndex(Index index) throws IOException { + private boolean innerIndex(Index index) throws IOException { synchronized (dirtyLock(index.uid())) { final long currentVersion; VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes()); @@ -428,17 +430,18 @@ public class InternalEngine extends Engine { long expectedVersion = index.version(); if (index.versionType().isVersionConflictForWrites(currentVersion, expectedVersion)) { if (index.origin() == Operation.Origin.RECOVERY) { - return; + return false; } else { throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion); } } updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion); + final boolean created; index.updateVersion(updatedVersion); if (currentVersion == Versions.NOT_FOUND) { // document does not exists, we can optimize for create - index.created(true); + created = true; if (index.docs().size() > 1) { indexWriter.addDocuments(index.docs()); } else { @@ -446,7 +449,9 @@ public class InternalEngine extends Engine { } } else { if (versionValue != null) { - index.created(versionValue.delete()); // we have a delete which is not GC'ed... + created = versionValue.delete(); // we have a delete which is not GC'ed... + } else { + created = false; } if (index.docs().size() > 1) { indexWriter.updateDocuments(index.uid(), index.docs()); @@ -459,6 +464,7 @@ public class InternalEngine extends Engine { versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, translogLocation)); indexingService.postIndexUnderLock(index); + return created; } } diff --git a/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java b/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java index 6771b432176..511b9ae9955 100644 --- a/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java @@ -108,7 +108,7 @@ public class ShadowEngine extends Engine { } @Override - public void index(Index index) throws EngineException { + public boolean index(Index index) throws EngineException { throw new UnsupportedOperationException(shardId + " index operation not allowed on shadow engine"); } diff --git a/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 0ff0c420f02..546d66cbdaa 100644 --- a/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -470,7 +470,7 @@ public class IndexShard extends AbstractIndexShardComponent { return new Engine.Create(docMapper.v1(), docMapper.v1().uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, canHaveDuplicates, autoGeneratedId); } - public ParsedDocument create(Engine.Create create) throws ElasticsearchException { + public void create(Engine.Create create) throws ElasticsearchException { writeAllowed(create.origin()); create = indexingService.preCreate(create); mapperAnalyzer.setType(create.type()); @@ -485,7 +485,6 @@ public class IndexShard extends AbstractIndexShardComponent { throw ex; } indexingService.postCreate(create); - return create.parsedDoc(); } public Engine.Index prepareIndex(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates) throws ElasticsearchException { @@ -501,22 +500,27 @@ public class IndexShard extends AbstractIndexShardComponent { return new Engine.Index(docMapper.v1(), docMapper.v1().uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, canHaveDuplicates); } - public ParsedDocument index(Engine.Index index) throws ElasticsearchException { + /** + * Index a document and return whether it was created, as opposed to just + * updated. + */ + public boolean index(Engine.Index index) throws ElasticsearchException { writeAllowed(index.origin()); index = indexingService.preIndex(index); mapperAnalyzer.setType(index.type()); + final boolean created; try { if (logger.isTraceEnabled()) { logger.trace("index [{}][{}]{}", index.type(), index.id(), index.docs()); } - engine().index(index); + created = engine().index(index); index.endTime(System.nanoTime()); } catch (Throwable ex) { indexingService.postIndex(index, ex); throw ex; } indexingService.postIndex(index); - return index.parsedDoc(); + return created; } public Engine.Delete prepareDelete(String type, String id, long version, VersionType versionType, Engine.Operation.Origin origin) throws ElasticsearchException { diff --git a/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 3673b7f889d..2ac8608fa96 100644 --- a/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1354,34 +1354,29 @@ public class InternalEngineTests extends ElasticsearchTestCase { public void testBasicCreatedFlag() { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null); Engine.Index index = new Engine.Index(null, newUid("1"), doc); - engine.index(index); - assertTrue(index.created()); + assertTrue(engine.index(index)); index = new Engine.Index(null, newUid("1"), doc); - engine.index(index); - assertFalse(index.created()); + assertFalse(engine.index(index)); engine.delete(new Engine.Delete(null, "1", newUid("1"))); index = new Engine.Index(null, newUid("1"), doc); - engine.index(index); - assertTrue(index.created()); + assertTrue(engine.index(index)); } @Test public void testCreatedFlagAfterFlush() { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null); Engine.Index index = new Engine.Index(null, newUid("1"), doc); - engine.index(index); - assertTrue(index.created()); + assertTrue(engine.index(index)); engine.delete(new Engine.Delete(null, "1", newUid("1"))); engine.flush(); index = new Engine.Index(null, newUid("1"), doc); - engine.index(index); - assertTrue(index.created()); + assertTrue(engine.index(index)); } private static class MockAppender extends AppenderSkeleton {