From 196e3c360261cc4cf25a4d859b41bca3b189c57d Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 26 Mar 2014 09:47:44 +0100 Subject: [PATCH] Capture and set start time in Delete By Query operations This is important for queries/filters that use `now` in date based queries/filters Closes #5540 --- .../IndexDeleteByQueryRequest.java | 19 ++++++++++++++++++- .../ShardDeleteByQueryRequest.java | 16 ++++++++++++++++ .../TransportDeleteByQueryAction.java | 4 ++-- .../TransportShardDeleteByQueryAction.java | 4 ++-- ...portIndicesReplicationOperationAction.java | 5 +++-- .../deleteByQuery/DeleteByQueryTests.java | 12 ++++++++++++ 6 files changed, 53 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/deletebyquery/IndexDeleteByQueryRequest.java b/src/main/java/org/elasticsearch/action/deletebyquery/IndexDeleteByQueryRequest.java index 76fee0e55fb..105fd76399d 100644 --- a/src/main/java/org/elasticsearch/action/deletebyquery/IndexDeleteByQueryRequest.java +++ b/src/main/java/org/elasticsearch/action/deletebyquery/IndexDeleteByQueryRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.deletebyquery; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.replication.IndexReplicationOperationRequest; import org.elasticsearch.common.Nullable; @@ -45,8 +46,11 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest< private Set routing; @Nullable private String[] filteringAliases; + private long nowInMillis; - IndexDeleteByQueryRequest(DeleteByQueryRequest request, String index, @Nullable Set routing, @Nullable String[] filteringAliases) { + IndexDeleteByQueryRequest(DeleteByQueryRequest request, String index, @Nullable Set routing, @Nullable String[] filteringAliases, + long nowInMillis + ) { this.index = index; this.timeout = request.timeout(); this.source = request.source(); @@ -55,6 +59,7 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest< this.consistencyLevel = request.consistencyLevel(); this.routing = routing; this.filteringAliases = filteringAliases; + this.nowInMillis = nowInMillis; } IndexDeleteByQueryRequest() { @@ -85,6 +90,10 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest< return filteringAliases; } + long nowInMillis() { + return nowInMillis; + } + public IndexDeleteByQueryRequest timeout(TimeValue timeout) { this.timeout = timeout; return this; @@ -114,6 +123,11 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest< filteringAliases[i] = in.readString(); } } + if (in.getVersion().onOrAfter(Version.V_1_2_0)) { + nowInMillis = in.readVLong(); + } else { + nowInMillis = System.currentTimeMillis(); + } } public void writeTo(StreamOutput out) throws IOException { @@ -139,5 +153,8 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest< } else { out.writeVInt(0); } + if (out.getVersion().onOrAfter(Version.V_1_2_0)) { + out.writeVLong(nowInMillis); + } } } diff --git a/src/main/java/org/elasticsearch/action/deletebyquery/ShardDeleteByQueryRequest.java b/src/main/java/org/elasticsearch/action/deletebyquery/ShardDeleteByQueryRequest.java index 7d42bc5389b..d3bcba009c8 100644 --- a/src/main/java/org/elasticsearch/action/deletebyquery/ShardDeleteByQueryRequest.java +++ b/src/main/java/org/elasticsearch/action/deletebyquery/ShardDeleteByQueryRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.deletebyquery; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest; import org.elasticsearch.common.Nullable; @@ -47,6 +48,7 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest< private Set routing; @Nullable private String[] filteringAliases; + private long nowInMillis; ShardDeleteByQueryRequest(IndexDeleteByQueryRequest request, int shardId) { super(request); @@ -59,6 +61,7 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest< timeout = request.timeout(); this.routing = request.routing(); filteringAliases = request.filteringAliases(); + nowInMillis = request.nowInMillis(); } ShardDeleteByQueryRequest() { @@ -93,6 +96,10 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest< return filteringAliases; } + long nowInMillis() { + return nowInMillis; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -113,6 +120,12 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest< filteringAliases[i] = in.readString(); } } + + if (in.getVersion().onOrAfter(Version.V_1_2_0)) { + nowInMillis = in.readVLong(); + } else { + nowInMillis = System.currentTimeMillis(); + } } @Override @@ -137,6 +150,9 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest< } else { out.writeVInt(0); } + if (out.getVersion().onOrAfter(Version.V_1_2_0)) { + out.writeVLong(nowInMillis); + } } @Override diff --git a/src/main/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryAction.java b/src/main/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryAction.java index 9be61c5a75f..ee6eacaaef2 100644 --- a/src/main/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryAction.java +++ b/src/main/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryAction.java @@ -100,8 +100,8 @@ public class TransportDeleteByQueryAction extends TransportIndicesReplicationOpe } @Override - protected IndexDeleteByQueryRequest newIndexRequestInstance(DeleteByQueryRequest request, String index, Set routing) { + protected IndexDeleteByQueryRequest newIndexRequestInstance(DeleteByQueryRequest request, String index, Set routing, long startTimeInMillis) { String[] filteringAliases = clusterService.state().metaData().filteringAliases(index, request.indices()); - return new IndexDeleteByQueryRequest(request, index, routing, filteringAliases); + return new IndexDeleteByQueryRequest(request, index, routing, filteringAliases, startTimeInMillis); } } diff --git a/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java b/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java index 4dddb549873..1092c0387c5 100644 --- a/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java +++ b/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java @@ -115,7 +115,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication IndexService indexService = indicesService.indexServiceSafe(shardRequest.request.index()); IndexShard indexShard = indexService.shardSafe(shardRequest.shardId); - SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest().types(request.types()), null, + SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest().types(request.types()).nowInMillis(request.nowInMillis()), null, indexShard.acquireSearcher("delete_by_query"), indexService, indexShard, scriptService, cacheRecycler, pageCacheRecycler, bigArrays)); try { @@ -138,7 +138,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication IndexService indexService = indicesService.indexServiceSafe(shardRequest.request.index()); IndexShard indexShard = indexService.shardSafe(shardRequest.shardId); - SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest().types(request.types()), null, + SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest().types(request.types()).nowInMillis(request.nowInMillis()), null, indexShard.acquireSearcher("delete_by_query", IndexShard.Mode.WRITE), indexService, indexShard, scriptService, cacheRecycler, pageCacheRecycler, bigArrays)); try { diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java index 5c0dd266599..53ea0522314 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java @@ -84,6 +84,7 @@ public abstract class TransportIndicesReplicationOperationAction indexResponses = new AtomicReferenceArray(concreteIndices.length); + final long startTimeInMillis = System.currentTimeMillis(); Map> routingMap = resolveRouting(clusterState, request); if (concreteIndices == null || concreteIndices.length == 0) { @@ -94,7 +95,7 @@ public abstract class TransportIndicesReplicationOperationAction() { @@ -127,7 +128,7 @@ public abstract class TransportIndicesReplicationOperationAction routing); + protected abstract IndexRequest newIndexRequestInstance(Request request, String index, Set routing, long startTimeInMillis); protected abstract boolean accumulateExceptions(); diff --git a/src/test/java/org/elasticsearch/deleteByQuery/DeleteByQueryTests.java b/src/test/java/org/elasticsearch/deleteByQuery/DeleteByQueryTests.java index f98a68b43bc..fd39fdc53fc 100644 --- a/src/test/java/org/elasticsearch/deleteByQuery/DeleteByQueryTests.java +++ b/src/test/java/org/elasticsearch/deleteByQuery/DeleteByQueryTests.java @@ -142,4 +142,16 @@ public class DeleteByQueryTests extends ElasticsearchIntegrationTest { } + @Test + public void testDateMath() throws Exception { + index("test", "type", "1", "d", "2013-01-01"); + ensureGreen(); + refresh(); + assertHitCount(client().prepareCount("test").get(), 1); + client().prepareDeleteByQuery("test").setQuery(QueryBuilders.rangeQuery("d").to("now-1h")).get(); + refresh(); + assertHitCount(client().prepareCount("test").get(), 0); + } + + }