From 475b8cafc5265173cb100e84c895e7946ade902a Mon Sep 17 00:00:00 2001 From: Chris Hostetter Date: Wed, 22 Jan 2020 09:32:25 -0700 Subject: [PATCH] SOLR-12859: Fixed DocExpirationUpdateProcessorFactory to work with BasicAuth and other auth plugins that delegate to PKI for server initiated node-to-node communication. (cherry picked from commit 95dfddc7d4eaaa5997ccd69797dbe1fd966f32ac) --- solr/CHANGES.txt | 3 + .../solr/request/LocalSolrQueryRequest.java | 40 ++- .../security/PKIAuthenticationPlugin.java | 6 +- .../DocExpirationUpdateProcessorFactory.java | 7 +- ...stribDocExpirationUpdateProcessorTest.java | 326 ++++++++++++++---- 5 files changed, 310 insertions(+), 72 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 0bb9c5b4ca7..fc5d4e0ba44 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -139,6 +139,9 @@ Bug Fixes * SOLR-14192: Race condition between SchemaManager and ZkIndexSchemaReader. (ab) +* SOLR-12859: Fixed DocExpirationUpdateProcessorFactory to work with BasicAuth and other auth plugins + that delegate to PKI for server initiated node-to-node communication. (hossman) + Other Changes --------------------- diff --git a/solr/core/src/java/org/apache/solr/request/LocalSolrQueryRequest.java b/solr/core/src/java/org/apache/solr/request/LocalSolrQueryRequest.java index 889877ae491..427d71c933a 100644 --- a/solr/core/src/java/org/apache/solr/request/LocalSolrQueryRequest.java +++ b/solr/core/src/java/org/apache/solr/request/LocalSolrQueryRequest.java @@ -16,9 +16,11 @@ */ package org.apache.solr.request; +import java.security.Principal; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.Objects; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.MultiMapSolrParams; @@ -34,7 +36,9 @@ import org.apache.solr.core.SolrCore; */ public class LocalSolrQueryRequest extends SolrQueryRequestBase { public final static Map emptyArgs = new HashMap(0,1); - + + public String userPrincipalName = null; + protected static SolrParams makeParams(String query, String qtype, int start, int limit, Map args) { Map map = new HashMap<>(); for (Iterator iter = args.entrySet().iterator(); iter.hasNext();) { @@ -66,6 +70,38 @@ public class LocalSolrQueryRequest extends SolrQueryRequestBase { public LocalSolrQueryRequest(SolrCore core, SolrParams args) { super(core, args); } - + + @Override public Principal getUserPrincipal() { + return new LocalPrincipal(this.userPrincipalName); + } + + /** + * Allows setting the 'name' of the User Principal for the purposes of creating local requests + * in a solr node when security is enabled. It is experiemental and subject to removal + * + * @see org.apache.solr.security.PKIAuthenticationPlugin#NODE_IS_USER + * @see #getUserPrincipal + * @lucene.internal + * @lucene.experimental + */ + public void setUserPrincipalName(String s) { + this.userPrincipalName = s; + } + private final class LocalPrincipal implements Principal { + private final String user; + public LocalPrincipal(String user) { + this.user = user; + } + public String getName() { + return user; + } + @Override public int hashCode() { + return Objects.hashCode(user); + } + @Override public boolean equals(Object other) { + return Objects.equals(this.getClass(), other.getClass()) + && Objects.equals(this.getName(), ((LocalPrincipal)other).getName() ); + } + } } diff --git a/solr/core/src/java/org/apache/solr/security/PKIAuthenticationPlugin.java b/solr/core/src/java/org/apache/solr/security/PKIAuthenticationPlugin.java index b14068c9317..538334a6c84 100644 --- a/solr/core/src/java/org/apache/solr/security/PKIAuthenticationPlugin.java +++ b/solr/core/src/java/org/apache/solr/security/PKIAuthenticationPlugin.java @@ -231,7 +231,9 @@ public class PKIAuthenticationPlugin extends AuthenticationPlugin implements Htt final HttpListenerFactory.RequestResponseListener listener = new HttpListenerFactory.RequestResponseListener() { @Override public void onQueued(Request request) { + log.trace("onQueued: {}", request); if (cores.getAuthenticationPlugin() == null) { + log.trace("no authentication plugin, skipping"); return; } if (!cores.getAuthenticationPlugin().interceptInternodeRequest(request)) { @@ -282,7 +284,7 @@ public class PKIAuthenticationPlugin extends AuthenticationPlugin implements Htt if (reqInfo != null) { Principal principal = reqInfo.getUserPrincipal(); if (principal == null) { - log.debug("principal is null"); + log.debug("generateToken: principal is null"); //this had a request but not authenticated //so we don't not need to set a principal return Optional.empty(); @@ -293,6 +295,7 @@ public class PKIAuthenticationPlugin extends AuthenticationPlugin implements Htt if (!isSolrThread()) { //if this is not running inside a Solr threadpool (as in testcases) // then no need to add any header + log.debug("generateToken: not a solr (server) thread"); return Optional.empty(); } //this request seems to be originated from Solr itself @@ -304,6 +307,7 @@ public class PKIAuthenticationPlugin extends AuthenticationPlugin implements Htt byte[] payload = s.getBytes(UTF_8); byte[] payloadCipher = publicKeyHandler.keyPair.encrypt(ByteBuffer.wrap(payload)); String base64Cipher = Base64.byteArrayToBase64(payloadCipher); + log.trace("generateToken: usr={} token={}", usr, base64Cipher); return Optional.of(base64Cipher); } diff --git a/solr/core/src/java/org/apache/solr/update/processor/DocExpirationUpdateProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/DocExpirationUpdateProcessorFactory.java index 38f6c47be58..a66c8cdebaf 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DocExpirationUpdateProcessorFactory.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DocExpirationUpdateProcessorFactory.java @@ -45,6 +45,7 @@ import org.apache.solr.request.LocalSolrQueryRequest; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrRequestInfo; import org.apache.solr.response.SolrQueryResponse; +import org.apache.solr.security.PKIAuthenticationPlugin; import org.apache.solr.update.AddUpdateCommand; import org.apache.solr.update.CommitUpdateCommand; import org.apache.solr.update.DeleteUpdateCommand; @@ -381,9 +382,13 @@ public final class DocExpirationUpdateProcessorFactory public void run() { // setup the request context early so the logging (including any from // shouldWeDoPeriodicDelete() ) includes the core context info - final SolrQueryRequest req = new LocalSolrQueryRequest + final LocalSolrQueryRequest req = new LocalSolrQueryRequest (factory.core, Collections.emptyMap()); try { + // HACK: to indicate to PKI that this is a server initiated request for the purposes + // of distributed requet/credential forwarding... + req.setUserPrincipalName(PKIAuthenticationPlugin.NODE_IS_USER); + final SolrQueryResponse rsp = new SolrQueryResponse(); rsp.addResponseHeader(new SimpleOrderedMap<>(1)); SolrRequestInfo.setRequestInfo(new SolrRequestInfo(req, rsp)); diff --git a/solr/core/src/test/org/apache/solr/cloud/DistribDocExpirationUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/cloud/DistribDocExpirationUpdateProcessorTest.java index 8847ceca637..d99a4069e21 100644 --- a/solr/core/src/test/org/apache/solr/cloud/DistribDocExpirationUpdateProcessorTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/DistribDocExpirationUpdateProcessorTest.java @@ -18,29 +18,42 @@ package org.apache.solr.cloud; import java.io.IOException; import java.lang.invoke.MethodHandles; +import java.util.Objects; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import static java.util.Collections.singletonMap; +import static java.util.Collections.singletonList; + import org.apache.lucene.util.LuceneTestCase.Slow; +import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.TimeSource; +import org.apache.solr.common.util.Utils; import org.apache.solr.handler.ReplicationHandler; +import org.apache.solr.security.BasicAuthPlugin; +import org.apache.solr.security.RuleBasedAuthorizationPlugin; import org.apache.solr.update.processor.DocExpirationUpdateProcessorFactory; import org.apache.solr.util.TimeOut; -import org.junit.BeforeClass; -import org.junit.Test; + +import static org.apache.solr.security.Sha256AuthenticationProvider.getSaltedHashedValue; + +import org.junit.After; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,76 +63,203 @@ public class DistribDocExpirationUpdateProcessorTest extends SolrCloudTestCase { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private static final String COLLECTION = "expiry"; + private String COLLECTION = null; + private String USER = null; + private String PASS = null; - @BeforeClass - public static void setupCluster() throws Exception { - configureCluster(2) - .addConfig("conf", TEST_PATH().resolve("configsets").resolve("doc-expiry").resolve("conf")) - .configure(); - - CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1) - .processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT); - cluster.getSolrClient().waitForState(COLLECTION, DEFAULT_TIMEOUT, TimeUnit.SECONDS, - (n, c) -> DocCollection.isFullyActive(n, c, 2, 1)); + @After + public void cleanup() throws Exception { + shutdownCluster(); + COLLECTION = null; + USER = null; + PASS = null; } - @Test - public void test() throws Exception { - - // some docs with no expiration - UpdateRequest req1 = new UpdateRequest(); - for (int i = 1; i <= 100; i++) { - req1.add(sdoc("id", i)); + /** + * Modifies the request to inlcude authentication params if needed, returns the request + */ + private T setAuthIfNeeded(T req) { + if (null != USER) { + assert null != PASS; + req.setBasicAuthCredentials(USER, PASS); } - req1.commit(cluster.getSolrClient(), COLLECTION); + return req; + } + + public void setupCluster(boolean security) throws Exception { + // we want at most one core per node to force lots of network traffic to try and tickle distributed bugs + final Builder b = configureCluster(4) + .addConfig("conf", TEST_PATH().resolve("configsets").resolve("doc-expiry").resolve("conf")); - // this doc better not already exist - waitForNoResults(0, params("q","id:999","rows","0","_trace","sanity_check")); + COLLECTION = "expiring"; + if (security) { + USER = "solr"; + PASS = "SolrRocksAgain"; + COLLECTION += "_secure"; + + final String SECURITY_JSON = Utils.toJSONString + (Utils.makeMap("authorization", + Utils.makeMap("class", RuleBasedAuthorizationPlugin.class.getName(), + "user-role", singletonMap(USER,"admin"), + "permissions", singletonList(Utils.makeMap("name","all", + "role","admin"))), + "authentication", + Utils.makeMap("class", BasicAuthPlugin.class.getName(), + "blockUnknown",true, + "credentials", singletonMap(USER, getSaltedHashedValue(PASS))))); + b.withSecurityJson(SECURITY_JSON); + } + b.configure(); + + setAuthIfNeeded(CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 2)) + .process(cluster.getSolrClient()); + + cluster.getSolrClient().waitForState(COLLECTION, DEFAULT_TIMEOUT, TimeUnit.SECONDS, + (n, c) -> DocCollection.isFullyActive(n, c, 2, 2)); + } + + public void testNoAuth() throws Exception { + setupCluster(false); + runTest(); + } + + + public void testBasicAuth() throws Exception { + setupCluster(true); + + // sanity check that our cluster really does require authentication + assertEquals("sanity check of non authenticated request", + 401, + expectThrows(SolrException.class, () -> { + final long ignored = cluster.getSolrClient().query + (COLLECTION, + params("q", "*:*", + "rows", "0", + "_trace", "no_auth_sanity_check")).getResults().getNumFound(); + }).code()); - // record the indexversion for each server so we can check later - // that it only changes for one shard - final Map initIndexVersions = getIndexVersionOfAllReplicas(); - assertTrue("WTF? no versions?", 0 < initIndexVersions.size()); - - // add a doc with a short TTL - new UpdateRequest().add(sdoc("id", "999", "tTl_s","+30SECONDS")).commit(cluster.getSolrClient(), COLLECTION); - - // wait for one doc to be deleted - waitForNoResults(180, params("q","id:999","rows","0","_trace","did_it_expire_yet")); - - // verify only one shard changed - final Map finalIndexVersions = getIndexVersionOfAllReplicas(); - assertEquals("WTF? not same num versions?", - initIndexVersions.size(), - finalIndexVersions.size()); + runTest(); + } + + private void runTest() throws Exception { + final int totalNumDocs = atLeast(50); - final Set nodesThatChange = new HashSet(); - final Set shardsThatChange = new HashSet(); + // Add a bunch of docs; some with extremely short expiration, some with no expiration + // these should be randomly distributed to each shard + long numDocsThatNeverExpire = 0; + { + final UpdateRequest req = setAuthIfNeeded(new UpdateRequest()); + for (int i = 1; i <= totalNumDocs; i++) { + final SolrInputDocument doc = sdoc("id", i); + + if (random().nextBoolean()) { + doc.addField("should_expire_s","yup"); + doc.addField("tTl_s","+1SECONDS"); + } else { + numDocsThatNeverExpire++; + } + + req.add(doc); + } + req.commit(cluster.getSolrClient(), COLLECTION); + } + + // NOTE: don't assume we can find exactly totalNumDocs right now, some may have already been deleted... + + // it should not take long for us to get to the point where all 'should_expire_s:yup' docs are gone + waitForNoResults(30, params("q","should_expire_s:yup","rows","0","_trace","init_batch_check")); + + { + // ...*NOW* we can assert that exactly numDocsThatNeverExpire should exist... + final QueryRequest req = setAuthIfNeeded(new QueryRequest + (params("q", "*:*", + "rows", "0", + "_trace", "count_non_expire_docs"))); + + // NOTE: it's possible that replicas could be out of sync but this query may get lucky and + // only hit leaders. we'll compare the counts of every replica in every shard later on... + assertEquals(numDocsThatNeverExpire, + req.process(cluster.getSolrClient(), COLLECTION).getResults().getNumFound()); + } + + // + // now that we've confrmed the basics work, let's check some fine grain stuff... + // + + // first off, sanity check that this special docId doesn't some how already exist + waitForNoResults(0, params("q","id:special99","rows","0","_trace","sanity_check99")); + + { + // force a hard commit on all shards (the prior auto-expire would have only done a soft commit) + // so we can ensure our indexVersion won't change uncessisarily on the un-affected + // shard when we add & (hard) commit our special doc... + final UpdateRequest req = setAuthIfNeeded(new UpdateRequest()); + req.commit(cluster.getSolrClient(), COLLECTION); + } + + + // record important data for each replica core so we can check later + // that it only changes for the replicas of a single shard after we add/expire a single special doc + log.info("Fetching ReplicaData BEFORE special doc addition/expiration"); + final Map initReplicaData = getTestDataForAllReplicas(); + assertTrue("WTF? no replica data?", 0 < initReplicaData.size()); + + // add & hard commit a special doc with a short TTL + setAuthIfNeeded(new UpdateRequest()).add(sdoc("id", "special99", "should_expire_s","yup","tTl_s","+30SECONDS")) + .commit(cluster.getSolrClient(), COLLECTION); + + // wait for our special docId to be deleted + waitForNoResults(180, params("q","id:special99","rows","0","_trace","did_special_doc_expire_yet")); + + // now check all of the replicas to verify a few things: + // - only the replicas of one shard changed -- no unneccessary churn on other shards + // - every replica of each single shard should have the same number of docs + // - the total number of docs should match numDocsThatNeverExpire + log.info("Fetching ReplicaData AFTER special doc addition/expiration"); + final Map finalReplicaData = getTestDataForAllReplicas(); + assertEquals("WTF? not same num replicas?", + initReplicaData.size(), + finalReplicaData.size()); + + final Set coresThatChange = new HashSet<>(); + final Set shardsThatChange = new HashSet<>(); int coresCompared = 0; - DocCollection collectionState = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION); - for (Replica replica : collectionState.getReplicas()) { - coresCompared++; - String name = replica.getName(); - String core = replica.getCoreName(); - Long initVersion = initIndexVersions.get(core); - Long finalVersion = finalIndexVersions.get(core); - assertNotNull(name + ": no init version for core: " + core, initVersion); - assertNotNull(name + ": no final version for core: " + core, finalVersion); + int totalDocsOnAllShards = 0; + final DocCollection collectionState = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION); + for (Slice shard : collectionState) { + boolean firstReplica = true; + for (Replica replica : shard) { + coresCompared++; + assertEquals(shard.getName(), replica.getSlice()); // sanity check + final String core = replica.getCoreName(); + final ReplicaData initData = initReplicaData.get(core); + final ReplicaData finalData = finalReplicaData.get(core); + assertNotNull(shard.getName() + ": no init data for core: " + core, initData); + assertNotNull(shard.getName() + ": no final data for core: " + core, finalData); - if (!initVersion.equals(finalVersion)) { - nodesThatChange.add(core + "("+name+")"); - shardsThatChange.add(name); + if (!initData.equals(finalData)) { + log.error("ReplicaData changed: {} != {}", initData, finalData); + coresThatChange.add(core + "("+shard.getName()+")"); + shardsThatChange.add(shard.getName()); + } + + if (firstReplica) { + totalDocsOnAllShards += finalData.numDocs; + firstReplica = false; + } } } assertEquals("Exactly one shard should have changed, instead: " + shardsThatChange - + " nodes=(" + nodesThatChange + ")", + + " cores=(" + coresThatChange + ")", 1, shardsThatChange.size()); assertEquals("somehow we missed some cores?", - initIndexVersions.size(), coresCompared); + initReplicaData.size(), coresCompared); + assertEquals("Final tally has incorrect numDocsThatNeverExpire", + numDocsThatNeverExpire, totalDocsOnAllShards); + // TODO: above logic verifies that deleteByQuery happens on all nodes, and ... // doesn't affect searcher re-open on shards w/o expired docs ... can we also verify // that *only* one node is sending the deletes ? @@ -128,11 +268,10 @@ public class DistribDocExpirationUpdateProcessorTest extends SolrCloudTestCase { } /** - * returns a map whose key is the coreNodeName and whose value is what the replication - * handler returns for the indexversion + * returns a map whose key is the coreNodeName and whose value is data about that core needed for the test */ - private Map getIndexVersionOfAllReplicas() throws IOException, SolrServerException { - Map results = new HashMap(); + private Map getTestDataForAllReplicas() throws IOException, SolrServerException { + Map results = new HashMap<>(); DocCollection collectionState = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION); @@ -145,7 +284,7 @@ public class DistribDocExpirationUpdateProcessorTest extends SolrCloudTestCase { params.set("command", "indexversion"); params.set("_trace", "getIndexVersion"); params.set("qt", ReplicationHandler.PATH); - QueryRequest req = new QueryRequest(params); + QueryRequest req = setAuthIfNeeded(new QueryRequest(params)); NamedList res = client.request(req); assertNotNull("null response from server: " + coreName, res); @@ -153,17 +292,21 @@ public class DistribDocExpirationUpdateProcessorTest extends SolrCloudTestCase { Object version = res.get("indexversion"); assertNotNull("null version from server: " + coreName, version); assertTrue("version isn't a long: " + coreName, version instanceof Long); - results.put(coreName, (Long) version); - long numDocs = client.query(params("q", "*:*", "distrib", "false", "rows", "0", "_trace", "counting_docs")) - .getResults().getNumFound(); - log.info("core=" + coreName + "; ver=" + version + - "; numDocs=" + numDocs); + long numDocs = + setAuthIfNeeded(new QueryRequest + (params("q", "*:*", + "distrib", "false", + "rows", "0", + "_trace", "counting_docs"))).process(client).getResults().getNumFound(); + + final ReplicaData data = new ReplicaData(replica.getSlice(),coreName,(Long)version,numDocs); + log.info("{}", data); + results.put(coreName, data); } } - return results; } @@ -176,14 +319,61 @@ public class DistribDocExpirationUpdateProcessorTest extends SolrCloudTestCase { SolrParams params) throws SolrServerException, InterruptedException, IOException { + final QueryRequest req = setAuthIfNeeded(new QueryRequest(params)); final TimeOut timeout = new TimeOut(maxTimeLimitSeconds, TimeUnit.SECONDS, TimeSource.NANO_TIME); - long numFound = cluster.getSolrClient().query(COLLECTION, params).getResults().getNumFound(); + + long numFound = req.process(cluster.getSolrClient(), COLLECTION).getResults().getNumFound(); while (0L < numFound && ! timeout.hasTimedOut()) { Thread.sleep(Math.max(1, Math.min(5000, timeout.timeLeft(TimeUnit.MILLISECONDS)))); - numFound = cluster.getSolrClient().query(COLLECTION, params).getResults().getNumFound(); + + numFound = req.process(cluster.getSolrClient(), COLLECTION).getResults().getNumFound(); } + assertEquals("Give up waiting for no results: " + params, 0L, numFound); } + private static class ReplicaData { + public final String shardName; + public final String coreName; + public final long indexVersion; + public final long numDocs; + public ReplicaData(final String shardName, + final String coreName, + final long indexVersion, + final long numDocs) { + assert null != shardName; + assert null != coreName; + + this.shardName = shardName; + this.coreName = coreName; + this.indexVersion = indexVersion; + this.numDocs = numDocs; + } + + @Override + public String toString() { + return "ReplicaData(shard="+shardName+",core="+coreName+ + ",indexVer="+indexVersion+",numDocs="+numDocs+")"; + } + + @Override + public boolean equals(Object other) { + if (other instanceof ReplicaData) { + ReplicaData that = (ReplicaData)other; + return + this.shardName.equals(that.shardName) && + this.coreName.equals(that.coreName) && + (this.indexVersion == that.indexVersion) && + (this.numDocs == that.numDocs); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(this.shardName, this.coreName, this.indexVersion, this.numDocs); + } + } + }