Add support for providing absolute start time to SearchRequest (#37142)

We have recently added support for providing a local cluster alias to a
SearchRequest through a package protected constructor. When executing
cross-cluster search requests with local reduction on each cluster, the
CCS coordinating node will have to provide such cluster alias to each
remote cluster, as well as the absolute start time of the search action
in milliseconds from the time epoch, to be used when evaluating date
math expressions both while executing queries / scripts as well as when
resolving index names.

This commit adds support for providing the start time together with the
cluster alias. It is a final member in the search request, which will
only be set when using cross-cluster search with local reduction (also
known as alternate execution mode). When not provided, the coordinating
node will determine the current time and pass it through (by calling
`System.currentTimeMillis`).

Relates to #32125
This commit is contained in:
Luca Cavanna 2019-01-07 10:28:31 +01:00 committed by GitHub
parent 6347461146
commit 2f4dafa69f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 125 additions and 30 deletions

View File

@ -62,7 +62,10 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
public static final int DEFAULT_PRE_FILTER_SHARD_SIZE = 128;
public static final int DEFAULT_BATCHED_REDUCE_SIZE = 512;
private static final long DEFAULT_ABSOLUTE_START_MILLIS = -1;
private final String localClusterAlias;
private final long absoluteStartMillis;
private SearchType searchType = SearchType.DEFAULT;
@ -95,6 +98,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
public SearchRequest() {
this.localClusterAlias = null;
this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
}
/**
@ -115,6 +119,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
this.source = searchRequest.source;
this.types = searchRequest.types;
this.localClusterAlias = searchRequest.localClusterAlias;
this.absoluteStartMillis = searchRequest.absoluteStartMillis;
}
/**
@ -138,12 +143,17 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
}
/**
* Creates a new search request by providing the alias of the cluster where it will be executed. Used when a {@link SearchRequest}
* is created and executed as part of a cross-cluster search request performing local reduction on each cluster.
* The coordinating CCS node provides the alias to prefix index names with in the returned search results.
* Creates a new search request by providing the alias of the cluster where it will be executed, as well as the current time in
* milliseconds from the epoch time. Used when a {@link SearchRequest} is created and executed as part of a cross-cluster search
* request performing local reduction on each cluster. The coordinating CCS node provides the alias to prefix index names with in
* the returned search results, and the current time to be used on the remote clusters to ensure that the same value is used.
*/
SearchRequest(String localClusterAlias) {
SearchRequest(String localClusterAlias, long absoluteStartMillis) {
this.localClusterAlias = Objects.requireNonNull(localClusterAlias, "cluster alias must not be null");
if (absoluteStartMillis < 0) {
throw new IllegalArgumentException("absoluteStartMillis must not be negative but was [" + absoluteStartMillis + "]");
}
this.absoluteStartMillis = absoluteStartMillis;
}
/**
@ -155,10 +165,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
public SearchRequest(StreamInput in) throws IOException {
super(in);
searchType = SearchType.fromId(in.readByte());
indices = new String[in.readVInt()];
for (int i = 0; i < indices.length; i++) {
indices[i] = in.readString();
}
indices = in.readStringArray();
routing = in.readOptionalString();
preference = in.readOptionalString();
scroll = in.readOptionalWriteable(Scroll::new);
@ -175,8 +182,14 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
//TODO update version after backport
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
localClusterAlias = in.readOptionalString();
if (localClusterAlias != null) {
absoluteStartMillis = in.readVLong();
} else {
absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
}
} else {
localClusterAlias = null;
absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
}
}
@ -184,10 +197,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByte(searchType.id());
out.writeVInt(indices.length);
for (String index : indices) {
out.writeString(index);
}
out.writeStringArray(indices);
out.writeOptionalString(routing);
out.writeOptionalString(preference);
out.writeOptionalWriteable(scroll);
@ -204,6 +214,9 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
//TODO update version after backport
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeOptionalString(localClusterAlias);
if (localClusterAlias != null) {
out.writeVLong(absoluteStartMillis);
}
}
}
@ -243,6 +256,17 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
return localClusterAlias;
}
/**
* Returns the current time in milliseconds from the time epoch, to be used for the execution of this search request. Used to
* ensure that the same value, determined by the coordinating node, is used on all nodes involved in the execution of the search
* request. When created through {@link #SearchRequest(String, long)}, this method returns the provided current time, otherwise
* it will return {@link System#currentTimeMillis()}.
*
*/
long getOrCreateAbsoluteStartMillis() {
return absoluteStartMillis == DEFAULT_ABSOLUTE_START_MILLIS ? System.currentTimeMillis() : absoluteStartMillis;
}
/**
* Sets the indices the search will be executed on.
*/
@ -435,7 +459,6 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
return this.allowPartialSearchResults;
}
/**
* Sets the number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection
* mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large.
@ -498,13 +521,6 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
return preFilterShardSize;
}
/**
* Returns <code>true</code> iff the maxConcurrentShardRequest is set.
*/
boolean isMaxConcurrentShardRequestsSet() {
return maxConcurrentShardRequests != 0;
}
/**
* @return true if the request only has suggest
*/
@ -538,7 +554,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
}
@Override
public void readFrom(StreamInput in) throws IOException {
public void readFrom(StreamInput in) {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@ -564,14 +580,15 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
Objects.equals(preFilterShardSize, that.preFilterShardSize) &&
Objects.equals(indicesOptions, that.indicesOptions) &&
Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults) &&
Objects.equals(localClusterAlias, that.localClusterAlias);
Objects.equals(localClusterAlias, that.localClusterAlias) &&
absoluteStartMillis == that.absoluteStartMillis;
}
@Override
public int hashCode() {
return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache,
scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize,
allowPartialSearchResults, localClusterAlias);
allowPartialSearchResults, localClusterAlias, absoluteStartMillis);
}
@Override
@ -590,6 +607,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
", preFilterShardSize=" + preFilterShardSize +
", allowPartialSearchResults=" + allowPartialSearchResults +
", localClusterAlias=" + localClusterAlias +
", getOrCreateAbsoluteStartMillis=" + absoluteStartMillis +
", source=" + source + '}';
}
}

