From 64a897e5f2ad0274e04db7831bcc8c5334bfee1a Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Tue, 25 Oct 2016 09:58:14 -0400 Subject: [PATCH] add setters for translog location and took in engine operation result --- .../action/bulk/TransportShardBulkAction.java | 2 +- .../action/index/TransportIndexAction.java | 8 +-- .../elasticsearch/index/engine/Engine.java | 57 +++++++++++------- .../index/engine/InternalEngine.java | 59 +++++++++---------- .../index/translog/Translog.java | 8 +-- .../shard/IndexingOperationListenerTests.java | 4 +- 6 files changed, 75 insertions(+), 63 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 24367fae90a..61b9669f9a1 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -243,7 +243,7 @@ public class TransportShardBulkAction extends TransportWriteAction new VersionValue(u); - - @FunctionalInterface - private interface VersionValueSupplier { - VersionValue apply(long updatedVersion, long time); - } - - private Translog.Location maybeAddToTranslog( - final T op, - final long updatedVersion, - final Function toTranslogOp, - final VersionValueSupplier toVersionValue) throws IOException { - Translog.Location location = null; - if (op.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { - location = translog.add(toTranslogOp.apply(op)); - } - versionMap.putUnderLock(op.uid().bytes(), toVersionValue.apply(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis())); - return location; - } - @Override public IndexResult index(Index index) { IndexResult result; @@ -423,7 +403,7 @@ public class InternalEngine extends Engine { } catch (Exception e) { Exception documentFailure = extractDocumentFailure(index, e); result = new IndexResult(documentFailure, index.version(), - index.startTime() - System.nanoTime(), index.estimatedSizeInBytes()); + index.estimatedSizeInBytes()); } return result; } @@ -551,7 +531,7 @@ public class InternalEngine extends Engine { final long expectedVersion = index.version(); if (checkVersionConflict(index, currentVersion, expectedVersion, deleted)) { // skip index operation because of version conflict on recovery - return new IndexResult(null, expectedVersion, false, index.startTime() - System.nanoTime(), index.estimatedSizeInBytes()); + return new IndexResult(expectedVersion, false, index.estimatedSizeInBytes()); } else { updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion); index.parsedDoc().version().setLongValue(updatedVersion); @@ -561,8 +541,17 @@ public class InternalEngine extends Engine { } else { update(index.uid(), index.docs(), indexWriter); } - location = maybeAddToTranslog(index, updatedVersion, Translog.Index::new, NEW_VERSION_VALUE); - return new IndexResult(location, updatedVersion, deleted, index.startTime() - System.nanoTime(), index.estimatedSizeInBytes()); + IndexResult indexResult = new IndexResult(updatedVersion, deleted, index.estimatedSizeInBytes()); + if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { + location = translog.add(new Translog.Index(index, indexResult)); + } else { + location = null; + } + versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion)); + indexResult.setLocation(location); + indexResult.setTook(index.startTime() - System.nanoTime()); + indexResult.freeze(); + return indexResult; } } } @@ -593,7 +582,7 @@ public class InternalEngine extends Engine { } catch (Exception e) { Exception documentFailure = extractDocumentFailure(delete, e); result = new DeleteResult(documentFailure, delete.version(), - delete.startTime() - System.nanoTime(), delete.estimatedSizeInBytes()); + delete.estimatedSizeInBytes()); } maybePruneDeletedTombstones(); return result; @@ -628,14 +617,24 @@ public class InternalEngine extends Engine { final long expectedVersion = delete.version(); if (checkVersionConflict(delete, currentVersion, expectedVersion, deleted)) { // skip executing delete because of version conflict on recovery - return new DeleteResult(null, expectedVersion, true, - delete.startTime() - System.nanoTime(), delete.estimatedSizeInBytes()); + return new DeleteResult(expectedVersion, true, + delete.estimatedSizeInBytes()); } else { updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion); found = deleteIfFound(delete.uid(), currentVersion, deleted, versionValue); - location = maybeAddToTranslog(delete, updatedVersion, Translog.Delete::new, DeleteVersionValue::new); - return new DeleteResult(location, updatedVersion, found, - delete.startTime() - System.nanoTime(), delete.estimatedSizeInBytes()); + DeleteResult deleteResult = new DeleteResult(updatedVersion, found, + delete.estimatedSizeInBytes()); + if (delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { + location = translog.add(new Translog.Delete(delete, deleteResult)); + } else { + location = null; + } + versionMap.putUnderLock(delete.uid().bytes(), + new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis())); + deleteResult.setLocation(location); + deleteResult.setTook(delete.startTime() - System.nanoTime()); + deleteResult.freeze(); + return deleteResult; } } } diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 056716a29bd..9cf60dbe422 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -830,13 +830,13 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC } } - public Index(Engine.Index index) { + public Index(Engine.Index index, Engine.IndexResult indexResult) { this.id = index.id(); this.type = index.type(); this.source = index.source(); this.routing = index.routing(); this.parent = index.parent(); - this.version = index.version(); + this.version = indexResult.getVersion(); this.timestamp = index.timestamp(); this.ttl = index.ttl(); this.versionType = index.versionType(); @@ -994,9 +994,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC assert versionType.validateVersionForWrites(this.version); } - public Delete(Engine.Delete delete) { + public Delete(Engine.Delete delete, Engine.DeleteResult deleteResult) { this.uid = delete.uid(); - this.version = delete.version(); + this.version = deleteResult.getVersion(); this.versionType = delete.versionType(); } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java index d3cf1640baa..742fadc9a80 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java @@ -112,7 +112,7 @@ public class IndexingOperationListenerTests extends ESTestCase{ IndexingOperationListener.CompositeListener compositeListener = new IndexingOperationListener.CompositeListener(indexingOperationListeners, logger); Engine.Delete delete = new Engine.Delete("test", "1", new Term("_uid", "1")); Engine.Index index = new Engine.Index(new Term("_uid", "1"), null); - compositeListener.postDelete(delete, new Engine.DeleteResult(null, 1, true, 0, 0)); + compositeListener.postDelete(delete, new Engine.DeleteResult(1, true, 0)); assertEquals(0, preIndex.get()); assertEquals(0, postIndex.get()); assertEquals(0, postIndexException.get()); @@ -136,7 +136,7 @@ public class IndexingOperationListenerTests extends ESTestCase{ assertEquals(2, postDelete.get()); assertEquals(2, postDeleteException.get()); - compositeListener.postIndex(index, new Engine.IndexResult(null, 0, false, 0, 0)); + compositeListener.postIndex(index, new Engine.IndexResult(0, false, 0)); assertEquals(0, preIndex.get()); assertEquals(2, postIndex.get()); assertEquals(0, postIndexException.get());