Internal: make sure that all delete mapping internal requests share the same original headers and context

Delete mapping executes flush, delete by query and refresh operations internally. Those internal requests are now initialized by passing in the original delete mapping request so that its headers and request context are kept around.

Closes #7736
This commit is contained in:
javanna 2014-09-16 11:23:45 +02:00 committed by Luca Cavanna
parent f43a8e2961
commit b9b5842acc
5 changed files with 45 additions and 5 deletions

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.admin.indices.flush;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -47,6 +48,14 @@ public class FlushRequest extends BroadcastOperationRequest<FlushRequest> {
}
/**
* Copy constructor that creates a new flush request that is a copy of the one provided as an argument.
* The new request will inherit though headers and context from the original request that caused it.
*/
public FlushRequest(ActionRequest originalRequest) {
super(originalRequest);
}
/**
* Constructs a new flush request against one or more indices. If nothing is provided, all indices will
* be flushed.

View File

@ -23,10 +23,13 @@ import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.flush.TransportFlushAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.admin.indices.refresh.TransportRefreshAction;
import org.elasticsearch.action.deletebyquery.DeleteByQueryRequest;
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse;
import org.elasticsearch.action.deletebyquery.TransportDeleteByQueryAction;
@ -35,7 +38,6 @@ import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.action.support.QuerySourceBuilder;
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
@ -112,7 +114,7 @@ public class TransportDeleteMappingAction extends TransportMasterNodeOperationAc
@Override
protected void masterOperation(final DeleteMappingRequest request, final ClusterState state, final ActionListener<DeleteMappingResponse> listener) throws ElasticsearchException {
final String[] concreteIndices = state.metaData().concreteIndices(request.indicesOptions(), request.indices());
flushAction.execute(Requests.flushRequest(concreteIndices), new ActionListener<FlushResponse>() {
flushAction.execute(new FlushRequest(request).indices(concreteIndices), new ActionListener<FlushResponse>() {
@Override
public void onResponse(FlushResponse flushResponse) {
if (logger.isTraceEnabled()) {
@ -138,7 +140,9 @@ public class TransportDeleteMappingAction extends TransportMasterNodeOperationAc
request.types(types.toArray(new String[types.size()]));
QuerySourceBuilder querySourceBuilder = new QuerySourceBuilder()
.setQuery(QueryBuilders.filteredQuery(QueryBuilders.matchAllQuery(), filterBuilder));
deleteByQueryAction.execute(Requests.deleteByQueryRequest(concreteIndices).types(request.types()).source(querySourceBuilder), new ActionListener<DeleteByQueryResponse>() {
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(request).indices(concreteIndices).types(request.types()).source(querySourceBuilder);
deleteByQueryAction.execute(deleteByQueryRequest, new ActionListener<DeleteByQueryResponse>() {
@Override
public void onResponse(DeleteByQueryResponse deleteByQueryResponse) {
if (logger.isTraceEnabled()) {
@ -151,7 +155,7 @@ public class TransportDeleteMappingAction extends TransportMasterNodeOperationAc
}
}
}
refreshAction.execute(Requests.refreshRequest(concreteIndices), new ActionListener<RefreshResponse>() {
refreshAction.execute(new RefreshRequest(request).indices(concreteIndices), new ActionListener<RefreshResponse>() {
@Override
public void onResponse(RefreshResponse refreshResponse) {
if (logger.isTraceEnabled()) {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.admin.indices.refresh;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -41,6 +42,14 @@ public class RefreshRequest extends BroadcastOperationRequest<RefreshRequest> {
RefreshRequest() {
}
/**
* Copy constructor that creates a new refresh request that is a copy of the one provided as an argument.
* The new request will inherit though headers and context from the original request that caused it.
*/
public RefreshRequest(ActionRequest originalRequest) {
super(originalRequest);
}
public RefreshRequest(String... indices) {
super(indices);
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.deletebyquery;
import com.google.common.base.Charsets;
import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.QuerySourceBuilder;
import org.elasticsearch.action.support.replication.IndicesReplicationOperationRequest;
@ -72,6 +73,14 @@ public class DeleteByQueryRequest extends IndicesReplicationOperationRequest<Del
public DeleteByQueryRequest() {
}
/**
* Copy constructor that creates a new delete by query request that is a copy of the one provided as an argument.
* The new request will inherit though headers and context from the original request that caused it.
*/
public DeleteByQueryRequest(ActionRequest originalRequest) {
super(originalRequest);
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = super.validate();
@ -113,6 +122,7 @@ public class DeleteByQueryRequest extends IndicesReplicationOperationRequest<Del
/**
* The source to execute in the form of a map.
*/
@SuppressWarnings("unchecked")
public DeleteByQueryRequest source(Map source) {
try {
XContentBuilder builder = XContentFactory.contentBuilder(Requests.CONTENT_TYPE);

View File

@ -33,7 +33,7 @@ import java.io.IOException;
/**
*
*/
public class IndicesReplicationOperationRequest<T extends IndicesReplicationOperationRequest> extends ActionRequest<T> implements IndicesRequest {
public abstract class IndicesReplicationOperationRequest<T extends IndicesReplicationOperationRequest> extends ActionRequest<T> implements IndicesRequest {
protected TimeValue timeout = ShardReplicationOperationRequest.DEFAULT_TIMEOUT;
protected String[] indices;
@ -46,6 +46,13 @@ public class IndicesReplicationOperationRequest<T extends IndicesReplicationOper
return timeout;
}
protected IndicesReplicationOperationRequest() {
}
protected IndicesReplicationOperationRequest(ActionRequest actionRequest) {
super(actionRequest);
}
/**
* A timeout to wait if the delete by query operation can't be performed immediately. Defaults to <tt>1m</tt>.
*/
@ -74,6 +81,7 @@ public class IndicesReplicationOperationRequest<T extends IndicesReplicationOper
return indicesOptions;
}
@SuppressWarnings("unchecked")
public T indicesOptions(IndicesOptions indicesOptions) {
if (indicesOptions == null) {
throw new IllegalArgumentException("IndicesOptions must not be null");