Better UUID for reader context (#62799)

We can use a single and stronger UUID for all reader contexts
created by the same SearchService.

Backport of #62715
This commit is contained in:
Nhat Nguyen 2020-09-23 12:50:18 -04:00 committed by GitHub
parent 7ba0c95191
commit 38c8a55df8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 63 additions and 56 deletions

View File

@ -51,7 +51,7 @@ final class TransportSearchHelper {
out.writeVInt(searchPhaseResults.asList().size());
for (SearchPhaseResult searchPhaseResult : searchPhaseResults.asList()) {
if (includeContextUUID) {
out.writeString(searchPhaseResult.getContextId().getReaderId());
out.writeString(searchPhaseResult.getContextId().getSessionId());
}
out.writeLong(searchPhaseResult.getContextId().getId());
SearchShardTarget searchShardTarget = searchPhaseResult.getSearchShardTarget();

View File

@ -36,6 +36,7 @@ import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
@ -209,6 +210,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
private final MultiBucketConsumerService multiBucketConsumerService;
private final AtomicInteger openScrollContexts = new AtomicInteger();
private final String sessionId = UUIDs.randomBase64UUID();
public SearchService(ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, FetchPhase fetchPhase,
@ -600,14 +602,10 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
private ReaderContext getReaderContext(ShardSearchContextId id) {
final ReaderContext reader = activeReaders.get(id.getId());
if (reader == null) {
return null;
if (sessionId.equals(id.getSessionId()) == false && id.getSessionId().isEmpty() == false) {
throw new SearchContextMissingException(id);
}
if (reader.id().getReaderId().equals(id.getReaderId()) || id.getReaderId().isEmpty()) {
return reader;
}
return null;
return activeReaders.get(id.getId());
}
private ReaderContext findReaderContext(ShardSearchContextId id, TransportRequest request) throws SearchContextMissingException {
@ -627,8 +625,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
final ReaderContext createOrGetReaderContext(ShardSearchRequest request, boolean keepStatesInContext) {
if (request.readerId() != null) {
assert keepStatesInContext == false;
final ReaderContext readerContext = findReaderContext(request.readerId(), request);
return readerContext;
return findReaderContext(request.readerId(), request);
}
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard shard = indexService.getShard(request.shardId().id());
@ -653,15 +650,15 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
}
final long keepAlive = getKeepAlive(request);
final ShardSearchContextId id = new ShardSearchContextId(sessionId, idGenerator.incrementAndGet());
if (keepStatesInContext || request.scroll() != null) {
readerContext = new LegacyReaderContext(idGenerator.incrementAndGet(), indexService, shard, reader, request, keepAlive);
readerContext = new LegacyReaderContext(id, indexService, shard, reader, request, keepAlive);
if (request.scroll() != null) {
readerContext.addOnClose(decreaseScrollContexts);
decreaseScrollContexts = null;
}
} else {
readerContext = new ReaderContext(idGenerator.incrementAndGet(), indexService, shard, reader, keepAlive,
request.keepAlive() == null);
readerContext = new ReaderContext(id, indexService, shard, reader, keepAlive, request.keepAlive() == null);
}
reader = null;
final ReaderContext finalReaderContext = readerContext;
@ -701,8 +698,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
ReaderContext readerContext = null;
try {
searcherSupplier = shard.acquireSearcherSupplier();
readerContext = new ReaderContext(
idGenerator.incrementAndGet(), indexService, shard, searcherSupplier, keepAlive.millis(), false);
final ShardSearchContextId id = new ShardSearchContextId(sessionId, idGenerator.incrementAndGet());
readerContext = new ReaderContext(id, indexService, shard, searcherSupplier, keepAlive.millis(), false);
final ReaderContext finalReaderContext = readerContext;
searcherSupplier = null; // transfer ownership to reader context
searchOperationListener.onNewReaderContext(readerContext);
@ -751,7 +748,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
final IndexShard indexShard = indexService.getShard(request.shardId().getId());
final Engine.SearcherSupplier reader = indexShard.acquireSearcherSupplier();
try (ReaderContext readerContext = new ReaderContext(idGenerator.incrementAndGet(), indexService, indexShard, reader, -1L, true)) {
final ShardSearchContextId id = new ShardSearchContextId(sessionId, idGenerator.incrementAndGet());
try (ReaderContext readerContext = new ReaderContext(id, indexService, indexShard, reader, -1L, true)) {
DefaultSearchContext searchContext = createSearchContext(readerContext, request, timeout);
searchContext.addReleasable(readerContext.markAsUsed(0L));
return searchContext;

View File

@ -31,11 +31,11 @@ public class LegacyReaderContext extends ReaderContext {
private final ShardSearchRequest shardSearchRequest;
private final ScrollContext scrollContext;
private final Engine.Searcher searcher;
private AggregatedDfs aggregatedDfs;
private RescoreDocIds rescoreDocIds;
public LegacyReaderContext(long id, IndexService indexService, IndexShard indexShard, Engine.SearcherSupplier reader,
public LegacyReaderContext(ShardSearchContextId id, IndexService indexService, IndexShard indexShard, Engine.SearcherSupplier reader,
ShardSearchRequest shardSearchRequest, long keepAliveInMillis) {
super(id, indexService, indexShard, reader, keepAliveInMillis, false);
assert shardSearchRequest.readerId() == null;

View File

@ -19,7 +19,6 @@
package org.elasticsearch.search.internal;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
@ -64,13 +63,13 @@ public class ReaderContext implements Releasable {
private Map<String, Object> context;
public ReaderContext(long id,
public ReaderContext(ShardSearchContextId id,
IndexService indexService,
IndexShard indexShard,
Engine.SearcherSupplier searcherSupplier,
long keepAliveInMillis,
boolean singleSession) {
this.id = new ShardSearchContextId(UUIDs.base64UUID(), id);
this.id = id;
this.indexService = indexService;
this.indexShard = indexShard;
this.searcherSupplier = searcherSupplier;

View File

@ -28,20 +28,20 @@ import java.io.IOException;
import java.util.Objects;
public final class ShardSearchContextId implements Writeable {
private final String readerId;
private final String sessionId;
private final long id;
public ShardSearchContextId(String readerId, long id) {
this.readerId = Objects.requireNonNull(readerId);
public ShardSearchContextId(String sessionId, long id) {
this.sessionId = Objects.requireNonNull(sessionId);
this.id = id;
}
public ShardSearchContextId(StreamInput in) throws IOException {
this.id = in.readLong();
if (in.getVersion().onOrAfter(Version.V_7_7_0)) {
this.readerId = in.readString();
this.sessionId = in.readString();
} else {
this.readerId = "";
this.sessionId = "";
}
}
@ -49,12 +49,12 @@ public final class ShardSearchContextId implements Writeable {
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(id);
if (out.getVersion().onOrAfter(Version.V_7_7_0)) {
out.writeString(readerId);
out.writeString(sessionId);
}
}
public String getReaderId() {
return readerId;
public String getSessionId() {
return sessionId;
}
public long getId() {
@ -66,16 +66,16 @@ public final class ShardSearchContextId implements Writeable {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ShardSearchContextId other = (ShardSearchContextId) o;
return id == other.id && readerId.equals(other.readerId);
return id == other.id && sessionId.equals(other.sessionId);
}
@Override
public int hashCode() {
return Objects.hash(readerId, id);
return Objects.hash(sessionId, id);
}
@Override
public String toString() {
return "[" + readerId + "][" + id + "]";
return "[" + sessionId + "][" + id + "]";
}
}

View File

@ -360,9 +360,9 @@ public class ExceptionSerializationTests extends ESTestCase {
SearchContextMissingException ex = serialize(new SearchContextMissingException(contextId), version);
assertThat(ex.contextId().getId(), equalTo(contextId.getId()));
if (version.onOrAfter(Version.V_7_7_0)) {
assertThat(ex.contextId().getReaderId(), equalTo(contextId.getReaderId()));
assertThat(ex.contextId().getSessionId(), equalTo(contextId.getSessionId()));
} else {
assertThat(ex.contextId().getReaderId(), equalTo(""));
assertThat(ex.contextId().getSessionId(), equalTo(""));
}
}

View File

@ -81,18 +81,18 @@ public class SearchContextIdTests extends ESTestCase {
assertThat(node1.getClusterAlias(), equalTo("cluster_x"));
assertThat(node1.getNode(), equalTo("node_1"));
assertThat(node1.getSearchContextId().getId(), equalTo(1L));
assertThat(node1.getSearchContextId().getReaderId(), equalTo("a"));
assertThat(node1.getSearchContextId().getSessionId(), equalTo("a"));
SearchContextIdForNode node2 = context.shards().get(new ShardId("idy", "uuid2", 42));
assertThat(node2.getClusterAlias(), equalTo("cluster_y"));
assertThat(node2.getNode(), equalTo("node_2"));
assertThat(node2.getSearchContextId().getId(), equalTo(12L));
assertThat(node2.getSearchContextId().getReaderId(), equalTo("b"));
assertThat(node2.getSearchContextId().getSessionId(), equalTo("b"));
SearchContextIdForNode node3 = context.shards().get(new ShardId("idy", "uuid2", 43));
assertThat(node3.getClusterAlias(), nullValue());
assertThat(node3.getNode(), equalTo("node_3"));
assertThat(node3.getSearchContextId().getId(), equalTo(42L));
assertThat(node3.getSearchContextId().getReaderId(), equalTo("c"));
assertThat(node3.getSearchContextId().getSessionId(), equalTo("c"));
}
}

View File

@ -63,27 +63,27 @@ public class TransportSearchHelperTests extends ESTestCase {
assertEquals("cluster_x", parseScrollId.getContext()[0].getClusterAlias());
assertEquals(1, parseScrollId.getContext()[0].getSearchContextId().getId());
if (includeUUID) {
assertThat(parseScrollId.getContext()[0].getSearchContextId().getReaderId(), equalTo("a"));
assertThat(parseScrollId.getContext()[0].getSearchContextId().getSessionId(), equalTo("a"));
} else {
assertThat(parseScrollId.getContext()[0].getSearchContextId().getReaderId(), equalTo(""));
assertThat(parseScrollId.getContext()[0].getSearchContextId().getSessionId(), equalTo(""));
}
assertEquals("node_2", parseScrollId.getContext()[1].getNode());
assertEquals("cluster_y", parseScrollId.getContext()[1].getClusterAlias());
assertEquals(12, parseScrollId.getContext()[1].getSearchContextId().getId());
if (includeUUID) {
assertThat(parseScrollId.getContext()[1].getSearchContextId().getReaderId(), equalTo("b"));
assertThat(parseScrollId.getContext()[1].getSearchContextId().getSessionId(), equalTo("b"));
} else {
assertThat(parseScrollId.getContext()[1].getSearchContextId().getReaderId(), equalTo(""));
assertThat(parseScrollId.getContext()[1].getSearchContextId().getSessionId(), equalTo(""));
}
assertEquals("node_3", parseScrollId.getContext()[2].getNode());
assertNull(parseScrollId.getContext()[2].getClusterAlias());
assertEquals(42, parseScrollId.getContext()[2].getSearchContextId().getId());
if (includeUUID) {
assertThat(parseScrollId.getContext()[2].getSearchContextId().getReaderId(), equalTo("c"));
assertThat(parseScrollId.getContext()[2].getSearchContextId().getSessionId(), equalTo("c"));
} else {
assertThat(parseScrollId.getContext()[2].getSearchContextId().getReaderId(), equalTo(""));
assertThat(parseScrollId.getContext()[2].getSearchContextId().getSessionId(), equalTo(""));
}
}
}

View File

@ -31,6 +31,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
@ -52,6 +53,7 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.LegacyReaderContext;
import org.elasticsearch.search.internal.ReaderContext;
import org.elasticsearch.search.internal.ShardSearchContextId;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.rescore.RescoreContext;
import org.elasticsearch.search.slice.SliceBuilder;
@ -142,7 +144,7 @@ public class DefaultSearchContextTests extends ESTestCase {
SearchShardTarget target = new SearchShardTarget("node", shardId, null, OriginalIndices.NONE);
ReaderContext readerWithoutScroll = new ReaderContext(
randomNonNegativeLong(), indexService, indexShard, searcherSupplier.get(), randomNonNegativeLong(), false);
newContextId(), indexService, indexShard, searcherSupplier.get(), randomNonNegativeLong(), false);
DefaultSearchContext contextWithoutScroll = new DefaultSearchContext(readerWithoutScroll, shardSearchRequest, target, null,
bigArrays, null, timeout, null, false, Version.CURRENT);
@ -159,7 +161,7 @@ public class DefaultSearchContextTests extends ESTestCase {
// resultWindow greater than maxResultWindow and scrollContext isn't null
when(shardSearchRequest.scroll()).thenReturn(new Scroll(TimeValue.timeValueMillis(randomInt(1000))));
ReaderContext readerContext = new LegacyReaderContext(
randomNonNegativeLong(), indexService, indexShard, searcherSupplier.get(), shardSearchRequest, randomNonNegativeLong());
newContextId(), indexService, indexShard, searcherSupplier.get(), shardSearchRequest, randomNonNegativeLong());
DefaultSearchContext context1 = new DefaultSearchContext(readerContext, shardSearchRequest, target, null,
bigArrays, null, timeout, null, false, Version.CURRENT);
context1.from(300);
@ -192,7 +194,7 @@ public class DefaultSearchContextTests extends ESTestCase {
readerContext.close();
readerContext = new ReaderContext(
randomNonNegativeLong(), indexService, indexShard, searcherSupplier.get(), randomNonNegativeLong(), false);
newContextId(), indexService, indexShard, searcherSupplier.get(), randomNonNegativeLong(), false);
// rescore is null but sliceBuilder is not null
DefaultSearchContext context2 = new DefaultSearchContext(readerContext, shardSearchRequest, target,
null, bigArrays, null, timeout, null, false, Version.CURRENT);
@ -222,7 +224,7 @@ public class DefaultSearchContextTests extends ESTestCase {
when(shardSearchRequest.indexRoutings()).thenReturn(new String[0]);
readerContext.close();
readerContext = new ReaderContext(randomNonNegativeLong(), indexService, indexShard,
readerContext = new ReaderContext(newContextId(), indexService, indexShard,
searcherSupplier.get(), randomNonNegativeLong(), false);
DefaultSearchContext context4 = new DefaultSearchContext(readerContext, shardSearchRequest, target, null, bigArrays, null,
timeout, null, false, Version.CURRENT);
@ -276,7 +278,7 @@ public class DefaultSearchContextTests extends ESTestCase {
};
SearchShardTarget target = new SearchShardTarget("node", shardId, null, OriginalIndices.NONE);
ReaderContext readerContext = new ReaderContext(
randomNonNegativeLong(), indexService, indexShard, searcherSupplier, randomNonNegativeLong(), false);
newContextId(), indexService, indexShard, searcherSupplier, randomNonNegativeLong(), false);
DefaultSearchContext context = new DefaultSearchContext(
readerContext, shardSearchRequest, target, null, bigArrays, null, timeout, null, false, Version.CURRENT);
@ -291,4 +293,8 @@ public class DefaultSearchContextTests extends ESTestCase {
threadPool.shutdown();
}
}
private ShardSearchContextId newContextId() {
return new ShardSearchContextId(UUIDs.randomBase64UUID(), randomNonNegativeLong());
}
}

View File

@ -1058,16 +1058,17 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
assertThat(searchService.getActiveContexts(), equalTo(contextIds.size()));
while (contextIds.isEmpty() == false) {
final ShardSearchContextId contextId = randomFrom(contextIds);
assertFalse(searchService.freeReaderContext(new ShardSearchContextId(UUIDs.randomBase64UUID(), contextId.getId())));
expectThrows(SearchContextMissingException.class,
() -> searchService.freeReaderContext(new ShardSearchContextId(UUIDs.randomBase64UUID(), contextId.getId())));
assertThat(searchService.getActiveContexts(), equalTo(contextIds.size()));
if (randomBoolean()) {
assertTrue(searchService.freeReaderContext(contextId));
} else {
assertTrue(searchService.freeReaderContext((new ShardSearchContextId("", contextId.getId()))));
assertTrue(searchService.freeReaderContext((
new ShardSearchContextId(contextId.getSessionId(), contextId.getId()))));
}
contextIds.remove(contextId);
assertThat(searchService.getActiveContexts(), equalTo(contextIds.size()));
assertFalse(searchService.freeReaderContext(new ShardSearchContextId("", contextId.getId())));
assertFalse(searchService.freeReaderContext(contextId));
assertThat(searchService.getActiveContexts(), equalTo(contextIds.size()));
}
@ -1089,7 +1090,7 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
}
private ReaderContext createReaderContext(IndexService indexService, IndexShard indexShard) {
return new ReaderContext(randomNonNegativeLong(), indexService, indexShard,
indexShard.acquireSearcherSupplier(), randomNonNegativeLong(), false);
return new ReaderContext(new ShardSearchContextId(UUIDs.randomBase64UUID(), randomNonNegativeLong()),
indexService, indexShard, indexShard.acquireSearcherSupplier(), randomNonNegativeLong(), false);
}
}

View File

@ -68,7 +68,8 @@ public class SecuritySearchOperationListenerTests extends ESSingleNodeTestCase {
final ShardSearchRequest shardSearchRequest = mock(ShardSearchRequest.class);
when(shardSearchRequest.scroll()).thenReturn(new Scroll(TimeValue.timeValueMinutes(between(1, 10))));
try (LegacyReaderContext readerContext =
new LegacyReaderContext(0L, indexService, shard, shard.acquireSearcherSupplier(), shardSearchRequest, Long.MAX_VALUE)) {
new LegacyReaderContext(new ShardSearchContextId(UUIDs.randomBase64UUID(), 0L), indexService, shard,
shard.acquireSearcherSupplier(), shardSearchRequest, Long.MAX_VALUE)) {
XPackLicenseState licenseState = mock(XPackLicenseState.class);
when(licenseState.isSecurityEnabled()).thenReturn(false);
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
@ -89,7 +90,8 @@ public class SecuritySearchOperationListenerTests extends ESSingleNodeTestCase {
final ShardSearchRequest shardSearchRequest = mock(ShardSearchRequest.class);
when(shardSearchRequest.scroll()).thenReturn(new Scroll(TimeValue.timeValueMinutes(between(1, 10))));
try (LegacyReaderContext readerContext =
new LegacyReaderContext(0L, indexService, shard, shard.acquireSearcherSupplier(), shardSearchRequest, Long.MAX_VALUE)) {
new LegacyReaderContext(new ShardSearchContextId(UUIDs.randomBase64UUID(), 0L),
indexService, shard, shard.acquireSearcherSupplier(), shardSearchRequest, Long.MAX_VALUE)) {
XPackLicenseState licenseState = mock(XPackLicenseState.class);
when(licenseState.isSecurityEnabled()).thenReturn(true);
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
@ -117,7 +119,8 @@ public class SecuritySearchOperationListenerTests extends ESSingleNodeTestCase {
final ShardSearchRequest shardSearchRequest = mock(ShardSearchRequest.class);
when(shardSearchRequest.scroll()).thenReturn(new Scroll(TimeValue.timeValueMinutes(between(1, 10))));
try (LegacyReaderContext readerContext =
new LegacyReaderContext(0L, indexService, shard, shard.acquireSearcherSupplier(), shardSearchRequest, Long.MAX_VALUE)) {
new LegacyReaderContext(new ShardSearchContextId(UUIDs.randomBase64UUID(), 0L), indexService, shard,
shard.acquireSearcherSupplier(), shardSearchRequest, Long.MAX_VALUE)) {
readerContext.putInContext(AuthenticationField.AUTHENTICATION_KEY,
new Authentication(new User("test", "role"), new RealmRef("realm", "file", "node"), null));
final IndicesAccessControl indicesAccessControl = mock(IndicesAccessControl.class);