Make keep alive optional in PointInTimeBuilder (#62720)

Remove the keepAlive parameter from the constructor of PointInTimeBuilder
as it's optional.
This commit is contained in:
Nhat Nguyen 2020-09-22 18:51:24 -04:00
parent 7ffea4621d
commit 663b85b98f
14 changed files with 192 additions and 129 deletions

View File

@ -31,6 +31,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.search.Scroll; import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskId;
@ -479,7 +480,7 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla
return source; return source;
} }
public SearchSourceBuilder.PointInTimeBuilder pointInTimeBuilder() { public PointInTimeBuilder pointInTimeBuilder() {
if (source != null) { if (source != null) {
return source.pointInTimeBuilder(); return source.pointInTimeBuilder();
} }

View File

@ -29,6 +29,7 @@ import org.elasticsearch.script.Script;
import org.elasticsearch.search.Scroll; import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder; import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.collapse.CollapseBuilder; import org.elasticsearch.search.collapse.CollapseBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder; import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
@ -554,13 +555,10 @@ public class SearchRequestBuilder extends ActionRequestBuilder<SearchRequest, Se
} }
/** /**
* Specifies the search context that Elasticsearch should use to perform the query * If specified, Elasticsearch will execute this search request using reader contexts from that point in time.
*
* @param searchContextId the base64 encoded string of the search context id
* @param keepAlive the extended time to live for the search context
*/ */
public SearchRequestBuilder setSearchContext(String searchContextId, TimeValue keepAlive) { public SearchRequestBuilder setPointInTime(PointInTimeBuilder pointInTimeBuilder) {
sourceBuilder().pointInTimeBuilder(new SearchSourceBuilder.PointInTimeBuilder(searchContextId, keepAlive)); sourceBuilder().pointInTimeBuilder(pointInTimeBuilder);
return this; return this;
} }

View File

@ -0,0 +1,128 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.builder;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Objects;
/**
* A search request with a point in time will execute using the reader contexts associated with that point time
* instead of the latest reader contexts.
*/
public final class PointInTimeBuilder implements Writeable, ToXContentObject {
private static final ParseField ID_FIELD = new ParseField("id");
private static final ParseField KEEP_ALIVE_FIELD = new ParseField("keep_alive");
private static final ObjectParser<XContentParams, Void> PARSER;
static {
PARSER = new ObjectParser<>(SearchSourceBuilder.POINT_IN_TIME.getPreferredName(), XContentParams::new);
PARSER.declareString((params, id) -> params.id = id, ID_FIELD);
PARSER.declareField((params, keepAlive) -> params.keepAlive = keepAlive,
(p, c) -> TimeValue.parseTimeValue(p.text(), KEEP_ALIVE_FIELD.getPreferredName()),
KEEP_ALIVE_FIELD, ObjectParser.ValueType.STRING);
}
private static final class XContentParams {
private String id;
private TimeValue keepAlive;
}
private final String id;
private TimeValue keepAlive;
public PointInTimeBuilder(String id) {
this.id = Objects.requireNonNull(id);
}
public PointInTimeBuilder(StreamInput in) throws IOException {
id = in.readString();
keepAlive = in.readOptionalTimeValue();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(id);
out.writeOptionalTimeValue(keepAlive);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(SearchSourceBuilder.POINT_IN_TIME.getPreferredName());
builder.field(ID_FIELD.getPreferredName(), id);
if (keepAlive != null) {
builder.field(KEEP_ALIVE_FIELD.getPreferredName(), keepAlive);
}
builder.endObject();
return builder;
}
public static PointInTimeBuilder fromXContent(XContentParser parser) throws IOException {
final XContentParams params = PARSER.parse(parser, null);
if (params.id == null) {
throw new IllegalArgumentException("point int time id is not provided");
}
return new PointInTimeBuilder(params.id).setKeepAlive(params.keepAlive);
}
/**
* Returns the id of this point in time
*/
public String getId() {
return id;
}
/**
* If specified, the search layer will keep this point in time around for at least the given keep-alive.
* Otherwise, the point in time will be kept around until the original keep alive elapsed.
*/
public PointInTimeBuilder setKeepAlive(TimeValue keepAlive) {
this.keepAlive = keepAlive;
return this;
}
@Nullable
public TimeValue getKeepAlive() {
return keepAlive;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final PointInTimeBuilder that = (PointInTimeBuilder) o;
return Objects.equals(id, that.id) && Objects.equals(keepAlive, that.keepAlive);
}
@Override
public int hashCode() {
return Objects.hash(id, keepAlive);
}
}

