diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 0b27b4c037c..24367fae90a 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -242,7 +242,8 @@ public class TransportShardBulkAction extends TransportWriteAction= 0 && tookInNanos > indexWarnThreshold) { indexLogger.warn("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog)); } else if (indexInfoThreshold >= 0 && tookInNanos > indexInfoThreshold) { diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 7ec35d73a38..d02097141b1 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -422,7 +422,8 @@ public class InternalEngine extends Engine { } } catch (Exception e) { Exception transientOperationFailure = handleOperationFailure(index, e); - result = new IndexResult(transientOperationFailure, index.version(), index.startTime() - System.nanoTime()); + result = new IndexResult(transientOperationFailure, index.version(), + index.startTime() - System.nanoTime(), index.estimatedSizeInBytes()); } return result; } @@ -550,7 +551,7 @@ public class InternalEngine extends Engine { final long expectedVersion = index.version(); if (checkVersionConflict(index, currentVersion, expectedVersion, deleted)) { // skip index operation because of version conflict on recovery - return new IndexResult(null, expectedVersion, false, index.startTime() - System.nanoTime()); + return new IndexResult(null, expectedVersion, false, index.startTime() - System.nanoTime(), index.estimatedSizeInBytes()); } else { updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion); index.parsedDoc().version().setLongValue(updatedVersion); @@ -561,7 +562,7 @@ public class InternalEngine extends Engine { update(index.uid(), index.docs(), indexWriter); } location = maybeAddToTranslog(index, updatedVersion, Translog.Index::new, NEW_VERSION_VALUE); - return new IndexResult(location, updatedVersion, deleted, index.startTime() - System.nanoTime()); + return new IndexResult(location, updatedVersion, deleted, index.startTime() - System.nanoTime(), index.estimatedSizeInBytes()); } } } @@ -591,7 +592,8 @@ public class InternalEngine extends Engine { result = innerDelete(delete); } catch (Exception e) { Exception transientOperationFailure = handleOperationFailure(delete, e); - result = new DeleteResult(transientOperationFailure, delete.version(), delete.startTime() - System.nanoTime()); + result = new DeleteResult(transientOperationFailure, delete.version(), + delete.startTime() - System.nanoTime(), delete.estimatedSizeInBytes()); } maybePruneDeletedTombstones(); return result; @@ -626,12 +628,14 @@ public class InternalEngine extends Engine { final long expectedVersion = delete.version(); if (checkVersionConflict(delete, currentVersion, expectedVersion, deleted)) { // skip executing delete because of version conflict on recovery - return new DeleteResult(null, expectedVersion, true, delete.startTime() - System.nanoTime()); + return new DeleteResult(null, expectedVersion, true, + delete.startTime() - System.nanoTime(), delete.estimatedSizeInBytes()); } else { updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion); found = deleteIfFound(delete.uid(), currentVersion, deleted, versionValue); location = maybeAddToTranslog(delete, updatedVersion, Translog.Delete::new, DeleteVersionValue::new); - return new DeleteResult(location, updatedVersion, found, delete.startTime() - System.nanoTime()); + return new DeleteResult(location, updatedVersion, found, + delete.startTime() - System.nanoTime(), delete.estimatedSizeInBytes()); } } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java b/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java index 042ddec924e..0e605954248 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java @@ -40,7 +40,7 @@ public interface IndexingOperationListener { /** * Called after the indexing operation occurred. */ - default void postIndex(Engine.Index index, boolean created) {} + default void postIndex(Engine.Index index, Engine.IndexResult result) {} /** * Called after the indexing operation occurred with exception. @@ -58,7 +58,7 @@ public interface IndexingOperationListener { /** * Called after the delete operation occurred. */ - default void postDelete(Engine.Delete delete) {} + default void postDelete(Engine.Delete delete, Engine.DeleteResult result) {} /** * Called after the delete operation occurred with exception. @@ -91,11 +91,11 @@ public interface IndexingOperationListener { } @Override - public void postIndex(Engine.Index index, boolean created) { + public void postIndex(Engine.Index index, Engine.IndexResult result) { assert index != null; for (IndexingOperationListener listener : listeners) { try { - listener.postIndex(index, created); + listener.postIndex(index, result); } catch (Exception e) { logger.warn((Supplier) () -> new ParameterizedMessage("postIndex listener [{}] failed", listener), e); } @@ -129,11 +129,11 @@ public interface IndexingOperationListener { } @Override - public void postDelete(Engine.Delete delete) { + public void postDelete(Engine.Delete delete, Engine.DeleteResult result) { assert delete != null; for (IndexingOperationListener listener : listeners) { try { - listener.postDelete(delete); + listener.postDelete(delete, result); } catch (Exception e) { logger.warn((Supplier) () -> new ParameterizedMessage("postDelete listener [{}] failed", listener), e); } diff --git a/core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java b/core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java index f62b8f7fe3c..39a415ca8eb 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java +++ b/core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java @@ -74,9 +74,9 @@ final class InternalIndexingStats implements IndexingOperationListener { } @Override - public void postIndex(Engine.Index index, boolean created) { + public void postIndex(Engine.Index index, Engine.IndexResult result) { if (!index.origin().isRecovery()) { - long took = index.endTime() - index.startTime(); + long took = result.getTook(); totalStats.indexMetric.inc(took); totalStats.indexCurrent.dec(); StatsHolder typeStats = typeStats(index.type()); @@ -106,9 +106,9 @@ final class InternalIndexingStats implements IndexingOperationListener { } @Override - public void postDelete(Engine.Delete delete) { + public void postDelete(Engine.Delete delete, Engine.DeleteResult result) { if (!delete.origin().isRecovery()) { - long took = delete.endTime() - delete.startTime(); + long took = result.getTook(); totalStats.deleteMetric.inc(took); totalStats.deleteCurrent.dec(); StatsHolder typeStats = typeStats(delete.type()); diff --git a/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java b/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java index 3b4258a8bdf..25b133f390d 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java +++ b/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java @@ -189,11 +189,6 @@ public class IndexingMemoryController extends AbstractComponent implements Index statusChecker.run(); } - /** called by IndexShard to record that this many bytes were written to translog */ - public void bytesWritten(int bytes) { - statusChecker.bytesWritten(bytes); - } - /** Asks this shard to throttle indexing to one thread */ protected void activateThrottling(IndexShard shard) { shard.activateThrottling(); @@ -205,17 +200,18 @@ public class IndexingMemoryController extends AbstractComponent implements Index } @Override - public void postIndex(Engine.Index index, boolean created) { - recordOperationBytes(index); + public void postIndex(Engine.Index index, Engine.IndexResult result) { + recordOperationBytes(result); } @Override - public void postDelete(Engine.Delete delete) { - recordOperationBytes(delete); + public void postDelete(Engine.Delete delete, Engine.DeleteResult result) { + recordOperationBytes(result); } - private void recordOperationBytes(Engine.Operation op) { - bytesWritten(op.sizeInBytes()); + /** called by IndexShard to record that this many bytes were written to translog */ + private void recordOperationBytes(Engine.Result result) { + statusChecker.bytesWritten(result.getSizeInBytes()); } private static final class ShardAndBytesUsed implements Comparable { diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 2248ff156ac..9cf6594e42c 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -408,7 +408,7 @@ public class IndexShardIT extends ESSingleNodeTestCase { IndexingOperationListener listener = new IndexingOperationListener() { @Override - public void postIndex(Engine.Index index, boolean created) { + public void postIndex(Engine.Index index, Engine.IndexResult result) { try { assertNotNull(shardRef.get()); // this is all IMC needs to do - check current memory and refresh @@ -422,7 +422,7 @@ public class IndexShardIT extends ESSingleNodeTestCase { @Override - public void postDelete(Engine.Delete delete) { + public void postDelete(Engine.Delete delete, Engine.DeleteResult result) { try { assertNotNull(shardRef.get()); // this is all IMC needs to do - check current memory and refresh diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 9c6f21c6958..2b417c10ae7 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -564,8 +564,8 @@ public class IndexShardTests extends IndexShardTestCase { } @Override - public void postIndex(Engine.Index index, boolean created) { - if (created) { + public void postIndex(Engine.Index index, Engine.IndexResult result) { + if (result.isCreated()) { postIndexCreate.incrementAndGet(); } else { postIndexUpdate.incrementAndGet(); @@ -584,7 +584,7 @@ public class IndexShardTests extends IndexShardTestCase { } @Override - public void postDelete(Engine.Delete delete) { + public void postDelete(Engine.Delete delete, Engine.DeleteResult result) { postDelete.incrementAndGet(); } @@ -1127,7 +1127,7 @@ public class IndexShardTests extends IndexShardTestCase { } @Override - public void postIndex(Engine.Index index, boolean created) { + public void postIndex(Engine.Index index, Engine.IndexResult result) { postIndex.incrementAndGet(); } @@ -1138,7 +1138,7 @@ public class IndexShardTests extends IndexShardTestCase { } @Override - public void postDelete(Engine.Delete delete) { + public void postDelete(Engine.Delete delete, Engine.DeleteResult result) { postDelete.incrementAndGet(); } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java index d1cf8b32f58..15b40e4e09c 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java @@ -46,7 +46,7 @@ public class IndexingOperationListenerTests extends ESTestCase{ } @Override - public void postIndex(Engine.Index index, boolean created) { + public void postIndex(Engine.Index index, Engine.IndexResult result) { postIndex.incrementAndGet(); } @@ -62,7 +62,7 @@ public class IndexingOperationListenerTests extends ESTestCase{ } @Override - public void postDelete(Engine.Delete delete) { + public void postDelete(Engine.Delete delete, Engine.DeleteResult result) { postDelete.incrementAndGet(); } @@ -79,12 +79,14 @@ public class IndexingOperationListenerTests extends ESTestCase{ } @Override - public void postIndex(Engine.Index index, boolean created) { - throw new RuntimeException(); } + public void postIndex(Engine.Index index, Engine.IndexResult result) { + throw new RuntimeException(); + } @Override public void postIndex(Engine.Index index, Exception ex) { - throw new RuntimeException(); } + throw new RuntimeException(); + } @Override public Engine.Delete preDelete(Engine.Delete delete) { @@ -92,8 +94,9 @@ public class IndexingOperationListenerTests extends ESTestCase{ } @Override - public void postDelete(Engine.Delete delete) { - throw new RuntimeException(); } + public void postDelete(Engine.Delete delete, Engine.DeleteResult result) { + throw new RuntimeException(); + } @Override public void postDelete(Engine.Delete delete, Exception ex) { @@ -111,7 +114,7 @@ public class IndexingOperationListenerTests extends ESTestCase{ IndexingOperationListener.CompositeListener compositeListener = new IndexingOperationListener.CompositeListener(indexingOperationListeners, logger); Engine.Delete delete = new Engine.Delete("test", "1", new Term("_uid", "1")); Engine.Index index = new Engine.Index(new Term("_uid", "1"), null); - compositeListener.postDelete(delete); + compositeListener.postDelete(delete, new Engine.DeleteResult(null, 1, true, 0, 0)); assertEquals(0, preIndex.get()); assertEquals(0, postIndex.get()); assertEquals(0, postIndexException.get()); @@ -135,7 +138,7 @@ public class IndexingOperationListenerTests extends ESTestCase{ assertEquals(2, postDelete.get()); assertEquals(2, postDeleteException.get()); - compositeListener.postIndex(index, false); + compositeListener.postIndex(index, new Engine.IndexResult(null, 0, false, 0, 0)); assertEquals(0, preIndex.get()); assertEquals(2, postIndex.get()); assertEquals(0, postIndexException.get()); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index c73f22fa73b..fbb87d9f8d1 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -441,7 +441,7 @@ public abstract class IndexShardTestCase extends ESTestCase { } protected Engine.Index indexDoc(IndexShard shard, String type, String id, String source) { - final Engine.Operation index; + final Engine.Index index; if (shard.routingEntry().primary()) { index = shard.prepareIndexOnPrimary( SourceToParse.source(SourceToParse.Origin.PRIMARY, shard.shardId().getIndexName(), type, id, new BytesArray(source)), @@ -452,7 +452,7 @@ public abstract class IndexShardTestCase extends ESTestCase { 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); } shard.index(index); - return ((Engine.Index) index); + return index; } protected Engine.Delete deleteDoc(IndexShard shard, String type, String id) {