From fe5cdd30d5bfd5b9457e717b3409e65a0b17b434 Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Thu, 23 Jun 2016 16:34:22 -0400 Subject: [PATCH] Set created flag in index operation Now document created flag is set in the index operation instead of being returned from engine operation. This change makes the engine index and delete operations have the same signature. --- .../action/index/TransportIndexAction.java | 4 +-- .../elasticsearch/index/engine/Engine.java | 11 +++++- .../index/engine/InternalEngine.java | 35 ++++++++----------- .../index/engine/ShadowEngine.java | 2 +- .../elasticsearch/index/shard/IndexShard.java | 18 +++------- .../index/engine/InternalEngineTests.java | 15 +++++--- 6 files changed, 43 insertions(+), 42 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 785a5d23157..c7eef2758cc 100644 --- a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -188,7 +188,7 @@ public class TransportIndexAction extends TransportWriteAction(response, operation.getTranslogLocation()); } } diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 12b021ddb71..47df50f6212 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -277,7 +277,7 @@ public abstract class Engine implements Closeable { } } - public abstract boolean index(Index operation) throws EngineException; + public abstract void index(Index operation) throws EngineException; public abstract void delete(Delete delete) throws EngineException; @@ -847,6 +847,7 @@ public abstract class Engine implements Closeable { public static class Index extends Operation { private final ParsedDocument doc; + private boolean created; public Index(Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime) { super(uid, version, versionType, origin, startTime); @@ -905,6 +906,14 @@ public abstract class Engine implements Closeable { return this.doc.source(); } + public boolean isCreated() { + return created; + } + + public void setCreated(boolean created) { + this.created = created; + } + @Override protected int estimatedSizeInBytes() { return (id().length() + type().length()) * 2 + source().length() + 12; diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index eba6fa10802..f91b4c19dd1 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -394,16 +394,15 @@ public class InternalEngine extends Engine { } @Override - public boolean index(Index index) { - final boolean created; + public void index(Index index) { try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); if (index.origin().isRecovery()) { // Don't throttle recovery operations - created = innerIndex(index); + innerIndex(index); } else { try (Releasable r = throttle.acquireThrottle()) { - created = innerIndex(index); + innerIndex(index); } } } catch (IllegalStateException | IOException e) { @@ -414,10 +413,9 @@ public class InternalEngine extends Engine { } throw new IndexFailedEngineException(shardId, index.type(), index.id(), e); } - return created; } - private boolean innerIndex(Index index) throws IOException { + private void innerIndex(Index index) throws IOException { try (Releasable ignored = acquireLock(index.uid())) { lastWriteNanos = index.startTime(); final long currentVersion; @@ -432,15 +430,16 @@ public class InternalEngine extends Engine { } final long expectedVersion = index.version(); - if (checkVersionConflict(index, currentVersion, expectedVersion, deleted)) return false; + if (checkVersionConflict(index, currentVersion, expectedVersion, deleted)) { + index.setCreated(false); + return; + } final long updatedVersion = updateVersion(index, currentVersion, expectedVersion); - final boolean created = indexOrUpdate(index, currentVersion, versionValue); + indexOrUpdate(index, currentVersion, versionValue); maybeAddToTranslog(index, updatedVersion, Translog.Index::new, NEW_VERSION_VALUE); - - return created; } } @@ -450,16 +449,14 @@ public class InternalEngine extends Engine { return updatedVersion; } - private boolean indexOrUpdate(final Index index, final long currentVersion, final VersionValue versionValue) throws IOException { - final boolean created; + private void indexOrUpdate(final Index index, final long currentVersion, final VersionValue versionValue) throws IOException { if (currentVersion == Versions.NOT_FOUND) { // document does not exists, we can optimize for create - created = true; + index.setCreated(true); index(index, indexWriter); } else { - created = update(index, versionValue, indexWriter); + update(index, versionValue, indexWriter); } - return created; } private static void index(final Index index, final IndexWriter indexWriter) throws IOException { @@ -470,19 +467,17 @@ public class InternalEngine extends Engine { } } - private static boolean update(final Index index, final VersionValue versionValue, final IndexWriter indexWriter) throws IOException { - final boolean created; + private static void update(final Index index, final VersionValue versionValue, final IndexWriter indexWriter) throws IOException { if (versionValue != null) { - created = versionValue.delete(); // we have a delete which is not GC'ed... + index.setCreated(versionValue.delete()); // we have a delete which is not GC'ed... } else { - created = false; + index.setCreated(false); } if (index.docs().size() > 1) { indexWriter.updateDocuments(index.uid(), index.docs()); } else { indexWriter.updateDocument(index.uid(), index.docs().get(0)); } - return created; } @Override diff --git a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java index 2d5a134493a..3c1172b113c 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java @@ -106,7 +106,7 @@ public class ShadowEngine extends Engine { @Override - public boolean index(Index index) throws EngineException { + public void index(Index index) throws EngineException { throw new UnsupportedOperationException(shardId + " index operation not allowed on shadow engine"); } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 1dff3ad8b9b..0a6279ea40d 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -518,34 +518,26 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return new Engine.Index(uid, doc, version, versionType, origin, startTime); } - /** - * Index a document and return whether it was created, as opposed to just - * updated. - */ - public boolean index(Engine.Index index) { + public void index(Engine.Index index) { ensureWriteAllowed(index); Engine engine = getEngine(); - return index(engine, index); + index(engine, index); } - private boolean index(Engine engine, Engine.Index index) { + private void index(Engine engine, Engine.Index index) { active.set(true); index = indexingOperationListeners.preIndex(index); - final boolean created; try { if (logger.isTraceEnabled()) { logger.trace("index [{}][{}]{}", index.type(), index.id(), index.docs()); } - created = engine.index(index); + engine.index(index); index.endTime(System.nanoTime()); } catch (Exception e) { indexingOperationListeners.postIndex(index, e); throw e; } - - indexingOperationListeners.postIndex(index, created); - - return created; + indexingOperationListeners.postIndex(index, index.isCreated()); } public Engine.Delete prepareDeleteOnPrimary(String type, String id, long version, VersionType versionType) { diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 6ae432dfbf7..e365c3310b2 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1480,28 +1480,33 @@ public class InternalEngineTests extends ESTestCase { public void testBasicCreatedFlag() { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null); Engine.Index index = new Engine.Index(newUid("1"), doc); - assertTrue(engine.index(index)); + engine.index(index); + assertTrue(index.isCreated()); index = new Engine.Index(newUid("1"), doc); - assertFalse(engine.index(index)); + engine.index(index); + assertFalse(index.isCreated()); engine.delete(new Engine.Delete(null, "1", newUid("1"))); index = new Engine.Index(newUid("1"), doc); - assertTrue(engine.index(index)); + engine.index(index); + assertTrue(index.isCreated()); } public void testCreatedFlagAfterFlush() { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null); Engine.Index index = new Engine.Index(newUid("1"), doc); - assertTrue(engine.index(index)); + engine.index(index); + assertTrue(index.isCreated()); engine.delete(new Engine.Delete(null, "1", newUid("1"))); engine.flush(); index = new Engine.Index(newUid("1"), doc); - assertTrue(engine.index(index)); + engine.index(index); + assertTrue(index.isCreated()); } private static class MockAppender extends AppenderSkeleton {