diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CloudAuthStreamTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CloudAuthStreamTest.java
new file mode 100644
index 00000000000..b360278b2cb
--- /dev/null
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/CloudAuthStreamTest.java
@@ -0,0 +1,611 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.io.stream;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.security.BasicAuthPlugin;
+import org.apache.solr.security.RuleBasedAuthorizationPlugin;
+import org.apache.solr.util.TimeOut;
+
+import static org.apache.solr.security.Sha256AuthenticationProvider.getSaltedHashedValue;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * tests various streaming expressions (via the SolrJ {@link SolrStream} API) against a SolrCloud cluster
+ * using both Authenticationand Role based Authorization
+ */
+public class CloudAuthStreamTest extends SolrCloudTestCase {
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final String COLLECTION_X = "collection_x";
+ private static final String COLLECTION_Y = "collection_y";
+
+ private static final String READ_ONLY_USER = "read_only_user";
+ private static final String WRITE_X_USER = "write_x_user";
+ private static final String WRITE_Y_USER = "write_y_user";
+ private static final String ADMIN_USER = "admin_user";
+
+ private static String solrUrl = null;
+
+ /**
+ * Helper that returns the original {@link SolrRequest} with it's original type
+ * so it can be chained. This menthod knows that for the purpose of this test, every user name
+ * is it's own password
+ *
+ * @see SolrRequest#setBasicAuthCredentials
+ */
+ private static T setBasicAuthCredentials(T req, String user) {
+ assert null != user;
+ req.setBasicAuthCredentials(user, user);
+ return req;
+ }
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ final List users = Arrays.asList(READ_ONLY_USER, WRITE_X_USER, WRITE_Y_USER, ADMIN_USER);
+ // For simplicity: every user uses a password the same as their name...
+ final Map credentials = users.stream()
+ .collect(Collectors.toMap(Function.identity(), s -> getSaltedHashedValue(s)));
+
+ // For simplicity: Every user is their own role...
+ final Map roles = users.stream()
+ .collect(Collectors.toMap(Function.identity(), Function.identity()));
+
+ final String SECURITY_JSON = Utils.toJSONString
+ (Utils.makeMap("authorization",
+ Utils.makeMap("class", RuleBasedAuthorizationPlugin.class.getName(),
+ "user-role", roles,
+ // NOTE: permissions order matters!
+ "permissions", Arrays.asList(// any authn user can 'read' or hit /stream
+ Utils.makeMap("name","read",
+ "role","*"),
+ Utils.makeMap("name","stream",
+ "collection", "*",
+ "path", "/stream",
+ "role","*"),
+ // per collection write perms
+ Utils.makeMap("name","update",
+ "collection", COLLECTION_X,
+ "role", WRITE_X_USER),
+ Utils.makeMap("name","update",
+ "collection", COLLECTION_Y,
+ "role", WRITE_Y_USER),
+ Utils.makeMap("name","all",
+ "role",ADMIN_USER))),
+ "authentication",
+ Utils.makeMap("class", BasicAuthPlugin.class.getName(),
+ "blockUnknown",true,
+ "credentials", credentials)));
+
+ // we want at most one core per node to force lots of network traffic to try and tickle distributed bugs
+ configureCluster(5)
+ .withSecurityJson(SECURITY_JSON)
+ .configure();
+
+ for (String collection : Arrays.asList(COLLECTION_X, COLLECTION_Y)) {
+ CollectionAdminRequest.createCollection(collection, "_default", 2, 2)
+ .setBasicAuthCredentials(ADMIN_USER, ADMIN_USER)
+ .process(cluster.getSolrClient());
+ }
+
+ for (String collection : Arrays.asList(COLLECTION_X, COLLECTION_Y)) {
+ cluster.getSolrClient().waitForState(collection, DEFAULT_TIMEOUT, TimeUnit.SECONDS,
+ (n, c) -> DocCollection.isFullyActive(n, c, 2, 2));
+ }
+
+ solrUrl = cluster.getRandomJetty(random()).getProxyBaseUrl().toString();
+
+ log.info("All stream requests will be sent to random solrUrl: {}", solrUrl);
+ }
+
+ @AfterClass
+ public static void clearVariables() {
+ solrUrl = null;
+ }
+
+ @After
+ public void clearCollections() throws Exception {
+ log.info("Clearing Collections @After test method...");
+ assertEquals(0,
+ setBasicAuthCredentials(new UpdateRequest(), WRITE_X_USER)
+ .deleteByQuery("*:*")
+ .commit(cluster.getSolrClient(), COLLECTION_X).getStatus());
+ assertEquals(0,
+ setBasicAuthCredentials(new UpdateRequest(), WRITE_Y_USER)
+ .deleteByQuery("*:*")
+ .commit(cluster.getSolrClient(), COLLECTION_Y).getStatus());
+ }
+
+ /**
+ * Simple sanity checks that authentication is working the way the test expects
+ */
+ public void testSanityCheckAuth() throws Exception {
+
+ assertEquals("sanity check of non authenticated query request",
+ 401,
+ expectThrows(SolrException.class, () -> {
+ final long ignored =
+ (new QueryRequest(params("q", "*:*",
+ "rows", "0",
+ "_trace", "no_auth_sanity_check")))
+ .process(cluster.getSolrClient(), COLLECTION_X).getResults().getNumFound();
+ }).code());
+
+ assertEquals("sanity check of update to X from write_X user",
+ 0,
+ (setBasicAuthCredentials(new UpdateRequest(), WRITE_X_USER)
+ .add(sdoc("id", "1_from_write_X_user"))
+ .commit(cluster.getSolrClient(), COLLECTION_X)).getStatus());
+
+ assertEquals("sanity check of update to X from read only user",
+ 500, // should be 403, but CloudSolrClient lies on updates for now: SOLR-14222
+ expectThrows(SolrException.class, () -> {
+ final int ignored = (setBasicAuthCredentials(new UpdateRequest(), READ_ONLY_USER)
+ .add(sdoc("id", "2_from_read_only_user"))
+ .commit(cluster.getSolrClient(), COLLECTION_X)).getStatus();
+ }).code());
+
+ assertEquals("sanity check of update to X from write_Y user",
+ 500, // should be 403, but CloudSolrClient lies on updates for now: SOLR-14222
+ expectThrows(SolrException.class, () -> {
+ final int ignored = (setBasicAuthCredentials(new UpdateRequest(), WRITE_Y_USER)
+ .add(sdoc("id", "3_from_write_Y_user"))
+ .commit(cluster.getSolrClient(), COLLECTION_X)).getStatus();
+ }).code());
+
+ assertEquals("sanity check of update to Y from write_Y user",
+ 0,
+ (setBasicAuthCredentials(new UpdateRequest(), WRITE_Y_USER)
+ .add(sdoc("id", "1_from_write_Y_user"))
+ .commit(cluster.getSolrClient(), COLLECTION_Y)).getStatus());
+
+ for (String user : Arrays.asList(READ_ONLY_USER, WRITE_Y_USER, WRITE_X_USER)) {
+ for (String collection : Arrays.asList(COLLECTION_X, COLLECTION_Y)) {
+ assertEquals("sanity check: query "+collection+" from user: "+user,
+ 1, countDocsInCollection(collection, user));
+ }
+ }
+ }
+
+ public void testEchoStream() throws Exception {
+ final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X,
+ params("qt", "/stream",
+ "expr", "echo(hello world)"));
+ solrStream.setCredentials(READ_ONLY_USER, READ_ONLY_USER);
+ final List tuples = getTuples(solrStream);
+ assertEquals(1, tuples.size());
+ assertEquals("hello world", tuples.get(0).get("echo"));
+ }
+
+ public void testEchoStreamNoCredentials() throws Exception {
+ final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X,
+ params("qt", "/stream",
+ "expr", "echo(hello world)"));
+ // NOTE: no credentials
+
+ // NOTE: Can't make any assertions about Exception: SOLR-14226
+ expectThrows(Exception.class, () -> {
+ final List ignored = getTuples(solrStream);
+ });
+ }
+
+ public void testEchoStreamInvalidCredentials() throws Exception {
+ final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X,
+ params("qt", "/stream",
+ "expr", "echo(hello world)"));
+ solrStream.setCredentials(READ_ONLY_USER, "BOGUS_PASSWORD");
+
+ // NOTE: Can't make any assertions about Exception: SOLR-14226
+ expectThrows(Exception.class, () -> {
+ final List ignored = getTuples(solrStream);
+ });
+ }
+
+ public void testSimpleUpdateStream() throws Exception {
+ final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X,
+ params("qt", "/stream", "expr",
+ "update("+COLLECTION_X+",batchSize=1," +
+ "tuple(id='42',a_i=1,b_i=5))"));
+ solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER);
+ final List tuples = getTuples(solrStream);
+ assertEquals(1, tuples.size());
+ assertEquals(1L, tuples.get(0).get("totalIndexed"));
+
+ assertEquals(1L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
+ }
+
+ public void testSimpleUpdateStreamInvalidCredentials() throws Exception {
+ final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X,
+ params("qt", "/stream", "expr",
+ "update("+COLLECTION_X+",batchSize=1," +
+ "tuple(id='42',a_i=1,b_i=5))"));
+ // "WRITE" credentials should be required for 'update(...)'
+ solrStream.setCredentials(WRITE_X_USER, "BOGUS_PASSWORD");
+
+ // NOTE: Can't make any assertions about Exception: SOLR-14226
+ expectThrows(Exception.class, () -> {
+ final List ignored = getTuples(solrStream);
+ });
+
+ assertEquals(0L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
+ }
+
+ public void testSimpleUpdateStreamInsufficientCredentials() throws Exception {
+ // both of these users have valid credentials and authz read COLLECTION_X, but neither has
+ // authz to write to X...
+ for (String user : Arrays.asList(READ_ONLY_USER, WRITE_Y_USER)) {
+ final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X,
+ params("qt", "/stream", "expr",
+ "update("+COLLECTION_X+",batchSize=1," +
+ "tuple(id='42',a_i=1,b_i=5))"));
+
+ solrStream.setCredentials(user, user);
+
+ // NOTE: Can't make any assertions about Exception: SOLR-14226
+ expectThrows(Exception.class, () -> {
+ final List ignored = getTuples(solrStream);
+ });
+ }
+
+ assertEquals(0L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
+ }
+
+ public void testIndirectUpdateStream() throws Exception {
+ { // WRITE_X user should be able to update X via a (dummy) stream from Y...
+ final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_Y,
+ params("qt", "/stream", "expr",
+ "update("+COLLECTION_X+",batchSize=1," +
+ "tuple(id='42',a_i=1,b_i=5))"));
+ solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER);
+ final List tuples = getTuples(solrStream);
+ assertEquals(1, tuples.size());
+ assertEquals(1L, tuples.get(0).get("totalIndexed"));
+ }
+
+ { // Now add some "real" docs directly to Y...
+ final UpdateRequest update = setBasicAuthCredentials(new UpdateRequest(), WRITE_Y_USER);
+ for (int i = 1; i <= 42; i++) {
+ update.add(sdoc("id",i+"y","foo_i",""+i));
+ }
+ assertEquals("initial docs in Y",
+ 0, update.commit(cluster.getSolrClient(), COLLECTION_Y).getStatus());
+ }
+
+ { // WRITE_X user should be able to update X via a (search) stream from Y (routed via Y)
+ final String expr
+ = "update("+COLLECTION_X+", batchSize=50, " // note batch size
+ + " search("+COLLECTION_Y+", "
+ + " q=\"foo_i:[* TO 10]\", " // 10 matches = 1 batch
+ + " rows=100, "
+ + " fl=\"id,foo_i\", "
+ + " sort=\"foo_i desc\")) "
+ ;
+
+ final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_Y, // NOTE: Y route
+ params("qt", "/stream",
+ "expr", expr));
+ solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER);
+ final List tuples = getTuples(solrStream);
+ assertEquals(1, tuples.size());
+ assertEquals(10L, tuples.get(0).get("batchIndexed"));
+ assertEquals(10L, tuples.get(0).get("totalIndexed"));
+ }
+
+ { // WRITE_X user should be able to update X via a (search) stream from Y (routed via X)...
+ final String expr
+ = "update("+COLLECTION_X+", batchSize=5, " // note batch size
+ + " search("+COLLECTION_Y+", "
+ + " q=\"foo_i:[30 TO *]\", " // 13 matches = 3 batches
+ + " rows=100, "
+ + " fl=\"id,foo_i\", "
+ + " sort=\"foo_i desc\")) "
+ ;
+
+ final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X, // NOTE: X route
+ params("qt", "/stream",
+ "expr", expr));
+ solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER);
+ final List tuples = getTuples(solrStream);
+ assertEquals(3, tuples.size());
+
+ assertEquals( 5L, tuples.get(0).get("batchIndexed"));
+ assertEquals( 5L, tuples.get(0).get("totalIndexed"));
+
+ assertEquals( 5L, tuples.get(1).get("batchIndexed"));
+ assertEquals(10L, tuples.get(1).get("totalIndexed"));
+
+ assertEquals( 3L, tuples.get(2).get("batchIndexed"));
+ assertEquals(13L, tuples.get(2).get("totalIndexed"));
+ }
+
+ assertEquals(1L + 10L + 13L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
+
+ }
+
+ public void testIndirectUpdateStreamInsufficientCredentials() throws Exception {
+
+ // regardless of how it's routed, WRITE_Y should NOT have authz to stream updates to X...
+ for (String path : Arrays.asList(COLLECTION_X, COLLECTION_Y)) {
+ final SolrStream solrStream = new SolrStream(solrUrl + "/" + path,
+ params("qt", "/stream", "expr",
+ "update("+COLLECTION_X+",batchSize=1," +
+ "tuple(id='42',a_i=1,b_i=5))"));
+ solrStream.setCredentials(WRITE_Y_USER, WRITE_Y_USER);
+
+ // NOTE: Can't make any assertions about Exception: SOLR-14226
+ expectThrows(Exception.class, () -> {
+ final List ignored = getTuples(solrStream);
+ });
+ }
+
+ assertEquals(0L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
+ }
+
+ public void testExecutorUpdateStream() throws Exception {
+ final String expr
+ = "executor(threads=1, "
+ + " tuple(expr_s=\"update("+COLLECTION_X+", batchSize=5, "
+ + " tuple(id='42',a_i=1,b_i=5)) "
+ + " \")) "
+ ;
+ final SolrStream solrStream = new SolrStream(solrUrl + "/" + COLLECTION_X,
+ params("qt", "/stream",
+ "expr", expr));
+ solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER);
+ final List tuples = getTuples(solrStream);
+ assertEquals(0, tuples.size());
+
+ assertEquals(1L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
+ }
+
+ public void testExecutorUpdateStreamInsufficientCredentials() throws Exception {
+ int id = 0;
+ // both of these users have valid credentials and authz read COLLECTION_X, but neither has
+ // authz to write to X...
+ for (String user : Arrays.asList(READ_ONLY_USER, WRITE_Y_USER)) {
+ // ... regardless of how the request is routed...
+ for (String path : Arrays.asList(COLLECTION_X, COLLECTION_Y)) {
+ final String trace = user + ":" + path;
+ final String expr
+ = "executor(threads=1, "
+ + " tuple(expr_s=\"update("+COLLECTION_X+", batchSize=5, "
+ + " tuple(id='"+(++id)+"',foo_s='"+trace+"')) "
+ + " \")) "
+ ;
+ final SolrStream solrStream = new SolrStream(solrUrl + "/" + path,
+ params("qt", "/stream",
+ "_trace", "executor_via_" + trace,
+ "expr", expr));
+ solrStream.setCredentials(user, user);
+
+ // NOTE: Becaue of the backgroun threads, no failures will to be returned to client...
+ final List tuples = getTuples(solrStream);
+ assertEquals(0, tuples.size());
+
+ // we have to assert that the updates failed solely based on the side effects...
+ assertEquals("doc count after execute update via " + trace,
+ 0L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
+
+ }
+ }
+
+ // sanity check
+ assertEquals("final doc count",
+ 0L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
+ }
+
+ public void testDaemonUpdateStream() throws Exception {
+ final String daemonUrl = getRandomCoreUrl(COLLECTION_X);
+ log.info("Using Daemon @ {}", daemonUrl);
+
+ {
+ final String expr
+ // NOTE: inspite of what is implied by 'terminate=true', this daemon will
+ // NEVER terminate on it's own as long as the updates are successful
+ // (aparently that requires usage of anest topic() stream to set a "sleepMillis"?!?!?!)
+ = "daemon(id=daemonId,runInterval=1000,terminate=true, "
+ + " update("+COLLECTION_X+",tuple(id=42,a_i=1,b_i=5))) "
+ ;
+ final SolrStream solrStream = new SolrStream(daemonUrl,
+ params("qt", "/stream",
+ "expr", expr));
+ solrStream.setCredentials(WRITE_X_USER, WRITE_X_USER);
+ final List tuples = getTuples(solrStream);
+ assertEquals(1, tuples.size()); // daemon starting status
+ }
+ try {
+ // We have to poll the daemon 'list' to know once it's run...
+ long iterations = 0;
+ final TimeOut timeout = new TimeOut(60, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+ while ( ! timeout.hasTimedOut() ) {
+ final SolrStream daemonCheck = new SolrStream(daemonUrl,
+ params("qt", "/stream",
+ "action", "list"));
+ daemonCheck.setCredentials(WRITE_X_USER, WRITE_X_USER);
+ final List tuples = getTuples(daemonCheck);
+ assertEquals(1, tuples.size()); // our daemon;
+ iterations = tuples.get(0).getLong("iterations");
+ if (1 < iterations) {
+ // once the daemon has had a chance to run, break out of TimeOut
+ break;
+ }
+ Thread.sleep(Math.max(1, Math.min(5000, timeout.timeLeft(TimeUnit.MILLISECONDS))));
+ }
+ assertTrue("Didn't see any iterations after waiting an excessive amount of time: " + iterations,
+ 0 < iterations);
+ } finally {
+ // kill the damon...
+ final SolrStream daemonKiller = new SolrStream(daemonUrl,
+ params("qt", "/stream",
+ "action", "kill",
+ "id", "daemonId"));
+ daemonKiller.setCredentials(WRITE_X_USER, WRITE_X_USER);
+ final List tuples = getTuples(daemonKiller);
+ assertEquals(1, tuples.size()); // daemon death status
+ }
+
+ assertEquals(1L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
+ }
+
+ public void testDaemonUpdateStreamInsufficientCredentials() throws Exception {
+ final String daemonUrl = getRandomCoreUrl(COLLECTION_X);
+ log.info("Using Daemon @ {}", daemonUrl);
+
+ // both of these users have valid credentials and authz read COLLECTION_X, but neither has
+ // authz to write to X...
+ for (String user : Arrays.asList(READ_ONLY_USER, WRITE_Y_USER)) {
+ final String daemonId = "daemon_" + user;
+ {
+ final String expr
+ = "daemon(id="+daemonId+",runInterval=1000,terminate=true, "
+ + " update("+COLLECTION_X+",tuple(id=42,a_i=1,b_i=5))) "
+ ;
+ final SolrStream solrStream = new SolrStream(daemonUrl,
+ params("qt", "/stream",
+ "_trace", "start_" + daemonId,
+ "expr", expr));
+ solrStream.setCredentials(user, user);
+ final List tuples = getTuples(solrStream);
+ assertEquals(1, tuples.size()); // daemon starting status
+ }
+ try {
+ // We have to poll the daemon 'list' to know once it's run / terminated...
+ Object state = null;
+ final TimeOut timeout = new TimeOut(60, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+ while ( ! timeout.hasTimedOut() ) {
+ final SolrStream daemonCheck = new SolrStream(daemonUrl,
+ params("qt", "/stream",
+ "_trace", "check_" + daemonId,
+ "action", "list"));
+ daemonCheck.setCredentials(user, user);
+ final List tuples = getTuples(daemonCheck);
+ assertEquals(1, tuples.size()); // our daemon;
+ log.info("Current daemon status: {}", tuples.get(0).fields);
+ assertEquals(daemonId + " should have never had a successful iteration",
+ Long.valueOf(0L), tuples.get(0).getLong("iterations"));
+ state = tuples.get(0).get("state");
+ if ("TERMINATED".equals(state)) {
+ // once the daemon has failed, break out of TimeOut
+ break;
+ }
+ Thread.sleep(Math.max(1, Math.min(5000, timeout.timeLeft(TimeUnit.MILLISECONDS))));
+ }
+ assertEquals("Timed out w/o ever getting TERMINATED state from " + daemonId,
+ "TERMINATED", state);
+ } finally {
+ // kill the damon...
+ final SolrStream daemonKiller = new SolrStream(daemonUrl,
+ params("qt", "/stream",
+ "_trace", "kill_" + daemonId,
+ "action", "kill",
+ "id", daemonId));
+ daemonKiller.setCredentials(user, user);
+ final List tuples = getTuples(daemonKiller);
+ assertEquals(1, tuples.size()); // daemon death status
+ }
+
+ assertEquals("doc count after daemon update for " + user,
+ 0L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
+ }
+
+ // sanity check
+ assertEquals("final doc count",
+ 0L, commitAndCountDocsInCollection(COLLECTION_X, WRITE_X_USER));
+
+ }
+
+ /**
+ * Helper method that uses the specified user to (first commit, and then) count the total
+ * number of documents in the collection
+ */
+ protected static long commitAndCountDocsInCollection(final String collection,
+ final String user) throws Exception {
+ assertEquals(0, setBasicAuthCredentials(new UpdateRequest(), user).commit(cluster.getSolrClient(),
+ collection).getStatus());
+ return countDocsInCollection(collection, user);
+ }
+
+ /**
+ * Helper method that uses the specified user to count the total number of documents in the collection
+ */
+ protected static long countDocsInCollection(final String collection,
+ final String user) throws Exception {
+ return setBasicAuthCredentials(new QueryRequest(params("q", "*:*",
+ "rows", "0",
+ "_trace", "count_via_" + user + ":" + collection)),
+ user)
+ .process(cluster.getSolrClient(), collection).getResults().getNumFound();
+ }
+
+ /**
+ * Slurps a stream into a List
+ */
+ protected static List getTuples(final TupleStream tupleStream) throws IOException {
+ List tuples = new ArrayList();
+ try {
+ log.trace("TupleStream: {}", tupleStream);
+ tupleStream.open();
+ for (Tuple t = tupleStream.read(); !t.EOF; t = tupleStream.read()) {
+ log.trace("Tuple: {}", t.fields);
+ tuples.add(t);
+ }
+ } finally {
+ tupleStream.close();
+ }
+ return tuples;
+ }
+
+ /**
+ * Sigh. DaemonStream requires polling the same core where the stream was exectured.
+ */
+ protected static String getRandomCoreUrl(final String collection) throws Exception {
+ final List replicaUrls =
+ cluster.getSolrClient().getZkStateReader().getClusterState()
+ .getCollectionOrNull(collection).getReplicas().stream()
+ .map(Replica::getCoreUrl).collect(Collectors.toList());
+ Collections.shuffle(replicaUrls, random());
+ return replicaUrls.get(0);
+ }
+
+}