incorporate feedback

This commit is contained in:
Areek Zillur 2016-10-31 20:44:11 -04:00
parent eafd3dfc55
commit 02ecff13e4
8 changed files with 41 additions and 36 deletions

View File

@ -134,7 +134,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
// execute item request // execute item request
final Engine.Result operationResult; final Engine.Result operationResult;
final DocWriteResponse response; final DocWriteResponse response;
BulkItemRequest replicaRequest = request.items()[requestIndex]; final BulkItemRequest replicaRequest;
switch (itemRequest.opType()) { switch (itemRequest.opType()) {
case CREATE: case CREATE:
case INDEX: case INDEX:
@ -144,6 +144,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
response = indexResult.hasFailure() ? null response = indexResult.hasFailure() ? null
: new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(), : new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(),
indexResult.getVersion(), indexResult.isCreated()); indexResult.getVersion(), indexResult.isCreated());
replicaRequest = request.items()[requestIndex];
break; break;
case UPDATE: case UPDATE:
UpdateResultHolder updateResultHolder = executeUpdateRequest(((UpdateRequest) itemRequest), UpdateResultHolder updateResultHolder = executeUpdateRequest(((UpdateRequest) itemRequest),
@ -159,6 +160,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
response = deleteResult.hasFailure() ? null : response = deleteResult.hasFailure() ? null :
new DeleteResponse(request.shardId(), deleteRequest.type(), deleteRequest.id(), new DeleteResponse(request.shardId(), deleteRequest.type(), deleteRequest.id(),
deleteResult.getVersion(), deleteResult.isFound()); deleteResult.getVersion(), deleteResult.isFound());
replicaRequest = request.items()[requestIndex];
break; break;
default: throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found"); default: throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found");
} }

View File

@ -166,7 +166,7 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
final Engine.Index operation; final Engine.Index operation;
try { try {
operation = replica.prepareIndexOnReplica(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry()); operation = replica.prepareIndexOnReplica(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
} catch (MapperParsingException | IllegalArgumentException e) { } catch (MapperParsingException e) {
return new Engine.IndexResult(e, request.version()); return new Engine.IndexResult(e, request.version());
} }
Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); Mapping update = operation.parsedDoc().dynamicMappingsUpdate();

View File

