From dd4fa8f2f87d1dc7a10d72febc9241520b6294d6 Mon Sep 17 00:00:00 2001 From: Andrzej Bialecki Date: Wed, 13 May 2020 19:36:16 +0200 Subject: [PATCH] SOLR-14423: Additional fixes for object caching and incorrect test assumptions. --- .../org/apache/solr/core/CoreContainer.java | 4 +++- .../apache/solr/handler/StreamHandler.java | 18 +++++++------- .../metrics/reporters/solr/SolrReporter.java | 2 +- .../solrj/io/stream/ExecutorStream.java | 1 + .../client/solrj/io/stream/FetchStream.java | 1 + .../client/solrj/io/stream/TopicStream.java | 1 + .../solrj/io/stream/MathExpressionTest.java | 24 +++++++++++++++++-- .../solrj/io/stream/StreamDecoratorTest.java | 17 ++++++++++++- 8 files changed, 53 insertions(+), 15 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java index 308fbf336f5..b0ebe91f829 100644 --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java @@ -233,7 +233,7 @@ public class CoreContainer { private volatile SolrClientCache solrClientCache; - private volatile ObjectCache objectCache = new ObjectCache(); + private final ObjectCache objectCache = new ObjectCache(); private PackageStoreAPI packageStoreAPI; private PackageLoader packageLoader; @@ -996,6 +996,8 @@ public class CoreContainer { // Now clear all the cores that are being operated upon. solrCores.close(); + objectCache.clear(); + // It's still possible that one of the pending dynamic load operation is waiting, so wake it up if so. // Since all the pending operations queues have been drained, there should be nothing to do. synchronized (solrCores.getModifyLock()) { diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java index 8c6af432e5f..13e40db0513 100644 --- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java @@ -87,8 +87,8 @@ import static org.apache.solr.common.params.CommonParams.ID; */ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, PermissionNameProvider { - private ModelCache modelCache = null; - private ConcurrentMap objectCache = new ConcurrentHashMap(); + private ModelCache modelCache; + private ConcurrentMap objectCache; private SolrDefaultStreamFactory streamFactory = new SolrDefaultStreamFactory(); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private String coreName; @@ -100,25 +100,23 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, return PermissionNameProvider.Name.READ_PERM; } - public SolrClientCache getClientCache() { - return solrClientCache; - } - public void inform(SolrCore core) { String defaultCollection; String defaultZkhost; CoreContainer coreContainer = core.getCoreContainer(); this.solrClientCache = coreContainer.getSolrClientCache(); this.coreName = core.getName(); - + String cacheKey = this.getClass().getName() + "_" + coreName + "_"; + this.objectCache = coreContainer.getObjectCache().computeIfAbsent(cacheKey + "objectCache", + ConcurrentHashMap.class, k-> new ConcurrentHashMap()); if (coreContainer.isZooKeeperAware()) { defaultCollection = core.getCoreDescriptor().getCollectionName(); defaultZkhost = core.getCoreContainer().getZkController().getZkServerAddress(); streamFactory.withCollectionZkHost(defaultCollection, defaultZkhost); streamFactory.withDefaultZkHost(defaultZkhost); - modelCache = new ModelCache(250, - defaultZkhost, - solrClientCache); + modelCache = coreContainer.getObjectCache().computeIfAbsent(cacheKey + "modelCache", + ModelCache.class, + k -> new ModelCache(250, defaultZkhost, solrClientCache)); } streamFactory.withSolrResourceLoader(core.getResourceLoader()); diff --git a/solr/core/src/java/org/apache/solr/metrics/reporters/solr/SolrReporter.java b/solr/core/src/java/org/apache/solr/metrics/reporters/solr/SolrReporter.java index c126e73e006..81c74de1c1e 100644 --- a/solr/core/src/java/org/apache/solr/metrics/reporters/solr/SolrReporter.java +++ b/solr/core/src/java/org/apache/solr/metrics/reporters/solr/SolrReporter.java @@ -341,7 +341,7 @@ public class SolrReporter extends ScheduledReporter { * * @deprecated use {@link SolrReporter#SolrReporter(SolrClientCache, boolean, Supplier, SolrMetricManager, List, String, String, TimeUnit, TimeUnit, SolrParams, boolean, boolean, boolean, boolean)} instead. */ - @Deprecated(since = "8.6.0") + @Deprecated public SolrReporter(HttpClient httpClient, Supplier urlProvider, SolrMetricManager metricManager, List metrics, String handler, String reporterId, TimeUnit rateUnit, TimeUnit durationUnit, diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExecutorStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExecutorStream.java index 10b6873856f..29581fe6f00 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExecutorStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ExecutorStream.java @@ -187,6 +187,7 @@ public class ExecutorStream extends TupleStream implements Expressible { this.queue = queue; this.streamFactory = streamFactory; this.streamContext = new StreamContext(); + this.streamContext.setObjectCache(streamContext.getObjectCache()); this.streamContext.setSolrClientCache(streamContext.getSolrClientCache()); this.streamContext.setModelCache(streamContext.getModelCache()); } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FetchStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FetchStream.java index fbdba168fa9..e587ff20a28 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FetchStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FetchStream.java @@ -239,6 +239,7 @@ public class FetchStream extends TupleStream implements Expressible { CloudSolrStream cloudSolrStream = new CloudSolrStream(zkHost, collection, params); StreamContext newContext = new StreamContext(); newContext.setSolrClientCache(streamContext.getSolrClientCache()); + newContext.setObjectCache(streamContext.getObjectCache()); cloudSolrStream.setStreamContext(newContext); Map fetched = new HashMap<>(); try { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java index 58280a061fd..26cbff30cd3 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java @@ -430,6 +430,7 @@ public class TopicStream extends CloudSolrStream implements Expressible { if(streamContext != null) { StreamContext localContext = new StreamContext(); localContext.setSolrClientCache(streamContext.getSolrClientCache()); + localContext.setObjectCache(streamContext.getObjectCache()); solrStream.setStreamContext(localContext); } diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java index 25de9e1b194..da71685a7ec 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java @@ -26,12 +26,15 @@ import java.util.Set; import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase.Slow; +import org.apache.solr.client.solrj.embedded.JettySolrRunner; import org.apache.solr.client.solrj.io.SolrClientCache; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.cloud.AbstractDistribZkTestBase; import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.params.ModifiableSolrParams; import org.junit.Before; import org.junit.BeforeClass; @@ -77,7 +80,7 @@ public class MathExpressionTest extends SolrCloudTestCase { .commit(cluster.getSolrClient(), COLLECTIONORALIAS); } - @Test + @Test public void testAnalyzeEvaluator() throws Exception { UpdateRequest updateRequest = new UpdateRequest(); @@ -3922,13 +3925,30 @@ public class MathExpressionTest extends SolrCloudTestCase { assertTrue(stddev.doubleValue() == 0); } + // NOTE: cache evaluators work only locally, on + // the same node where the replica that executes + // the stream is located @Test public void testCache() throws Exception { String cexpr = "putCache(\"space1\", \"key1\", dotProduct(array(2,4,6,8,10,12),array(1,2,3,4,5,6)))"; ModifiableSolrParams paramsLoc = new ModifiableSolrParams(); paramsLoc.set("expr", cexpr); paramsLoc.set("qt", "/stream"); - String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString()+"/"+COLLECTIONORALIAS; + // find a node with a replica + ClusterState clusterState = cluster.getSolrClient().getClusterStateProvider().getClusterState(); + String collection = useAlias ? COLLECTIONORALIAS + "_collection" : COLLECTIONORALIAS; + DocCollection coll = clusterState.getCollection(collection); + String node = coll.getReplicas().iterator().next().getNodeName(); + String url = null; + for (JettySolrRunner jetty : cluster.getJettySolrRunners()) { + if (jetty.getNodeName().equals(node)) { + url = jetty.getBaseUrl().toString()+"/"+COLLECTIONORALIAS; + break; + } + } + if (url == null) { + fail("unable to find a node with replica"); + } TupleStream solrStream = new SolrStream(url, paramsLoc); StreamContext context = new StreamContext(); solrStream.setStreamContext(context); diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java index 52aa3780aa5..73f6f9d3235 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java @@ -61,6 +61,8 @@ import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.cloud.AbstractDistribZkTestBase; import org.apache.solr.cloud.SolrCloudTestCase; import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.junit.Assume; @@ -3654,7 +3656,20 @@ public class StreamDecoratorTest extends SolrCloudTestCase { updateRequest.add(id, String.valueOf(1), "text_s", "a b e e f"); updateRequest.commit(cluster.getSolrClient(), "uknownCollection"); - String url = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTIONORALIAS; + // find a node with a replica + ClusterState clusterState = cluster.getSolrClient().getClusterStateProvider().getClusterState(); + DocCollection coll = clusterState.getCollection(COLLECTIONORALIAS); + String node = coll.getReplicas().iterator().next().getNodeName(); + String url = null; + for (JettySolrRunner jetty : cluster.getJettySolrRunners()) { + if (jetty.getNodeName().equals(node)) { + url = jetty.getBaseUrl().toString()+"/"+COLLECTIONORALIAS; + break; + } + } + if (url == null) { + fail("unable to find a node with replica"); + } TupleStream updateTrainModelStream; ModifiableSolrParams paramsLoc;