SOLR-14423: Move static SolrClientCache from StreamHandler to CoreContainer for wider reuse and better life-cycle management.

This commit is contained in:
Andrzej Bialecki 2020-05-12 21:44:00 +02:00
parent 6971244134
commit 4680e9245f
17 changed files with 189 additions and 76 deletions

View File

@ -108,6 +108,8 @@ Improvements
* SOLR-14433: Metrics: SolrShardReporter's default metrics list now includes TLOG and UPDATE./update (David Smiley) * SOLR-14433: Metrics: SolrShardReporter's default metrics list now includes TLOG and UPDATE./update (David Smiley)
* SOLR-14423: Move static SolrClientCache from StreamHandler to CoreContainer for wider reuse and better life-cycle management. (ab)
Optimizations Optimizations
--------------------- ---------------------
* SOLR-8306: Do not collect expand documents when expand.rows=0 (Marshall Sanders, Amelia Henderson) * SOLR-8306: Do not collect expand documents when expand.rows=0 (Marshall Sanders, Amelia Henderson)

View File

@ -28,8 +28,6 @@ import org.apache.solr.analytics.AnalyticsRequestParser;
import org.apache.solr.analytics.ExpressionFactory; import org.apache.solr.analytics.ExpressionFactory;
import org.apache.solr.analytics.TimeExceededStubException; import org.apache.solr.analytics.TimeExceededStubException;
import org.apache.solr.analytics.stream.AnalyticsShardResponseParser; import org.apache.solr.analytics.stream.AnalyticsShardResponseParser;
import org.apache.solr.client.solrj.io.ModelCache;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.params.SolrParams;
@ -61,9 +59,6 @@ public class AnalyticsHandler extends RequestHandlerBase implements SolrCoreAwar
public static final String NAME = "/analytics"; public static final String NAME = "/analytics";
private IndexSchema indexSchema; private IndexSchema indexSchema;
static SolrClientCache clientCache = new SolrClientCache();
static ModelCache modelCache = null;
@Override @Override
public PermissionNameProvider.Name getPermissionName(AuthorizationContext request) { public PermissionNameProvider.Name getPermissionName(AuthorizationContext request) {
return PermissionNameProvider.Name.READ_PERM; return PermissionNameProvider.Name.READ_PERM;
@ -72,7 +67,6 @@ public class AnalyticsHandler extends RequestHandlerBase implements SolrCoreAwar
@Override @Override
public void inform(SolrCore core) { public void inform(SolrCore core) {
core.registerResponseWriter(AnalyticsShardResponseWriter.NAME, new AnalyticsShardResponseWriter()); core.registerResponseWriter(AnalyticsShardResponseWriter.NAME, new AnalyticsShardResponseWriter());
indexSchema = core.getLatestSchema(); indexSchema = core.getLatestSchema();
AnalyticsRequestParser.init(); AnalyticsRequestParser.init();
} }

View File

@ -755,7 +755,10 @@ public class ZkController implements Closeable {
cloudSolrClient = new CloudSolrClient.Builder(new ZkClientClusterStateProvider(zkStateReader)).withSocketTimeout(30000).withConnectionTimeout(15000) cloudSolrClient = new CloudSolrClient.Builder(new ZkClientClusterStateProvider(zkStateReader)).withSocketTimeout(30000).withConnectionTimeout(15000)
.withHttpClient(cc.getUpdateShardHandler().getDefaultHttpClient()) .withHttpClient(cc.getUpdateShardHandler().getDefaultHttpClient())
.withConnectionTimeout(15000).withSocketTimeout(30000).build(); .withConnectionTimeout(15000).withSocketTimeout(30000).build();
cloudManager = new SolrClientCloudManager(new ZkDistributedQueueFactory(zkClient), cloudSolrClient); cloudManager = new SolrClientCloudManager(
new ZkDistributedQueueFactory(zkClient),
cloudSolrClient,
cc.getObjectCache());
cloudManager.getClusterStateProvider().connect(); cloudManager.getClusterStateProvider().connect();
} }
return cloudManager; return cloudManager;

View File

@ -38,7 +38,6 @@ import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy; import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.client.solrj.response.QueryResponse;
@ -163,7 +162,6 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
Stream.of(Cmd.values()).collect(Collectors.toMap(Cmd::toLower, Function.identity()))); Stream.of(Cmd.values()).collect(Collectors.toMap(Cmd::toLower, Function.identity())));
} }
private SolrClientCache solrClientCache;
private String zkHost; private String zkHost;
public ReindexCollectionCmd(OverseerCollectionMessageHandler ocmh) { public ReindexCollectionCmd(OverseerCollectionMessageHandler ocmh) {
@ -268,7 +266,6 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
Exception exc = null; Exception exc = null;
boolean createdTarget = false; boolean createdTarget = false;
try { try {
solrClientCache = new SolrClientCache(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
zkHost = ocmh.zkStateReader.getZkClient().getZkServerAddress(); zkHost = ocmh.zkStateReader.getZkClient().getZkServerAddress();
// set the running flag // set the running flag
reindexingState.clear(); reindexingState.clear();
@ -504,7 +501,6 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
exc = e; exc = e;
aborted = true; aborted = true;
} finally { } finally {
solrClientCache.close();
if (aborted) { if (aborted) {
cleanup(collection, targetCollection, chkCollection, daemonUrl, targetCollection, createdTarget); cleanup(collection, targetCollection, chkCollection, daemonUrl, targetCollection, createdTarget);
if (exc != null) { if (exc != null) {
@ -550,7 +546,7 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
} }
private long getNumberOfDocs(String collection) { private long getNumberOfDocs(String collection) {
CloudSolrClient solrClient = solrClientCache.getCloudSolrClient(zkHost); CloudSolrClient solrClient = ocmh.overseer.getCoreContainer().getSolrClientCache().getCloudSolrClient(zkHost);
try { try {
ModifiableSolrParams params = new ModifiableSolrParams(); ModifiableSolrParams params = new ModifiableSolrParams();
params.add(CommonParams.Q, "*:*"); params.add(CommonParams.Q, "*:*");

View File

@ -61,6 +61,7 @@ import org.apache.solr.client.solrj.impl.SolrHttpClientBuilder;
import org.apache.solr.client.solrj.impl.SolrHttpClientContextBuilder; import org.apache.solr.client.solrj.impl.SolrHttpClientContextBuilder;
import org.apache.solr.client.solrj.impl.SolrHttpClientContextBuilder.AuthSchemeRegistryProvider; import org.apache.solr.client.solrj.impl.SolrHttpClientContextBuilder.AuthSchemeRegistryProvider;
import org.apache.solr.client.solrj.impl.SolrHttpClientContextBuilder.CredentialsProviderProvider; import org.apache.solr.client.solrj.impl.SolrHttpClientContextBuilder.CredentialsProviderProvider;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.util.SolrIdentifierValidator; import org.apache.solr.client.solrj.util.SolrIdentifierValidator;
import org.apache.solr.cloud.CloudDescriptor; import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.Overseer; import org.apache.solr.cloud.Overseer;
@ -76,6 +77,7 @@ import org.apache.solr.common.cloud.Replica.State;
import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.IOUtils; import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.ObjectCache;
import org.apache.solr.common.util.SolrNamedThreadFactory; import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.common.util.Utils; import org.apache.solr.common.util.Utils;
import org.apache.solr.core.DirectoryFactory.DirContext; import org.apache.solr.core.DirectoryFactory.DirContext;
@ -100,6 +102,7 @@ import org.apache.solr.handler.admin.ZookeeperInfoHandler;
import org.apache.solr.handler.admin.ZookeeperReadAPI; import org.apache.solr.handler.admin.ZookeeperReadAPI;
import org.apache.solr.handler.admin.ZookeeperStatusHandler; import org.apache.solr.handler.admin.ZookeeperStatusHandler;
import org.apache.solr.handler.component.ShardHandlerFactory; import org.apache.solr.handler.component.ShardHandlerFactory;
import org.apache.solr.handler.sql.CalciteSolrDriver;
import org.apache.solr.logging.LogWatcher; import org.apache.solr.logging.LogWatcher;
import org.apache.solr.logging.MDCLoggingContext; import org.apache.solr.logging.MDCLoggingContext;
import org.apache.solr.metrics.SolrCoreMetricManager; import org.apache.solr.metrics.SolrCoreMetricManager;
@ -228,6 +231,10 @@ public class CoreContainer {
protected volatile AutoscalingHistoryHandler autoscalingHistoryHandler; protected volatile AutoscalingHistoryHandler autoscalingHistoryHandler;
private volatile SolrClientCache solrClientCache;
private volatile ObjectCache objectCache = new ObjectCache();
private PackageStoreAPI packageStoreAPI; private PackageStoreAPI packageStoreAPI;
private PackageLoader packageLoader; private PackageLoader packageLoader;
@ -576,6 +583,15 @@ public class CoreContainer {
public PackageStoreAPI getPackageStoreAPI() { public PackageStoreAPI getPackageStoreAPI() {
return packageStoreAPI; return packageStoreAPI;
} }
public SolrClientCache getSolrClientCache() {
return solrClientCache;
}
public ObjectCache getObjectCache() {
return objectCache;
}
//------------------------------------------------------------------- //-------------------------------------------------------------------
// Initialization / Cleanup // Initialization / Cleanup
//------------------------------------------------------------------- //-------------------------------------------------------------------
@ -636,6 +652,11 @@ public class CoreContainer {
updateShardHandler = new UpdateShardHandler(cfg.getUpdateShardHandlerConfig()); updateShardHandler = new UpdateShardHandler(cfg.getUpdateShardHandlerConfig());
updateShardHandler.initializeMetrics(solrMetricsContext, "updateShardHandler"); updateShardHandler.initializeMetrics(solrMetricsContext, "updateShardHandler");
solrClientCache = new SolrClientCache(updateShardHandler.getDefaultHttpClient());
// initialize CalciteSolrDriver instance to use this solrClientCache
CalciteSolrDriver.INSTANCE.setSolrClientCache(solrClientCache);
solrCores.load(loader); solrCores.load(loader);
@ -1017,6 +1038,9 @@ public class CoreContainer {
} catch (Exception e) { } catch (Exception e) {
log.warn("Error shutting down CoreAdminHandler. Continuing to close CoreContainer.", e); log.warn("Error shutting down CoreAdminHandler. Continuing to close CoreContainer.", e);
} }
if (solrClientCache != null) {
solrClientCache.close();
}
} finally { } finally {
try { try {

View File

@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator; import org.apache.solr.client.solrj.io.comp.StreamComparator;
import org.apache.solr.client.solrj.io.graph.Traversal; import org.apache.solr.client.solrj.io.graph.Traversal;
@ -83,6 +84,7 @@ public class GraphHandler extends RequestHandlerBase implements SolrCoreAware, P
private StreamFactory streamFactory = new DefaultStreamFactory(); private StreamFactory streamFactory = new DefaultStreamFactory();
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private String coreName; private String coreName;
private SolrClientCache solrClientCache;
@Override @Override
public PermissionNameProvider.Name getPermissionName(AuthorizationContext request) { public PermissionNameProvider.Name getPermissionName(AuthorizationContext request) {
@ -94,6 +96,7 @@ public class GraphHandler extends RequestHandlerBase implements SolrCoreAware, P
String defaultZkhost; String defaultZkhost;
CoreContainer coreContainer = core.getCoreContainer(); CoreContainer coreContainer = core.getCoreContainer();
this.coreName = core.getName(); this.coreName = core.getName();
this.solrClientCache = coreContainer.getSolrClientCache();
if(coreContainer.isZooKeeperAware()) { if(coreContainer.isZooKeeperAware()) {
defaultCollection = core.getCoreDescriptor().getCollectionName(); defaultCollection = core.getCoreDescriptor().getCollectionName();
@ -147,7 +150,7 @@ public class GraphHandler extends RequestHandlerBase implements SolrCoreAware, P
} }
StreamContext context = new StreamContext(); StreamContext context = new StreamContext();
context.setSolrClientCache(StreamHandler.clientCache); context.setSolrClientCache(solrClientCache);
context.put("core", this.coreName); context.put("core", this.coreName);
Traversal traversal = new Traversal(); Traversal traversal = new Traversal();
context.put("traversal", traversal); context.put("traversal", traversal);

View File

@ -53,7 +53,6 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.params.SolrParams;
import org.apache.solr.core.CloseHook;
import org.apache.solr.core.CoreContainer; import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.PluginInfo; import org.apache.solr.core.PluginInfo;
import org.apache.solr.core.SolrConfig; import org.apache.solr.core.SolrConfig;
@ -88,12 +87,12 @@ import static org.apache.solr.common.params.CommonParams.ID;
*/ */
public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, PermissionNameProvider { public class StreamHandler extends RequestHandlerBase implements SolrCoreAware, PermissionNameProvider {
static SolrClientCache clientCache = new SolrClientCache(); private ModelCache modelCache = null;
static ModelCache modelCache = null; private ConcurrentMap objectCache = new ConcurrentHashMap();
static ConcurrentMap objectCache = new ConcurrentHashMap();
private SolrDefaultStreamFactory streamFactory = new SolrDefaultStreamFactory(); private SolrDefaultStreamFactory streamFactory = new SolrDefaultStreamFactory();
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private String coreName; private String coreName;
private SolrClientCache solrClientCache;
private Map<String, DaemonStream> daemons = Collections.synchronizedMap(new HashMap()); private Map<String, DaemonStream> daemons = Collections.synchronizedMap(new HashMap());
@Override @Override
@ -101,14 +100,15 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
return PermissionNameProvider.Name.READ_PERM; return PermissionNameProvider.Name.READ_PERM;
} }
public static SolrClientCache getClientCache() { public SolrClientCache getClientCache() {
return clientCache; return solrClientCache;
} }
public void inform(SolrCore core) { public void inform(SolrCore core) {
String defaultCollection; String defaultCollection;
String defaultZkhost; String defaultZkhost;
CoreContainer coreContainer = core.getCoreContainer(); CoreContainer coreContainer = core.getCoreContainer();
this.solrClientCache = coreContainer.getSolrClientCache();
this.coreName = core.getName(); this.coreName = core.getName();
if (coreContainer.isZooKeeperAware()) { if (coreContainer.isZooKeeperAware()) {
@ -118,24 +118,12 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
streamFactory.withDefaultZkHost(defaultZkhost); streamFactory.withDefaultZkHost(defaultZkhost);
modelCache = new ModelCache(250, modelCache = new ModelCache(250,
defaultZkhost, defaultZkhost,
clientCache); solrClientCache);
} }
streamFactory.withSolrResourceLoader(core.getResourceLoader()); streamFactory.withSolrResourceLoader(core.getResourceLoader());
// This pulls all the overrides and additions from the config // This pulls all the overrides and additions from the config
addExpressiblePlugins(streamFactory, core); addExpressiblePlugins(streamFactory, core);
core.addCloseHook(new CloseHook() {
@Override
public void preClose(SolrCore core) {
// To change body of implemented methods use File | Settings | File Templates.
}
@Override
public void postClose(SolrCore core) {
clientCache.close();
}
});
} }
public static void addExpressiblePlugins(StreamFactory streamFactory, SolrCore core) { public static void addExpressiblePlugins(StreamFactory streamFactory, SolrCore core) {
@ -226,7 +214,7 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
context.put("shards", getCollectionShards(params)); context.put("shards", getCollectionShards(params));
context.workerID = worker; context.workerID = worker;
context.numWorkers = numWorkers; context.numWorkers = numWorkers;
context.setSolrClientCache(clientCache); context.setSolrClientCache(solrClientCache);
context.setModelCache(modelCache); context.setModelCache(modelCache);
context.setObjectCache(objectCache); context.setObjectCache(objectCache);
context.put("core", this.coreName); context.put("core", this.coreName);

View File

@ -27,7 +27,6 @@ import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.io.SolrClientCache; import org.apache.solr.client.solrj.io.SolrClientCache;
@ -66,9 +65,9 @@ public class ColStatus {
public static final String RAW_SIZE_SAMPLING_PERCENT_PROP = SegmentsInfoRequestHandler.RAW_SIZE_SAMPLING_PERCENT_PARAM; public static final String RAW_SIZE_SAMPLING_PERCENT_PROP = SegmentsInfoRequestHandler.RAW_SIZE_SAMPLING_PERCENT_PARAM;
public static final String SEGMENTS_PROP = "segments"; public static final String SEGMENTS_PROP = "segments";
public ColStatus(HttpClient httpClient, ClusterState clusterState, ZkNodeProps props) { public ColStatus(SolrClientCache solrClientCache, ClusterState clusterState, ZkNodeProps props) {
this.props = props; this.props = props;
this.solrClientCache = new SolrClientCache(httpClient); this.solrClientCache = solrClientCache;
this.clusterState = clusterState; this.clusterState = clusterState;
} }

View File

@ -529,7 +529,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
if (props.containsKey(CoreAdminParams.NAME) && !props.containsKey(COLLECTION_PROP)) { if (props.containsKey(CoreAdminParams.NAME) && !props.containsKey(COLLECTION_PROP)) {
props.put(COLLECTION_PROP, props.get(CoreAdminParams.NAME)); props.put(COLLECTION_PROP, props.get(CoreAdminParams.NAME));
} }
new ColStatus(h.coreContainer.getUpdateShardHandler().getDefaultHttpClient(), new ColStatus(h.coreContainer.getSolrClientCache(),
h.coreContainer.getZkController().getZkStateReader().getClusterState(), new ZkNodeProps(props)) h.coreContainer.getZkController().getZkStateReader().getClusterState(), new ZkNodeProps(props))
.getColStatus(rsp.getValues()); .getColStatus(rsp.getValues());
return null; return null;

View File

@ -19,6 +19,7 @@ package org.apache.solr.handler.sql;
import org.apache.calcite.jdbc.CalciteConnection; import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.jdbc.Driver; import org.apache.calcite.jdbc.Driver;
import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.SchemaPlus;
import org.apache.solr.client.solrj.io.SolrClientCache;
import java.sql.Connection; import java.sql.Connection;
import java.sql.SQLException; import java.sql.SQLException;
@ -32,12 +33,17 @@ import java.util.Properties;
public class CalciteSolrDriver extends Driver { public class CalciteSolrDriver extends Driver {
public final static String CONNECT_STRING_PREFIX = "jdbc:calcitesolr:"; public final static String CONNECT_STRING_PREFIX = "jdbc:calcitesolr:";
public static CalciteSolrDriver INSTANCE = new CalciteSolrDriver();
private SolrClientCache solrClientCache;
private CalciteSolrDriver() { private CalciteSolrDriver() {
super(); super();
} }
static { static {
new CalciteSolrDriver().register(); INSTANCE.register();
} }
@Override @Override
@ -59,11 +65,15 @@ public class CalciteSolrDriver extends Driver {
if(schemaName == null) { if(schemaName == null) {
throw new SQLException("zk must be set"); throw new SQLException("zk must be set");
} }
rootSchema.add(schemaName, new SolrSchema(info)); final SolrSchema solrSchema = new SolrSchema(info, solrClientCache);
rootSchema.add(schemaName, solrSchema);
// Set the default schema // Set the default schema
calciteConnection.setSchema(schemaName); calciteConnection.setSchema(schemaName);
return calciteConnection;
}
return connection; public void setSolrClientCache(SolrClientCache solrClientCache) {
this.solrClientCache = solrClientCache;
} }
} }

View File

@ -16,10 +16,9 @@
*/ */
package org.apache.solr.handler.sql; package org.apache.solr.handler.sql;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
@ -33,6 +32,7 @@ import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl; import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.request.LukeRequest; import org.apache.solr.client.solrj.request.LukeRequest;
import org.apache.solr.client.solrj.response.LukeResponse; import org.apache.solr.client.solrj.response.LukeResponse;
import org.apache.solr.common.cloud.Aliases; import org.apache.solr.common.cloud.Aliases;
@ -41,19 +41,34 @@ import org.apache.solr.common.cloud.ZkStateReader;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
class SolrSchema extends AbstractSchema { class SolrSchema extends AbstractSchema implements Closeable {
final Properties properties; final Properties properties;
final SolrClientCache solrClientCache;
private volatile boolean isClosed = false;
SolrSchema(Properties properties) { SolrSchema(Properties properties, SolrClientCache solrClientCache) {
super(); super();
this.properties = properties; this.properties = properties;
this.solrClientCache = solrClientCache;
}
public SolrClientCache getSolrClientCache() {
return solrClientCache;
}
@Override
public void close() {
isClosed = true;
}
public boolean isClosed() {
return isClosed;
} }
@Override @Override
protected Map<String, Table> getTableMap() { protected Map<String, Table> getTableMap() {
String zk = this.properties.getProperty("zk"); String zk = this.properties.getProperty("zk");
try(CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder(Collections.singletonList(zk), Optional.empty()).withSocketTimeout(30000).withConnectionTimeout(15000).build()) { CloudSolrClient cloudSolrClient = solrClientCache.getCloudSolrClient(zk);
cloudSolrClient.connect();
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader(); ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
ClusterState clusterState = zkStateReader.getClusterState(); ClusterState clusterState = zkStateReader.getClusterState();
@ -73,15 +88,12 @@ class SolrSchema extends AbstractSchema {
} }
return builder.build(); return builder.build();
} catch (IOException e) {
throw new RuntimeException(e);
}
} }
private Map<String, LukeResponse.FieldInfo> getFieldInfo(String collection) { private Map<String, LukeResponse.FieldInfo> getFieldInfo(String collection) {
String zk = this.properties.getProperty("zk"); String zk = this.properties.getProperty("zk");
try(CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder(Collections.singletonList(zk), Optional.empty()).withSocketTimeout(30000).withConnectionTimeout(15000).build()) { CloudSolrClient cloudSolrClient = solrClientCache.getCloudSolrClient(zk);
cloudSolrClient.connect(); try {
LukeRequest lukeRequest = new LukeRequest(); LukeRequest lukeRequest = new LukeRequest();
lukeRequest.setNumTerms(0); lukeRequest.setNumTerms(0);
LukeResponse lukeResponse = lukeRequest.process(cloudSolrClient, collection); LukeResponse lukeResponse = lukeRequest.process(cloudSolrClient, collection);

View File

@ -51,7 +51,6 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.metrics.*; import org.apache.solr.client.solrj.io.stream.metrics.*;
import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.handler.StreamHandler;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.*;
@ -160,7 +159,7 @@ class SolrTable extends AbstractQueryableTable implements TranslatableTable {
} }
StreamContext streamContext = new StreamContext(); StreamContext streamContext = new StreamContext();
streamContext.setSolrClientCache(StreamHandler.getClientCache()); streamContext.setSolrClientCache(schema.getSolrClientCache());
tupleStream.setStreamContext(streamContext); tupleStream.setStreamContext(streamContext);
final TupleStream finalStream = tupleStream; final TupleStream finalStream = tupleStream;

View File

@ -257,18 +257,33 @@ public class SolrReporter extends ScheduledReporter {
* null to indicate that reporting should be skipped. Note: this * null to indicate that reporting should be skipped. Note: this
* function will be called every time just before report is sent. * function will be called every time just before report is sent.
* @return configured instance of reporter * @return configured instance of reporter
* @deprecated use {@link #build(SolrClientCache, Supplier)} instead.
*/ */
public SolrReporter build(HttpClient client, Supplier<String> urlProvider) { public SolrReporter build(HttpClient client, Supplier<String> urlProvider) {
return new SolrReporter(client, urlProvider, metricManager, reports, handler, reporterId, rateUnit, durationUnit, return new SolrReporter(client, urlProvider, metricManager, reports, handler, reporterId, rateUnit, durationUnit,
params, skipHistograms, skipAggregateValues, cloudClient, compact); params, skipHistograms, skipAggregateValues, cloudClient, compact);
} }
/**
* Build it.
* @param solrClientCache an instance of {@link SolrClientCache} to be used for making calls.
* @param urlProvider function that returns the base URL of Solr instance to target. May return
* null to indicate that reporting should be skipped. Note: this
* function will be called every time just before report is sent.
* @return configured instance of reporter
*/
public SolrReporter build(SolrClientCache solrClientCache, Supplier<String> urlProvider) {
return new SolrReporter(solrClientCache, false, urlProvider, metricManager, reports, handler, reporterId, rateUnit, durationUnit,
params, skipHistograms, skipAggregateValues, cloudClient, compact);
}
} }
private String reporterId; private String reporterId;
private String handler; private String handler;
private Supplier<String> urlProvider; private Supplier<String> urlProvider;
private SolrClientCache clientCache; private SolrClientCache clientCache;
private boolean closeClientCache;
private List<CompiledReport> compiledReports; private List<CompiledReport> compiledReports;
private SolrMetricManager metricManager; private SolrMetricManager metricManager;
private boolean skipHistograms; private boolean skipHistograms;
@ -306,11 +321,59 @@ public class SolrReporter extends ScheduledReporter {
// We delegate to registries anyway, so having a dummy registry is harmless. // We delegate to registries anyway, so having a dummy registry is harmless.
private static final MetricRegistry dummyRegistry = new MetricRegistry(); private static final MetricRegistry dummyRegistry = new MetricRegistry();
// back-compat constructor
/**
* Create a SolrReporter instance.
* @param httpClient HttpClient to use for constructing SolrClient instances.
* @param urlProvider what URL to send to.
* @param metricManager metric manager
* @param metrics metric specifications to report
* @param handler handler name to report to
* @param reporterId my reporter id
* @param rateUnit rate unit
* @param durationUnit duration unit
* @param params request parameters
* @param skipHistograms if true then don't send histogram metrics
* @param skipAggregateValues if true then don't send aggregate metrics' individual values
* @param cloudClient if true then use CloudSolrClient, plain HttpSolrClient otherwise.
* @param compact if true then use compact representation.
*
* @deprecated use {@link SolrReporter#SolrReporter(SolrClientCache, boolean, Supplier, SolrMetricManager, List, String, String, TimeUnit, TimeUnit, SolrParams, boolean, boolean, boolean, boolean)} instead.
*/
@Deprecated(since = "8.6.0")
public SolrReporter(HttpClient httpClient, Supplier<String> urlProvider, SolrMetricManager metricManager, public SolrReporter(HttpClient httpClient, Supplier<String> urlProvider, SolrMetricManager metricManager,
List<Report> metrics, String handler, List<Report> metrics, String handler,
String reporterId, TimeUnit rateUnit, TimeUnit durationUnit, String reporterId, TimeUnit rateUnit, TimeUnit durationUnit,
SolrParams params, boolean skipHistograms, boolean skipAggregateValues, SolrParams params, boolean skipHistograms, boolean skipAggregateValues,
boolean cloudClient, boolean compact) { boolean cloudClient, boolean compact) {
this (new SolrClientCache(httpClient), true, urlProvider, metricManager,
metrics, handler, reporterId, rateUnit, durationUnit,
params, skipHistograms, skipAggregateValues, cloudClient, compact);
}
/**
* Create a SolrReporter instance.
* @param solrClientCache client cache to use for constructing SolrClient instances.
* @param urlProvider what URL to send to.
* @param metricManager metric manager
* @param metrics metric specifications to report
* @param handler handler name to report to
* @param reporterId my reporter id
* @param rateUnit rate unit
* @param durationUnit duration unit
* @param params request parameters
* @param skipHistograms if true then don't send histogram metrics
* @param skipAggregateValues if true then don't send aggregate metrics' individual values
* @param cloudClient if true then use CloudSolrClient, plain HttpSolrClient otherwise.
* @param compact if true then use compact representation.
*/
public SolrReporter(SolrClientCache solrClientCache, boolean closeClientCache,
Supplier<String> urlProvider, SolrMetricManager metricManager,
List<Report> metrics, String handler,
String reporterId, TimeUnit rateUnit, TimeUnit durationUnit,
SolrParams params, boolean skipHistograms, boolean skipAggregateValues,
boolean cloudClient, boolean compact) {
super(dummyRegistry, "solr-reporter", MetricFilter.ALL, rateUnit, durationUnit, null, true); super(dummyRegistry, "solr-reporter", MetricFilter.ALL, rateUnit, durationUnit, null, true);
this.metricManager = metricManager; this.metricManager = metricManager;
@ -320,7 +383,8 @@ public class SolrReporter extends ScheduledReporter {
handler = MetricsCollectorHandler.HANDLER_PATH; handler = MetricsCollectorHandler.HANDLER_PATH;
} }
this.handler = handler; this.handler = handler;
this.clientCache = new SolrClientCache(httpClient); this.clientCache = solrClientCache;
this.closeClientCache = closeClientCache;
this.compiledReports = new ArrayList<>(); this.compiledReports = new ArrayList<>();
metrics.forEach(report -> { metrics.forEach(report -> {
MetricFilter filter = new SolrMetricManager.RegexFilter(report.metricFilters); MetricFilter filter = new SolrMetricManager.RegexFilter(report.metricFilters);
@ -347,7 +411,9 @@ public class SolrReporter extends ScheduledReporter {
@Override @Override
public void close() { public void close() {
if (closeClientCache) {
clientCache.close(); clientCache.close();
}
super.close(); super.close();
} }

View File

@ -154,7 +154,7 @@ public class SolrShardReporter extends SolrCoreReporter {
.cloudClient(false) // we want to send reports specifically to a selected leader instance .cloudClient(false) // we want to send reports specifically to a selected leader instance
.skipAggregateValues(true) // we don't want to transport details of aggregates .skipAggregateValues(true) // we don't want to transport details of aggregates
.skipHistograms(true) // we don't want to transport histograms .skipHistograms(true) // we don't want to transport histograms
.build(core.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient(), new LeaderUrlSupplier(core)); .build(core.getCoreContainer().getSolrClientCache(), new LeaderUrlSupplier(core));
reporter.start(period, TimeUnit.SECONDS); reporter.start(period, TimeUnit.SECONDS);
} }

View File

@ -261,7 +261,7 @@ public class XCJFQuery extends Query {
} }
private DocSet getDocSet() throws IOException { private DocSet getDocSet() throws IOException {
SolrClientCache solrClientCache = new SolrClientCache(); SolrClientCache solrClientCache = searcher.getCore().getCoreContainer().getSolrClientCache();
TupleStream solrStream; TupleStream solrStream;
if (zkHost != null || solrUrl == null) { if (zkHost != null || solrUrl == null) {
solrStream = createCloudSolrStream(solrClientCache); solrStream = createCloudSolrStream(solrClientCache);
@ -299,7 +299,6 @@ public class XCJFQuery extends Query {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
} finally { } finally {
solrStream.close(); solrStream.close();
solrClientCache.close();
} }
return collector.getDocSet(); return collector.getDocSet();

View File

@ -61,23 +61,37 @@ public class SolrClientCloudManager implements SolrCloudManager {
private final ZkStateReader zkStateReader; private final ZkStateReader zkStateReader;
private final SolrZkClient zkClient; private final SolrZkClient zkClient;
private final ObjectCache objectCache; private final ObjectCache objectCache;
private final boolean closeObjectCache;
private volatile boolean isClosed; private volatile boolean isClosed;
public SolrClientCloudManager(DistributedQueueFactory queueFactory, CloudSolrClient solrClient) { public SolrClientCloudManager(DistributedQueueFactory queueFactory, CloudSolrClient solrClient) {
this(queueFactory, solrClient, null);
}
public SolrClientCloudManager(DistributedQueueFactory queueFactory, CloudSolrClient solrClient,
ObjectCache objectCache) {
this.queueFactory = queueFactory; this.queueFactory = queueFactory;
this.solrClient = solrClient; this.solrClient = solrClient;
this.zkStateReader = solrClient.getZkStateReader(); this.zkStateReader = solrClient.getZkStateReader();
this.zkClient = zkStateReader.getZkClient(); this.zkClient = zkStateReader.getZkClient();
this.stateManager = new ZkDistribStateManager(zkClient); this.stateManager = new ZkDistribStateManager(zkClient);
this.isClosed = false; this.isClosed = false;
if (objectCache == null) {
this.objectCache = new ObjectCache(); this.objectCache = new ObjectCache();
closeObjectCache = true;
} else {
this.objectCache = objectCache;
this.closeObjectCache = false;
}
} }
@Override @Override
public void close() { public void close() {
isClosed = true; isClosed = true;
if (closeObjectCache) {
IOUtils.closeQuietly(objectCache); IOUtils.closeQuietly(objectCache);
} }
}
@Override @Override
public boolean isClosed() { public boolean isClosed() {

View File

@ -30,6 +30,10 @@ public class MapBackedCache<K, V> implements Cache<K, V> {
this.map = map; this.map = map;
} }
public Map<K, V> asMap() {
return map;
}
@Override @Override
public V put(K key, V val) { public V put(K key, V val) {
return map.put(key, val); return map.put(key, val);