Internal: make sure that update internal requests share the same original headers and request context

Update request internally executes index and delete operations. We need to make sure that those internal operations hold the same headers and context as the original update request. Achieved via copy constructors that accept the current request and the original request.

Closes #7766
This commit is contained in:
javanna 2014-09-16 14:35:14 +02:00 committed by Luca Cavanna
parent b9b5842acc
commit 6717de9e46
4 changed files with 55 additions and 8 deletions

View File

@ -53,6 +53,9 @@ public class DeleteRequest extends ShardReplicationOperationRequest<DeleteReques
private long version = Versions.MATCH_ANY;
private VersionType versionType = VersionType.INTERNAL;
public DeleteRequest() {
}
/**
* Constructs a new delete request against the specified index. The {@link #type(String)} and {@link #id(String)}
* must be set.
@ -74,8 +77,19 @@ public class DeleteRequest extends ShardReplicationOperationRequest<DeleteReques
this.id = id;
}
/**
* Copy constructor that creates a new delete request that is a copy of the one provided as an argument.
*/
public DeleteRequest(DeleteRequest request) {
super(request);
this(request, request);
}
/**
* Copy constructor that creates a new delete request that is a copy of the one provided as an argument.
* The new request will inherit though headers and context from the original request that caused it.
*/
public DeleteRequest(DeleteRequest request, ActionRequest originalRequest) {
super(request, originalRequest);
this.type = request.type();
this.id = request.id();
this.routing = request.routing();
@ -84,9 +98,6 @@ public class DeleteRequest extends ShardReplicationOperationRequest<DeleteReques
this.versionType = request.versionType();
}
public DeleteRequest() {
}
/**
* Creates a delete request caused by some other request, which is provided as an
* argument so that its headers and context can be copied to the new request

View File

@ -160,6 +160,28 @@ public class IndexRequest extends ShardReplicationOperationRequest<IndexRequest>
super(request);
}
/**
* Copy constructor that creates a new index request that is a copy of the one provided as an argument.
* The new request will inherit though headers and context from the original request that caused it.
*/
public IndexRequest(IndexRequest indexRequest, ActionRequest originalRequest) {
super(indexRequest, originalRequest);
this.type = indexRequest.type;
this.id = indexRequest.id;
this.routing = indexRequest.routing;
this.parent = indexRequest.parent;
this.timestamp = indexRequest.timestamp;
this.ttl = indexRequest.ttl;
this.source = indexRequest.source;
this.sourceUnsafe = indexRequest.sourceUnsafe;
this.opType = indexRequest.opType;
this.autoGeneratedId = indexRequest.autoGeneratedId;
this.refresh = indexRequest.refresh;
this.version = indexRequest.version;
this.versionType = indexRequest.versionType;
this.contentType = indexRequest.contentType;
}
/**
* Constructs a new index request against the specific index. The {@link #type(String)}
* {@link #source(byte[])} must be set.

View File

@ -54,12 +54,26 @@ public abstract class ShardReplicationOperationRequest<T extends ShardReplicatio
}
/**
* Creates a new request that inherits headers and context from the request provided as argument.
*/
protected ShardReplicationOperationRequest(ActionRequest request) {
super(request);
}
/**
* Copy constructor that creates a new request that is a copy of the one provided as an argument.
*/
protected ShardReplicationOperationRequest(T request) {
super(request);
this(request, request);
}
/**
* Copy constructor that creates a new request that is a copy of the one provided as an argument.
* The new request will inherit though headers and context from the original request that caused it.
*/
protected ShardReplicationOperationRequest(T request, ActionRequest originalRequest) {
super(originalRequest);
this.timeout = request.timeout();
this.index = request.index();
this.threadedOperation = request.operationThreaded();

View File

@ -176,7 +176,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
final UpdateHelper.Result result = updateHelper.prepare(request.request(), indexShard);
switch (result.operation()) {
case UPSERT:
IndexRequest upsertRequest = result.action();
IndexRequest upsertRequest = new IndexRequest((IndexRequest)result.action(), request.request());
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
final BytesReference upsertSourceBytes = upsertRequest.source();
indexAction.execute(upsertRequest, new ActionListener<IndexResponse>() {
@ -211,7 +211,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
});
break;
case INDEX:
IndexRequest indexRequest = result.action();
IndexRequest indexRequest = new IndexRequest((IndexRequest)result.action(), request.request());
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
final BytesReference indexSourceBytes = indexRequest.source();
indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
@ -241,7 +241,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
});
break;
case DELETE:
DeleteRequest deleteRequest = result.action();
DeleteRequest deleteRequest = new DeleteRequest((DeleteRequest)result.action(), request.request());
deleteAction.execute(deleteRequest, new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse response) {