From 02ecff13e4e192378bf06305e3fd961515aab981 Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Mon, 31 Oct 2016 20:44:11 -0400 Subject: [PATCH] incorporate feedback --- .../action/bulk/TransportShardBulkAction.java | 4 ++- .../action/index/TransportIndexAction.java | 2 +- .../TransportReplicationAction.java | 4 ++- .../replication/TransportWriteAction.java | 3 +++ .../elasticsearch/index/engine/Engine.java | 10 +------ .../shard/IndexingOperationListener.java | 18 ++++++++++--- .../indices/IndexingMemoryController.java | 10 ++----- .../TransportWriteActionTests.java | 26 ++++++++++--------- 8 files changed, 41 insertions(+), 36 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 6324c684a6f..18fe1c15d78 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -134,7 +134,7 @@ public class TransportShardBulkAction extends TransportWriteActionfinalResponseIfSuccessful or finalFailure to be not-null */ public PrimaryResult(ReplicaRequest replicaRequest, Response finalResponseIfSuccessful, Exception finalFailure) { - assert finalFailure != null ^ finalResponseIfSuccessful != null : "either a response or a failure has to be not null"; + assert finalFailure != null ^ finalResponseIfSuccessful != null + : "either a response or a failure has to be not null, " + + "found [" + finalFailure + "] failure and ["+ finalResponseIfSuccessful + "] response"; this.replicaRequest = replicaRequest; this.finalResponseIfSuccessful = finalResponseIfSuccessful; this.finalFailure = finalFailure; diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index acc512f1a07..111edf5606b 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -89,6 +89,9 @@ public abstract class TransportWriteAction< @Nullable Location location, @Nullable Exception operationFailure, IndexShard primary) { super(request, finalResponse, operationFailure); + if (location != null) { + assert operationFailure == null : "expected no failures when translog location is not null"; + } if (operationFailure != null) { this.finishedAsyncActions = true; } else { diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 0cd1a3022b1..0df026ad617 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -334,17 +334,9 @@ public abstract class Engine implements Closeable { return operationType; } - /** get size of the translog operation if translog location has been set */ - public int getSizeInBytes() { - if (translogLocation != null) { - return translogLocation.size; - } else { - throw new IllegalStateException("result has null location, use Operation#estimatedSizeInBytes instead"); - } - } - void setTranslogLocation(Translog.Location translogLocation) { if (freeze == false) { + assert failure == null : "failure has to be null to set translog location"; this.translogLocation = translogLocation; } else { throw new IllegalStateException("result is already frozen"); diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java b/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java index 0e605954248..e0114a918ff 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java @@ -38,12 +38,17 @@ public interface IndexingOperationListener { } /** - * Called after the indexing operation occurred. + * Called after the indexing operation occurred. Implementations should + * check {@link Engine.IndexResult#hasFailure()} for operation failures + * and delegate to {@link #postIndex(Engine.Index, Exception)} with + * {@link Engine.IndexResult#getFailure()} if appropriate */ default void postIndex(Engine.Index index, Engine.IndexResult result) {} /** - * Called after the indexing operation occurred with exception. + * Called after the indexing operation occurred with exception that + * is not specific to the {@link Engine.Index} i.e. persistent engine + * failures etc. */ default void postIndex(Engine.Index index, Exception ex) {} @@ -56,12 +61,17 @@ public interface IndexingOperationListener { /** - * Called after the delete operation occurred. + * Called after the delete operation occurred. Implementations should + * check {@link Engine.DeleteResult#hasFailure()} for operation failures + * and delegate to {@link #postDelete(Engine.Delete, Exception)} with + * {@link Engine.DeleteResult#getFailure()} if appropriate */ default void postDelete(Engine.Delete delete, Engine.DeleteResult result) {} /** - * Called after the delete operation occurred with exception. + * Called after the delete operation occurred with exception that + * is not specific to the {@link Engine.Delete} i.e. persistent engine + * failures etc. */ default void postDelete(Engine.Delete delete, Exception ex) {} diff --git a/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java b/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java index dd950806b6f..11dbfb36f4f 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java +++ b/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java @@ -209,16 +209,10 @@ public class IndexingMemoryController extends AbstractComponent implements Index recordOperationBytes(delete, result); } - /** called by IndexShard to record that this many bytes were written to translog */ + /** called by IndexShard to record estimated bytes written to translog for the operation */ private void recordOperationBytes(Engine.Operation operation, Engine.Result result) { if (result.hasFailure() == false) { - final int sizeInBytes; - if (result.getTranslogLocation() != null) { - sizeInBytes = result.getSizeInBytes(); - } else { - sizeInBytes = operation.estimatedSizeInBytes(); - } - statusChecker.bytesWritten(sizeInBytes); + statusChecker.bytesWritten(operation.estimatedSizeInBytes()); } } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 712d1ebe993..571bbfa72e0 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import org.junit.Before; import org.mockito.ArgumentCaptor; @@ -131,22 +132,23 @@ public class TransportWriteActionTests extends ESTestCase { } public void testDocumentFailureInShardOperationOnPrimary() throws Exception { - handleDocumentFailure(new TestAction(true, true), TestAction::shardOperationOnPrimary, TestAction.WritePrimaryResult::respond); + TestRequest request = new TestRequest(); + TestAction testAction = new TestAction(true, true); + TransportWriteAction.WritePrimaryResult writePrimaryResult = + testAction.shardOperationOnPrimary(request, indexShard); + CapturingActionListener listener = new CapturingActionListener<>(); + writePrimaryResult.respond(listener); + assertNull(listener.response); + assertNotNull(listener.failure); } public void testDocumentFailureInShardOperationOnReplica() throws Exception { - handleDocumentFailure(new TestAction(randomBoolean(), true), TestAction::shardOperationOnReplica, - TestAction.WriteReplicaResult::respond); - } - - private void handleDocumentFailure(TestAction testAction, - ThrowingTriFunction action, - BiConsumer> responder) - throws Exception { TestRequest request = new TestRequest(); - Result result = action.apply(testAction, request, indexShard); - CapturingActionListener listener = new CapturingActionListener<>(); - responder.accept(result, listener); + TestAction testAction = new TestAction(randomBoolean(), true); + TransportWriteAction.WriteReplicaResult writeReplicaResult = + testAction.shardOperationOnReplica(request, indexShard); + CapturingActionListener listener = new CapturingActionListener<>(); + writeReplicaResult.respond(listener); assertNull(listener.response); assertNotNull(listener.failure); }