Fix ccs permission for search with a scroll id (#62053) (#62695)

CCS with remote indices only does not require any privileges on the local cluster.
This PR ensures that search with scroll follow the permission model.
This commit is contained in:
Yang Wang 2020-09-22 11:49:40 +10:00 committed by GitHub
parent 21d5236173
commit 897d2e8a02
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 98 additions and 11 deletions

View File

@ -19,7 +19,9 @@
package org.elasticsearch.action.search;
class ParsedScrollId {
import java.util.Arrays;
public class ParsedScrollId {
public static final String QUERY_THEN_FETCH_TYPE = "queryThenFetch";
@ -48,4 +50,8 @@ class ParsedScrollId {
public SearchContextIdForNode[] getContext() {
return context;
}
public boolean hasLocalIndices() {
return Arrays.stream(context).anyMatch(c -> c.getClusterAlias() == null);
}
}

View File

@ -83,6 +83,10 @@ public class SearchScrollRequest extends ActionRequest implements ToXContentObje
return this;
}
public ParsedScrollId parseScrollId() {
return TransportSearchHelper.parseScrollId(scrollId);
}
/**
* If set, will enable scrolling of the search request.
*/

View File

@ -0,0 +1,41 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.elasticsearch.search.internal.ShardSearchContextId;
import org.elasticsearch.test.ESTestCase;
public class ParsedScrollIdTests extends ESTestCase {
public void testHasLocalIndices() {
final int nResults = randomIntBetween(1, 3);
final SearchContextIdForNode[] searchContextIdForNodes = new SearchContextIdForNode[nResults];
boolean hasLocal = false;
for (int i = 0; i < nResults; i++) {
String clusterAlias = randomBoolean() ? randomAlphaOfLength(8) : null;
hasLocal = hasLocal || (clusterAlias == null);
searchContextIdForNodes[i] =
new SearchContextIdForNode(clusterAlias, "node_" + i, new ShardSearchContextId(randomAlphaOfLength(8), randomLong()));
}
final ParsedScrollId parsedScrollId = new ParsedScrollId(randomAlphaOfLength(8), randomAlphaOfLength(8), searchContextIdForNodes);
assertEquals(hasLocal, parsedScrollId.hasLocalIndices());
}
}

View File

@ -12,6 +12,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.automaton.Automaton;
import org.apache.lucene.util.automaton.Operations;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.CompositeIndicesRequest;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
@ -20,6 +21,7 @@ import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.delete.DeleteAction;
import org.elasticsearch.action.get.MultiGetAction;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction;
import org.elasticsearch.action.search.ClearScrollAction;
import org.elasticsearch.action.search.MultiSearchAction;
@ -262,7 +264,16 @@ public class RBACEngine implements AuthorizationEngine {
// index and if they cannot, we can fail the request early before we allow the execution of the action and in
// turn the shard actions
if (SearchScrollAction.NAME.equals(action)) {
authorizeIndexActionName(action, authorizationInfo, null, listener);
ActionRunnable.supply(
ActionListener.wrap(parsedScrollId -> {
if (parsedScrollId.hasLocalIndices()) {
authorizeIndexActionName(action, authorizationInfo, null, listener);
} else {
listener.onResponse(new IndexAuthorizationResult(true, null));
}
}, listener::onFailure),
((SearchScrollRequest) request)::parseScrollId
).run();
} else {
// RBACEngine simply authorizes scroll related actions without filling in any DLS/FLS permissions.
// Scroll related actions have special security logic, where the security context of the initial search

View File

@ -56,6 +56,7 @@ import org.elasticsearch.action.search.ClearScrollAction;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.MultiSearchAction;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.ParsedScrollId;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchScrollAction;
@ -469,6 +470,32 @@ public class AuthorizationServiceTests extends ESTestCase {
verifyNoMoreInteractions(auditTrail);
}
public void testUserWithNoRolesPerformsRemoteSearchWithScroll() {
final ParsedScrollId parsedScrollId = mock(ParsedScrollId.class);
final SearchScrollRequest searchScrollRequest = mock(SearchScrollRequest.class);
when(searchScrollRequest.parseScrollId()).thenReturn(parsedScrollId);
final Authentication authentication = createAuthentication(new User("test user"));
mockEmptyMetadata();
final String requestId = AuditUtil.getOrGenerateRequestId(threadContext);
for (final boolean hasLocalIndices: org.elasticsearch.common.collect.List.of(true, false)) {
when(parsedScrollId.hasLocalIndices()).thenReturn(hasLocalIndices);
if (hasLocalIndices) {
assertThrowsAuthorizationException(
() -> authorize(authentication, SearchScrollAction.NAME, searchScrollRequest),
"indices:data/read/scroll", "test user"
);
verify(auditTrail).accessDenied(eq(requestId), eq(authentication),
eq("indices:data/read/scroll"), eq(searchScrollRequest),
authzInfoRoles(Role.EMPTY.names()));
} else {
authorize(authentication, SearchScrollAction.NAME, searchScrollRequest);
verify(auditTrail).accessGranted(eq(requestId), eq(authentication), eq(SearchScrollAction.NAME), eq(searchScrollRequest),
authzInfoRoles(Role.EMPTY.names()));
}
verifyNoMoreInteractions(auditTrail);
}
}
/**
* This test mimics {@link #testUserWithNoRolesCanPerformRemoteSearch()} except that
* while the referenced index _looks_ like a remote index, the remote cluster name has not
@ -659,7 +686,10 @@ public class AuthorizationServiceTests extends ESTestCase {
verify(auditTrail).accessGranted(eq(requestId), eq(authentication), eq(ClearScrollAction.NAME), eq(clearScrollRequest),
authzInfoRoles(new String[]{role.getName()}));
final SearchScrollRequest searchScrollRequest = new SearchScrollRequest();
final ParsedScrollId parsedScrollId = mock(ParsedScrollId.class);
when(parsedScrollId.hasLocalIndices()).thenReturn(true);
final SearchScrollRequest searchScrollRequest = mock(SearchScrollRequest.class);
when(searchScrollRequest.parseScrollId()).thenReturn(parsedScrollId);
authorize(authentication, SearchScrollAction.NAME, searchScrollRequest);
verify(auditTrail).accessGranted(eq(requestId), eq(authentication), eq(SearchScrollAction.NAME), eq(searchScrollRequest),
authzInfoRoles(new String[]{role.getName()}));

View File

@ -14,18 +14,13 @@ setup:
"password": "s3krit",
"roles" : [ "x_cluster_role" ]
}
- do:
- do: # Remote indices only CCS does not require any privileges on the local cluster
security.put_role:
name: "x_cluster_role"
body: >
{
"cluster": ["all"],
"indices": [
{
"names": ["local_index", "my_remote_cluster:test_i*", "my_remote_cluster:aliased_test_index", "test_remote_cluster:test_i*", "my_remote_cluster:secure_alias"],
"privileges": ["read"]
}
]
"cluster": [],
"indices": []
}
---
teardown: