New /stream test cases showing authn+authz edge cases in cloud mode

This triggers various places in the Streaming Expressions code that use background threads
to confirm that the expected credentails (or lack of) are propogarded along.

Test currently has comments + workarounds for 2 known client issues:
 - SOLR-14226: SolrStream reports AuthN/AuthZ failures (401|403) as IOException w/o details
 - SOLR-14222: CloudSolrClient converts (update) 403 error to 500 error
This commit is contained in:
Chris Hostetter 2020-01-30 10:01:03 -07:00
parent 4b5105e167
commit 517438e356

View File

@ -0,0 +1,611 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.util.TimeOut;
import static;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* tests various streaming expressions (via the SolrJ {@link SolrStream} API) against a SolrCloud cluster
* using both Authenticationand Role based Authorization
public class CloudAuthStreamTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String COLLECTION_X = "collection_x";
private static final String COLLECTION_Y = "collection_y";
private static final String READ_ONLY_USER = "read_only_user";
private static final String WRITE_X_USER = "write_x_user";
private static final String WRITE_Y_USER = "write_y_user";
private static final String ADMIN_USER = "admin_user";
private static String solrUrl = null;
* Helper that returns the original {@link SolrRequest} <em>with it's original type</em>
* so it can be chained. This menthod knows that for the purpose of this test, every user name
* is it's own password
* @see SolrRequest#setBasicAuthCredentials
private static <T extends SolrRequest> T setBasicAuthCredentials(T req, String user) {
assert null != user;
req.setBasicAuthCredentials(user, user);
return req;
public static void setupCluster() throws Exception {
final List<String> users = Arrays.asList(READ_ONLY_USER, WRITE_X_USER, WRITE_Y_USER, ADMIN_USER);
// For simplicity: every user uses a password the same as their name...
final Map<String,String> credentials =
.collect(Collectors.toMap(Function.identity(), s -> getSaltedHashedValue(s)));
// For simplicity: Every user is their own role...
final Map<String,String> roles =
.collect(Collectors.toMap(Function.identity(), Function.identity()));
final String SECURITY_JSON = Utils.toJSONString
Utils.makeMap("class", RuleBasedAuthorizationPlugin.class.getName(),
"user-role", roles,
// NOTE: permissions order matters!
"permissions", Arrays.asList(// any authn user can 'read' or hit /stream
"collection", "*",
"path", "/stream",
// per collection write perms
"collection", COLLECTION_X,
"role", WRITE_X_USER),
"collection", COLLECTION_Y,
"role", WRITE_Y_USER),
Utils.makeMap("class", BasicAuthPlugin.class.getName(),
"credentials", credentials)));
// we want at most one core per node to force lots of network traffic to try and tickle distributed bugs
for (String collection : Arrays.asList(COLLECTION_X, COLLECTION_Y)) {
CollectionAdminRequest.createCollection(collection, "_default", 2, 2)
.setBasicAuthCredentials(ADMIN_USER, ADMIN_USER)
for (String collection : Arrays.asList(COLLECTION_X, COLLECTION_Y)) {
cluster.getSolrClient().waitForState(collection, DEFAULT_TIMEOUT, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 2, 2));
solrUrl = cluster.getRandomJetty(random()).getProxyBaseUrl().toString();"All stream requests will be sent to random solrUrl: {}", solrUrl);
public static void clearVariables() {
solrUrl = null;
public void clearCollections() throws Exception {"Clearing Collections @After test method...");
setBasicAuthCredentials(new UpdateRequest(), WRITE_X_USER)
.commit(cluster.getSolrClient(), COLLECTION_X).getStatus());
setBasicAuthCredentials(new UpdateRequest(), WRITE_Y_USER)
.commit(cluster.getSolrClient(), COLLECTION_Y).getStatus());
* Simple sanity checks that authentication is working the way the test expects
public void testSanityCheckAuth() throws Exception {
assertEquals("sanity check of non authenticated query request",
expectThrows(SolrException.class, () -> {
final long ignored =
(new QueryRequest(params("q", "*:*",
"rows", "0",
"_trace", "no_auth_sanity_check")))
.process(cluster.getSolrClient(), COLLECTION_X).getResults().getNumFound();
assertEquals("sanity check of update to X from write_X user",
(setBasicAuthCredentials(new UpdateRequest(), WRITE_X_USER)
.add(sdoc("id", "1_from_write_X_user"))
.commit(cluster.getSolrClient(), COLLECTION_X)).getStatus());
assertEquals("sanity check of update to X from read only user",
500, // should be 403, but CloudSolrClient lies on updates for now: SOLR-14222
expectThrows(SolrException.class, () -> {
final int ignored = (setBasicAuthCredentials(new UpdateRequest(), READ_ONLY_USER)
.add(sdoc("id", "2_from_read_only_user"))
.commit(cluster.getSolrClient(), COLLECTION_X)).getStatus();
assertEquals("sanity check of update to X from write_Y user",
500, // should be 403, but CloudSolrClient lies on updates for now: SOLR-14222
expectThrows(SolrException.class, () -> {
final int ignored = (setBasicAuthCredentials(new UpdateRequest(), WRITE_Y_USER)
.add(sdoc("id", "3_from_write_Y_user"))
.commit(cluster.getSolrClient(), COLLECTION_X)).getStatus();
assertEquals("sanity check of update to Y from write_Y user",
(setBasicAuthCredentials(new UpdateRequest(), WRITE_Y_USER)
.add(sdoc("id", "1_from_write_Y_user"))
.commit(cluster.getSolrClient(), COLLECTION_Y)).getStatus());
for (String user : Arrays.asList(READ_ONLY_USER, WRITE_Y_USER, WRITE_X_USER)) {
for (String collection : Arrays.asList(COLLECTION_X, COLLECTION_Y)) {
assertEquals("sanity check: query "+collection+" from user: "+user,
1, countDocsInCollection(collection, user));
public void testEchoStream() throws Exception {
final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X,
params("qt", "/stream",
"expr", "echo(hello world)"));
solrStream.setCredentials(READ_ONLY_USER, READ_ONLY_USER);
final List<Tuple> tuples = getTuples(solrStream);
assertEquals(1, tuples.size());
assertEquals("hello world", tuples.get(0).get("echo"));
public void testEchoStreamNoCredentials() throws Exception {
final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X,
params("qt", "/stream",
"expr", "echo(hello world)"));
// NOTE: no credentials
// NOTE: Can't make any assertions about Exception: SOLR-14226
expectThrows(Exception.class, () -> {
final List<Tuple> ignored = getTuples(solrStream);
public void testEchoStreamInvalidCredentials() throws Exception {
final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X,
params("qt", "/stream",
"expr", "echo(hello world)"));
solrStream.setCredentials(READ_ONLY_USER, "BOGUS_PASSWORD");
// NOTE: Can't make any assertions about Exception: SOLR-14226
expectThrows(Exception.class, () -> {
final List<Tuple> ignored = getTuples(solrStream);
public void testSimpleUpdateStream() throws Exception {
final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X,
params("qt", "/stream", "expr",
"update("+COLLECTION_X+",batchSize=1," +
solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER);
final List<Tuple> tuples = getTuples(solrStream);
assertEquals(1, tuples.size());
assertEquals(1L, tuples.get(0).get("totalIndexed"));
assertEquals(1L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
public void testSimpleUpdateStreamInvalidCredentials() throws Exception {
final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X,
params("qt", "/stream", "expr",
"update("+COLLECTION_X+",batchSize=1," +
// "WRITE" credentials should be required for 'update(...)'
solrStream.setCredentials(WRITE_X_USER, "BOGUS_PASSWORD");
// NOTE: Can't make any assertions about Exception: SOLR-14226
expectThrows(Exception.class, () -> {
final List<Tuple> ignored = getTuples(solrStream);
assertEquals(0L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
public void testSimpleUpdateStreamInsufficientCredentials() throws Exception {
// both of these users have valid credentials and authz read COLLECTION_X, but neither has
// authz to write to X...
for (String user : Arrays.asList(READ_ONLY_USER, WRITE_Y_USER)) {
final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X,
params("qt", "/stream", "expr",
"update("+COLLECTION_X+",batchSize=1," +
solrStream.setCredentials(user, user);
// NOTE: Can't make any assertions about Exception: SOLR-14226
expectThrows(Exception.class, () -> {
final List<Tuple> ignored = getTuples(solrStream);
assertEquals(0L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
public void testIndirectUpdateStream() throws Exception {
{ // WRITE_X user should be able to update X via a (dummy) stream from Y...
final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_Y,
params("qt", "/stream", "expr",
"update("+COLLECTION_X+",batchSize=1," +
solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER);
final List<Tuple> tuples = getTuples(solrStream);
assertEquals(1, tuples.size());
assertEquals(1L, tuples.get(0).get("totalIndexed"));
{ // Now add some "real" docs directly to Y...
final UpdateRequest update = setBasicAuthCredentials(new UpdateRequest(), WRITE_Y_USER);
for (int i = 1; i <= 42; i++) {
assertEquals("initial docs in Y",
0, update.commit(cluster.getSolrClient(), COLLECTION_Y).getStatus());
{ // WRITE_X user should be able to update X via a (search) stream from Y (routed via Y)
final String expr
= "update("+COLLECTION_X+", batchSize=50, " // note batch size
+ " search("+COLLECTION_Y+", "
+ " q=\"foo_i:[* TO 10]\", " // 10 matches = 1 batch
+ " rows=100, "
+ " fl=\"id,foo_i\", "
+ " sort=\"foo_i desc\")) "
final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_Y, // NOTE: Y route
params("qt", "/stream",
"expr", expr));
solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER);
final List<Tuple> tuples = getTuples(solrStream);
assertEquals(1, tuples.size());
assertEquals(10L, tuples.get(0).get("batchIndexed"));
assertEquals(10L, tuples.get(0).get("totalIndexed"));
{ // WRITE_X user should be able to update X via a (search) stream from Y (routed via X)...
final String expr
= "update("+COLLECTION_X+", batchSize=5, " // note batch size
+ " search("+COLLECTION_Y+", "
+ " q=\"foo_i:[30 TO *]\", " // 13 matches = 3 batches
+ " rows=100, "
+ " fl=\"id,foo_i\", "
+ " sort=\"foo_i desc\")) "
final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X, // NOTE: X route
params("qt", "/stream",
"expr", expr));
solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER);
final List<Tuple> tuples = getTuples(solrStream);
assertEquals(3, tuples.size());
assertEquals( 5L, tuples.get(0).get("batchIndexed"));
assertEquals( 5L, tuples.get(0).get("totalIndexed"));
assertEquals( 5L, tuples.get(1).get("batchIndexed"));
assertEquals(10L, tuples.get(1).get("totalIndexed"));
assertEquals( 3L, tuples.get(2).get("batchIndexed"));
assertEquals(13L, tuples.get(2).get("totalIndexed"));
assertEquals(1L + 10L + 13L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
public void testIndirectUpdateStreamInsufficientCredentials() throws Exception {
// regardless of how it's routed, WRITE_Y should NOT have authz to stream updates to X...
for (String path : Arrays.asList(COLLECTION_X, COLLECTION_Y)) {
final SolrStream solrStream = new SolrStream(solrUrl + "/" + path,
params("qt", "/stream", "expr",
"update("+COLLECTION_X+",batchSize=1," +
solrStream.setCredentials(WRITE_Y_USER, WRITE_Y_USER);
// NOTE: Can't make any assertions about Exception: SOLR-14226
expectThrows(Exception.class, () -> {
final List<Tuple> ignored = getTuples(solrStream);
assertEquals(0L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
public void testExecutorUpdateStream() throws Exception {
final String expr
= "executor(threads=1, "
+ " tuple(expr_s=\"update("+COLLECTION_X+", batchSize=5, "
+ " tuple(id='42',a_i=1,b_i=5)) "
+ " \")) "
final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X,
params("qt", "/stream",
"expr", expr));
solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER);
final List<Tuple> tuples = getTuples(solrStream);
assertEquals(0, tuples.size());
assertEquals(1L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
public void testExecutorUpdateStreamInsufficientCredentials() throws Exception {
int id = 0;
// both of these users have valid credentials and authz read COLLECTION_X, but neither has
// authz to write to X...
for (String user : Arrays.asList(READ_ONLY_USER, WRITE_Y_USER)) {
// ... regardless of how the request is routed...
for (String path : Arrays.asList(COLLECTION_X, COLLECTION_Y)) {
final String trace = user + ":" + path;
final String expr
= "executor(threads=1, "
+ " tuple(expr_s=\"update("+COLLECTION_X+", batchSize=5, "
+ " tuple(id='"+(++id)+"',foo_s='"+trace+"')) "
+ " \")) "
final SolrStream solrStream = new SolrStream(solrUrl + "/" + path,
params("qt", "/stream",
"_trace", "executor_via_" + trace,
"expr", expr));
solrStream.setCredentials(user, user);
// NOTE: Becaue of the backgroun threads, no failures will to be returned to client...
final List<Tuple> tuples = getTuples(solrStream);
assertEquals(0, tuples.size());
// we have to assert that the updates failed solely based on the side effects...
assertEquals("doc count after execute update via " + trace,
0L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
// sanity check
assertEquals("final doc count",
0L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
public void testDaemonUpdateStream() throws Exception {
final String daemonUrl = getRandomCoreUrl(COLLECTION_X);"Using Daemon @ {}", daemonUrl);
final String expr
// NOTE: inspite of what is implied by 'terminate=true', this daemon will
// NEVER terminate on it's own as long as the updates are successful
// (aparently that requires usage of anest topic() stream to set a "sleepMillis"?!?!?!)
= "daemon(id=daemonId,runInterval=1000,terminate=true, "
+ " update("+COLLECTION_X+",tuple(id=42,a_i=1,b_i=5))) "
final SolrStream solrStream = new SolrStream(daemonUrl,
params("qt", "/stream",
"expr", expr));
solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER);
final List<Tuple> tuples = getTuples(solrStream);
assertEquals(1, tuples.size()); // daemon starting status
try {
// We have to poll the daemon 'list' to know once it's run...
long iterations = 0;
final TimeOut timeout = new TimeOut(60, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while ( ! timeout.hasTimedOut() ) {
final SolrStream daemonCheck = new SolrStream(daemonUrl,
params("qt", "/stream",
"action", "list"));
daemonCheck.setCredentials(WRITE_X_USER, WRITE_X_USER);
final List<Tuple> tuples = getTuples(daemonCheck);
assertEquals(1, tuples.size()); // our daemon;
iterations = tuples.get(0).getLong("iterations");
if (1 < iterations) {
// once the daemon has had a chance to run, break out of TimeOut
Thread.sleep(Math.max(1, Math.min(5000, timeout.timeLeft(TimeUnit.MILLISECONDS))));
assertTrue("Didn't see any iterations after waiting an excessive amount of time: " + iterations,
0 < iterations);
} finally {
// kill the damon...
final SolrStream daemonKiller = new SolrStream(daemonUrl,
params("qt", "/stream",
"action", "kill",
"id", "daemonId"));
daemonKiller.setCredentials(WRITE_X_USER, WRITE_X_USER);
final List<Tuple> tuples = getTuples(daemonKiller);
assertEquals(1, tuples.size()); // daemon death status
assertEquals(1L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
public void testDaemonUpdateStreamInsufficientCredentials() throws Exception {
final String daemonUrl = getRandomCoreUrl(COLLECTION_X);"Using Daemon @ {}", daemonUrl);
// both of these users have valid credentials and authz read COLLECTION_X, but neither has
// authz to write to X...
for (String user : Arrays.asList(READ_ONLY_USER, WRITE_Y_USER)) {
final String daemonId = "daemon_" + user;
final String expr
= "daemon(id="+daemonId+",runInterval=1000,terminate=true, "
+ " update("+COLLECTION_X+",tuple(id=42,a_i=1,b_i=5))) "
final SolrStream solrStream = new SolrStream(daemonUrl,
params("qt", "/stream",
"_trace", "start_" + daemonId,
"expr", expr));
solrStream.setCredentials(user, user);
final List<Tuple> tuples = getTuples(solrStream);
assertEquals(1, tuples.size()); // daemon starting status
try {
// We have to poll the daemon 'list' to know once it's run / terminated...
Object state = null;
final TimeOut timeout = new TimeOut(60, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while ( ! timeout.hasTimedOut() ) {
final SolrStream daemonCheck = new SolrStream(daemonUrl,
params("qt", "/stream",
"_trace", "check_" + daemonId,
"action", "list"));
daemonCheck.setCredentials(user, user);
final List<Tuple> tuples = getTuples(daemonCheck);
assertEquals(1, tuples.size()); // our daemon;"Current daemon status: {}", tuples.get(0).fields);
assertEquals(daemonId + " should have never had a successful iteration",
Long.valueOf(0L), tuples.get(0).getLong("iterations"));
state = tuples.get(0).get("state");
if ("TERMINATED".equals(state)) {
// once the daemon has failed, break out of TimeOut
Thread.sleep(Math.max(1, Math.min(5000, timeout.timeLeft(TimeUnit.MILLISECONDS))));
assertEquals("Timed out w/o ever getting TERMINATED state from " + daemonId,
"TERMINATED", state);
} finally {
// kill the damon...
final SolrStream daemonKiller = new SolrStream(daemonUrl,
params("qt", "/stream",
"_trace", "kill_" + daemonId,
"action", "kill",
"id", daemonId));
daemonKiller.setCredentials(user, user);
final List<Tuple> tuples = getTuples(daemonKiller);
assertEquals(1, tuples.size()); // daemon death status
assertEquals("doc count after daemon update for " + user,
0L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
// sanity check
assertEquals("final doc count",
0L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
* Helper method that uses the specified user to (first commit, and then) count the total
* number of documents in the collection
protected static long commitAndCountDocsInCollection(final String collection,
final String user) throws Exception {
assertEquals(0, setBasicAuthCredentials(new UpdateRequest(), user).commit(cluster.getSolrClient(),
return countDocsInCollection(collection, user);
* Helper method that uses the specified user to count the total number of documents in the collection
protected static long countDocsInCollection(final String collection,
final String user) throws Exception {
return setBasicAuthCredentials(new QueryRequest(params("q", "*:*",
"rows", "0",
"_trace", "count_via_" + user + ":" + collection)),
.process(cluster.getSolrClient(), collection).getResults().getNumFound();
* Slurps a stream into a List
protected static List<Tuple> getTuples(final TupleStream tupleStream) throws IOException {
List<Tuple> tuples = new ArrayList<Tuple>();
try {
log.trace("TupleStream: {}", tupleStream);;
for (Tuple t =; !t.EOF; t = {
log.trace("Tuple: {}", t.fields);
} finally {
return tuples;
* Sigh. DaemonStream requires polling the same core where the stream was exectured.
protected static String getRandomCoreUrl(final String collection) throws Exception {
final List<String> replicaUrls =
Collections.shuffle(replicaUrls, random());
return replicaUrls.get(0);