@ -378,7 +378,9 @@ public abstract class TransportReplicationAction<
* expects <code>finalResponseIfSuccessful</code> or <code>finalFailure</code> to be not-null * expects <code>finalResponseIfSuccessful</code> or <code>finalFailure</code> to be not-null
*/ */
public PrimaryResult(ReplicaRequest replicaRequest, Response finalResponseIfSuccessful, Exception finalFailure) { public PrimaryResult(ReplicaRequest replicaRequest, Response finalResponseIfSuccessful, Exception finalFailure) {
assert finalFailure != null ^ finalResponseIfSuccessful != null : "either a response or a failure has to be not null"; assert finalFailure != null ^ finalResponseIfSuccessful != null
: "either a response or a failure has to be not null, " +
"found [" + finalFailure + "] failure and ["+ finalResponseIfSuccessful + "] response";
this.replicaRequest = replicaRequest; this.replicaRequest = replicaRequest;
this.finalResponseIfSuccessful = finalResponseIfSuccessful; this.finalResponseIfSuccessful = finalResponseIfSuccessful;
this.finalFailure = finalFailure; this.finalFailure = finalFailure;

View File

@ -89,6 +89,9 @@ public abstract class TransportWriteAction<
@Nullable Location location, @Nullable Exception operationFailure, @Nullable Location location, @Nullable Exception operationFailure,
IndexShard primary) { IndexShard primary) {
super(request, finalResponse, operationFailure); super(request, finalResponse, operationFailure);
if (location != null) {
assert operationFailure == null : "expected no failures when translog location is not null";
}
if (operationFailure != null) { if (operationFailure != null) {
this.finishedAsyncActions = true; this.finishedAsyncActions = true;
} else { } else {

View File

@ -334,17 +334,9 @@ public abstract class Engine implements Closeable {
return operationType; return operationType;
} }
/** get size of the translog operation if translog location has been set */
public int getSizeInBytes() {
if (translogLocation != null) {
return translogLocation.size;
} else {
throw new IllegalStateException("result has null location, use Operation#estimatedSizeInBytes instead");
}
}
void setTranslogLocation(Translog.Location translogLocation) { void setTranslogLocation(Translog.Location translogLocation) {
if (freeze == false) { if (freeze == false) {
assert failure == null : "failure has to be null to set translog location";
this.translogLocation = translogLocation; this.translogLocation = translogLocation;
} else { } else {
throw new IllegalStateException("result is already frozen"); throw new IllegalStateException("result is already frozen");

View File

@ -38,12 +38,17 @@ public interface IndexingOperationListener {
} }
/** /**
* Called after the indexing operation occurred. * Called after the indexing operation occurred. Implementations should
* check {@link Engine.IndexResult#hasFailure()} for operation failures
* and delegate to {@link #postIndex(Engine.Index, Exception)} with
* {@link Engine.IndexResult#getFailure()} if appropriate
*/ */
default void postIndex(Engine.Index index, Engine.IndexResult result) {} default void postIndex(Engine.Index index, Engine.IndexResult result) {}
/** /**
* Called after the indexing operation occurred with exception. * Called after the indexing operation occurred with exception that
* is not specific to the {@link Engine.Index} i.e. persistent engine
* failures etc.
*/ */
default void postIndex(Engine.Index index, Exception ex) {} default void postIndex(Engine.Index index, Exception ex) {}
@ -56,12 +61,17 @@ public interface IndexingOperationListener {
/** /**
* Called after the delete operation occurred. * Called after the delete operation occurred. Implementations should
* check {@link Engine.DeleteResult#hasFailure()} for operation failures
* and delegate to {@link #postDelete(Engine.Delete, Exception)} with
* {@link Engine.DeleteResult#getFailure()} if appropriate
*/ */
default void postDelete(Engine.Delete delete, Engine.DeleteResult result) {} default void postDelete(Engine.Delete delete, Engine.DeleteResult result) {}
/** /**
* Called after the delete operation occurred with exception. * Called after the delete operation occurred with exception that
* is not specific to the {@link Engine.Delete} i.e. persistent engine
* failures etc.
*/ */
default void postDelete(Engine.Delete delete, Exception ex) {} default void postDelete(Engine.Delete delete, Exception ex) {}

View File

@ -209,16 +209,10 @@ public class IndexingMemoryController extends AbstractComponent implements Index
recordOperationBytes(delete, result); recordOperationBytes(delete, result);
} }
/** called by IndexShard to record that this many bytes were written to translog */ /** called by IndexShard to record estimated bytes written to translog for the operation */
private void recordOperationBytes(Engine.Operation operation, Engine.Result result) { private void recordOperationBytes(Engine.Operation operation, Engine.Result result) {
if (result.hasFailure() == false) { if (result.hasFailure() == false) {
final int sizeInBytes; statusChecker.bytesWritten(operation.estimatedSizeInBytes());
if (result.getTranslogLocation() != null) {
sizeInBytes = result.getSizeInBytes();
} else {
sizeInBytes = operation.estimatedSizeInBytes();
}
statusChecker.bytesWritten(sizeInBytes);
} }
} }

View File

@ -30,6 +30,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.junit.Before; import org.junit.Before;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
@ -131,22 +132,23 @@ public class TransportWriteActionTests extends ESTestCase {
} }
public void testDocumentFailureInShardOperationOnPrimary() throws Exception { public void testDocumentFailureInShardOperationOnPrimary() throws Exception {
handleDocumentFailure(new TestAction(true, true), TestAction::shardOperationOnPrimary, TestAction.WritePrimaryResult::respond); TestRequest request = new TestRequest();
TestAction testAction = new TestAction(true, true);
TransportWriteAction<TestRequest, TestRequest, TestResponse>.WritePrimaryResult writePrimaryResult =
testAction.shardOperationOnPrimary(request, indexShard);
CapturingActionListener<TestResponse> listener = new CapturingActionListener<>();
writePrimaryResult.respond(listener);
assertNull(listener.response);
assertNotNull(listener.failure);
} }
public void testDocumentFailureInShardOperationOnReplica() throws Exception { public void testDocumentFailureInShardOperationOnReplica() throws Exception {
handleDocumentFailure(new TestAction(randomBoolean(), true), TestAction::shardOperationOnReplica,
TestAction.WriteReplicaResult::respond);
}
private <Result, Response> void handleDocumentFailure(TestAction testAction,
ThrowingTriFunction<TestAction, TestRequest, IndexShard, Result> action,
BiConsumer<Result, CapturingActionListener<Response>> responder)
throws Exception {
TestRequest request = new TestRequest(); TestRequest request = new TestRequest();
Result result = action.apply(testAction, request, indexShard); TestAction testAction = new TestAction(randomBoolean(), true);
CapturingActionListener<Response> listener = new CapturingActionListener<>(); TransportWriteAction<TestRequest, TestRequest, TestResponse>.WriteReplicaResult writeReplicaResult =
responder.accept(result, listener); testAction.shardOperationOnReplica(request, indexShard);
CapturingActionListener<TransportResponse.Empty> listener = new CapturingActionListener<>();
writeReplicaResult.respond(listener);
assertNull(listener.response); assertNull(listener.response);
assertNotNull(listener.failure); assertNotNull(listener.failure);
} }