mirror of https://github.com/apache/lucene.git
SOLR-12859: Fixed DocExpirationUpdateProcessorFactory to work with BasicAuth and other auth plugins that delegate to PKI for server initiated node-to-node communication.
(cherry picked from commit 95dfddc7d4
)
This commit is contained in:
parent
74936c32c3
commit
475b8cafc5
|
@ -139,6 +139,9 @@ Bug Fixes
|
||||||
|
|
||||||
* SOLR-14192: Race condition between SchemaManager and ZkIndexSchemaReader. (ab)
|
* SOLR-14192: Race condition between SchemaManager and ZkIndexSchemaReader. (ab)
|
||||||
|
|
||||||
|
* SOLR-12859: Fixed DocExpirationUpdateProcessorFactory to work with BasicAuth and other auth plugins
|
||||||
|
that delegate to PKI for server initiated node-to-node communication. (hossman)
|
||||||
|
|
||||||
Other Changes
|
Other Changes
|
||||||
---------------------
|
---------------------
|
||||||
|
|
||||||
|
|
|
@ -16,9 +16,11 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.solr.request;
|
package org.apache.solr.request;
|
||||||
|
|
||||||
|
import java.security.Principal;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
import org.apache.solr.common.params.CommonParams;
|
import org.apache.solr.common.params.CommonParams;
|
||||||
import org.apache.solr.common.params.MultiMapSolrParams;
|
import org.apache.solr.common.params.MultiMapSolrParams;
|
||||||
|
@ -34,7 +36,9 @@ import org.apache.solr.core.SolrCore;
|
||||||
*/
|
*/
|
||||||
public class LocalSolrQueryRequest extends SolrQueryRequestBase {
|
public class LocalSolrQueryRequest extends SolrQueryRequestBase {
|
||||||
public final static Map emptyArgs = new HashMap(0,1);
|
public final static Map emptyArgs = new HashMap(0,1);
|
||||||
|
|
||||||
|
public String userPrincipalName = null;
|
||||||
|
|
||||||
protected static SolrParams makeParams(String query, String qtype, int start, int limit, Map args) {
|
protected static SolrParams makeParams(String query, String qtype, int start, int limit, Map args) {
|
||||||
Map<String,String[]> map = new HashMap<>();
|
Map<String,String[]> map = new HashMap<>();
|
||||||
for (Iterator iter = args.entrySet().iterator(); iter.hasNext();) {
|
for (Iterator iter = args.entrySet().iterator(); iter.hasNext();) {
|
||||||
|
@ -66,6 +70,38 @@ public class LocalSolrQueryRequest extends SolrQueryRequestBase {
|
||||||
public LocalSolrQueryRequest(SolrCore core, SolrParams args) {
|
public LocalSolrQueryRequest(SolrCore core, SolrParams args) {
|
||||||
super(core, args);
|
super(core, args);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override public Principal getUserPrincipal() {
|
||||||
|
return new LocalPrincipal(this.userPrincipalName);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allows setting the 'name' of the User Principal for the purposes of creating local requests
|
||||||
|
* in a solr node when security is enabled. It is experiemental and subject to removal
|
||||||
|
*
|
||||||
|
* @see org.apache.solr.security.PKIAuthenticationPlugin#NODE_IS_USER
|
||||||
|
* @see #getUserPrincipal
|
||||||
|
* @lucene.internal
|
||||||
|
* @lucene.experimental
|
||||||
|
*/
|
||||||
|
public void setUserPrincipalName(String s) {
|
||||||
|
this.userPrincipalName = s;
|
||||||
|
}
|
||||||
|
private final class LocalPrincipal implements Principal {
|
||||||
|
private final String user;
|
||||||
|
public LocalPrincipal(String user) {
|
||||||
|
this.user = user;
|
||||||
|
}
|
||||||
|
public String getName() {
|
||||||
|
return user;
|
||||||
|
}
|
||||||
|
@Override public int hashCode() {
|
||||||
|
return Objects.hashCode(user);
|
||||||
|
}
|
||||||
|
@Override public boolean equals(Object other) {
|
||||||
|
return Objects.equals(this.getClass(), other.getClass())
|
||||||
|
&& Objects.equals(this.getName(), ((LocalPrincipal)other).getName() );
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -231,7 +231,9 @@ public class PKIAuthenticationPlugin extends AuthenticationPlugin implements Htt
|
||||||
final HttpListenerFactory.RequestResponseListener listener = new HttpListenerFactory.RequestResponseListener() {
|
final HttpListenerFactory.RequestResponseListener listener = new HttpListenerFactory.RequestResponseListener() {
|
||||||
@Override
|
@Override
|
||||||
public void onQueued(Request request) {
|
public void onQueued(Request request) {
|
||||||
|
log.trace("onQueued: {}", request);
|
||||||
if (cores.getAuthenticationPlugin() == null) {
|
if (cores.getAuthenticationPlugin() == null) {
|
||||||
|
log.trace("no authentication plugin, skipping");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (!cores.getAuthenticationPlugin().interceptInternodeRequest(request)) {
|
if (!cores.getAuthenticationPlugin().interceptInternodeRequest(request)) {
|
||||||
|
@ -282,7 +284,7 @@ public class PKIAuthenticationPlugin extends AuthenticationPlugin implements Htt
|
||||||
if (reqInfo != null) {
|
if (reqInfo != null) {
|
||||||
Principal principal = reqInfo.getUserPrincipal();
|
Principal principal = reqInfo.getUserPrincipal();
|
||||||
if (principal == null) {
|
if (principal == null) {
|
||||||
log.debug("principal is null");
|
log.debug("generateToken: principal is null");
|
||||||
//this had a request but not authenticated
|
//this had a request but not authenticated
|
||||||
//so we don't not need to set a principal
|
//so we don't not need to set a principal
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
|
@ -293,6 +295,7 @@ public class PKIAuthenticationPlugin extends AuthenticationPlugin implements Htt
|
||||||
if (!isSolrThread()) {
|
if (!isSolrThread()) {
|
||||||
//if this is not running inside a Solr threadpool (as in testcases)
|
//if this is not running inside a Solr threadpool (as in testcases)
|
||||||
// then no need to add any header
|
// then no need to add any header
|
||||||
|
log.debug("generateToken: not a solr (server) thread");
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
//this request seems to be originated from Solr itself
|
//this request seems to be originated from Solr itself
|
||||||
|
@ -304,6 +307,7 @@ public class PKIAuthenticationPlugin extends AuthenticationPlugin implements Htt
|
||||||
byte[] payload = s.getBytes(UTF_8);
|
byte[] payload = s.getBytes(UTF_8);
|
||||||
byte[] payloadCipher = publicKeyHandler.keyPair.encrypt(ByteBuffer.wrap(payload));
|
byte[] payloadCipher = publicKeyHandler.keyPair.encrypt(ByteBuffer.wrap(payload));
|
||||||
String base64Cipher = Base64.byteArrayToBase64(payloadCipher);
|
String base64Cipher = Base64.byteArrayToBase64(payloadCipher);
|
||||||
|
log.trace("generateToken: usr={} token={}", usr, base64Cipher);
|
||||||
return Optional.of(base64Cipher);
|
return Optional.of(base64Cipher);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.solr.request.LocalSolrQueryRequest;
|
||||||
import org.apache.solr.request.SolrQueryRequest;
|
import org.apache.solr.request.SolrQueryRequest;
|
||||||
import org.apache.solr.request.SolrRequestInfo;
|
import org.apache.solr.request.SolrRequestInfo;
|
||||||
import org.apache.solr.response.SolrQueryResponse;
|
import org.apache.solr.response.SolrQueryResponse;
|
||||||
|
import org.apache.solr.security.PKIAuthenticationPlugin;
|
||||||
import org.apache.solr.update.AddUpdateCommand;
|
import org.apache.solr.update.AddUpdateCommand;
|
||||||
import org.apache.solr.update.CommitUpdateCommand;
|
import org.apache.solr.update.CommitUpdateCommand;
|
||||||
import org.apache.solr.update.DeleteUpdateCommand;
|
import org.apache.solr.update.DeleteUpdateCommand;
|
||||||
|
@ -381,9 +382,13 @@ public final class DocExpirationUpdateProcessorFactory
|
||||||
public void run() {
|
public void run() {
|
||||||
// setup the request context early so the logging (including any from
|
// setup the request context early so the logging (including any from
|
||||||
// shouldWeDoPeriodicDelete() ) includes the core context info
|
// shouldWeDoPeriodicDelete() ) includes the core context info
|
||||||
final SolrQueryRequest req = new LocalSolrQueryRequest
|
final LocalSolrQueryRequest req = new LocalSolrQueryRequest
|
||||||
(factory.core, Collections.<String,String[]>emptyMap());
|
(factory.core, Collections.<String,String[]>emptyMap());
|
||||||
try {
|
try {
|
||||||
|
// HACK: to indicate to PKI that this is a server initiated request for the purposes
|
||||||
|
// of distributed requet/credential forwarding...
|
||||||
|
req.setUserPrincipalName(PKIAuthenticationPlugin.NODE_IS_USER);
|
||||||
|
|
||||||
final SolrQueryResponse rsp = new SolrQueryResponse();
|
final SolrQueryResponse rsp = new SolrQueryResponse();
|
||||||
rsp.addResponseHeader(new SimpleOrderedMap<>(1));
|
rsp.addResponseHeader(new SimpleOrderedMap<>(1));
|
||||||
SolrRequestInfo.setRequestInfo(new SolrRequestInfo(req, rsp));
|
SolrRequestInfo.setRequestInfo(new SolrRequestInfo(req, rsp));
|
||||||
|
|
|
@ -18,29 +18,42 @@ package org.apache.solr.cloud;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.invoke.MethodHandles;
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import static java.util.Collections.singletonMap;
|
||||||
|
import static java.util.Collections.singletonList;
|
||||||
|
|
||||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||||
|
import org.apache.solr.client.solrj.SolrRequest;
|
||||||
import org.apache.solr.client.solrj.SolrServerException;
|
import org.apache.solr.client.solrj.SolrServerException;
|
||||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||||
|
import org.apache.solr.common.SolrException;
|
||||||
|
import org.apache.solr.common.SolrInputDocument;
|
||||||
import org.apache.solr.common.cloud.DocCollection;
|
import org.apache.solr.common.cloud.DocCollection;
|
||||||
import org.apache.solr.common.cloud.Replica;
|
import org.apache.solr.common.cloud.Replica;
|
||||||
|
import org.apache.solr.common.cloud.Slice;
|
||||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||||
import org.apache.solr.common.params.SolrParams;
|
import org.apache.solr.common.params.SolrParams;
|
||||||
import org.apache.solr.common.util.NamedList;
|
import org.apache.solr.common.util.NamedList;
|
||||||
import org.apache.solr.common.util.TimeSource;
|
import org.apache.solr.common.util.TimeSource;
|
||||||
|
import org.apache.solr.common.util.Utils;
|
||||||
import org.apache.solr.handler.ReplicationHandler;
|
import org.apache.solr.handler.ReplicationHandler;
|
||||||
|
import org.apache.solr.security.BasicAuthPlugin;
|
||||||
|
import org.apache.solr.security.RuleBasedAuthorizationPlugin;
|
||||||
import org.apache.solr.update.processor.DocExpirationUpdateProcessorFactory;
|
import org.apache.solr.update.processor.DocExpirationUpdateProcessorFactory;
|
||||||
import org.apache.solr.util.TimeOut;
|
import org.apache.solr.util.TimeOut;
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.Test;
|
import static org.apache.solr.security.Sha256AuthenticationProvider.getSaltedHashedValue;
|
||||||
|
|
||||||
|
import org.junit.After;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -50,76 +63,203 @@ public class DistribDocExpirationUpdateProcessorTest extends SolrCloudTestCase {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
private static final String COLLECTION = "expiry";
|
private String COLLECTION = null;
|
||||||
|
private String USER = null;
|
||||||
|
private String PASS = null;
|
||||||
|
|
||||||
@BeforeClass
|
@After
|
||||||
public static void setupCluster() throws Exception {
|
public void cleanup() throws Exception {
|
||||||
configureCluster(2)
|
shutdownCluster();
|
||||||
.addConfig("conf", TEST_PATH().resolve("configsets").resolve("doc-expiry").resolve("conf"))
|
COLLECTION = null;
|
||||||
.configure();
|
USER = null;
|
||||||
|
PASS = null;
|
||||||
CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1)
|
|
||||||
.processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
|
|
||||||
cluster.getSolrClient().waitForState(COLLECTION, DEFAULT_TIMEOUT, TimeUnit.SECONDS,
|
|
||||||
(n, c) -> DocCollection.isFullyActive(n, c, 2, 1));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
/**
|
||||||
public void test() throws Exception {
|
* Modifies the request to inlcude authentication params if needed, returns the request
|
||||||
|
*/
|
||||||
// some docs with no expiration
|
private <T extends SolrRequest> T setAuthIfNeeded(T req) {
|
||||||
UpdateRequest req1 = new UpdateRequest();
|
if (null != USER) {
|
||||||
for (int i = 1; i <= 100; i++) {
|
assert null != PASS;
|
||||||
req1.add(sdoc("id", i));
|
req.setBasicAuthCredentials(USER, PASS);
|
||||||
}
|
}
|
||||||
req1.commit(cluster.getSolrClient(), COLLECTION);
|
return req;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setupCluster(boolean security) throws Exception {
|
||||||
|
// we want at most one core per node to force lots of network traffic to try and tickle distributed bugs
|
||||||
|
final Builder b = configureCluster(4)
|
||||||
|
.addConfig("conf", TEST_PATH().resolve("configsets").resolve("doc-expiry").resolve("conf"));
|
||||||
|
|
||||||
// this doc better not already exist
|
COLLECTION = "expiring";
|
||||||
waitForNoResults(0, params("q","id:999","rows","0","_trace","sanity_check"));
|
if (security) {
|
||||||
|
USER = "solr";
|
||||||
|
PASS = "SolrRocksAgain";
|
||||||
|
COLLECTION += "_secure";
|
||||||
|
|
||||||
|
final String SECURITY_JSON = Utils.toJSONString
|
||||||
|
(Utils.makeMap("authorization",
|
||||||
|
Utils.makeMap("class", RuleBasedAuthorizationPlugin.class.getName(),
|
||||||
|
"user-role", singletonMap(USER,"admin"),
|
||||||
|
"permissions", singletonList(Utils.makeMap("name","all",
|
||||||
|
"role","admin"))),
|
||||||
|
"authentication",
|
||||||
|
Utils.makeMap("class", BasicAuthPlugin.class.getName(),
|
||||||
|
"blockUnknown",true,
|
||||||
|
"credentials", singletonMap(USER, getSaltedHashedValue(PASS)))));
|
||||||
|
b.withSecurityJson(SECURITY_JSON);
|
||||||
|
}
|
||||||
|
b.configure();
|
||||||
|
|
||||||
|
setAuthIfNeeded(CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 2))
|
||||||
|
.process(cluster.getSolrClient());
|
||||||
|
|
||||||
|
cluster.getSolrClient().waitForState(COLLECTION, DEFAULT_TIMEOUT, TimeUnit.SECONDS,
|
||||||
|
(n, c) -> DocCollection.isFullyActive(n, c, 2, 2));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testNoAuth() throws Exception {
|
||||||
|
setupCluster(false);
|
||||||
|
runTest();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void testBasicAuth() throws Exception {
|
||||||
|
setupCluster(true);
|
||||||
|
|
||||||
|
// sanity check that our cluster really does require authentication
|
||||||
|
assertEquals("sanity check of non authenticated request",
|
||||||
|
401,
|
||||||
|
expectThrows(SolrException.class, () -> {
|
||||||
|
final long ignored = cluster.getSolrClient().query
|
||||||
|
(COLLECTION,
|
||||||
|
params("q", "*:*",
|
||||||
|
"rows", "0",
|
||||||
|
"_trace", "no_auth_sanity_check")).getResults().getNumFound();
|
||||||
|
}).code());
|
||||||
|
|
||||||
// record the indexversion for each server so we can check later
|
runTest();
|
||||||
// that it only changes for one shard
|
}
|
||||||
final Map<String,Long> initIndexVersions = getIndexVersionOfAllReplicas();
|
|
||||||
assertTrue("WTF? no versions?", 0 < initIndexVersions.size());
|
private void runTest() throws Exception {
|
||||||
|
final int totalNumDocs = atLeast(50);
|
||||||
// add a doc with a short TTL
|
|
||||||
new UpdateRequest().add(sdoc("id", "999", "tTl_s","+30SECONDS")).commit(cluster.getSolrClient(), COLLECTION);
|
|
||||||
|
|
||||||
// wait for one doc to be deleted
|
|
||||||
waitForNoResults(180, params("q","id:999","rows","0","_trace","did_it_expire_yet"));
|
|
||||||
|
|
||||||
// verify only one shard changed
|
|
||||||
final Map<String,Long> finalIndexVersions = getIndexVersionOfAllReplicas();
|
|
||||||
assertEquals("WTF? not same num versions?",
|
|
||||||
initIndexVersions.size(),
|
|
||||||
finalIndexVersions.size());
|
|
||||||
|
|
||||||
final Set<String> nodesThatChange = new HashSet<String>();
|
// Add a bunch of docs; some with extremely short expiration, some with no expiration
|
||||||
final Set<String> shardsThatChange = new HashSet<String>();
|
// these should be randomly distributed to each shard
|
||||||
|
long numDocsThatNeverExpire = 0;
|
||||||
|
{
|
||||||
|
final UpdateRequest req = setAuthIfNeeded(new UpdateRequest());
|
||||||
|
for (int i = 1; i <= totalNumDocs; i++) {
|
||||||
|
final SolrInputDocument doc = sdoc("id", i);
|
||||||
|
|
||||||
|
if (random().nextBoolean()) {
|
||||||
|
doc.addField("should_expire_s","yup");
|
||||||
|
doc.addField("tTl_s","+1SECONDS");
|
||||||
|
} else {
|
||||||
|
numDocsThatNeverExpire++;
|
||||||
|
}
|
||||||
|
|
||||||
|
req.add(doc);
|
||||||
|
}
|
||||||
|
req.commit(cluster.getSolrClient(), COLLECTION);
|
||||||
|
}
|
||||||
|
|
||||||
|
// NOTE: don't assume we can find exactly totalNumDocs right now, some may have already been deleted...
|
||||||
|
|
||||||
|
// it should not take long for us to get to the point where all 'should_expire_s:yup' docs are gone
|
||||||
|
waitForNoResults(30, params("q","should_expire_s:yup","rows","0","_trace","init_batch_check"));
|
||||||
|
|
||||||
|
{
|
||||||
|
// ...*NOW* we can assert that exactly numDocsThatNeverExpire should exist...
|
||||||
|
final QueryRequest req = setAuthIfNeeded(new QueryRequest
|
||||||
|
(params("q", "*:*",
|
||||||
|
"rows", "0",
|
||||||
|
"_trace", "count_non_expire_docs")));
|
||||||
|
|
||||||
|
// NOTE: it's possible that replicas could be out of sync but this query may get lucky and
|
||||||
|
// only hit leaders. we'll compare the counts of every replica in every shard later on...
|
||||||
|
assertEquals(numDocsThatNeverExpire,
|
||||||
|
req.process(cluster.getSolrClient(), COLLECTION).getResults().getNumFound());
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// now that we've confrmed the basics work, let's check some fine grain stuff...
|
||||||
|
//
|
||||||
|
|
||||||
|
// first off, sanity check that this special docId doesn't some how already exist
|
||||||
|
waitForNoResults(0, params("q","id:special99","rows","0","_trace","sanity_check99"));
|
||||||
|
|
||||||
|
{
|
||||||
|
// force a hard commit on all shards (the prior auto-expire would have only done a soft commit)
|
||||||
|
// so we can ensure our indexVersion won't change uncessisarily on the un-affected
|
||||||
|
// shard when we add & (hard) commit our special doc...
|
||||||
|
final UpdateRequest req = setAuthIfNeeded(new UpdateRequest());
|
||||||
|
req.commit(cluster.getSolrClient(), COLLECTION);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// record important data for each replica core so we can check later
|
||||||
|
// that it only changes for the replicas of a single shard after we add/expire a single special doc
|
||||||
|
log.info("Fetching ReplicaData BEFORE special doc addition/expiration");
|
||||||
|
final Map<String,ReplicaData> initReplicaData = getTestDataForAllReplicas();
|
||||||
|
assertTrue("WTF? no replica data?", 0 < initReplicaData.size());
|
||||||
|
|
||||||
|
// add & hard commit a special doc with a short TTL
|
||||||
|
setAuthIfNeeded(new UpdateRequest()).add(sdoc("id", "special99", "should_expire_s","yup","tTl_s","+30SECONDS"))
|
||||||
|
.commit(cluster.getSolrClient(), COLLECTION);
|
||||||
|
|
||||||
|
// wait for our special docId to be deleted
|
||||||
|
waitForNoResults(180, params("q","id:special99","rows","0","_trace","did_special_doc_expire_yet"));
|
||||||
|
|
||||||
|
// now check all of the replicas to verify a few things:
|
||||||
|
// - only the replicas of one shard changed -- no unneccessary churn on other shards
|
||||||
|
// - every replica of each single shard should have the same number of docs
|
||||||
|
// - the total number of docs should match numDocsThatNeverExpire
|
||||||
|
log.info("Fetching ReplicaData AFTER special doc addition/expiration");
|
||||||
|
final Map<String,ReplicaData> finalReplicaData = getTestDataForAllReplicas();
|
||||||
|
assertEquals("WTF? not same num replicas?",
|
||||||
|
initReplicaData.size(),
|
||||||
|
finalReplicaData.size());
|
||||||
|
|
||||||
|
final Set<String> coresThatChange = new HashSet<>();
|
||||||
|
final Set<String> shardsThatChange = new HashSet<>();
|
||||||
|
|
||||||
int coresCompared = 0;
|
int coresCompared = 0;
|
||||||
DocCollection collectionState = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION);
|
int totalDocsOnAllShards = 0;
|
||||||
for (Replica replica : collectionState.getReplicas()) {
|
final DocCollection collectionState = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION);
|
||||||
coresCompared++;
|
for (Slice shard : collectionState) {
|
||||||
String name = replica.getName();
|
boolean firstReplica = true;
|
||||||
String core = replica.getCoreName();
|
for (Replica replica : shard) {
|
||||||
Long initVersion = initIndexVersions.get(core);
|
coresCompared++;
|
||||||
Long finalVersion = finalIndexVersions.get(core);
|
assertEquals(shard.getName(), replica.getSlice()); // sanity check
|
||||||
assertNotNull(name + ": no init version for core: " + core, initVersion);
|
final String core = replica.getCoreName();
|
||||||
assertNotNull(name + ": no final version for core: " + core, finalVersion);
|
final ReplicaData initData = initReplicaData.get(core);
|
||||||
|
final ReplicaData finalData = finalReplicaData.get(core);
|
||||||
|
assertNotNull(shard.getName() + ": no init data for core: " + core, initData);
|
||||||
|
assertNotNull(shard.getName() + ": no final data for core: " + core, finalData);
|
||||||
|
|
||||||
if (!initVersion.equals(finalVersion)) {
|
if (!initData.equals(finalData)) {
|
||||||
nodesThatChange.add(core + "("+name+")");
|
log.error("ReplicaData changed: {} != {}", initData, finalData);
|
||||||
shardsThatChange.add(name);
|
coresThatChange.add(core + "("+shard.getName()+")");
|
||||||
|
shardsThatChange.add(shard.getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (firstReplica) {
|
||||||
|
totalDocsOnAllShards += finalData.numDocs;
|
||||||
|
firstReplica = false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
assertEquals("Exactly one shard should have changed, instead: " + shardsThatChange
|
assertEquals("Exactly one shard should have changed, instead: " + shardsThatChange
|
||||||
+ " nodes=(" + nodesThatChange + ")",
|
+ " cores=(" + coresThatChange + ")",
|
||||||
1, shardsThatChange.size());
|
1, shardsThatChange.size());
|
||||||
assertEquals("somehow we missed some cores?",
|
assertEquals("somehow we missed some cores?",
|
||||||
initIndexVersions.size(), coresCompared);
|
initReplicaData.size(), coresCompared);
|
||||||
|
|
||||||
|
assertEquals("Final tally has incorrect numDocsThatNeverExpire",
|
||||||
|
numDocsThatNeverExpire, totalDocsOnAllShards);
|
||||||
|
|
||||||
// TODO: above logic verifies that deleteByQuery happens on all nodes, and ...
|
// TODO: above logic verifies that deleteByQuery happens on all nodes, and ...
|
||||||
// doesn't affect searcher re-open on shards w/o expired docs ... can we also verify
|
// doesn't affect searcher re-open on shards w/o expired docs ... can we also verify
|
||||||
// that *only* one node is sending the deletes ?
|
// that *only* one node is sending the deletes ?
|
||||||
|
@ -128,11 +268,10 @@ public class DistribDocExpirationUpdateProcessorTest extends SolrCloudTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* returns a map whose key is the coreNodeName and whose value is what the replication
|
* returns a map whose key is the coreNodeName and whose value is data about that core needed for the test
|
||||||
* handler returns for the indexversion
|
|
||||||
*/
|
*/
|
||||||
private Map<String,Long> getIndexVersionOfAllReplicas() throws IOException, SolrServerException {
|
private Map<String,ReplicaData> getTestDataForAllReplicas() throws IOException, SolrServerException {
|
||||||
Map<String,Long> results = new HashMap<String,Long>();
|
Map<String,ReplicaData> results = new HashMap<>();
|
||||||
|
|
||||||
DocCollection collectionState = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION);
|
DocCollection collectionState = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION);
|
||||||
|
|
||||||
|
@ -145,7 +284,7 @@ public class DistribDocExpirationUpdateProcessorTest extends SolrCloudTestCase {
|
||||||
params.set("command", "indexversion");
|
params.set("command", "indexversion");
|
||||||
params.set("_trace", "getIndexVersion");
|
params.set("_trace", "getIndexVersion");
|
||||||
params.set("qt", ReplicationHandler.PATH);
|
params.set("qt", ReplicationHandler.PATH);
|
||||||
QueryRequest req = new QueryRequest(params);
|
QueryRequest req = setAuthIfNeeded(new QueryRequest(params));
|
||||||
|
|
||||||
NamedList<Object> res = client.request(req);
|
NamedList<Object> res = client.request(req);
|
||||||
assertNotNull("null response from server: " + coreName, res);
|
assertNotNull("null response from server: " + coreName, res);
|
||||||
|
@ -153,17 +292,21 @@ public class DistribDocExpirationUpdateProcessorTest extends SolrCloudTestCase {
|
||||||
Object version = res.get("indexversion");
|
Object version = res.get("indexversion");
|
||||||
assertNotNull("null version from server: " + coreName, version);
|
assertNotNull("null version from server: " + coreName, version);
|
||||||
assertTrue("version isn't a long: " + coreName, version instanceof Long);
|
assertTrue("version isn't a long: " + coreName, version instanceof Long);
|
||||||
results.put(coreName, (Long) version);
|
|
||||||
|
|
||||||
long numDocs = client.query(params("q", "*:*", "distrib", "false", "rows", "0", "_trace", "counting_docs"))
|
long numDocs =
|
||||||
.getResults().getNumFound();
|
setAuthIfNeeded(new QueryRequest
|
||||||
log.info("core=" + coreName + "; ver=" + version +
|
(params("q", "*:*",
|
||||||
"; numDocs=" + numDocs);
|
"distrib", "false",
|
||||||
|
"rows", "0",
|
||||||
|
"_trace", "counting_docs"))).process(client).getResults().getNumFound();
|
||||||
|
|
||||||
|
final ReplicaData data = new ReplicaData(replica.getSlice(),coreName,(Long)version,numDocs);
|
||||||
|
log.info("{}", data);
|
||||||
|
results.put(coreName, data);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
return results;
|
return results;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -176,14 +319,61 @@ public class DistribDocExpirationUpdateProcessorTest extends SolrCloudTestCase {
|
||||||
SolrParams params)
|
SolrParams params)
|
||||||
throws SolrServerException, InterruptedException, IOException {
|
throws SolrServerException, InterruptedException, IOException {
|
||||||
|
|
||||||
|
final QueryRequest req = setAuthIfNeeded(new QueryRequest(params));
|
||||||
final TimeOut timeout = new TimeOut(maxTimeLimitSeconds, TimeUnit.SECONDS, TimeSource.NANO_TIME);
|
final TimeOut timeout = new TimeOut(maxTimeLimitSeconds, TimeUnit.SECONDS, TimeSource.NANO_TIME);
|
||||||
long numFound = cluster.getSolrClient().query(COLLECTION, params).getResults().getNumFound();
|
|
||||||
|
long numFound = req.process(cluster.getSolrClient(), COLLECTION).getResults().getNumFound();
|
||||||
while (0L < numFound && ! timeout.hasTimedOut()) {
|
while (0L < numFound && ! timeout.hasTimedOut()) {
|
||||||
Thread.sleep(Math.max(1, Math.min(5000, timeout.timeLeft(TimeUnit.MILLISECONDS))));
|
Thread.sleep(Math.max(1, Math.min(5000, timeout.timeLeft(TimeUnit.MILLISECONDS))));
|
||||||
numFound = cluster.getSolrClient().query(COLLECTION, params).getResults().getNumFound();
|
|
||||||
|
numFound = req.process(cluster.getSolrClient(), COLLECTION).getResults().getNumFound();
|
||||||
}
|
}
|
||||||
|
|
||||||
assertEquals("Give up waiting for no results: " + params,
|
assertEquals("Give up waiting for no results: " + params,
|
||||||
0L, numFound);
|
0L, numFound);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class ReplicaData {
|
||||||
|
public final String shardName;
|
||||||
|
public final String coreName;
|
||||||
|
public final long indexVersion;
|
||||||
|
public final long numDocs;
|
||||||
|
public ReplicaData(final String shardName,
|
||||||
|
final String coreName,
|
||||||
|
final long indexVersion,
|
||||||
|
final long numDocs) {
|
||||||
|
assert null != shardName;
|
||||||
|
assert null != coreName;
|
||||||
|
|
||||||
|
this.shardName = shardName;
|
||||||
|
this.coreName = coreName;
|
||||||
|
this.indexVersion = indexVersion;
|
||||||
|
this.numDocs = numDocs;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "ReplicaData(shard="+shardName+",core="+coreName+
|
||||||
|
",indexVer="+indexVersion+",numDocs="+numDocs+")";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object other) {
|
||||||
|
if (other instanceof ReplicaData) {
|
||||||
|
ReplicaData that = (ReplicaData)other;
|
||||||
|
return
|
||||||
|
this.shardName.equals(that.shardName) &&
|
||||||
|
this.coreName.equals(that.coreName) &&
|
||||||
|
(this.indexVersion == that.indexVersion) &&
|
||||||
|
(this.numDocs == that.numDocs);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hash(this.shardName, this.coreName, this.indexVersion, this.numDocs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue