mirror of https://github.com/apache/lucene.git
SOLR-14749: Improve support for arbitrary container-level plugins. Add ClusterSingleton
support for plugins that require only one active instance in the cluster.
This commit is contained in:
parent
c587bf89e8
commit
67ecd8ff9a
|
@ -16,6 +16,9 @@ New Features
|
|||
|
||||
* SOLR-13528 Rate Limiting in Solr (Atri Sharma, Mike Drob)
|
||||
|
||||
* SOLR-14749: Improve support for arbitrary container-level plugins. Add ClusterSingleton
|
||||
support for plugins that require only one active instance in the cluster. (ab, noble)
|
||||
|
||||
Improvements
|
||||
----------------------
|
||||
* LUCENE-8984: MoreLikeThis MLT is biased for uncommon fields (Andy Hind via Anshum Gupta)
|
||||
|
|
|
@ -85,9 +85,18 @@ public class AnnotatedApi extends Api implements PermissionNameProvider , Closea
|
|||
}
|
||||
|
||||
public static List<Api> getApis(Object obj) {
|
||||
return getApis(obj.getClass(), obj);
|
||||
return getApis(obj.getClass(), obj, true);
|
||||
}
|
||||
public static List<Api> getApis(Class<? extends Object> theClass , Object obj) {
|
||||
|
||||
/**
|
||||
* Get a list of Api-s supported by this class.
|
||||
* @param theClass class
|
||||
* @param obj object of this class (may be null)
|
||||
* @param allowEmpty if false then an exception is thrown if no Api-s can be retrieved, if true
|
||||
* then absence of Api-s is silently ignored.
|
||||
* @return list of discovered Api-s
|
||||
*/
|
||||
public static List<Api> getApis(Class<? extends Object> theClass , Object obj, boolean allowEmpty) {
|
||||
Class<?> klas = null;
|
||||
try {
|
||||
klas = MethodHandles.publicLookup().accessClass(theClass);
|
||||
|
@ -122,7 +131,7 @@ public class AnnotatedApi extends Api implements PermissionNameProvider , Closea
|
|||
SpecProvider specProvider = readSpec(endPoint, Collections.singletonList(m));
|
||||
apis.add(new AnnotatedApi(specProvider, endPoint, Collections.singletonMap("", cmd), null));
|
||||
}
|
||||
if (apis.isEmpty()) {
|
||||
if (!allowEmpty && apis.isEmpty()) {
|
||||
throw new RuntimeException("Invalid Class : " + klas.getName() + " No @EndPoints");
|
||||
}
|
||||
|
||||
|
|
|
@ -28,6 +28,8 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.lucene.util.ResourceLoaderAware;
|
||||
|
@ -54,12 +56,24 @@ import org.slf4j.LoggerFactory;
|
|||
import static org.apache.lucene.util.IOUtils.closeWhileHandlingException;
|
||||
import static org.apache.solr.common.util.Utils.makeMap;
|
||||
|
||||
/**
|
||||
* This class manages the container-level plugins and their Api-s. It is
|
||||
* responsible for adding / removing / replacing the plugins according to the updated
|
||||
* configuration obtained from {@link ContainerPluginsApi#plugins(Supplier)}.
|
||||
* <p>Plugins instantiated by this class may implement zero or more {@link Api}-s, which
|
||||
* are then registered in the CoreContainer {@link ApiBag}. They may be also post-processed
|
||||
* for additional functionality by {@link PluginRegistryListener}-s registered with
|
||||
* this class.</p>
|
||||
*/
|
||||
public class CustomContainerPlugins implements ClusterPropertiesListener, MapWriter {
|
||||
private final ObjectMapper mapper = SolrJacksonAnnotationInspector.createObjectMapper();
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
final CoreContainer coreContainer;
|
||||
final ApiBag containerApiBag;
|
||||
private final ObjectMapper mapper = SolrJacksonAnnotationInspector.createObjectMapper();
|
||||
|
||||
private final List<PluginRegistryListener> listeners = new CopyOnWriteArrayList<>();
|
||||
|
||||
private final CoreContainer coreContainer;
|
||||
private final ApiBag containerApiBag;
|
||||
|
||||
private final Map<String, ApiInfo> currentPlugins = new HashMap<>();
|
||||
|
||||
|
@ -68,6 +82,14 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
|
|||
refresh();
|
||||
return false;
|
||||
}
|
||||
|
||||
public void registerListener(PluginRegistryListener listener) {
|
||||
listeners.add(listener);
|
||||
}
|
||||
public void unregisterListener(PluginRegistryListener listener) {
|
||||
listeners.remove(listener);
|
||||
}
|
||||
|
||||
public CustomContainerPlugins(CoreContainer coreContainer, ApiBag apiBag) {
|
||||
this.coreContainer = coreContainer;
|
||||
this.containerApiBag = apiBag;
|
||||
|
@ -78,6 +100,10 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
|
|||
currentPlugins.forEach(ew.getBiConsumer());
|
||||
}
|
||||
|
||||
public synchronized ApiInfo getPlugin(String name) {
|
||||
return currentPlugins.get(name);
|
||||
}
|
||||
|
||||
public synchronized void refresh() {
|
||||
Map<String, Object> pluginInfos = null;
|
||||
try {
|
||||
|
@ -107,6 +133,7 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
|
|||
if (e.getValue() == Diff.REMOVED) {
|
||||
ApiInfo apiInfo = currentPlugins.remove(e.getKey());
|
||||
if (apiInfo == null) continue;
|
||||
listeners.forEach(listener -> listener.deleted(apiInfo));
|
||||
for (ApiHolder holder : apiInfo.holders) {
|
||||
Api old = containerApiBag.unregister(holder.api.getEndPoint().method()[0],
|
||||
getActualPath(apiInfo, holder.api.getEndPoint().path()[0]));
|
||||
|
@ -136,6 +163,8 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
|
|||
containerApiBag.register(holder, getTemplateVars(apiInfo.info));
|
||||
}
|
||||
currentPlugins.put(e.getKey(), apiInfo);
|
||||
final ApiInfo apiInfoFinal = apiInfo;
|
||||
listeners.forEach(listener -> listener.added(apiInfoFinal));
|
||||
} else {
|
||||
//this plugin is being updated
|
||||
ApiInfo old = currentPlugins.put(e.getKey(), apiInfo);
|
||||
|
@ -143,6 +172,8 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
|
|||
//register all new paths
|
||||
containerApiBag.register(holder, getTemplateVars(apiInfo.info));
|
||||
}
|
||||
final ApiInfo apiInfoFinal = apiInfo;
|
||||
listeners.forEach(listener -> listener.modified(old, apiInfoFinal));
|
||||
if (old != null) {
|
||||
//this is an update of the plugin. But, it is possible that
|
||||
// some paths are remved in the newer version of the plugin
|
||||
|
@ -201,6 +232,7 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
|
|||
@SuppressWarnings({"rawtypes"})
|
||||
public class ApiInfo implements ReflectMapWriter {
|
||||
List<ApiHolder> holders;
|
||||
|
||||
@JsonProperty
|
||||
private final PluginMeta info;
|
||||
|
||||
|
@ -222,7 +254,13 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
|
|||
return null;
|
||||
}
|
||||
|
||||
public Object getInstance() {
|
||||
return instance;
|
||||
}
|
||||
|
||||
public PluginMeta getInfo() {
|
||||
return info.copy();
|
||||
}
|
||||
@SuppressWarnings({"unchecked","rawtypes"})
|
||||
public ApiInfo(PluginMeta info, List<String> errs) {
|
||||
this.info = info;
|
||||
|
@ -268,14 +306,14 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
|
|||
}
|
||||
|
||||
try {
|
||||
List<Api> apis = AnnotatedApi.getApis(klas, null);
|
||||
List<Api> apis = AnnotatedApi.getApis(klas, null, true);
|
||||
for (Object api : apis) {
|
||||
EndPoint endPoint = ((AnnotatedApi) api).getEndPoint();
|
||||
if (endPoint.path().length > 1 || endPoint.method().length > 1) {
|
||||
errs.add("Only one HTTP method and url supported for each API");
|
||||
}
|
||||
if (endPoint.method().length != 1 || endPoint.path().length != 1) {
|
||||
errs.add("The @EndPint must have exactly one method and path attributes");
|
||||
errs.add("The @EndPoint must have exactly one method and path attributes");
|
||||
}
|
||||
List<String> pathSegments = StrUtils.splitSmart(endPoint.path()[0], '/', true);
|
||||
PathTrie.replaceTemplates(pathSegments, getTemplateVars(info));
|
||||
|
@ -320,7 +358,7 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
|
|||
}
|
||||
}
|
||||
this.holders = new ArrayList<>();
|
||||
for (Api api : AnnotatedApi.getApis(instance)) {
|
||||
for (Api api : AnnotatedApi.getApis(instance.getClass(), instance, true)) {
|
||||
holders.add(new ApiHolder((AnnotatedApi) api));
|
||||
}
|
||||
}
|
||||
|
@ -359,4 +397,20 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
|
|||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Listener for notifications about added / deleted / modified plugins.
|
||||
*/
|
||||
public interface PluginRegistryListener {
|
||||
|
||||
/** Called when a new plugin is added. */
|
||||
void added(ApiInfo plugin);
|
||||
|
||||
/** Called when an existing plugin is deleted. */
|
||||
void deleted(ApiInfo plugin);
|
||||
|
||||
/** Called when an existing plugin is replaced. */
|
||||
void modified(ApiInfo old, ApiInfo replacement);
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
/**
|
||||
* Intended for components that should be enabled only one instance per cluster.
|
||||
* <p>Components that implement this interface are always in one of these states:
|
||||
* <ul>
|
||||
* <li>STOPPED - the default state. The component is idle and does not perform
|
||||
* any functions. It should also avoid holding any resources.</li>
|
||||
* <li>STARTING - transitional state, which leads either to RUNNING or STOPPING in
|
||||
* case of startup failures.</li>
|
||||
* <li>STOPPING - transitional state, which leads to STOPPED state.</li>
|
||||
* <li>RUNNING - the component is active.</li>
|
||||
* </ul>
|
||||
* <p>Components must be prepared to change these states multiple times in their
|
||||
* life-cycle.</p>
|
||||
* <p>Implementation detail: currently these components are instantiated on all nodes
|
||||
* but they are started only on the Overseer leader, and stopped when the current
|
||||
* node loses its Overseer leadership.</p>
|
||||
*/
|
||||
public interface ClusterSingleton {
|
||||
|
||||
enum State {
|
||||
/** Component is idle. */
|
||||
STOPPED,
|
||||
/** Component is starting. */
|
||||
STARTING,
|
||||
/** Component is active. */
|
||||
RUNNING,
|
||||
/** Component is stopping. */
|
||||
STOPPING
|
||||
}
|
||||
|
||||
/**
|
||||
* Unique name of this singleton. Used for registration.
|
||||
*/
|
||||
String getName();
|
||||
|
||||
/**
|
||||
* Start the operation of the component. Initially this method should set
|
||||
* the state to STARTING, and on success it should set the state to RUNNING.
|
||||
* @throws Exception on startup errors. The component should revert to the
|
||||
* STOPPED state.
|
||||
*/
|
||||
void start() throws Exception;
|
||||
|
||||
/**
|
||||
* Returns the current state of the component.
|
||||
*/
|
||||
State getState();
|
||||
|
||||
/**
|
||||
* Stop the operation of the component. Initially this method should set
|
||||
* the state to STOPPING, and on return it should set the state to STOPPED.
|
||||
* Components should also avoid holding any resource when in STOPPED state.
|
||||
*/
|
||||
void stop();
|
||||
}
|
|
@ -655,6 +655,8 @@ public class Overseer implements SolrCloseable {
|
|||
}
|
||||
});
|
||||
|
||||
getCoreContainer().getClusterSingletons().startClusterSingletons();
|
||||
|
||||
assert ObjectReleaseTracker.track(this);
|
||||
}
|
||||
|
||||
|
@ -774,6 +776,13 @@ public class Overseer implements SolrCloseable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start {@link ClusterSingleton} plugins when we become the leader.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Stop {@link ClusterSingleton} plugins when we lose leadership.
|
||||
*/
|
||||
public Stats getStats() {
|
||||
return stats;
|
||||
}
|
||||
|
@ -813,9 +822,14 @@ public class Overseer implements SolrCloseable {
|
|||
if (this.id != null) {
|
||||
log.info("Overseer (id={}) closing", id);
|
||||
}
|
||||
// stop singletons only on the leader
|
||||
if (!this.closed) {
|
||||
getCoreContainer().getClusterSingletons().stopClusterSingletons();
|
||||
}
|
||||
this.closed = true;
|
||||
doClose();
|
||||
|
||||
|
||||
assert ObjectReleaseTracker.release(this);
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,193 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.solr.core;
|
||||
|
||||
import org.apache.solr.api.CustomContainerPlugins;
|
||||
import org.apache.solr.cloud.ClusterSingleton;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* Helper class to manage the initial registration of {@link ClusterSingleton} plugins and
|
||||
* to track the changes in loaded plugins in {@link org.apache.solr.api.CustomContainerPlugins}.
|
||||
*/
|
||||
public class ClusterSingletons {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
private final Map<String, ClusterSingleton> singletonMap = new ConcurrentHashMap<>();
|
||||
private final Supplier<Boolean> runSingletons;
|
||||
private final Consumer<Runnable> asyncRunner;
|
||||
private final CustomContainerPlugins.PluginRegistryListener pluginListener;
|
||||
|
||||
public static final int DEFAULT_WAIT_TIMEOUT_SEC = 60;
|
||||
|
||||
// we use this latch to delay the initial startup of singletons, due to
|
||||
// the leader election occurring in parallel with the rest of the load() method.
|
||||
private final CountDownLatch readyLatch = new CountDownLatch(1);
|
||||
|
||||
/**
|
||||
* Create a helper to manage singletons.
|
||||
* @param runSingletons this function returns true when singletons should be running. It's
|
||||
* Used when adding or modifying existing plugins, and when invoking
|
||||
* {@link #startClusterSingletons()}.
|
||||
* @param asyncRunner async runner that will be used for starting up each singleton.
|
||||
*/
|
||||
public ClusterSingletons(Supplier<Boolean> runSingletons, Consumer<Runnable> asyncRunner) {
|
||||
this.runSingletons = runSingletons;
|
||||
this.asyncRunner = asyncRunner;
|
||||
// create plugin registry listener
|
||||
pluginListener = new CustomContainerPlugins.PluginRegistryListener() {
|
||||
@Override
|
||||
public void added(CustomContainerPlugins.ApiInfo plugin) {
|
||||
if (plugin == null || plugin.getInstance() == null) {
|
||||
return;
|
||||
}
|
||||
// register new api
|
||||
Object instance = plugin.getInstance();
|
||||
if (instance instanceof ClusterSingleton) {
|
||||
ClusterSingleton singleton = (ClusterSingleton) instance;
|
||||
singletonMap.put(singleton.getName(), singleton);
|
||||
// check to see if we should immediately start this singleton
|
||||
if (isReady() && runSingletons.get()) {
|
||||
try {
|
||||
singleton.start();
|
||||
} catch (Exception exc) {
|
||||
log.warn("Exception starting ClusterSingleton {}: {}", plugin, exc);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleted(CustomContainerPlugins.ApiInfo plugin) {
|
||||
if (plugin == null || plugin.getInstance() == null) {
|
||||
return;
|
||||
}
|
||||
Object instance = plugin.getInstance();
|
||||
if (instance instanceof ClusterSingleton) {
|
||||
ClusterSingleton singleton = (ClusterSingleton) instance;
|
||||
singleton.stop();
|
||||
singletonMap.remove(singleton.getName());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void modified(CustomContainerPlugins.ApiInfo old, CustomContainerPlugins.ApiInfo replacement) {
|
||||
added(replacement);
|
||||
deleted(old);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public CustomContainerPlugins.PluginRegistryListener getPluginRegistryListener() {
|
||||
return pluginListener;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return modifiable registry of name / {@link ClusterSingleton}.
|
||||
*/
|
||||
public Map<String, ClusterSingleton> getSingletons() {
|
||||
return singletonMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true when this helper is ready to be used for singleton management.
|
||||
*/
|
||||
public boolean isReady() {
|
||||
return readyLatch.getCount() == 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark this helper as ready to be used for singleton management.
|
||||
*/
|
||||
public void setReady() {
|
||||
readyLatch.countDown();
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for this helper to become ready.
|
||||
* @param timeout timeout value.
|
||||
* @param timeUnit timeout unit.
|
||||
* @throws InterruptedException on this thread being interrupted.
|
||||
* @throws TimeoutException when specified timeout has elapsed but the helper is not ready.
|
||||
*/
|
||||
public void waitUntilReady(long timeout, TimeUnit timeUnit)
|
||||
throws InterruptedException, TimeoutException {
|
||||
boolean await = readyLatch.await(timeout, timeUnit);
|
||||
if (!await) {
|
||||
throw new TimeoutException("Timed out waiting for ClusterSingletons to become ready.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start singletons when the helper is ready and when it's supposed to start
|
||||
* (as determined by {@link #runSingletons} function). If the helper is not ready this
|
||||
* method will use {@link #asyncRunner} to wait in another thread for
|
||||
* {@link #DEFAULT_WAIT_TIMEOUT_SEC} seconds.
|
||||
*/
|
||||
public void startClusterSingletons() {
|
||||
final Runnable initializer = () -> {
|
||||
if (!runSingletons.get()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
waitUntilReady(DEFAULT_WAIT_TIMEOUT_SEC, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
log.warn("Interrupted initialization of ClusterSingleton-s");
|
||||
return;
|
||||
} catch (TimeoutException te) {
|
||||
log.warn("Timed out during initialization of ClusterSingleton-s (waited {} sec)", DEFAULT_WAIT_TIMEOUT_SEC);
|
||||
return;
|
||||
}
|
||||
singletonMap.forEach((name, singleton) -> {
|
||||
if (!runSingletons.get()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
singleton.start();
|
||||
} catch (Exception e) {
|
||||
log.warn("Exception starting ClusterSingleton {}: {}", singleton, e);
|
||||
}
|
||||
});
|
||||
};
|
||||
if (!isReady()) {
|
||||
// wait until all singleton-s are ready for the first startup
|
||||
asyncRunner.accept(initializer);
|
||||
} else {
|
||||
initializer.run();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop all registered singletons.
|
||||
*/
|
||||
public void stopClusterSingletons() {
|
||||
singletonMap.forEach((name, singleton) -> {
|
||||
singleton.stop();
|
||||
});
|
||||
}
|
||||
}
|
|
@ -68,6 +68,7 @@ import org.apache.solr.client.solrj.impl.SolrHttpClientContextBuilder.Credential
|
|||
import org.apache.solr.client.solrj.io.SolrClientCache;
|
||||
import org.apache.solr.client.solrj.util.SolrIdentifierValidator;
|
||||
import org.apache.solr.cloud.CloudDescriptor;
|
||||
import org.apache.solr.cloud.ClusterSingleton;
|
||||
import org.apache.solr.cloud.OverseerTaskQueue;
|
||||
import org.apache.solr.cloud.ZkController;
|
||||
import org.apache.solr.common.AlreadyClosedException;
|
||||
|
@ -245,6 +246,11 @@ public class CoreContainer {
|
|||
|
||||
private final ObjectCache objectCache = new ObjectCache();
|
||||
|
||||
private final ClusterSingletons clusterSingletons = new ClusterSingletons(
|
||||
() -> getZkController() != null &&
|
||||
getZkController().getOverseer() != null &&
|
||||
!getZkController().getOverseer().isClosed(),
|
||||
(r) -> this.runAsync(r));
|
||||
private PackageStoreAPI packageStoreAPI;
|
||||
private PackageLoader packageLoader;
|
||||
|
||||
|
@ -663,6 +669,8 @@ public class CoreContainer {
|
|||
loader.reloadLuceneSPI();
|
||||
}
|
||||
|
||||
customContainerPlugins.registerListener(clusterSingletons.getPluginRegistryListener());
|
||||
|
||||
packageStoreAPI = new PackageStoreAPI(this);
|
||||
containerHandlers.getApiBag().registerObject(packageStoreAPI.readAPI);
|
||||
containerHandlers.getApiBag().registerObject(packageStoreAPI.writeAPI);
|
||||
|
@ -876,7 +884,21 @@ public class CoreContainer {
|
|||
ContainerPluginsApi containerPluginsApi = new ContainerPluginsApi(this);
|
||||
containerHandlers.getApiBag().registerObject(containerPluginsApi.readAPI);
|
||||
containerHandlers.getApiBag().registerObject(containerPluginsApi.editAPI);
|
||||
|
||||
// init ClusterSingleton-s
|
||||
|
||||
// register the handlers that are also ClusterSingleton
|
||||
containerHandlers.keySet().forEach(handlerName -> {
|
||||
SolrRequestHandler handler = containerHandlers.get(handlerName);
|
||||
if (handler instanceof ClusterSingleton) {
|
||||
ClusterSingleton singleton = (ClusterSingleton) handler;
|
||||
clusterSingletons.getSingletons().put(singleton.getName(), singleton);
|
||||
}
|
||||
});
|
||||
|
||||
clusterSingletons.setReady();
|
||||
zkSys.getZkController().checkOverseerDesignate();
|
||||
|
||||
}
|
||||
// This is a bit redundant but these are two distinct concepts for all they're accomplished at the same time.
|
||||
status |= LOAD_COMPLETE | INITIAL_CORE_LOAD_COMPLETE;
|
||||
|
@ -2078,6 +2100,10 @@ public class CoreContainer {
|
|||
return customContainerPlugins;
|
||||
}
|
||||
|
||||
public ClusterSingletons getClusterSingletons() {
|
||||
return clusterSingletons;
|
||||
}
|
||||
|
||||
static {
|
||||
ExecutorUtil.addThreadLocalProvider(SolrRequestInfo.getInheritableThreadLocalProvider());
|
||||
}
|
||||
|
|
|
@ -47,7 +47,9 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import static org.apache.lucene.util.IOUtils.closeWhileHandlingException;
|
||||
|
||||
|
||||
/**
|
||||
* API to maintain container-level plugin configurations.
|
||||
*/
|
||||
public class ContainerPluginsApi {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
|
@ -62,6 +64,9 @@ public class ContainerPluginsApi {
|
|||
this.coreContainer = coreContainer;
|
||||
}
|
||||
|
||||
/**
|
||||
* API for reading the current plugin configurations.
|
||||
*/
|
||||
public class Read {
|
||||
@EndPoint(method = METHOD.GET,
|
||||
path = "/cluster/plugin",
|
||||
|
@ -71,6 +76,9 @@ public class ContainerPluginsApi {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* API for editing the plugin configurations.
|
||||
*/
|
||||
@EndPoint(method = METHOD.POST,
|
||||
path = "/cluster/plugin",
|
||||
permission = PermissionNameProvider.Name.COLL_EDIT_PERM)
|
||||
|
@ -146,6 +154,13 @@ public class ContainerPluginsApi {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve the current plugin configurations.
|
||||
* @param zkClientSupplier supplier of {@link SolrZkClient}
|
||||
* @return current plugin configurations, where keys are plugin names and values
|
||||
* are {@link PluginMeta} plugin metadata.
|
||||
* @throws IOException on IO errors
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static Map<String, Object> plugins(Supplier<SolrZkClient> zkClientSupplier) throws IOException {
|
||||
SolrZkClient zkClient = zkClientSupplier.get();
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
|
|||
import org.apache.solr.common.util.Pair;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.filestore.DistribPackageStore;
|
||||
import org.apache.solr.handler.admin.ContainerPluginsApi;
|
||||
import org.apache.solr.packagemanager.SolrPackage.Command;
|
||||
import org.apache.solr.packagemanager.SolrPackage.Manifest;
|
||||
import org.apache.solr.packagemanager.SolrPackage.Plugin;
|
||||
|
@ -231,7 +232,7 @@ public class PackageManager implements Closeable {
|
|||
}
|
||||
}
|
||||
@SuppressWarnings({"unchecked"})
|
||||
Map<String, Object> clusterPlugins = (Map<String, Object>) result.getOrDefault("plugin", Collections.emptyMap());
|
||||
Map<String, Object> clusterPlugins = (Map<String, Object>) result.getOrDefault(ContainerPluginsApi.PLUGIN, Collections.emptyMap());
|
||||
for (String key : clusterPlugins.keySet()) {
|
||||
// Map<String, String> pluginMeta = (Map<String, String>) clusterPlugins.get(key);
|
||||
PluginMeta pluginMeta;
|
||||
|
|
|
@ -17,13 +17,7 @@
|
|||
package org.apache.solr.cloud;
|
||||
|
||||
import static org.apache.solr.cloud.AbstractDistribZkTestBase.verifyReplicaStatus;
|
||||
import static org.mockito.Mockito.anyBoolean;
|
||||
import static org.mockito.Mockito.anyInt;
|
||||
import static org.mockito.Mockito.anyString;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
|
@ -73,6 +67,7 @@ import org.apache.solr.common.util.SolrNamedThreadFactory;
|
|||
import org.apache.solr.common.util.TimeSource;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.core.CloudConfig;
|
||||
import org.apache.solr.core.ClusterSingletons;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.core.PluginInfo;
|
||||
import org.apache.solr.core.SolrResourceLoader;
|
||||
|
@ -117,7 +112,6 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
|
||||
private static SolrZkClient zkClient;
|
||||
|
||||
|
||||
private volatile boolean testDone = false;
|
||||
|
||||
private final List<ZkController> zkControllers = Collections.synchronizedList(new ArrayList<>());
|
||||
|
@ -127,7 +121,6 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
private final List<HttpShardHandlerFactory> httpShardHandlerFactorys = Collections.synchronizedList(new ArrayList<>());
|
||||
private final List<UpdateShardHandler> updateShardHandlers = Collections.synchronizedList(new ArrayList<>());
|
||||
private final List<CloudSolrClient> solrClients = Collections.synchronizedList(new ArrayList<>());
|
||||
|
||||
private static final String COLLECTION = SolrTestCaseJ4.DEFAULT_TEST_COLLECTION_NAME;
|
||||
|
||||
public static class MockZKController{
|
||||
|
@ -306,6 +299,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
@Before
|
||||
public void setUp() throws Exception {
|
||||
testDone = false;
|
||||
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
|
@ -322,6 +316,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
}
|
||||
|
||||
server = null;
|
||||
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -1428,11 +1423,22 @@ public class OverseerTest extends SolrTestCaseJ4 {
|
|||
Mockito.withSettings().defaultAnswer(Mockito.CALLS_REAL_METHODS));
|
||||
when(mockAlwaysUpCoreContainer.isShutDown()).thenReturn(testDone); // Allow retry on session expiry
|
||||
when(mockAlwaysUpCoreContainer.getResourceLoader()).thenReturn(new SolrResourceLoader());
|
||||
ClusterSingletons singletons = new ClusterSingletons(() -> true, r -> r.run());
|
||||
// don't wait for all singletons
|
||||
singletons.setReady();
|
||||
FieldSetter.setField(mockAlwaysUpCoreContainer, CoreContainer.class.getDeclaredField("clusterSingletons"), singletons);
|
||||
FieldSetter.setField(zkController, ZkController.class.getDeclaredField("zkClient"), zkClient);
|
||||
FieldSetter.setField(zkController, ZkController.class.getDeclaredField("cc"), mockAlwaysUpCoreContainer);
|
||||
when(zkController.getCoreContainer()).thenReturn(mockAlwaysUpCoreContainer);
|
||||
when(zkController.getZkClient()).thenReturn(zkClient);
|
||||
when(zkController.getZkStateReader()).thenReturn(reader);
|
||||
// primitive support for CC.runAsync
|
||||
doAnswer(invocable -> {
|
||||
Runnable r = invocable.getArgument(0);
|
||||
Thread t = new Thread(r);
|
||||
t.start();
|
||||
return null;
|
||||
}).when(mockAlwaysUpCoreContainer).runAsync(any(Runnable.class));
|
||||
|
||||
when(zkController.getLeaderProps(anyString(), anyString(), anyInt())).thenCallRealMethod();
|
||||
when(zkController.getLeaderProps(anyString(), anyString(), anyInt(), anyBoolean())).thenCallRealMethod();
|
||||
|
|
|
@ -37,10 +37,12 @@ import org.apache.solr.client.solrj.request.V2Request;
|
|||
import org.apache.solr.client.solrj.request.beans.Package;
|
||||
import org.apache.solr.client.solrj.request.beans.PluginMeta;
|
||||
import org.apache.solr.client.solrj.response.V2Response;
|
||||
import org.apache.solr.cloud.ClusterSingleton;
|
||||
import org.apache.solr.cloud.MiniSolrCloudCluster;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.common.NavigableObject;
|
||||
import org.apache.solr.common.util.Utils;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.core.SolrResourceLoader;
|
||||
import org.apache.solr.filestore.PackageStoreAPI;
|
||||
import org.apache.solr.filestore.TestDistribPackageStore;
|
||||
|
@ -92,8 +94,11 @@ public class TestContainerPlugin extends SolrCloudTestCase {
|
|||
expectError(req, cluster.getSolrClient(), errPath, "No method with @Command in class");
|
||||
|
||||
//test with an invalid class
|
||||
plugin.klass = C1.class.getName();
|
||||
expectError(req, cluster.getSolrClient(), errPath, "No @EndPoints");
|
||||
// XXX (ab) in order to support ClusterSingleton we allow adding
|
||||
// plugins without Api EndPoints
|
||||
|
||||
// plugin.klass = C1.class.getName();
|
||||
// expectError(req, cluster.getSolrClient(), errPath, "No @EndPoints");
|
||||
|
||||
//test with a valid class. This should succeed now
|
||||
plugin.klass = C3.class.getName();
|
||||
|
@ -170,6 +175,31 @@ public class TestContainerPlugin extends SolrCloudTestCase {
|
|||
.withMethod(GET)
|
||||
.build()
|
||||
.process(cluster.getSolrClient()));
|
||||
|
||||
// test ClusterSingleton plugin
|
||||
plugin.name = "clusterSingleton";
|
||||
plugin.klass = C6.class.getName();
|
||||
req.process(cluster.getSolrClient());
|
||||
|
||||
//just check if the plugin is indeed registered
|
||||
readPluginState = new V2Request.Builder("/cluster/plugin")
|
||||
.forceV2(true)
|
||||
.withMethod(GET)
|
||||
.build();
|
||||
rsp = readPluginState.process(cluster.getSolrClient());
|
||||
assertEquals(C6.class.getName(), rsp._getStr("/plugin/clusterSingleton/class", null));
|
||||
|
||||
assertTrue("ccProvided", C6.ccProvided);
|
||||
assertTrue("startCalled", C6.startCalled);
|
||||
assertFalse("stopCalled", C6.stopCalled);
|
||||
// kill the Overseer leader
|
||||
for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
|
||||
if (!jetty.getCoreContainer().getZkController().getOverseer().isClosed()) {
|
||||
cluster.stopJettySolrRunner(jetty);
|
||||
cluster.waitForJettyToStop(jetty);
|
||||
}
|
||||
}
|
||||
assertTrue("stopCalled", C6.stopCalled);
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
|
@ -289,6 +319,45 @@ public class TestContainerPlugin extends SolrCloudTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public static class C6 implements ClusterSingleton {
|
||||
static boolean startCalled = false;
|
||||
static boolean stopCalled = false;
|
||||
static boolean ccProvided = false;
|
||||
|
||||
private State state = State.STOPPED;
|
||||
|
||||
public C6(CoreContainer cc) {
|
||||
if (cc != null) {
|
||||
ccProvided = true;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "C6";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() throws Exception {
|
||||
state = State.STARTING;
|
||||
startCalled = true;
|
||||
state = State.RUNNING;
|
||||
}
|
||||
|
||||
@Override
|
||||
public State getState() {
|
||||
return state;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
state = State.STOPPING;
|
||||
stopCalled = true;
|
||||
state = State.STOPPED;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static class C5 implements ResourceLoaderAware {
|
||||
static ByteBuffer classData;
|
||||
private SolrResourceLoader resourceLoader;
|
||||
|
|
Loading…
Reference in New Issue