SOLR-12859: Fixed DocExpirationUpdateProcessorFactory to work with BasicAuth and other auth plugins that delegate to PKI for server initiated node-to-node communication.

This commit is contained in:
Chris Hostetter 2020-01-22 09:32:25 -07:00
parent 8fd3fbd93c
commit 95dfddc7d4
5 changed files with 310 additions and 72 deletions

View File

@ -225,6 +225,9 @@ Bug Fixes
* SOLR-14192: Race condition between SchemaManager and ZkIndexSchemaReader. (ab)
* SOLR-12859: Fixed DocExpirationUpdateProcessorFactory to work with BasicAuth and other auth plugins
that delegate to PKI for server initiated node-to-node communication. (hossman)
Other Changes
---------------------

View File

@ -16,9 +16,11 @@
*/
package org.apache.solr.request;
import java.security.Principal;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.MultiMapSolrParams;
@ -34,7 +36,9 @@ import org.apache.solr.core.SolrCore;
*/
public class LocalSolrQueryRequest extends SolrQueryRequestBase {
public final static Map emptyArgs = new HashMap(0,1);
public String userPrincipalName = null;
protected static SolrParams makeParams(String query, String qtype, int start, int limit, Map args) {
Map<String,String[]> map = new HashMap<>();
for (Iterator iter = args.entrySet().iterator(); iter.hasNext();) {
@ -66,6 +70,38 @@ public class LocalSolrQueryRequest extends SolrQueryRequestBase {
public LocalSolrQueryRequest(SolrCore core, SolrParams args) {
super(core, args);
}
@Override public Principal getUserPrincipal() {
return new LocalPrincipal(this.userPrincipalName);
}
/**
* Allows setting the 'name' of the User Principal for the purposes of creating local requests
* in a solr node when security is enabled. It is experiemental and subject to removal
*
* @see org.apache.solr.security.PKIAuthenticationPlugin#NODE_IS_USER
* @see #getUserPrincipal
* @lucene.internal
* @lucene.experimental
*/
public void setUserPrincipalName(String s) {
this.userPrincipalName = s;
}
private final class LocalPrincipal implements Principal {
private final String user;
public LocalPrincipal(String user) {
this.user = user;
}
public String getName() {
return user;
}
@Override public int hashCode() {
return Objects.hashCode(user);
}
@Override public boolean equals(Object other) {
return Objects.equals(this.getClass(), other.getClass())
&& Objects.equals(this.getName(), ((LocalPrincipal)other).getName() );
}
}
}

View File

@ -231,7 +231,9 @@ public class PKIAuthenticationPlugin extends AuthenticationPlugin implements Htt
final HttpListenerFactory.RequestResponseListener listener = new HttpListenerFactory.RequestResponseListener() {
@Override
public void onQueued(Request request) {
log.trace("onQueued: {}", request);
if (cores.getAuthenticationPlugin() == null) {
log.trace("no authentication plugin, skipping");
return;
}
if (!cores.getAuthenticationPlugin().interceptInternodeRequest(request)) {
@ -282,7 +284,7 @@ public class PKIAuthenticationPlugin extends AuthenticationPlugin implements Htt
if (reqInfo != null) {
Principal principal = reqInfo.getUserPrincipal();
if (principal == null) {
log.debug("principal is null");
log.debug("generateToken: principal is null");
//this had a request but not authenticated
//so we don't not need to set a principal
return Optional.empty();
@ -293,6 +295,7 @@ public class PKIAuthenticationPlugin extends AuthenticationPlugin implements Htt
if (!isSolrThread()) {
//if this is not running inside a Solr threadpool (as in testcases)
// then no need to add any header
log.debug("generateToken: not a solr (server) thread");
return Optional.empty();
}
//this request seems to be originated from Solr itself
@ -304,6 +307,7 @@ public class PKIAuthenticationPlugin extends AuthenticationPlugin implements Htt
byte[] payload = s.getBytes(UTF_8);
byte[] payloadCipher = publicKeyHandler.keyPair.encrypt(ByteBuffer.wrap(payload));
String base64Cipher = Base64.byteArrayToBase64(payloadCipher);
log.trace("generateToken: usr={} token={}", usr, base64Cipher);
return Optional.of(base64Cipher);
}

View File

@ -45,6 +45,7 @@ import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.security.PKIAuthenticationPlugin;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
@ -381,9 +382,13 @@ public final class DocExpirationUpdateProcessorFactory
public void run() {
// setup the request context early so the logging (including any from
// shouldWeDoPeriodicDelete() ) includes the core context info
final SolrQueryRequest req = new LocalSolrQueryRequest
final LocalSolrQueryRequest req = new LocalSolrQueryRequest
(factory.core, Collections.<String,String[]>emptyMap());
try {
// HACK: to indicate to PKI that this is a server initiated request for the purposes
// of distributed requet/credential forwarding...
req.setUserPrincipalName(PKIAuthenticationPlugin.NODE_IS_USER);
final SolrQueryResponse rsp = new SolrQueryResponse();
rsp.addResponseHeader(new SimpleOrderedMap<>(1));
SolrRequestInfo.setRequestInfo(new SolrRequestInfo(req, rsp));

View File

@ -18,29 +18,42 @@ package org.apache.solr.cloud;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Objects;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static java.util.Collections.singletonMap;
import static java.util.Collections.singletonList;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.handler.ReplicationHandler;
import org.apache.solr.security.BasicAuthPlugin;
import org.apache.solr.security.RuleBasedAuthorizationPlugin;
import org.apache.solr.update.processor.DocExpirationUpdateProcessorFactory;
import org.apache.solr.util.TimeOut;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.apache.solr.security.Sha256AuthenticationProvider.getSaltedHashedValue;
import org.junit.After;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -50,76 +63,203 @@ public class DistribDocExpirationUpdateProcessorTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String COLLECTION = "expiry";
private String COLLECTION = null;
private String USER = null;
private String PASS = null;
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(2)
.addConfig("conf", TEST_PATH().resolve("configsets").resolve("doc-expiry").resolve("conf"))
.configure();
CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1)
.processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
cluster.getSolrClient().waitForState(COLLECTION, DEFAULT_TIMEOUT, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 2, 1));
@After
public void cleanup() throws Exception {
shutdownCluster();
COLLECTION = null;
USER = null;
PASS = null;
}
@Test
public void test() throws Exception {
// some docs with no expiration
UpdateRequest req1 = new UpdateRequest();
for (int i = 1; i <= 100; i++) {
req1.add(sdoc("id", i));
/**
* Modifies the request to inlcude authentication params if needed, returns the request
*/
private <T extends SolrRequest> T setAuthIfNeeded(T req) {
if (null != USER) {
assert null != PASS;
req.setBasicAuthCredentials(USER, PASS);
}
req1.commit(cluster.getSolrClient(), COLLECTION);
return req;
}
public void setupCluster(boolean security) throws Exception {
// we want at most one core per node to force lots of network traffic to try and tickle distributed bugs
final Builder b = configureCluster(4)
.addConfig("conf", TEST_PATH().resolve("configsets").resolve("doc-expiry").resolve("conf"));
// this doc better not already exist
waitForNoResults(0, params("q","id:999","rows","0","_trace","sanity_check"));
COLLECTION = "expiring";
if (security) {
USER = "solr";
PASS = "SolrRocksAgain";
COLLECTION += "_secure";
final String SECURITY_JSON = Utils.toJSONString
(Utils.makeMap("authorization",
Utils.makeMap("class", RuleBasedAuthorizationPlugin.class.getName(),
"user-role", singletonMap(USER,"admin"),
"permissions", singletonList(Utils.makeMap("name","all",
"role","admin"))),
"authentication",
Utils.makeMap("class", BasicAuthPlugin.class.getName(),
"blockUnknown",true,
"credentials", singletonMap(USER, getSaltedHashedValue(PASS)))));
b.withSecurityJson(SECURITY_JSON);
}
b.configure();
setAuthIfNeeded(CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 2))
.process(cluster.getSolrClient());
cluster.getSolrClient().waitForState(COLLECTION, DEFAULT_TIMEOUT, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 2, 2));
}
public void testNoAuth() throws Exception {
setupCluster(false);
runTest();
}
public void testBasicAuth() throws Exception {
setupCluster(true);
// sanity check that our cluster really does require authentication
assertEquals("sanity check of non authenticated request",
401,
expectThrows(SolrException.class, () -> {
final long ignored = cluster.getSolrClient().query
(COLLECTION,
params("q", "*:*",
"rows", "0",
"_trace", "no_auth_sanity_check")).getResults().getNumFound();
}).code());
// record the indexversion for each server so we can check later
// that it only changes for one shard
final Map<String,Long> initIndexVersions = getIndexVersionOfAllReplicas();
assertTrue("WTF? no versions?", 0 < initIndexVersions.size());
// add a doc with a short TTL
new UpdateRequest().add(sdoc("id", "999", "tTl_s","+30SECONDS")).commit(cluster.getSolrClient(), COLLECTION);
// wait for one doc to be deleted
waitForNoResults(180, params("q","id:999","rows","0","_trace","did_it_expire_yet"));
// verify only one shard changed
final Map<String,Long> finalIndexVersions = getIndexVersionOfAllReplicas();
assertEquals("WTF? not same num versions?",
initIndexVersions.size(),
finalIndexVersions.size());
runTest();
}
private void runTest() throws Exception {
final int totalNumDocs = atLeast(50);
final Set<String> nodesThatChange = new HashSet<String>();
final Set<String> shardsThatChange = new HashSet<String>();
// Add a bunch of docs; some with extremely short expiration, some with no expiration
// these should be randomly distributed to each shard
long numDocsThatNeverExpire = 0;
{
final UpdateRequest req = setAuthIfNeeded(new UpdateRequest());
for (int i = 1; i <= totalNumDocs; i++) {
final SolrInputDocument doc = sdoc("id", i);
if (random().nextBoolean()) {
doc.addField("should_expire_s","yup");
doc.addField("tTl_s","+1SECONDS");
} else {
numDocsThatNeverExpire++;
}
req.add(doc);
}
req.commit(cluster.getSolrClient(), COLLECTION);
}
// NOTE: don't assume we can find exactly totalNumDocs right now, some may have already been deleted...
// it should not take long for us to get to the point where all 'should_expire_s:yup' docs are gone
waitForNoResults(30, params("q","should_expire_s:yup","rows","0","_trace","init_batch_check"));
{
// ...*NOW* we can assert that exactly numDocsThatNeverExpire should exist...
final QueryRequest req = setAuthIfNeeded(new QueryRequest
(params("q", "*:*",
"rows", "0",
"_trace", "count_non_expire_docs")));
// NOTE: it's possible that replicas could be out of sync but this query may get lucky and
// only hit leaders. we'll compare the counts of every replica in every shard later on...
assertEquals(numDocsThatNeverExpire,
req.process(cluster.getSolrClient(), COLLECTION).getResults().getNumFound());
}
//
// now that we've confrmed the basics work, let's check some fine grain stuff...
//
// first off, sanity check that this special docId doesn't some how already exist
waitForNoResults(0, params("q","id:special99","rows","0","_trace","sanity_check99"));
{
// force a hard commit on all shards (the prior auto-expire would have only done a soft commit)
// so we can ensure our indexVersion won't change uncessisarily on the un-affected
// shard when we add & (hard) commit our special doc...
final UpdateRequest req = setAuthIfNeeded(new UpdateRequest());
req.commit(cluster.getSolrClient(), COLLECTION);
}
// record important data for each replica core so we can check later
// that it only changes for the replicas of a single shard after we add/expire a single special doc
log.info("Fetching ReplicaData BEFORE special doc addition/expiration");
final Map<String,ReplicaData> initReplicaData = getTestDataForAllReplicas();
assertTrue("WTF? no replica data?", 0 < initReplicaData.size());
// add & hard commit a special doc with a short TTL
setAuthIfNeeded(new UpdateRequest()).add(sdoc("id", "special99", "should_expire_s","yup","tTl_s","+30SECONDS"))
.commit(cluster.getSolrClient(), COLLECTION);
// wait for our special docId to be deleted
waitForNoResults(180, params("q","id:special99","rows","0","_trace","did_special_doc_expire_yet"));
// now check all of the replicas to verify a few things:
// - only the replicas of one shard changed -- no unneccessary churn on other shards
// - every replica of each single shard should have the same number of docs
// - the total number of docs should match numDocsThatNeverExpire
log.info("Fetching ReplicaData AFTER special doc addition/expiration");
final Map<String,ReplicaData> finalReplicaData = getTestDataForAllReplicas();
assertEquals("WTF? not same num replicas?",
initReplicaData.size(),
finalReplicaData.size());
final Set<String> coresThatChange = new HashSet<>();
final Set<String> shardsThatChange = new HashSet<>();
int coresCompared = 0;
DocCollection collectionState = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION);
for (Replica replica : collectionState.getReplicas()) {
coresCompared++;
String name = replica.getName();
String core = replica.getCoreName();
Long initVersion = initIndexVersions.get(core);
Long finalVersion = finalIndexVersions.get(core);
assertNotNull(name + ": no init version for core: " + core, initVersion);
assertNotNull(name + ": no final version for core: " + core, finalVersion);
int totalDocsOnAllShards = 0;
final DocCollection collectionState = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION);
for (Slice shard : collectionState) {
boolean firstReplica = true;
for (Replica replica : shard) {
coresCompared++;
assertEquals(shard.getName(), replica.getSlice()); // sanity check
final String core = replica.getCoreName();
final ReplicaData initData = initReplicaData.get(core);
final ReplicaData finalData = finalReplicaData.get(core);
assertNotNull(shard.getName() + ": no init data for core: " + core, initData);
assertNotNull(shard.getName() + ": no final data for core: " + core, finalData);
if (!initVersion.equals(finalVersion)) {
nodesThatChange.add(core + "("+name+")");
shardsThatChange.add(name);
if (!initData.equals(finalData)) {
log.error("ReplicaData changed: {} != {}", initData, finalData);
coresThatChange.add(core + "("+shard.getName()+")");
shardsThatChange.add(shard.getName());
}
if (firstReplica) {
totalDocsOnAllShards += finalData.numDocs;
firstReplica = false;
}
}
}
assertEquals("Exactly one shard should have changed, instead: " + shardsThatChange
+ " nodes=(" + nodesThatChange + ")",
+ " cores=(" + coresThatChange + ")",
1, shardsThatChange.size());
assertEquals("somehow we missed some cores?",
initIndexVersions.size(), coresCompared);
initReplicaData.size(), coresCompared);
assertEquals("Final tally has incorrect numDocsThatNeverExpire",
numDocsThatNeverExpire, totalDocsOnAllShards);
// TODO: above logic verifies that deleteByQuery happens on all nodes, and ...
// doesn't affect searcher re-open on shards w/o expired docs ... can we also verify
// that *only* one node is sending the deletes ?
@ -128,11 +268,10 @@ public class DistribDocExpirationUpdateProcessorTest extends SolrCloudTestCase {
}
/**
* returns a map whose key is the coreNodeName and whose value is what the replication
* handler returns for the indexversion
* returns a map whose key is the coreNodeName and whose value is data about that core needed for the test
*/
private Map<String,Long> getIndexVersionOfAllReplicas() throws IOException, SolrServerException {
Map<String,Long> results = new HashMap<String,Long>();
private Map<String,ReplicaData> getTestDataForAllReplicas() throws IOException, SolrServerException {
Map<String,ReplicaData> results = new HashMap<>();
DocCollection collectionState = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION);
@ -145,7 +284,7 @@ public class DistribDocExpirationUpdateProcessorTest extends SolrCloudTestCase {
params.set("command", "indexversion");
params.set("_trace", "getIndexVersion");
params.set("qt", ReplicationHandler.PATH);
QueryRequest req = new QueryRequest(params);
QueryRequest req = setAuthIfNeeded(new QueryRequest(params));
NamedList<Object> res = client.request(req);
assertNotNull("null response from server: " + coreName, res);
@ -153,17 +292,21 @@ public class DistribDocExpirationUpdateProcessorTest extends SolrCloudTestCase {
Object version = res.get("indexversion");
assertNotNull("null version from server: " + coreName, version);
assertTrue("version isn't a long: " + coreName, version instanceof Long);
results.put(coreName, (Long) version);
long numDocs = client.query(params("q", "*:*", "distrib", "false", "rows", "0", "_trace", "counting_docs"))
.getResults().getNumFound();
log.info("core=" + coreName + "; ver=" + version +
"; numDocs=" + numDocs);
long numDocs =
setAuthIfNeeded(new QueryRequest
(params("q", "*:*",
"distrib", "false",
"rows", "0",
"_trace", "counting_docs"))).process(client).getResults().getNumFound();
final ReplicaData data = new ReplicaData(replica.getSlice(),coreName,(Long)version,numDocs);
log.info("{}", data);
results.put(coreName, data);
}
}
return results;
}
@ -176,14 +319,61 @@ public class DistribDocExpirationUpdateProcessorTest extends SolrCloudTestCase {
SolrParams params)
throws SolrServerException, InterruptedException, IOException {
final QueryRequest req = setAuthIfNeeded(new QueryRequest(params));
final TimeOut timeout = new TimeOut(maxTimeLimitSeconds, TimeUnit.SECONDS, TimeSource.NANO_TIME);
long numFound = cluster.getSolrClient().query(COLLECTION, params).getResults().getNumFound();
long numFound = req.process(cluster.getSolrClient(), COLLECTION).getResults().getNumFound();
while (0L < numFound && ! timeout.hasTimedOut()) {
Thread.sleep(Math.max(1, Math.min(5000, timeout.timeLeft(TimeUnit.MILLISECONDS))));
numFound = cluster.getSolrClient().query(COLLECTION, params).getResults().getNumFound();
numFound = req.process(cluster.getSolrClient(), COLLECTION).getResults().getNumFound();
}
assertEquals("Give up waiting for no results: " + params,
0L, numFound);
}
private static class ReplicaData {
public final String shardName;
public final String coreName;
public final long indexVersion;
public final long numDocs;
public ReplicaData(final String shardName,
final String coreName,
final long indexVersion,
final long numDocs) {
assert null != shardName;
assert null != coreName;
this.shardName = shardName;
this.coreName = coreName;
this.indexVersion = indexVersion;
this.numDocs = numDocs;
}
@Override
public String toString() {
return "ReplicaData(shard="+shardName+",core="+coreName+
",indexVer="+indexVersion+",numDocs="+numDocs+")";
}
@Override
public boolean equals(Object other) {
if (other instanceof ReplicaData) {
ReplicaData that = (ReplicaData)other;
return
this.shardName.equals(that.shardName) &&
this.coreName.equals(that.coreName) &&
(this.indexVersion == that.indexVersion) &&
(this.numDocs == that.numDocs);
}
return false;
}
@Override
public int hashCode() {
return Objects.hash(this.shardName, this.coreName, this.indexVersion, this.numDocs);
}
}
}