View File

@ -180,10 +180,9 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
@Override
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
final long absoluteStartMillis = System.currentTimeMillis();
final long relativeStartNanos = System.nanoTime();
final SearchTimeProvider timeProvider =
new SearchTimeProvider(absoluteStartMillis, relativeStartNanos, System::nanoTime);
new SearchTimeProvider(searchRequest.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime);
ActionListener<SearchSourceBuilder> rewriteListener = ActionListener.wrap(source -> {
if (source != searchRequest.source()) {
// only set it if it changed - we don't allow null values to be set but it might be already null be we want to catch

View File

@ -40,6 +40,9 @@ import java.util.Base64;
import java.util.List;
import static org.elasticsearch.test.EqualsHashCodeTestUtils.checkEqualsAndHashCode;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
public class SearchRequestTests extends AbstractSearchTestCase {
@ -48,12 +51,19 @@ public class SearchRequestTests extends AbstractSearchTestCase {
if (randomBoolean()) {
return super.createSearchRequest();
}
//clusterAlias does not have public getter/setter hence we randomize it only in this test specifically.
SearchRequest searchRequest = new SearchRequest(randomAlphaOfLengthBetween(5, 10));
//clusterAlias and absoluteStartMillis do not have public getters/setters hence we randomize them only in this test specifically.
SearchRequest searchRequest = new SearchRequest(randomAlphaOfLengthBetween(5, 10), randomNonNegativeLong());
RandomSearchRequestGenerator.randomSearchRequest(searchRequest, this::createSearchSourceBuilder);
return searchRequest;
}
public void testClusterAliasValidation() {
expectThrows(NullPointerException.class, () -> new SearchRequest(null, 0));
expectThrows(IllegalArgumentException.class, () -> new SearchRequest("", -1));
SearchRequest searchRequest = new SearchRequest("", 0);
assertNull(searchRequest.validate());
}
public void testSerialization() throws Exception {
SearchRequest searchRequest = createSearchRequest();
SearchRequest deserializedRequest = copyWriteable(searchRequest, namedWriteableRegistry, SearchRequest::new);
@ -69,8 +79,10 @@ public class SearchRequestTests extends AbstractSearchTestCase {
//TODO update version after backport
if (version.before(Version.V_7_0_0)) {
assertNull(deserializedRequest.getLocalClusterAlias());
assertAbsoluteStartMillisIsCurrentTime(deserializedRequest);
} else {
assertEquals(searchRequest.getLocalClusterAlias(), deserializedRequest.getLocalClusterAlias());
assertEquals(searchRequest.getOrCreateAbsoluteStartMillis(), deserializedRequest.getOrCreateAbsoluteStartMillis());
}
}
@ -78,13 +90,21 @@ public class SearchRequestTests extends AbstractSearchTestCase {
public void testReadFromPre7_0_0() throws IOException {
String msg = "AAEBBWluZGV4AAAAAQACAAAA/////w8AAAAAAAAA/////w8AAAAAAAACAAAAAAABAAMCBAUBAAKABACAAQIAAA==";
try (StreamInput in = StreamInput.wrap(Base64.getDecoder().decode(msg))) {
in.setVersion(Version.V_6_6_0);
in.setVersion(VersionUtils.randomVersionBetween(random(), Version.V_6_4_0, VersionUtils.getPreviousVersion(Version.V_7_0_0)));
SearchRequest searchRequest = new SearchRequest(in);
assertArrayEquals(new String[]{"index"}, searchRequest.indices());
assertNull(searchRequest.getLocalClusterAlias());
assertAbsoluteStartMillisIsCurrentTime(searchRequest);
}
}
private static void assertAbsoluteStartMillisIsCurrentTime(SearchRequest searchRequest) {
long before = System.currentTimeMillis();
long absoluteStartMillis = searchRequest.getOrCreateAbsoluteStartMillis();
long after = System.currentTimeMillis();
assertThat(absoluteStartMillis, allOf(greaterThanOrEqualTo(before), lessThanOrEqualTo(after)));
}
public void testIllegalArguments() {
SearchRequest searchRequest = new SearchRequest();
assertNotNull(searchRequest.indices());

View File

@ -21,14 +21,18 @@ package org.elasticsearch.action.search;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESSingleNodeTestCase;
public class TransportSearchActionSingleNodeTests extends ESSingleNodeTestCase {
public void testLocalClusterAlias() {
long nowInMillis = System.currentTimeMillis();
IndexRequest indexRequest = new IndexRequest("test");
indexRequest.id("1");
indexRequest.source("field", "value");
@ -37,7 +41,7 @@ public class TransportSearchActionSingleNodeTests extends ESSingleNodeTestCase {
assertEquals(RestStatus.CREATED, indexResponse.status());
{
SearchRequest searchRequest = new SearchRequest("local");
SearchRequest searchRequest = new SearchRequest("local", nowInMillis);
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(1, searchResponse.getHits().getTotalHits().value);
SearchHit[] hits = searchResponse.getHits().getHits();
@ -48,7 +52,7 @@ public class TransportSearchActionSingleNodeTests extends ESSingleNodeTestCase {
assertEquals("1", hit.getId());
}
{
SearchRequest searchRequest = new SearchRequest("");
SearchRequest searchRequest = new SearchRequest("", nowInMillis);
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(1, searchResponse.getHits().getTotalHits().value);
SearchHit[] hits = searchResponse.getHits().getHits();
@ -59,4 +63,58 @@ public class TransportSearchActionSingleNodeTests extends ESSingleNodeTestCase {
assertEquals("1", hit.getId());
}
}
public void testAbsoluteStartMillis() {
{
IndexRequest indexRequest = new IndexRequest("test-1970.01.01");
indexRequest.id("1");
indexRequest.source("date", "1970-01-01");
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
IndexResponse indexResponse = client().index(indexRequest).actionGet();
assertEquals(RestStatus.CREATED, indexResponse.status());
}
{
IndexRequest indexRequest = new IndexRequest("test-1982.01.01");
indexRequest.id("1");
indexRequest.source("date", "1982-01-01");
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
IndexResponse indexResponse = client().index(indexRequest).actionGet();
assertEquals(RestStatus.CREATED, indexResponse.status());
}
{
SearchRequest searchRequest = new SearchRequest();
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(2, searchResponse.getHits().getTotalHits().value);
}
{
SearchRequest searchRequest = new SearchRequest("<test-{now/d}>");
searchRequest.indicesOptions(IndicesOptions.fromOptions(true, true, true, true));
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(0, searchResponse.getTotalShards());
}
{
SearchRequest searchRequest = new SearchRequest("", 0);
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(2, searchResponse.getHits().getTotalHits().value);
}
{
SearchRequest searchRequest = new SearchRequest("", 0);
searchRequest.indices("<test-{now/d}>");
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(1, searchResponse.getHits().getTotalHits().value);
assertEquals("test-1970.01.01", searchResponse.getHits().getHits()[0].getIndex());
}
{
SearchRequest searchRequest = new SearchRequest("", 0);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
RangeQueryBuilder rangeQuery = new RangeQueryBuilder("date");
rangeQuery.gte("1970-01-01");
rangeQuery.lt("1982-01-01");
sourceBuilder.query(rangeQuery);
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(1, searchResponse.getHits().getTotalHits().value);
assertEquals("test-1970.01.01", searchResponse.getHits().getHits()[0].getIndex());
}
}
}