SOLR-14423: Additional fixes for object caching and incorrect test assumptions.

This commit is contained in:
Andrzej Bialecki 2020-05-13 19:36:16 +02:00
parent e4dc9e9401
commit dd4fa8f2f8
8 changed files with 53 additions and 15 deletions

View File

@ -233,7 +233,7 @@ public class CoreContainer {
private volatile SolrClientCache solrClientCache;
private volatile ObjectCache objectCache = new ObjectCache();
private final ObjectCache objectCache = new ObjectCache();
private PackageStoreAPI packageStoreAPI;
private PackageLoader packageLoader;
@ -996,6 +996,8 @@ public class CoreContainer {
// Now clear all the cores that are being operated upon.
solrCores.close();
objectCache.clear();
// It's still possible that one of the pending dynamic load operation is waiting, so wake it up if so.
// Since all the pending operations queues have been drained, there should be nothing to do.
synchronized (solrCores.getModifyLock()) {

View File

@ -87,8 +87,8 @@ import static org.apache.solr.common.params.CommonParams.ID;
*/
public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, PermissionNameProvider {
private ModelCache modelCache = null;
private ConcurrentMap objectCache = new ConcurrentHashMap();
private ModelCache modelCache;
private ConcurrentMap objectCache;
private SolrDefaultStreamFactory streamFactory = new SolrDefaultStreamFactory();
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private String coreName;
@ -100,25 +100,23 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
return PermissionNameProvider.Name.READ_PERM;
}
public SolrClientCache getClientCache() {
return solrClientCache;
}
public void inform(SolrCore core) {
String defaultCollection;
String defaultZkhost;
CoreContainer coreContainer = core.getCoreContainer();
this.solrClientCache = coreContainer.getSolrClientCache();
this.coreName = core.getName();
String cacheKey = this.getClass().getName() + "_" + coreName + "_";
this.objectCache = coreContainer.getObjectCache().computeIfAbsent(cacheKey + "objectCache",
ConcurrentHashMap.class, k-> new ConcurrentHashMap());
if (coreContainer.isZooKeeperAware()) {
defaultCollection = core.getCoreDescriptor().getCollectionName();
defaultZkhost = core.getCoreContainer().getZkController().getZkServerAddress();
streamFactory.withCollectionZkHost(defaultCollection, defaultZkhost);
streamFactory.withDefaultZkHost(defaultZkhost);
modelCache = new ModelCache(250,
defaultZkhost,
solrClientCache);
modelCache = coreContainer.getObjectCache().computeIfAbsent(cacheKey + "modelCache",
ModelCache.class,
k -> new ModelCache(250, defaultZkhost, solrClientCache));
}
streamFactory.withSolrResourceLoader(core.getResourceLoader());

View File

@ -341,7 +341,7 @@ public class SolrReporter extends ScheduledReporter {
*
* @deprecated use {@link SolrReporter#SolrReporter(SolrClientCache, boolean, Supplier, SolrMetricManager, List, String, String, TimeUnit, TimeUnit, SolrParams, boolean, boolean, boolean, boolean)} instead.
*/
@Deprecated(since = "8.6.0")
@Deprecated
public SolrReporter(HttpClient httpClient, Supplier<String> urlProvider, SolrMetricManager metricManager,
List<Report> metrics, String handler,
String reporterId, TimeUnit rateUnit, TimeUnit durationUnit,

View File

@ -187,6 +187,7 @@ public class ExecutorStream extends TupleStream implements Expressible {
this.queue = queue;
this.streamFactory = streamFactory;
this.streamContext = new StreamContext();
this.streamContext.setObjectCache(streamContext.getObjectCache());
this.streamContext.setSolrClientCache(streamContext.getSolrClientCache());
this.streamContext.setModelCache(streamContext.getModelCache());
}

View File

@ -239,6 +239,7 @@ public class FetchStream extends TupleStream implements Expressible {
CloudSolrStream cloudSolrStream = new CloudSolrStream(zkHost, collection, params);
StreamContext newContext = new StreamContext();
newContext.setSolrClientCache(streamContext.getSolrClientCache());
newContext.setObjectCache(streamContext.getObjectCache());
cloudSolrStream.setStreamContext(newContext);
Map<String, Tuple> fetched = new HashMap<>();
try {

View File

@ -430,6 +430,7 @@ public class TopicStream extends CloudSolrStream implements Expressible {
if(streamContext != null) {
StreamContext localContext = new StreamContext();
localContext.setSolrClientCache(streamContext.getSolrClientCache());
localContext.setObjectCache(streamContext.getObjectCache());
solrStream.setStreamContext(localContext);
}

View File

@ -26,12 +26,15 @@ import java.util.Set;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.AbstractDistribZkTestBase;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.junit.Before;
import org.junit.BeforeClass;
@ -3922,13 +3925,30 @@ public class MathExpressionTest extends SolrCloudTestCase {
assertTrue(stddev.doubleValue() == 0);
}
// NOTE: cache evaluators work only locally, on
// the same node where the replica that executes
// the stream is located
@Test
public void testCache() throws Exception {
String cexpr = "putCache(\"space1\", \"key1\", dotProduct(array(2,4,6,8,10,12),array(1,2,3,4,5,6)))";
ModifiableSolrParams paramsLoc = new ModifiableSolrParams();
paramsLoc.set("expr", cexpr);
paramsLoc.set("qt", "/stream");
String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS;
// find a node with a replica
ClusterState clusterState = cluster.getSolrClient().getClusterStateProvider().getClusterState();
String collection = useAlias ? COLLECTIONORALIAS + "_collection" : COLLECTIONORALIAS;
DocCollection coll = clusterState.getCollection(collection);
String node = coll.getReplicas().iterator().next().getNodeName();
String url = null;
for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
if (jetty.getNodeName().equals(node)) {
url = jetty.getBaseUrl().toString()+"/"+COLLECTIONORALIAS;
break;
}
}
if (url == null) {
fail("unable to find a node with replica");
}
TupleStream solrStream = new SolrStream(url, paramsLoc);
StreamContext context = new StreamContext();
solrStream.setStreamContext(context);

View File

@ -61,6 +61,8 @@ import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.AbstractDistribZkTestBase;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.junit.Assume;
@ -3654,7 +3656,20 @@ public class StreamDecoratorTest extends SolrCloudTestCase {
updateRequest.add(id, String.valueOf(1), "text_s", "a b e e f");
updateRequest.commit(cluster.getSolrClient(), "uknownCollection");
String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTIONORALIAS;
// find a node with a replica
ClusterState clusterState = cluster.getSolrClient().getClusterStateProvider().getClusterState();
DocCollection coll = clusterState.getCollection(COLLECTIONORALIAS);
String node = coll.getReplicas().iterator().next().getNodeName();
String url = null;
for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
if (jetty.getNodeName().equals(node)) {
url = jetty.getBaseUrl().toString()+"/"+COLLECTIONORALIAS;
break;
}
}
if (url == null) {
fail("unable to find a node with replica");
}
TupleStream updateTrainModelStream;
ModifiableSolrParams paramsLoc;