View File

@ -31,7 +31,6 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
@ -1693,84 +1692,4 @@ public final class SearchSourceBuilder implements Writeable, ToXContentObject, R
} }
} }
/**
* Specify whether this search should use specific reader contexts instead of the latest ones.
*/
public static final class PointInTimeBuilder implements Writeable, ToXContentObject {
private static final ParseField ID_FIELD = new ParseField("id");
private static final ParseField KEEP_ALIVE_FIELD = new ParseField("keep_alive");
private static final ObjectParser<XContentParams, Void> PARSER;
static {
PARSER = new ObjectParser<>(POINT_IN_TIME.getPreferredName(), XContentParams::new);
PARSER.declareString((params, id) -> params.id = id, ID_FIELD);
PARSER.declareField((params, keepAlive) -> params.keepAlive = keepAlive,
(p, c) -> TimeValue.parseTimeValue(p.text(), KEEP_ALIVE_FIELD.getPreferredName()),
KEEP_ALIVE_FIELD, ObjectParser.ValueType.STRING);
}
private static final class XContentParams {
private String id;
private TimeValue keepAlive;
}
private final String id;
private final TimeValue keepAlive;
public PointInTimeBuilder(String id, TimeValue keepAlive) {
this.id = Objects.requireNonNull(id);
this.keepAlive = keepAlive;
}
public PointInTimeBuilder(StreamInput in) throws IOException {
id = in.readString();
keepAlive = in.readOptionalTimeValue();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(id);
out.writeOptionalTimeValue(keepAlive);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(POINT_IN_TIME.getPreferredName());
builder.field(ID_FIELD.getPreferredName(), id);
if (keepAlive != null) {
builder.field(KEEP_ALIVE_FIELD.getPreferredName(), keepAlive);
}
builder.endObject();
return builder;
}
public static PointInTimeBuilder fromXContent(XContentParser parser) throws IOException {
final XContentParams params = PARSER.parse(parser, null);
if (params.id == null) {
throw new IllegalArgumentException("point int time id is not provided");
}
return new PointInTimeBuilder(params.id, params.keepAlive);
}
public TimeValue getKeepAlive() {
return keepAlive;
}
public String getId() {
return id;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final PointInTimeBuilder that = (PointInTimeBuilder) o;
return Objects.equals(id, that.id) && Objects.equals(keepAlive, that.keepAlive);
}
@Override
public int hashCode() {
return Objects.hash(id, keepAlive);
}
}
} }

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.util.ArrayUtils;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.AbstractSearchTestCase; import org.elasticsearch.search.AbstractSearchTestCase;
import org.elasticsearch.search.Scroll; import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.rescore.QueryRescorerBuilder; import org.elasticsearch.search.rescore.QueryRescorerBuilder;
import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskId;
@ -208,8 +209,7 @@ public class SearchRequestTests extends AbstractSearchTestCase {
{ {
// Reader context with scroll // Reader context with scroll
SearchRequest searchRequest = new SearchRequest() SearchRequest searchRequest = new SearchRequest()
.source(new SearchSourceBuilder().pointInTimeBuilder( .source(new SearchSourceBuilder().pointInTimeBuilder(new PointInTimeBuilder("id")))
new SearchSourceBuilder.PointInTimeBuilder("id", TimeValue.timeValueMillis(randomIntBetween(1, 10)))))
.scroll(TimeValue.timeValueMillis(randomIntBetween(1, 100))); .scroll(TimeValue.timeValueMillis(randomIntBetween(1, 100)));
ActionRequestValidationException validationErrors = searchRequest.validate(); ActionRequestValidationException validationErrors = searchRequest.validate();
assertNotNull(validationErrors); assertNotNull(validationErrors);

View File

@ -387,7 +387,7 @@ public class SearchSourceBuilderTests extends AbstractSearchTestCase {
XContentType xContentType = randomFrom(XContentType.values()); XContentType xContentType = randomFrom(XContentType.values());
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
TimeValue keepAlive = randomBoolean() ? TimeValue.timeValueHours(1) : null; TimeValue keepAlive = randomBoolean() ? TimeValue.timeValueHours(1) : null;
searchSourceBuilder.pointInTimeBuilder(new SearchSourceBuilder.PointInTimeBuilder("id", keepAlive)); searchSourceBuilder.pointInTimeBuilder(new PointInTimeBuilder("id").setKeepAlive(keepAlive));
XContentBuilder builder = XContentFactory.contentBuilder(xContentType); XContentBuilder builder = XContentFactory.contentBuilder(xContentType);
searchSourceBuilder.toXContent(builder, ToXContent.EMPTY_PARAMS); searchSourceBuilder.toXContent(builder, ToXContent.EMPTY_PARAMS);
BytesReference bytes = BytesReference.bytes(builder); BytesReference bytes = BytesReference.bytes(builder);

View File

@ -35,6 +35,7 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.script.Script; import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType; import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.collapse.CollapseBuilder; import org.elasticsearch.search.collapse.CollapseBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
@ -374,8 +375,11 @@ public class RandomSearchRequestGenerator {
builder.collapse(randomCollapseBuilder.get()); builder.collapse(randomCollapseBuilder.get());
} }
if (randomBoolean()) { if (randomBoolean()) {
TimeValue keepAlive = randomBoolean() ? TimeValue.timeValueMinutes(randomIntBetween(1, 60)) : null; PointInTimeBuilder pit = new PointInTimeBuilder(randomAlphaOfLengthBetween(3, 10));
builder.pointInTimeBuilder(new SearchSourceBuilder.PointInTimeBuilder(randomAlphaOfLengthBetween(3, 10), keepAlive)); if (randomBoolean()) {
pit.setKeepAlive(TimeValue.timeValueMinutes(randomIntBetween(1, 60)));
}
builder.pointInTimeBuilder(pit);
} }
return builder; return builder;
} }

View File

@ -24,6 +24,7 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SearchPlugin; import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter; import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
@ -235,7 +236,11 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase {
null, null,
null); null);
pitId = client().execute(OpenPointInTimeAction.INSTANCE, openPIT).actionGet().getSearchContextId(); pitId = client().execute(OpenPointInTimeAction.INSTANCE, openPIT).actionGet().getSearchContextId();
source.pointInTimeBuilder(new SearchSourceBuilder.PointInTimeBuilder(pitId, TimeValue.timeValueMinutes(1))); final PointInTimeBuilder pit = new PointInTimeBuilder(pitId);
if (randomBoolean()) {
pit.setKeepAlive(TimeValue.timeValueMillis(randomIntBetween(1, 3600)));
}
source.pointInTimeBuilder(pit);
request = new SubmitAsyncSearchRequest(source); request = new SubmitAsyncSearchRequest(source);
} else { } else {
pitId = null; pitId = null;

View File

@ -11,6 +11,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.test.AbstractMultiClustersTestCase; import org.elasticsearch.test.AbstractMultiClustersTestCase;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction; import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction;
@ -79,7 +80,7 @@ public class CCSPointInTimeIT extends AbstractMultiClustersTestCase {
SearchResponse resp = localClient.prepareSearch() SearchResponse resp = localClient.prepareSearch()
.setPreference(null) .setPreference(null)
.setQuery(new MatchAllQueryBuilder()) .setQuery(new MatchAllQueryBuilder())
.setSearchContext(pitId, TimeValue.timeValueMinutes(2)) .setPointInTime(new PointInTimeBuilder(pitId))
.setSize(1000) .setSize(1000)
.get(); .get();
assertNoFailures(resp); assertNoFailures(resp);

View File

@ -31,6 +31,7 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.search.SearchContextMissingException; import org.elasticsearch.search.SearchContextMissingException;
import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.core.search.action.OpenPointInTimeAction; import org.elasticsearch.xpack.core.search.action.OpenPointInTimeAction;
import org.elasticsearch.xpack.core.search.action.OpenPointInTimeRequest; import org.elasticsearch.xpack.core.search.action.OpenPointInTimeRequest;
@ -92,9 +93,9 @@ public class PointInTimeIT extends ESIntegTestCase {
client().prepareIndex("test", "_doc").setId(id).setSource("value", i).get(); client().prepareIndex("test", "_doc").setId(id).setSource("value", i).get();
} }
refresh("test"); refresh("test");
String readerId = openPointInTime(new String[] { "test" }, TimeValue.timeValueMinutes(2)); String pitId = openPointInTime(new String[] { "test" }, TimeValue.timeValueMinutes(2));
SearchResponse resp1 = client().prepareSearch().setPreference(null).setSearchContext(readerId, TimeValue.timeValueMinutes(2)).get(); SearchResponse resp1 = client().prepareSearch().setPreference(null).setPointInTime(new PointInTimeBuilder(pitId)).get();
assertThat(resp1.pointInTimeId(), equalTo(readerId)); assertThat(resp1.pointInTimeId(), equalTo(pitId));
assertHitCount(resp1, numDocs); assertHitCount(resp1, numDocs);
int deletedDocs = 0; int deletedDocs = 0;
for (int i = 0; i < numDocs; i++) { for (int i = 0; i < numDocs; i++) {
@ -114,13 +115,13 @@ public class PointInTimeIT extends ESIntegTestCase {
SearchResponse resp3 = client().prepareSearch() SearchResponse resp3 = client().prepareSearch()
.setPreference(null) .setPreference(null)
.setQuery(new MatchAllQueryBuilder()) .setQuery(new MatchAllQueryBuilder())
.setSearchContext(resp1.pointInTimeId(), TimeValue.timeValueMinutes(2)) .setPointInTime(new PointInTimeBuilder(pitId))
.get(); .get();
assertNoFailures(resp3); assertNoFailures(resp3);
assertHitCount(resp3, numDocs); assertHitCount(resp3, numDocs);
assertThat(resp3.pointInTimeId(), equalTo(readerId)); assertThat(resp3.pointInTimeId(), equalTo(pitId));
} finally { } finally {
closePointInTime(readerId); closePointInTime(pitId);
} }
} }
@ -139,7 +140,8 @@ public class PointInTimeIT extends ESIntegTestCase {
String pitId = openPointInTime(new String[]{"*"}, TimeValue.timeValueMinutes(2)); String pitId = openPointInTime(new String[]{"*"}, TimeValue.timeValueMinutes(2));
try { try {
SearchResponse resp = client().prepareSearch() SearchResponse resp = client().prepareSearch()
.setPreference(null).setSearchContext(pitId, TimeValue.timeValueMinutes(2)) .setPreference(null)
.setPointInTime(new PointInTimeBuilder(pitId))
.get(); .get();
assertNoFailures(resp); assertNoFailures(resp);
assertHitCount(resp, numDocs); assertHitCount(resp, numDocs);
@ -156,7 +158,7 @@ public class PointInTimeIT extends ESIntegTestCase {
assertNoFailures(resp); assertNoFailures(resp);
assertHitCount(resp, numDocs + moreDocs); assertHitCount(resp, numDocs + moreDocs);
resp = client().prepareSearch().setPreference(null).setSearchContext(pitId, TimeValue.timeValueMinutes(1)).get(); resp = client().prepareSearch().setPreference(null).setPointInTime(new PointInTimeBuilder(pitId)).get();
assertNoFailures(resp); assertNoFailures(resp);
assertHitCount(resp, numDocs); assertHitCount(resp, numDocs);
assertNotNull(resp.pointInTimeId()); assertNotNull(resp.pointInTimeId());
@ -178,7 +180,8 @@ public class PointInTimeIT extends ESIntegTestCase {
String pitId = openPointInTime(new String[]{"test"}, TimeValue.timeValueMinutes(2)); String pitId = openPointInTime(new String[]{"test"}, TimeValue.timeValueMinutes(2));
try { try {
SearchResponse resp = client().prepareSearch() SearchResponse resp = client().prepareSearch()
.setPreference(null).setSearchContext(pitId, TimeValue.timeValueMinutes(2)) .setPreference(null)
.setPointInTime(new PointInTimeBuilder(pitId))
.get(); .get();
assertNoFailures(resp); assertNoFailures(resp);
assertHitCount(resp, numDocs); assertHitCount(resp, numDocs);
@ -199,7 +202,8 @@ public class PointInTimeIT extends ESIntegTestCase {
refresh(); refresh();
} }
resp = client().prepareSearch() resp = client().prepareSearch()
.setPreference(null).setSearchContext(pitId, TimeValue.timeValueMinutes(2)) .setPreference(null)
.setPointInTime(new PointInTimeBuilder(pitId))
.get(); .get();
assertNoFailures(resp); assertNoFailures(resp);
assertHitCount(resp, numDocs); assertHitCount(resp, numDocs);
@ -215,7 +219,8 @@ public class PointInTimeIT extends ESIntegTestCase {
assertThat(assignedNodes, everyItem(not(in(excludedNodes)))); assertThat(assignedNodes, everyItem(not(in(excludedNodes))));
}, 30, TimeUnit.SECONDS); }, 30, TimeUnit.SECONDS);
resp = client().prepareSearch() resp = client().prepareSearch()
.setPreference(null).setSearchContext(pitId, TimeValue.timeValueMinutes(2)) .setPreference(null)
.setPointInTime(new PointInTimeBuilder(pitId))
.get(); .get();
assertNoFailures(resp); assertNoFailures(resp);
assertHitCount(resp, numDocs); assertHitCount(resp, numDocs);
@ -236,10 +241,10 @@ public class PointInTimeIT extends ESIntegTestCase {
client().prepareIndex("index", "_doc").setId(id).setSource("value", i).get(); client().prepareIndex("index", "_doc").setId(id).setSource("value", i).get();
} }
refresh(); refresh();
String readerId = openPointInTime(new String[] { "index" }, TimeValue.timeValueSeconds(5)); String pit = openPointInTime(new String[] { "index" }, TimeValue.timeValueSeconds(5));
SearchResponse resp1 = client().prepareSearch() SearchResponse resp1 = client().prepareSearch()
.setPreference(null) .setPreference(null)
.setSearchContext(readerId, TimeValue.timeValueMillis(randomIntBetween(0, 10))) .setPointInTime(new PointInTimeBuilder(pit))
.get(); .get();
assertNoFailures(resp1); assertNoFailures(resp1);
assertHitCount(resp1, index1); assertHitCount(resp1, index1);
@ -255,7 +260,7 @@ public class PointInTimeIT extends ESIntegTestCase {
SearchPhaseExecutionException.class, SearchPhaseExecutionException.class,
() -> client().prepareSearch() () -> client().prepareSearch()
.setPreference(null) .setPreference(null)
.setSearchContext(resp1.pointInTimeId(), TimeValue.timeValueMinutes(1)) .setPointInTime(new PointInTimeBuilder(pit))
.get() .get()
); );
for (ShardSearchFailure failure : e.shardFailures()) { for (ShardSearchFailure failure : e.shardFailures()) {
@ -279,8 +284,8 @@ public class PointInTimeIT extends ESIntegTestCase {
client().prepareIndex("index-2", "_doc").setId(id).setSource("value", i).get(); client().prepareIndex("index-2", "_doc").setId(id).setSource("value", i).get();
} }
refresh(); refresh();
String readerId = openPointInTime(new String[] { "index-*" }, TimeValue.timeValueMinutes(2)); String pit = openPointInTime(new String[] { "index-*" }, TimeValue.timeValueMinutes(2));
SearchResponse resp1 = client().prepareSearch().setPreference(null).setSearchContext(readerId, TimeValue.timeValueMinutes(2)).get(); SearchResponse resp1 = client().prepareSearch().setPreference(null).setPointInTime(new PointInTimeBuilder(pit)).get();
assertNoFailures(resp1); assertNoFailures(resp1);
assertHitCount(resp1, index1 + index2); assertHitCount(resp1, index1 + index2);
client().admin().indices().prepareDelete("index-1").get(); client().admin().indices().prepareDelete("index-1").get();
@ -294,7 +299,7 @@ public class PointInTimeIT extends ESIntegTestCase {
IndexNotFoundException.class, IndexNotFoundException.class,
() -> client().prepareSearch() () -> client().prepareSearch()
.setPreference(null) .setPreference(null)
.setSearchContext(resp1.pointInTimeId(), TimeValue.timeValueMinutes(1)) .setPointInTime(new PointInTimeBuilder(pit))
.get() .get()
); );
closePointInTime(resp1.pointInTimeId()); closePointInTime(resp1.pointInTimeId());
@ -308,7 +313,7 @@ public class PointInTimeIT extends ESIntegTestCase {
assertAcked( assertAcked(
prepareCreate("test").setSettings(settings).addMapping("_doc", "created_date", "type=date,format=yyyy-MM-dd")); prepareCreate("test").setSettings(settings).addMapping("_doc", "created_date", "type=date,format=yyyy-MM-dd"));
ensureGreen("test"); ensureGreen("test");
String readerId = openPointInTime(new String[] { "test*" }, TimeValue.timeValueMinutes(2)); String pitId = openPointInTime(new String[] { "test*" }, TimeValue.timeValueMinutes(2));
try { try {
for (String node : internalCluster().nodesInclude("test")) { for (String node : internalCluster().nodesInclude("test")) {
for (IndexService indexService : internalCluster().getInstance(IndicesService.class, node)) { for (IndexService indexService : internalCluster().getInstance(IndicesService.class, node)) {
@ -324,7 +329,7 @@ public class PointInTimeIT extends ESIntegTestCase {
.setPreference(null) .setPreference(null)
.setPreFilterShardSize(randomIntBetween(2, 3)) .setPreFilterShardSize(randomIntBetween(2, 3))
.setMaxConcurrentShardRequests(randomIntBetween(1, 2)) .setMaxConcurrentShardRequests(randomIntBetween(1, 2))
.setSearchContext(readerId, TimeValue.timeValueMinutes(2)) .setPointInTime(new PointInTimeBuilder(pitId))
.get(); .get();
assertThat(resp.getHits().getHits(), arrayWithSize(0)); assertThat(resp.getHits().getHits(), arrayWithSize(0));
for (String node : internalCluster().nodesInclude("test")) { for (String node : internalCluster().nodesInclude("test")) {
@ -336,7 +341,7 @@ public class PointInTimeIT extends ESIntegTestCase {
} }
} }
} finally { } finally {
closePointInTime(readerId); closePointInTime(pitId);
} }
} }

View File

@ -10,7 +10,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction; import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction;
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest; import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest;
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeResponse; import org.elasticsearch.xpack.core.search.action.ClosePointInTimeResponse;
@ -54,7 +54,7 @@ public class PITAwareQueryClient extends BasicQueryClient {
private void searchWithPIT(SearchRequest search, ActionListener<SearchResponse> listener) { private void searchWithPIT(SearchRequest search, ActionListener<SearchResponse> listener) {
// don't increase the keep alive // don't increase the keep alive
search.source().pointInTimeBuilder(new SearchSourceBuilder.PointInTimeBuilder(pitId, null)); search.source().pointInTimeBuilder(new PointInTimeBuilder(pitId));
// get the pid on each request // get the pid on each request
super.search(search, wrap(r -> { super.search(search, wrap(r -> {
pitId = r.pointInTimeId(); pitId = r.pointInTimeId();

View File

@ -10,6 +10,7 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction; import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction;
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest; import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest;
@ -118,13 +119,12 @@ public class FrozenIndexTests extends ESSingleNodeTestCase {
} while (searchResponse.getHits().getHits().length > 0); } while (searchResponse.getHits().getHits().length > 0);
client().prepareClearScroll().addScrollId(searchResponse.getScrollId()).get(); client().prepareClearScroll().addScrollId(searchResponse.getScrollId()).get();
String readerId = openReaders(TimeValue.timeValueMinutes(1), "index"); String pitId = openReaders(TimeValue.timeValueMinutes(1), "index");
try { try {
// now readerId
for (int from = 0; from < 3; from++) { for (int from = 0; from < 3; from++) {
searchResponse = client().prepareSearch() searchResponse = client().prepareSearch()
.setIndicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED) .setIndicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED)
.setSearchContext(readerId, TimeValue.timeValueMinutes(1)) .setPointInTime(new PointInTimeBuilder(pitId))
.setSize(1) .setSize(1)
.setFrom(from) .setFrom(from)
.get(); .get();
@ -139,7 +139,7 @@ public class FrozenIndexTests extends ESSingleNodeTestCase {
} }
} }
} finally { } finally {
client().execute(ClosePointInTimeAction.INSTANCE, new ClosePointInTimeRequest(searchResponse.pointInTimeId())).get(); client().execute(ClosePointInTimeAction.INSTANCE, new ClosePointInTimeRequest(pitId)).get();
} }
} }

View File

@ -12,6 +12,7 @@ import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction; import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction;
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest; import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest;
import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.MultiSearchResponse;
@ -65,7 +66,7 @@ import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDI
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.integration.FieldLevelSecurityTests.openSearchContext; import static org.elasticsearch.integration.FieldLevelSecurityTests.openPointInTime;
import static org.elasticsearch.join.query.JoinQueryBuilders.hasChildQuery; import static org.elasticsearch.join.query.JoinQueryBuilders.hasChildQuery;
import static org.elasticsearch.join.query.JoinQueryBuilders.hasParentQuery; import static org.elasticsearch.join.query.JoinQueryBuilders.hasParentQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@ -786,7 +787,7 @@ public class DocumentLevelSecurityTests extends SecurityIntegTestCase {
} }
refresh(); refresh();
String readerId = openSearchContext("user1", TimeValue.timeValueMinutes(1), "test"); String pitId = openPointInTime("user1", TimeValue.timeValueMinutes(1), "test");
SearchResponse response = null; SearchResponse response = null;
try { try {
for (int from = 0; from < numVisible; from++) { for (int from = 0; from < numVisible; from++) {
@ -795,7 +796,7 @@ public class DocumentLevelSecurityTests extends SecurityIntegTestCase {
.prepareSearch() .prepareSearch()
.setSize(1) .setSize(1)
.setFrom(from) .setFrom(from)
.setSearchContext(readerId, TimeValue.timeValueMinutes(1)) .setPointInTime(new PointInTimeBuilder(pitId))
.setQuery(termQuery("field1", "value1")) .setQuery(termQuery("field1", "value1"))
.get(); .get();
assertNoFailures(response); assertNoFailures(response);

View File

@ -11,6 +11,7 @@ import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction; import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction;
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest; import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest;
import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.MultiSearchResponse;
@ -736,7 +737,7 @@ public class FieldLevelSecurityTests extends SecurityIntegTestCase {
} }
} }
static String openSearchContext(String userName, TimeValue keepAlive, String... indices) { static String openPointInTime(String userName, TimeValue keepAlive, String... indices) {
OpenPointInTimeRequest request = new OpenPointInTimeRequest( OpenPointInTimeRequest request = new OpenPointInTimeRequest(
indices, OpenPointInTimeRequest.DEFAULT_INDICES_OPTIONS, keepAlive, null, null); indices, OpenPointInTimeRequest.DEFAULT_INDICES_OPTIONS, keepAlive, null, null);
final OpenPointInTimeResponse response = client() final OpenPointInTimeResponse response = client()
@ -745,7 +746,7 @@ public class FieldLevelSecurityTests extends SecurityIntegTestCase {
return response.getSearchContextId(); return response.getSearchContextId();
} }
public void testReaderId() throws Exception { public void testPointInTimeId() throws Exception {
assertAcked(client().admin().indices().prepareCreate("test") assertAcked(client().admin().indices().prepareCreate("test")
.setSettings(Settings.builder().put(IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.getKey(), true)) .setSettings(Settings.builder().put(IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.getKey(), true))
.addMapping("_doc", "field1", "type=text", "field2", "type=text", "field3", "type=text") .addMapping("_doc", "field1", "type=text", "field2", "type=text", "field3", "type=text")
@ -759,14 +760,14 @@ public class FieldLevelSecurityTests extends SecurityIntegTestCase {
} }
refresh("test"); refresh("test");
String readerId = openSearchContext("user1", TimeValue.timeValueMinutes(1), "test"); String pitId = openPointInTime("user1", TimeValue.timeValueMinutes(1), "test");
SearchResponse response = null; SearchResponse response = null;
try { try {
for (int from = 0; from < numDocs; from++) { for (int from = 0; from < numDocs; from++) {
response = client() response = client()
.filterWithHeader(Collections.singletonMap(BASIC_AUTH_HEADER, basicAuthHeaderValue("user1", USERS_PASSWD))) .filterWithHeader(Collections.singletonMap(BASIC_AUTH_HEADER, basicAuthHeaderValue("user1", USERS_PASSWD)))
.prepareSearch() .prepareSearch()
.setSearchContext(readerId, TimeValue.timeValueMinutes(1L)) .setPointInTime(new PointInTimeBuilder(pitId))
.setSize(1) .setSize(1)
.setFrom(from) .setFrom(from)
.setQuery(constantScoreQuery(termQuery("field1", "value1"))) .setQuery(constantScoreQuery(termQuery("field1", "value1")))
@ -778,7 +779,7 @@ public class FieldLevelSecurityTests extends SecurityIntegTestCase {
assertThat(response.getHits().getAt(0).getSourceAsMap().get("field1"), is("value1")); assertThat(response.getHits().getAt(0).getSourceAsMap().get("field1"), is("value1"));
} }
} finally { } finally {
client().execute(ClosePointInTimeAction.INSTANCE, new ClosePointInTimeRequest(readerId)).actionGet(); client().execute(ClosePointInTimeAction.INSTANCE, new ClosePointInTimeRequest(pitId)).actionGet();
} }
} }