Add scroll support for cross cluster search (elastic/x-pack-elasticsearch#1706)

Original commit: elastic/x-pack-elasticsearch@eadffa396b
This commit is contained in:
Simon Willnauer 2017-06-14 20:38:58 +02:00 committed by GitHub
parent 9c8e12280b
commit 97693b9357
3 changed files with 220 additions and 3 deletions

View File

@ -126,8 +126,14 @@ public class AuthorizationService extends AbstractComponent {
final TransportRequest originalRequest = request;
if (request instanceof ConcreteShardRequest) {
request = ((ConcreteShardRequest<?>) request).getRequest();
assert TransportActionProxy.isProxyRequest(request) == false : "expected non-proxy request for action: " + action;
} else {
request = TransportActionProxy.unwrapRequest(request);
if (TransportActionProxy.isProxyRequest(originalRequest) && TransportActionProxy.isProxyAction(action) == false) {
throw new IllegalStateException("originalRequest is a proxy request for: [" + request + "] but action: ["
+ action + "] isn't");
}
}
request = TransportActionProxy.unwrapRequest(request);
// prior to doing any authorization lets set the originating action in the context only
putTransientIfNonExisting(ORIGINATING_ACTION_KEY, action);
@ -198,6 +204,20 @@ public class AuthorizationService extends AbstractComponent {
return;
}
throw denial(authentication, action, request);
} else if (TransportActionProxy.isProxyAction(action)) {
// we authorize proxied actions once they are "unwrapped" on the next node
if (TransportActionProxy.isProxyRequest(originalRequest) == false ) {
throw new IllegalStateException("originalRequest is not a proxy request: [" + originalRequest + "] but action: ["
+ action + "] is a proxy action");
}
if (permission.indices().check(action)) {
grant(authentication, action, request);
return;
} else {
// we do this here in addition to the denial below since we might run into an assertion on scroll requrest below if we
// don't have permission to read cross cluster but wrap a scroll request.
throw denial(authentication, action, request);
}
}
// some APIs are indices requests that are not actually associated with indices. For example,

View File

@ -40,9 +40,8 @@ import org.elasticsearch.action.admin.indices.upgrade.get.UpgradeStatusAction;
import org.elasticsearch.action.admin.indices.upgrade.get.UpgradeStatusRequest;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.action.delete.DeleteAction;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetAction;
@ -81,6 +80,7 @@ import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.license.GetLicenseAction;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.xpack.security.SecurityLifecycleService;
import org.elasticsearch.xpack.security.action.user.AuthenticateAction;
@ -126,10 +126,12 @@ import static org.elasticsearch.test.SecurityTestsUtils.assertThrowsAuthorizatio
import static org.elasticsearch.test.SecurityTestsUtils.assertThrowsAuthorizationExceptionRunAs;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.startsWith;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@ -1001,4 +1003,82 @@ public class AuthorizationServiceTests extends ESTestCase {
when(state.metaData()).thenReturn(MetaData.EMPTY_META_DATA);
return state;
}
public void testProxyRequestFailsOnNonProxyAction() {
TransportRequest request = TransportRequest.Empty.INSTANCE;
DiscoveryNode node = new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT);
TransportRequest transportRequest = TransportActionProxy.wrapRequest(node, request);
User user = new User("test user", "role");
IllegalStateException illegalStateException = expectThrows(IllegalStateException.class,
() -> authorize(createAuthentication(user), "indices:some/action", transportRequest));
assertThat(illegalStateException.getMessage(),
startsWith("originalRequest is a proxy request for: [org.elasticsearch.transport.TransportRequest$"));
assertThat(illegalStateException.getMessage(), endsWith("] but action: [indices:some/action] isn't"));
}
public void testProxyRequestFailsOnNonProxyRequest() {
TransportRequest request = TransportRequest.Empty.INSTANCE;
User user = new User("test user", "role");
IllegalStateException illegalStateException = expectThrows(IllegalStateException.class,
() -> authorize(createAuthentication(user), TransportActionProxy.getProxyAction("indices:some/action"), request));
assertThat(illegalStateException.getMessage(),
startsWith("originalRequest is not a proxy request: [org.elasticsearch.transport.TransportRequest$"));
assertThat(illegalStateException.getMessage(),
endsWith("] but action: [internal:transport/proxy/indices:some/action] is a proxy action"));
}
public void testProxyRequestAuthenticationDenied() {
TransportRequest proxiedRequest = new SearchRequest();
DiscoveryNode node = new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT);
TransportRequest transportRequest = TransportActionProxy.wrapRequest(node, proxiedRequest);
String action = TransportActionProxy.getProxyAction(SearchTransportService.QUERY_ACTION_NAME);
User user = new User("test user", "no_indices");
roleMap.put("no_indices", new RoleDescriptor("no_indices", null, null, null));
assertThrowsAuthorizationException(
() -> authorize(createAuthentication(user), action, transportRequest), action, "test user");
verify(auditTrail).accessDenied(user, action, proxiedRequest);
verifyNoMoreInteractions(auditTrail);
}
public void testProxyRequestAuthenticationGrantedWithAllPrivileges() {
User user = new User("test user", "a_all");
roleMap.put("a_all", new RoleDescriptor("a_role", null,
new IndicesPrivileges[] { IndicesPrivileges.builder().indices("a").privileges("all").build() }, null));
mockEmptyMetaData();
DiscoveryNode node = new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT);
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
TransportRequest transportRequest = TransportActionProxy.wrapRequest(node, clearScrollRequest);
String action = TransportActionProxy.getProxyAction(SearchTransportService.CLEAR_SCROLL_CONTEXTS_ACTION_NAME);
authorize(createAuthentication(user), action, transportRequest);
verify(auditTrail).accessGranted(user, action, clearScrollRequest);
}
public void testProxyRequestAuthenticationGranted() {
User user = new User("test user", "a_all");
roleMap.put("a_all", new RoleDescriptor("a_role", null,
new IndicesPrivileges[] { IndicesPrivileges.builder().indices("a").privileges("read_cross_cluster").build() }, null));
mockEmptyMetaData();
DiscoveryNode node = new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT);
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
TransportRequest transportRequest = TransportActionProxy.wrapRequest(node, clearScrollRequest);
String action = TransportActionProxy.getProxyAction(SearchTransportService.CLEAR_SCROLL_CONTEXTS_ACTION_NAME);
authorize(createAuthentication(user), action, transportRequest);
verify(auditTrail).accessGranted(user, action, clearScrollRequest);
}
public void testProxyRequestAuthenticationDeniedWithReadPrivileges() {
User user = new User("test user", "a_all");
roleMap.put("a_all", new RoleDescriptor("a_role", null,
new IndicesPrivileges[] { IndicesPrivileges.builder().indices("a").privileges("read").build() }, null));
mockEmptyMetaData();
DiscoveryNode node = new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT);
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
TransportRequest transportRequest = TransportActionProxy.wrapRequest(node, clearScrollRequest);
String action = TransportActionProxy.getProxyAction(SearchTransportService.CLEAR_SCROLL_CONTEXTS_ACTION_NAME);
assertThrowsAuthorizationException(
() -> authorize(createAuthentication(user), action, transportRequest), action, "test user");
verify(auditTrail).accessDenied(user, action, clearScrollRequest);
}
}

