add internal origin to delete by query

This commit is contained in:
Shay Banon 2013-09-22 23:55:47 +02:00
parent 0f17a4c61f
commit e7e39936b8
3 changed files with 16 additions and 3 deletions

View File

@ -92,7 +92,8 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
protected PrimaryResponse<ShardDeleteByQueryResponse, ShardDeleteByQueryRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) { protected PrimaryResponse<ShardDeleteByQueryResponse, ShardDeleteByQueryRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
ShardDeleteByQueryRequest request = shardRequest.request; ShardDeleteByQueryRequest request = shardRequest.request;
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId); IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.querySource(), request.filteringAliases(), request.types()); Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.querySource(), request.filteringAliases(), request.types())
.origin(Engine.Operation.Origin.PRIMARY);
indexShard.deleteByQuery(deleteByQuery); indexShard.deleteByQuery(deleteByQuery);
return new PrimaryResponse<ShardDeleteByQueryResponse, ShardDeleteByQueryRequest>(shardRequest.request, new ShardDeleteByQueryResponse(), null); return new PrimaryResponse<ShardDeleteByQueryResponse, ShardDeleteByQueryRequest>(shardRequest.request, new ShardDeleteByQueryResponse(), null);
} }
@ -102,7 +103,8 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) { protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
ShardDeleteByQueryRequest request = shardRequest.request; ShardDeleteByQueryRequest request = shardRequest.request;
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId); IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.querySource(), request.filteringAliases(), request.types()); Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.querySource(), request.filteringAliases(), request.types())
.origin(Engine.Operation.Origin.REPLICA);
indexShard.deleteByQuery(deleteByQuery); indexShard.deleteByQuery(deleteByQuery);
} }

View File

@ -748,6 +748,7 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
private final Filter aliasFilter; private final Filter aliasFilter;
private final String[] types; private final String[] types;
private final Filter parentFilter; private final Filter parentFilter;
private Operation.Origin origin = Operation.Origin.PRIMARY;
private long startTime; private long startTime;
private long endTime; private long endTime;
@ -789,6 +790,15 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return parentFilter; return parentFilter;
} }
public DeleteByQuery origin(Operation.Origin origin) {
this.origin = origin;
return this;
}
public Operation.Origin origin() {
return this.origin;
}
public DeleteByQuery startTime(long startTime) { public DeleteByQuery startTime(long startTime) {
this.startTime = startTime; this.startTime = startTime;
return this; return this;

View File

@ -691,7 +691,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
break; break;
case DELETE_BY_QUERY: case DELETE_BY_QUERY:
Translog.DeleteByQuery deleteByQuery = (Translog.DeleteByQuery) operation; Translog.DeleteByQuery deleteByQuery = (Translog.DeleteByQuery) operation;
engine.delete(prepareDeleteByQuery(deleteByQuery.source(), deleteByQuery.filteringAliases(), deleteByQuery.types())); engine.delete(prepareDeleteByQuery(deleteByQuery.source(), deleteByQuery.filteringAliases(), deleteByQuery.types())
.origin(Engine.Operation.Origin.RECOVERY));
break; break;
default: default:
throw new ElasticSearchIllegalStateException("No operation defined for [" + operation + "]"); throw new ElasticSearchIllegalStateException("No operation defined for [" + operation + "]");