View File

@ -0,0 +1,117 @@
---
setup:
- skip:
features: headers
- do:
cluster.health:
wait_for_status: yellow
- do:
xpack.security.put_user:
username: "joe"
body: >
{
"password": "s3krit",
"roles" : [ "x_cluster_role" ]
}
- do:
xpack.security.put_role:
name: "x_cluster_role"
body: >
{
"cluster": ["all"],
"indices": [
{
"names": ["local_index", "my_remote_cluster:test_i*", "my_remote_cluster:aliased_test_index", "test_remote_cluster:test_i*", "my_remote_cluster:secure_alias"],
"privileges": ["read"]
}
]
}
---
teardown:
- do:
xpack.security.delete_user:
username: "joe"
ignore: 404
- do:
xpack.security.delete_role:
name: "x_cluster_role"
ignore: 404
---
"Scroll on the mixed cluster":
- do:
headers: { Authorization: "Basic am9lOnMza3JpdA==" }
search:
index: my_remote_cluster:test_index
size: 4
scroll: 1m
sort: filter_field
body:
query:
match_all: {}
- set: {_scroll_id: scroll_id}
- match: {hits.total: 6 }
- length: {hits.hits: 4 }
- match: {hits.hits.0._source.filter_field: 0 }
- match: {hits.hits.1._source.filter_field: 0 }
- match: {hits.hits.2._source.filter_field: 0 }
- match: {hits.hits.3._source.filter_field: 0 }
- do:
headers: { Authorization: "Basic am9lOnMza3JpdA==" }
scroll:
body: { "scroll_id": "$scroll_id", "scroll": "1m"}
- match: {hits.total: 6 }
- length: {hits.hits: 2 }
- match: {hits.hits.0._source.filter_field: 1 }
- match: {hits.hits.1._source.filter_field: 1 }
- do:
headers: { Authorization: "Basic am9lOnMza3JpdA==" }
scroll:
scroll_id: $scroll_id
scroll: 1m
- match: {hits.total: 6 }
- length: {hits.hits: 0 }
- do:
headers: { Authorization: "Basic am9lOnMza3JpdA==" }
clear_scroll:
scroll_id: $scroll_id
---
"Steal Scroll ID on the mixed cluster":
- do:
headers: { Authorization: "Basic am9lOnMza3JpdA==" }
search:
index: my_remote_cluster:test_index
size: 4
scroll: 1m
sort: filter_field
body:
query:
match_all: {}
- set: {_scroll_id: scroll_id}
- match: {hits.total: 6 }
- length: {hits.hits: 4 }
- match: {hits.hits.0._source.filter_field: 0 }
- match: {hits.hits.1._source.filter_field: 0 }
- match: {hits.hits.2._source.filter_field: 0 }
- match: {hits.hits.3._source.filter_field: 0 }
- do: # steal the scroll ID cross cluster and make sure it fails
catch: /search_context_missing_exception/
scroll:
body: { "scroll_id": "$scroll_id", "scroll": "1m"}
- do:
headers: { Authorization: "Basic am9lOnMza3JpdA==" }
catch: missing
clear_scroll:
scroll_id: $scroll_id
- match: { num_freed : 0 }