SOLR-14749: Provide a clean API for cluster-level event processing.

This commit is contained in:
Andrzej Bialecki 2020-11-05 12:16:35 +01:00
parent bcd9711ab6
commit bdc6e8247f
26 changed files with 2073 additions and 31 deletions

View File

@ -16,7 +16,8 @@ New Features
* SOLR-13528 Rate Limiting in Solr (Atri Sharma, Mike Drob)
* SOLR-14749: Improve support for arbitrary container-level plugins. Add ClusterSingleton
* SOLR-14749: Provide a clean API for cluster-level event processing.
Improve support for arbitrary container-level plugins. Add ClusterSingleton
support for plugins that require only one active instance in the cluster. (ab, noble)
Improvements

View File

@ -39,6 +39,7 @@ import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.annotation.JsonProperty;
import org.apache.solr.common.cloud.ClusterPropertiesListener;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.PathTrie;
import org.apache.solr.common.util.ReflectMapWriter;
import org.apache.solr.common.util.StrUtils;
@ -65,7 +66,7 @@ import static org.apache.solr.common.util.Utils.makeMap;
* for additional functionality by {@link PluginRegistryListener}-s registered with
* this class.</p>
*/
public class CustomContainerPlugins implements ClusterPropertiesListener, MapWriter {
public class ContainerPluginsRegistry implements ClusterPropertiesListener, MapWriter, Closeable {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final ObjectMapper mapper = SolrJacksonAnnotationInspector.createObjectMapper();
@ -90,16 +91,25 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
listeners.remove(listener);
}
public CustomContainerPlugins(CoreContainer coreContainer, ApiBag apiBag) {
public ContainerPluginsRegistry(CoreContainer coreContainer, ApiBag apiBag) {
this.coreContainer = coreContainer;
this.containerApiBag = apiBag;
}
@Override
public void writeMap(EntryWriter ew) throws IOException {
public synchronized void writeMap(EntryWriter ew) throws IOException {
currentPlugins.forEach(ew.getBiConsumer());
}
@Override
public synchronized void close() throws IOException {
currentPlugins.values().forEach(apiInfo -> {
if (apiInfo.instance instanceof Closeable) {
IOUtils.closeQuietly((Closeable) apiInfo.instance);
}
});
}
public synchronized ApiInfo getPlugin(String name) {
return currentPlugins.get(name);
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cluster.events;
import java.time.Instant;
/**
* Cluster-level event.
*/
public interface ClusterEvent {
enum EventType {
/** One or more nodes went down. */
NODES_DOWN,
/** One or more nodes went up. */
NODES_UP,
/** One or more collections have been added. */
COLLECTIONS_ADDED,
/** One or more collections have been removed. */
COLLECTIONS_REMOVED,
/** Cluster properties have changed. */
CLUSTER_PROPERTIES_CHANGED
// other types? eg. Overseer leader change, shard leader change,
// node overload (eg. CPU / MEM circuit breakers tripped)?
}
/** Get event type. */
EventType getType();
/** Get event timestamp. This is the instant when the event was generated (not necessarily when
* the underlying condition first occurred). */
Instant getTimestamp();
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cluster.events;
import java.io.Closeable;
/**
* Components that want to be notified of cluster-wide events should use this.
*
* XXX should this work only for ClusterSingleton-s? some types of events may be
* XXX difficult (or pointless) to propagate to every node.
*/
public interface ClusterEventListener extends Closeable {
/**
* Handle the event. Implementations should be non-blocking - if any long
* processing is needed it should be performed asynchronously.
* @param event cluster event
*/
void onEvent(ClusterEvent event);
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cluster.events;
import org.apache.solr.cloud.ClusterSingleton;
import java.io.Closeable;
/**
* Component that produces {@link ClusterEvent} instances.
*/
public interface ClusterEventProducer extends ClusterSingleton, Closeable {
/** Unique name for the registration of a plugin-based implementation. */
String PLUGIN_NAME = "cluster-event-producer";
@Override
default String getName() {
return PLUGIN_NAME;
}
/**
* Register an event listener for processing the specified event types.
* @param listener non-null listener. If the same instance of the listener is
* already registered for some event types then it will be also registered
* for additional event types specified in this call.
* @param eventTypes event types that this listener is being registered for.
* If this is null or empty then all types will be used.
*/
void registerListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes);
/**
* Unregister an event listener for all event types.
* @param listener non-null listener.
*/
default void unregisterListener(ClusterEventListener listener) {
unregisterListener(listener, ClusterEvent.EventType.values());
}
/**
* Unregister an event listener for specified event types.
* @param listener non-null listener.
* @param eventTypes event types from which the listener will be unregistered. If this
* is null or empty then all event types will be used
*/
void unregisterListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes);
}

View File

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cluster.events;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.core.CoreContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* Base class for implementing {@link ClusterEventProducer}.
*/
public abstract class ClusterEventProducerBase implements ClusterEventProducer {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected final Map<ClusterEvent.EventType, Set<ClusterEventListener>> listeners = new ConcurrentHashMap<>();
protected volatile State state = State.STOPPED;
protected final CoreContainer cc;
protected ClusterEventProducerBase(CoreContainer cc) {
this.cc = cc;
}
@Override
public void registerListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) {
if (eventTypes == null || eventTypes.length == 0) {
eventTypes = ClusterEvent.EventType.values();
}
for (ClusterEvent.EventType type : eventTypes) {
if (!getSupportedEventTypes().contains(type)) {
log.warn("event type {} not supported yet.", type);
continue;
}
// to avoid removing no-longer empty set on race in unregister
synchronized (listeners) {
listeners.computeIfAbsent(type, t -> ConcurrentHashMap.newKeySet())
.add(listener);
}
}
}
@Override
public void unregisterListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) {
if (eventTypes == null || eventTypes.length == 0) {
eventTypes = ClusterEvent.EventType.values();
}
synchronized (listeners) {
for (ClusterEvent.EventType type : eventTypes) {
Set<ClusterEventListener> perType = listeners.get(type);
if (perType != null) {
perType.remove(listener);
if (perType.isEmpty()) {
listeners.remove(type);
}
}
}
}
}
@Override
public State getState() {
return state;
}
@Override
public void close() throws IOException {
synchronized (listeners) {
listeners.values().forEach(listenerSet ->
listenerSet.forEach(listener -> IOUtils.closeQuietly(listener)));
}
}
public abstract Set<ClusterEvent.EventType> getSupportedEventTypes();
protected void fireEvent(ClusterEvent event) {
synchronized (listeners) {
listeners.getOrDefault(event.getType(), Collections.emptySet())
.forEach(listener -> {
if (log.isDebugEnabled()) {
log.debug("--- firing event {} to {}", event, listener);
}
listener.onEvent(event);
});
}
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cluster.events;
import java.util.Map;
/**
* Event generated when {@link org.apache.solr.common.cloud.ZkStateReader#CLUSTER_PROPS} is modified.
*/
public interface ClusterPropertiesChangedEvent extends ClusterEvent {
@Override
default EventType getType() {
return EventType.CLUSTER_PROPERTIES_CHANGED;
}
Map<String, Object> getNewClusterProperties();
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cluster.events;
import java.util.Iterator;
/**
* Event generated when some collections have been added.
*/
public interface CollectionsAddedEvent extends ClusterEvent {
@Override
default EventType getType() {
return EventType.COLLECTIONS_ADDED;
}
Iterator<String> getCollectionNames();
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cluster.events;
import java.util.Iterator;
/**
* Event generated when some collections have been removed.
*/
public interface CollectionsRemovedEvent extends ClusterEvent {
@Override
default EventType getType() {
return EventType.COLLECTIONS_REMOVED;
}
Iterator<String> getCollectionNames();
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cluster.events;
import org.apache.solr.core.CoreContainer;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
/**
* No-op implementation of {@link ClusterEventProducer}. This implementation doesn't
* generate any events.
*/
public final class NoOpProducer extends ClusterEventProducerBase {
public static final Set<ClusterEvent.EventType> ALL_EVENT_TYPES = new HashSet<>(Arrays.asList(ClusterEvent.EventType.values()));
public NoOpProducer(CoreContainer cc) {
super(cc);
}
@Override
public Set<ClusterEvent.EventType> getSupportedEventTypes() {
return ALL_EVENT_TYPES;
}
@Override
public void start() throws Exception {
state = State.RUNNING;
}
@Override
public void stop() {
state = State.STOPPED;
}
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cluster.events;
import java.util.Iterator;
/**
* Event generated when some nodes went down.
*/
public interface NodesDownEvent extends ClusterEvent {
@Override
default EventType getType() {
return EventType.NODES_DOWN;
}
Iterator<String> getNodeNames();
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cluster.events;
import java.util.Iterator;
/**
* Event generated when some nodes went up.
*/
public interface NodesUpEvent extends ClusterEvent {
@Override
default EventType getType() {
return EventType.NODES_UP;
}
Iterator<String> getNodeNames();
}

View File

@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cluster.events.impl;
import org.apache.solr.api.ContainerPluginsRegistry;
import org.apache.solr.cluster.events.ClusterEvent;
import org.apache.solr.cluster.events.ClusterEventListener;
import org.apache.solr.cluster.events.ClusterEventProducer;
import org.apache.solr.cluster.events.ClusterEventProducerBase;
import org.apache.solr.cluster.events.NoOpProducer;
import org.apache.solr.core.CoreContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.Set;
/**
* This class helps in handling the initial registration of plugin-based listeners,
* when both the final {@link ClusterEventProducer} implementation and listeners
* are configured using plugins.
*/
public class ClusterEventProducerFactory extends ClusterEventProducerBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private ContainerPluginsRegistry.PluginRegistryListener initialPluginListener;
private boolean created = false;
public ClusterEventProducerFactory(CoreContainer cc) {
super(cc);
// this initial listener is used only for capturing plugin registrations
// done by other nodes while this CoreContainer is still loading
initialPluginListener = new ContainerPluginsRegistry.PluginRegistryListener() {
@Override
public void added(ContainerPluginsRegistry.ApiInfo plugin) {
if (plugin == null || plugin.getInstance() == null) {
return;
}
Object instance = plugin.getInstance();
if (instance instanceof ClusterEventListener) {
registerListener((ClusterEventListener) instance);
}
}
@Override
public void deleted(ContainerPluginsRegistry.ApiInfo plugin) {
if (plugin == null || plugin.getInstance() == null) {
return;
}
Object instance = plugin.getInstance();
if (instance instanceof ClusterEventListener) {
unregisterListener((ClusterEventListener) instance);
}
}
@Override
public void modified(ContainerPluginsRegistry.ApiInfo old, ContainerPluginsRegistry.ApiInfo replacement) {
added(replacement);
deleted(old);
}
};
}
@Override
public Set<ClusterEvent.EventType> getSupportedEventTypes() {
return NoOpProducer.ALL_EVENT_TYPES;
}
/**
* This method returns an initial plugin registry listener that helps to capture the
* freshly loaded listener plugins before the final cluster event producer is created.
* @return initial listener
*/
public ContainerPluginsRegistry.PluginRegistryListener getPluginRegistryListener() {
return initialPluginListener;
}
/**
* Create a {@link ClusterEventProducer} based on the current plugin configurations.
* <p>NOTE: this method can only be called once because it has side-effects, such as
* transferring the initially collected listeners to the resulting producer's instance, and
* installing a {@link org.apache.solr.api.ContainerPluginsRegistry.PluginRegistryListener}.
* Calling this method more than once will result in an exception.</p>
* @param plugins current plugin configurations
* @return configured instance of cluster event producer (with side-effects, see above)
*/
public DelegatingClusterEventProducer create(ContainerPluginsRegistry plugins) {
if (created) {
throw new RuntimeException("this factory can be called only once!");
}
final DelegatingClusterEventProducer clusterEventProducer = new DelegatingClusterEventProducer(cc);
// since this is a ClusterSingleton, register it as such, under unique name
cc.getClusterSingletons().getSingletons().put(ClusterEventProducer.PLUGIN_NAME +"_delegate", clusterEventProducer);
ContainerPluginsRegistry.ApiInfo clusterEventProducerInfo = plugins.getPlugin(ClusterEventProducer.PLUGIN_NAME);
if (clusterEventProducerInfo != null) {
// the listener in ClusterSingletons already registered this instance
clusterEventProducer.setDelegate((ClusterEventProducer) clusterEventProducerInfo.getInstance());
} else {
// use the default NoOp impl
}
// transfer those listeners that were already registered to the initial impl
transferListeners(clusterEventProducer, plugins);
// install plugin registry listener that maintains plugin-based listeners in
// the event producer impl
ContainerPluginsRegistry.PluginRegistryListener pluginListener = new ContainerPluginsRegistry.PluginRegistryListener() {
@Override
public void added(ContainerPluginsRegistry.ApiInfo plugin) {
if (plugin == null || plugin.getInstance() == null) {
return;
}
Object instance = plugin.getInstance();
if (instance instanceof ClusterEventListener) {
ClusterEventListener listener = (ClusterEventListener) instance;
clusterEventProducer.registerListener(listener);
} else if (instance instanceof ClusterEventProducer) {
// replace the existing impl
if (cc.getClusterEventProducer() instanceof DelegatingClusterEventProducer) {
((DelegatingClusterEventProducer) cc.getClusterEventProducer())
.setDelegate((ClusterEventProducer) instance);
} else {
log.warn("Can't configure plugin-based ClusterEventProducer while CoreContainer is still loading - " +
" using existing implementation {}", cc.getClusterEventProducer().getClass().getName());
}
}
}
@Override
public void deleted(ContainerPluginsRegistry.ApiInfo plugin) {
if (plugin == null || plugin.getInstance() == null) {
return;
}
Object instance = plugin.getInstance();
if (instance instanceof ClusterEventListener) {
ClusterEventListener listener = (ClusterEventListener) instance;
clusterEventProducer.unregisterListener(listener);
} else if (instance instanceof ClusterEventProducer) {
// replace the existing impl with NoOp
if (cc.getClusterEventProducer() instanceof DelegatingClusterEventProducer) {
((DelegatingClusterEventProducer) cc.getClusterEventProducer())
.setDelegate(new NoOpProducer(cc));
} else {
log.warn("Can't configure plugin-based ClusterEventProducer while CoreContainer is still loading - " +
" using existing implementation {}", cc.getClusterEventProducer().getClass().getName());
}
}
}
@Override
public void modified(ContainerPluginsRegistry.ApiInfo old, ContainerPluginsRegistry.ApiInfo replacement) {
added(replacement);
deleted(old);
}
};
plugins.registerListener(pluginListener);
created = true;
return clusterEventProducer;
}
private void transferListeners(ClusterEventProducer target, ContainerPluginsRegistry plugins) {
synchronized (listeners) {
// stop capturing listener plugins
plugins.unregisterListener(initialPluginListener);
// transfer listeners that are already registered
listeners.forEach((type, listenersSet) -> {
listenersSet.forEach(listener -> target.registerListener(listener, type));
});
listeners.clear();
}
}
@Override
public void start() throws Exception {
state = State.RUNNING;
}
@Override
public void stop() {
state = State.STOPPED;
}
}

View File

@ -0,0 +1,264 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cluster.events.impl;
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.annotations.VisibleForTesting;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.ClusterSingleton;
import org.apache.solr.cloud.api.collections.Assign;
import org.apache.solr.cluster.events.ClusterEvent;
import org.apache.solr.cluster.events.ClusterEventListener;
import org.apache.solr.cluster.events.NodesDownEvent;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.core.CoreContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This is an illustration how to re-implement the combination of Solr 8x
* NodeLostTrigger and AutoAddReplicasPlanAction to maintain the collection's replicas when
* nodes are lost.
* <p>The notion of <code>waitFor</code> delay between detection and repair action is
* implemented as a scheduled execution of the repair method, which is called every 1 sec
* to check whether there are any lost nodes that exceeded their <code>waitFor</code> period.</p>
* <p>NOTE: this functionality would be probably more reliable when executed also as a
* periodically scheduled check - both as a reactive (listener) and proactive (scheduled) measure.</p>
*/
public class CollectionsRepairEventListener implements ClusterEventListener, ClusterSingleton, Closeable {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String PLUGIN_NAME = "collectionsRepairListener";
public static final int DEFAULT_WAIT_FOR_SEC = 30;
private static final String ASYNC_ID_PREFIX = "_async_" + PLUGIN_NAME;
private static final AtomicInteger counter = new AtomicInteger();
private final SolrClient solrClient;
private final SolrCloudManager solrCloudManager;
private State state = State.STOPPED;
private int waitForSecond = DEFAULT_WAIT_FOR_SEC;
private ScheduledThreadPoolExecutor waitForExecutor;
public CollectionsRepairEventListener(CoreContainer cc) {
this.solrClient = cc.getSolrClientCache().getCloudSolrClient(cc.getZkController().getZkClient().getZkServerAddress());
this.solrCloudManager = cc.getZkController().getSolrCloudManager();
}
@VisibleForTesting
public void setWaitForSecond(int waitForSecond) {
if (log.isDebugEnabled()) {
log.debug("-- setting waitFor={}", waitForSecond);
}
this.waitForSecond = waitForSecond;
}
@Override
public String getName() {
return PLUGIN_NAME;
}
@Override
public void onEvent(ClusterEvent event) {
if (state != State.RUNNING) {
// ignore the event
return;
}
switch (event.getType()) {
case NODES_DOWN:
handleNodesDown((NodesDownEvent) event);
break;
default:
log.warn("Unsupported event {}, ignoring...", event);
}
}
private Map<String, Long> nodeNameVsTimeRemoved = new ConcurrentHashMap<>();
private void handleNodesDown(NodesDownEvent event) {
// tracking for the purpose of "waitFor" delay
// have any nodes that we were tracking been added to the cluster?
// if so, remove them from the tracking map
Set<String> trackingKeySet = nodeNameVsTimeRemoved.keySet();
trackingKeySet.removeAll(solrCloudManager.getClusterStateProvider().getLiveNodes());
// add any new lost nodes (old lost nodes are skipped)
event.getNodeNames().forEachRemaining(lostNode -> {
nodeNameVsTimeRemoved.computeIfAbsent(lostNode, n -> solrCloudManager.getTimeSource().getTimeNs());
});
}
private void runRepair() {
if (nodeNameVsTimeRemoved.isEmpty()) {
// nothing to do
return;
}
if (log.isDebugEnabled()) {
log.debug("-- runRepair for {} lost nodes", nodeNameVsTimeRemoved.size());
}
Set<String> reallyLostNodes = new HashSet<>();
nodeNameVsTimeRemoved.forEach((lostNode, timeRemoved) -> {
long now = solrCloudManager.getTimeSource().getTimeNs();
long te = TimeUnit.SECONDS.convert(now - timeRemoved, TimeUnit.NANOSECONDS);
if (te >= waitForSecond) {
reallyLostNodes.add(lostNode);
}
});
if (reallyLostNodes.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("--- skipping repair, {} nodes are still in waitFor period", nodeNameVsTimeRemoved.size());
}
return;
} else {
if (log.isDebugEnabled()) {
log.debug("--- running repair for nodes that are still lost after waitFor: {}", reallyLostNodes);
}
}
// collect all lost replicas
// collection / positions
Map<String, List<ReplicaPosition>> newPositions = new HashMap<>();
try {
ClusterState clusterState = solrCloudManager.getClusterStateProvider().getClusterState();
clusterState.forEachCollection(coll -> {
// shard / type / count
Map<String, Map<Replica.Type, AtomicInteger>> lostReplicas = new HashMap<>();
coll.forEachReplica((shard, replica) -> {
if (reallyLostNodes.contains(replica.getNodeName())) {
lostReplicas.computeIfAbsent(shard, s -> new HashMap<>())
.computeIfAbsent(replica.type, t -> new AtomicInteger())
.incrementAndGet();
}
});
Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(solrCloudManager, clusterState, coll);
lostReplicas.forEach((shard, types) -> {
Assign.AssignRequestBuilder assignRequestBuilder = new Assign.AssignRequestBuilder()
.forCollection(coll.getName())
.forShard(Collections.singletonList(shard));
types.forEach((type, count) -> {
switch (type) {
case NRT:
assignRequestBuilder.assignNrtReplicas(count.get());
break;
case PULL:
assignRequestBuilder.assignPullReplicas(count.get());
break;
case TLOG:
assignRequestBuilder.assignTlogReplicas(count.get());
break;
}
});
Assign.AssignRequest assignRequest = assignRequestBuilder.build();
try {
List<ReplicaPosition> positions = assignStrategy.assign(solrCloudManager, assignRequest);
newPositions.put(coll.getName(), positions);
} catch (Exception e) {
log.warn("Exception computing positions for {}/{}: {}", coll.getName(), shard, e);
return;
}
});
});
} catch (IOException e) {
log.warn("Exception getting cluster state", e);
return;
}
// remove all nodes with expired waitFor from the tracking set
nodeNameVsTimeRemoved.keySet().removeAll(reallyLostNodes);
// send ADDREPLICA admin requests for each lost replica
// XXX should we use 'async' for that, to avoid blocking here?
List<CollectionAdminRequest.AddReplica> addReplicas = new ArrayList<>();
newPositions.forEach((collection, positions) -> {
positions.forEach(position -> {
CollectionAdminRequest.AddReplica addReplica = CollectionAdminRequest
.addReplicaToShard(collection, position.shard, position.type);
addReplica.setNode(position.node);
addReplica.setAsyncId(ASYNC_ID_PREFIX + counter.incrementAndGet());
addReplicas.add(addReplica);
});
});
addReplicas.forEach(addReplica -> {
try {
solrClient.request(addReplica);
} catch (Exception e) {
log.warn("Exception calling ADDREPLICA {}: {}", addReplica.getParams().toQueryString(), e);
}
});
}
@Override
public void start() throws Exception {
state = State.STARTING;
waitForExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1,
new SolrNamedThreadFactory("collectionsRepair_waitFor"));
waitForExecutor.setRemoveOnCancelPolicy(true);
waitForExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
waitForExecutor.scheduleAtFixedRate(() -> runRepair(), 0, waitForSecond, TimeUnit.SECONDS);
state = State.RUNNING;
}
@Override
public State getState() {
return state;
}
@Override
public void stop() {
state = State.STOPPING;
waitForExecutor.shutdownNow();
try {
waitForExecutor.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.warn("Failed to shut down the waitFor executor - interrupted...");
Thread.currentThread().interrupt();
}
waitForExecutor = null;
state = State.STOPPED;
}
@Override
public void close() throws IOException {
if (log.isDebugEnabled()) {
log.debug("-- close() called");
}
stop();
}
}

View File

@ -0,0 +1,231 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cluster.events.impl;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cluster.events.ClusterEventProducerBase;
import org.apache.solr.cluster.events.ClusterPropertiesChangedEvent;
import org.apache.solr.cluster.events.ClusterEvent;
import org.apache.solr.cluster.events.ClusterEventProducer;
import org.apache.solr.cluster.events.CollectionsAddedEvent;
import org.apache.solr.cluster.events.CollectionsRemovedEvent;
import org.apache.solr.cluster.events.NodesDownEvent;
import org.apache.solr.cluster.events.NodesUpEvent;
import org.apache.solr.common.cloud.CloudCollectionsListener;
import org.apache.solr.common.cloud.ClusterPropertiesListener;
import org.apache.solr.common.cloud.LiveNodesListener;
import org.apache.solr.core.CoreContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implementation of {@link ClusterEventProducer}.
* <h3>Implementation notes</h3>
* <p>For each cluster event relevant listeners are always invoked sequentially
* (not in parallel) and in arbitrary order. This means that if any listener blocks the
* processing other listeners may be invoked much later or not at all.</p>
*/
public class DefaultClusterEventProducer extends ClusterEventProducerBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private LiveNodesListener liveNodesListener;
private CloudCollectionsListener cloudCollectionsListener;
private ClusterPropertiesListener clusterPropertiesListener;
private ZkController zkController;
private final Set<ClusterEvent.EventType> supportedEvents =
new HashSet<>(Arrays.asList(
ClusterEvent.EventType.NODES_DOWN,
ClusterEvent.EventType.NODES_UP,
ClusterEvent.EventType.COLLECTIONS_ADDED,
ClusterEvent.EventType.COLLECTIONS_REMOVED,
ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED
));
public DefaultClusterEventProducer(CoreContainer cc) {
super(cc);
}
// ClusterSingleton lifecycle methods
@Override
public synchronized void start() {
if (cc == null) {
liveNodesListener = null;
cloudCollectionsListener = null;
clusterPropertiesListener = null;
state = State.STOPPED;
return;
}
if (state == State.RUNNING) {
log.warn("Double start() invoked on {}, ignoring", this);
return;
}
state = State.STARTING;
this.zkController = this.cc.getZkController();
// clean up any previous instances
doStop();
// register liveNodesListener
liveNodesListener = (oldNodes, newNodes) -> {
// already closed but still registered
if (state == State.STOPPING || state == State.STOPPED) {
// remove the listener
return true;
}
// spurious event, ignore but keep listening
if (oldNodes.equals(newNodes)) {
return false;
}
final Instant now = Instant.now();
final Set<String> downNodes = new HashSet<>(oldNodes);
downNodes.removeAll(newNodes);
if (!downNodes.isEmpty()) {
fireEvent(new NodesDownEvent() {
@Override
public Iterator<String> getNodeNames() {
return downNodes.iterator();
}
@Override
public Instant getTimestamp() {
return now;
}
});
}
final Set<String> upNodes = new HashSet<>(newNodes);
upNodes.removeAll(oldNodes);
if (!upNodes.isEmpty()) {
fireEvent(new NodesUpEvent() {
@Override
public Iterator<String> getNodeNames() {
return upNodes.iterator();
}
@Override
public Instant getTimestamp() {
return now;
}
});
}
return false;
};
zkController.zkStateReader.registerLiveNodesListener(liveNodesListener);
cloudCollectionsListener = ((oldCollections, newCollections) -> {
if (oldCollections.equals(newCollections)) {
return;
}
final Instant now = Instant.now();
final Set<String> removed = new HashSet<>(oldCollections);
removed.removeAll(newCollections);
if (!removed.isEmpty()) {
fireEvent(new CollectionsRemovedEvent() {
@Override
public Iterator<String> getCollectionNames() {
return removed.iterator();
}
@Override
public Instant getTimestamp() {
return now;
}
});
}
final Set<String> added = new HashSet<>(newCollections);
added.removeAll(oldCollections);
if (!added.isEmpty()) {
fireEvent(new CollectionsAddedEvent() {
@Override
public Iterator<String> getCollectionNames() {
return added.iterator();
}
@Override
public Instant getTimestamp() {
return now;
}
});
}
});
zkController.zkStateReader.registerCloudCollectionsListener(cloudCollectionsListener);
clusterPropertiesListener = (newProperties) -> {
fireEvent(new ClusterPropertiesChangedEvent() {
final Instant now = Instant.now();
@Override
public Map<String, Object> getNewClusterProperties() {
return newProperties;
}
@Override
public Instant getTimestamp() {
return now;
}
});
return false;
};
zkController.zkStateReader.registerClusterPropertiesListener(clusterPropertiesListener);
// XXX register collection state listener?
// XXX not sure how to efficiently monitor for REPLICA_DOWN events
state = State.RUNNING;
}
@Override
public Set<ClusterEvent.EventType> getSupportedEventTypes() {
return supportedEvents;
}
@Override
public synchronized void stop() {
state = State.STOPPING;
doStop();
state = State.STOPPED;
}
private void doStop() {
if (liveNodesListener != null) {
zkController.zkStateReader.removeLiveNodesListener(liveNodesListener);
}
if (cloudCollectionsListener != null) {
zkController.zkStateReader.removeCloudCollectionsListener(cloudCollectionsListener);
}
if (clusterPropertiesListener != null) {
zkController.zkStateReader.removeClusterPropertiesListener(clusterPropertiesListener);
}
liveNodesListener = null;
cloudCollectionsListener = null;
clusterPropertiesListener = null;
}
@Override
public void close() throws IOException {
stop();
super.close();
}
}

View File

@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cluster.events.impl;
import org.apache.solr.cluster.events.ClusterEvent;
import org.apache.solr.cluster.events.ClusterEventListener;
import org.apache.solr.cluster.events.ClusterEventProducer;
import org.apache.solr.cluster.events.NoOpProducer;
import org.apache.solr.cluster.events.ClusterEventProducerBase;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.core.CoreContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Set;
/**
* This implementation allows Solr to dynamically change the underlying implementation
* of {@link ClusterEventProducer} in response to the changed plugin configuration.
*/
public final class DelegatingClusterEventProducer extends ClusterEventProducerBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private ClusterEventProducer delegate;
public DelegatingClusterEventProducer(CoreContainer cc) {
super(cc);
delegate = new NoOpProducer(cc);
}
@Override
public void close() throws IOException {
if (log.isDebugEnabled()) {
log.debug("--closing delegate for CC-{}: {}", Integer.toHexString(cc.hashCode()), delegate);
}
IOUtils.closeQuietly(delegate);
super.close();
}
public void setDelegate(ClusterEventProducer newDelegate) {
if (log.isDebugEnabled()) {
log.debug("--setting new delegate for CC-{}: {}", Integer.toHexString(cc.hashCode()), newDelegate);
}
this.delegate = newDelegate;
// transfer all listeners to the new delegate
listeners.forEach((type, listenerSet) -> {
listenerSet.forEach(listener -> {
try {
delegate.registerListener(listener, type);
} catch (Exception e) {
log.warn("Exception registering listener with the new event producer", e);
// make sure it's not registered
delegate.unregisterListener(listener, type);
// unregister it here, too
super.unregisterListener(listener, type);
}
});
});
if ((state == State.RUNNING || state == State.STARTING) &&
!(delegate.getState() == State.RUNNING || delegate.getState() == State.STARTING)) {
try {
delegate.start();
if (log.isDebugEnabled()) {
log.debug("--- started delegate {}", delegate);
}
} catch (Exception e) {
log.warn("Unable to start the new delegate {}: {}", delegate.getClass().getName(), e);
}
} else {
if (log.isDebugEnabled()) {
log.debug("--- delegate {} already in state {}", delegate, delegate.getState());
}
}
}
@Override
public void registerListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) {
super.registerListener(listener, eventTypes);
delegate.registerListener(listener, eventTypes);
}
@Override
public void unregisterListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) {
super.unregisterListener(listener, eventTypes);
delegate.unregisterListener(listener, eventTypes);
}
@Override
public synchronized void start() throws Exception {
if (log.isDebugEnabled()) {
log.debug("-- starting CC-{}, Delegating {}, delegate {}",
Integer.toHexString(cc.hashCode()), Integer.toHexString(hashCode()), delegate);
}
state = State.STARTING;
if (!(delegate.getState() == State.RUNNING || delegate.getState() == State.STARTING)) {
try {
delegate.start();
if (log.isDebugEnabled()) {
log.debug("--- started delegate {}", delegate);
}
} finally {
state = delegate.getState();
}
} else {
if (log.isDebugEnabled()) {
log.debug("--- delegate {} already in state {}", delegate, delegate.getState());
}
}
}
@Override
public Set<ClusterEvent.EventType> getSupportedEventTypes() {
return NoOpProducer.ALL_EVENT_TYPES;
}
@Override
public synchronized void stop() {
if (log.isDebugEnabled()) {
log.debug("-- stopping Delegating {}, delegate {}", Integer.toHexString(hashCode()), delegate);
}
state = State.STOPPING;
delegate.stop();
state = delegate.getState();
}
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Default implementation of {@link org.apache.solr.cluster.events.ClusterEventProducer}.
*/
package org.apache.solr.cluster.events.impl;

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Interfaces defining support for cluster-level event generation and processing.
*/
package org.apache.solr.cluster.events;

View File

@ -17,7 +17,7 @@
package org.apache.solr.core;
import org.apache.solr.api.CustomContainerPlugins;
import org.apache.solr.api.ContainerPluginsRegistry;
import org.apache.solr.cloud.ClusterSingleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -33,7 +33,7 @@ import java.util.function.Supplier;
/**
* Helper class to manage the initial registration of {@link ClusterSingleton} plugins and
* to track the changes in loaded plugins in {@link org.apache.solr.api.CustomContainerPlugins}.
* to track the changes in loaded plugins in {@link ContainerPluginsRegistry}.
*/
public class ClusterSingletons {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -41,7 +41,7 @@ public class ClusterSingletons {
private final Map<String, ClusterSingleton> singletonMap = new ConcurrentHashMap<>();
private final Supplier<Boolean> runSingletons;
private final Consumer<Runnable> asyncRunner;
private final CustomContainerPlugins.PluginRegistryListener pluginListener;
private final ContainerPluginsRegistry.PluginRegistryListener pluginListener;
public static final int DEFAULT_WAIT_TIMEOUT_SEC = 60;
@ -60,9 +60,9 @@ public class ClusterSingletons {
this.runSingletons = runSingletons;
this.asyncRunner = asyncRunner;
// create plugin registry listener
pluginListener = new CustomContainerPlugins.PluginRegistryListener() {
pluginListener = new ContainerPluginsRegistry.PluginRegistryListener() {
@Override
public void added(CustomContainerPlugins.ApiInfo plugin) {
public void added(ContainerPluginsRegistry.ApiInfo plugin) {
if (plugin == null || plugin.getInstance() == null) {
return;
}
@ -83,7 +83,7 @@ public class ClusterSingletons {
}
@Override
public void deleted(CustomContainerPlugins.ApiInfo plugin) {
public void deleted(ContainerPluginsRegistry.ApiInfo plugin) {
if (plugin == null || plugin.getInstance() == null) {
return;
}
@ -96,14 +96,14 @@ public class ClusterSingletons {
}
@Override
public void modified(CustomContainerPlugins.ApiInfo old, CustomContainerPlugins.ApiInfo replacement) {
public void modified(ContainerPluginsRegistry.ApiInfo old, ContainerPluginsRegistry.ApiInfo replacement) {
added(replacement);
deleted(old);
}
};
}
public CustomContainerPlugins.PluginRegistryListener getPluginRegistryListener() {
public ContainerPluginsRegistry.PluginRegistryListener getPluginRegistryListener() {
return pluginListener;
}
@ -151,9 +151,6 @@ public class ClusterSingletons {
*/
public void startClusterSingletons() {
final Runnable initializer = () -> {
if (!runSingletons.get()) {
return;
}
try {
waitUntilReady(DEFAULT_WAIT_TIMEOUT_SEC, TimeUnit.SECONDS);
} catch (InterruptedException e) {
@ -163,6 +160,9 @@ public class ClusterSingletons {
log.warn("Timed out during initialization of ClusterSingleton-s (waited {} sec)", DEFAULT_WAIT_TIMEOUT_SEC);
return;
}
if (!runSingletons.get()) {
return;
}
singletonMap.forEach((name, singleton) -> {
if (!runSingletons.get()) {
return;

View File

@ -57,7 +57,7 @@ import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory;
import org.apache.solr.api.CustomContainerPlugins;
import org.apache.solr.api.ContainerPluginsRegistry;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.embedded.EmbeddedSolrServer;
@ -73,6 +73,8 @@ import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ClusterSingleton;
import org.apache.solr.cloud.OverseerTaskQueue;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cluster.events.ClusterEventProducer;
import org.apache.solr.cluster.events.impl.ClusterEventProducerFactory;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
@ -178,7 +180,7 @@ public class CoreContainer {
*/
public final Supplier<SolrZkClient> zkClientSupplier = () -> getZkController().getZkClient();
private final CustomContainerPlugins customContainerPlugins = new CustomContainerPlugins(this, containerHandlers.getApiBag());
private final ContainerPluginsRegistry containerPluginsRegistry = new ContainerPluginsRegistry(this, containerHandlers.getApiBag());
protected final Map<String, CoreLoadFailure> coreInitFailures = new ConcurrentHashMap<>();
@ -253,6 +255,10 @@ public class CoreContainer {
getZkController().getOverseer() != null &&
!getZkController().getOverseer().isClosed(),
(r) -> this.runAsync(r));
// initially these are the same to collect the plugin-based listeners during init
private ClusterEventProducer clusterEventProducer;
private PackageStoreAPI packageStoreAPI;
private PackageLoader packageLoader;
@ -671,7 +677,11 @@ public class CoreContainer {
loader.reloadLuceneSPI();
}
customContainerPlugins.registerListener(clusterSingletons.getPluginRegistryListener());
ClusterEventProducerFactory clusterEventProducerFactory = new ClusterEventProducerFactory(this);
clusterEventProducer = clusterEventProducerFactory;
containerPluginsRegistry.registerListener(clusterSingletons.getPluginRegistryListener());
containerPluginsRegistry.registerListener(clusterEventProducerFactory.getPluginRegistryListener());
packageStoreAPI = new PackageStoreAPI(this);
containerHandlers.getApiBag().registerObject(packageStoreAPI.readAPI);
@ -881,12 +891,15 @@ public class CoreContainer {
}
if (isZooKeeperAware()) {
customContainerPlugins.refresh();
getZkController().zkStateReader.registerClusterPropertiesListener(customContainerPlugins);
containerPluginsRegistry.refresh();
getZkController().zkStateReader.registerClusterPropertiesListener(containerPluginsRegistry);
ContainerPluginsApi containerPluginsApi = new ContainerPluginsApi(this);
containerHandlers.getApiBag().registerObject(containerPluginsApi.readAPI);
containerHandlers.getApiBag().registerObject(containerPluginsApi.editAPI);
// create target ClusterEventProducer (possibly from plugins)
clusterEventProducer = clusterEventProducerFactory.create(containerPluginsRegistry);
// init ClusterSingleton-s
// register the handlers that are also ClusterSingleton
@ -1117,6 +1130,9 @@ public class CoreContainer {
if (solrClientCache != null) {
solrClientCache.close();
}
if (containerPluginsRegistry != null) {
IOUtils.closeQuietly(containerPluginsRegistry);
}
} finally {
try {
@ -2137,14 +2153,18 @@ public class CoreContainer {
return tragicException != null;
}
public CustomContainerPlugins getCustomContainerPlugins(){
return customContainerPlugins;
public ContainerPluginsRegistry getContainerPluginsRegistry() {
return containerPluginsRegistry;
}
public ClusterSingletons getClusterSingletons() {
return clusterSingletons;
}
public ClusterEventProducer getClusterEventProducer() {
return clusterEventProducer;
}
static {
ExecutorUtil.addThreadLocalProvider(SolrRequestInfo.getInheritableThreadLocalProvider());
}

View File

@ -28,7 +28,7 @@ import java.util.function.Supplier;
import org.apache.solr.api.AnnotatedApi;
import org.apache.solr.api.Command;
import org.apache.solr.api.CustomContainerPlugins;
import org.apache.solr.api.ContainerPluginsRegistry;
import org.apache.solr.api.EndPoint;
import org.apache.solr.api.PayloadObj;
import org.apache.solr.client.solrj.SolrRequest.METHOD;
@ -137,7 +137,7 @@ public class ContainerPluginsApi {
}
}
List<String> errs = new ArrayList<>();
CustomContainerPlugins.ApiInfo apiInfo = coreContainer.getCustomContainerPlugins().createInfo(info, errs);
ContainerPluginsRegistry.ApiInfo apiInfo = coreContainer.getContainerPluginsRegistry().createInfo(info, errs);
if (!errs.isEmpty()) {
for (String err : errs) payload.addError(err);
return;

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cluster.events;
import org.junit.Assert;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
*
*/
public class AllEventsListener implements ClusterEventListener {
CountDownLatch eventLatch = new CountDownLatch(1);
ClusterEvent.EventType expectedType;
Map<ClusterEvent.EventType, List<ClusterEvent>> events = new HashMap<>();
@Override
public void onEvent(ClusterEvent event) {
events.computeIfAbsent(event.getType(), type -> new ArrayList<>()).add(event);
if (event.getType() == expectedType) {
eventLatch.countDown();
}
}
public void setExpectedType(ClusterEvent.EventType expectedType) {
this.expectedType = expectedType;
eventLatch = new CountDownLatch(1);
}
public void waitForExpectedEvent(int timeoutSeconds) throws InterruptedException {
boolean await = eventLatch.await(timeoutSeconds, TimeUnit.SECONDS);
if (!await) {
Assert.fail("Timed out waiting for expected event " + expectedType);
}
}
public void close() throws IOException {
}
}

View File

@ -0,0 +1,370 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cluster.events;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.V2Request;
import org.apache.solr.client.solrj.request.beans.PluginMeta;
import org.apache.solr.client.solrj.response.V2Response;
import org.apache.solr.cloud.ClusterSingleton;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.cluster.events.impl.DefaultClusterEventProducer;
import org.apache.solr.common.cloud.ClusterProperties;
import org.apache.solr.common.util.Utils;
import org.apache.solr.util.LogLevel;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static java.util.Collections.singletonMap;
import static org.apache.solr.client.solrj.SolrRequest.METHOD.GET;
import static org.apache.solr.client.solrj.SolrRequest.METHOD.POST;
/**
*
*/
@LogLevel("org.apache.solr.cluster.events=DEBUG")
public class ClusterEventProducerTest extends SolrCloudTestCase {
private AllEventsListener eventsListener;
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(3)
.addConfig("conf", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
.configure();
}
@Before
public void setUp() throws Exception {
System.setProperty("enable.packages", "true");
super.setUp();
cluster.deleteAllCollections();
eventsListener = new AllEventsListener();
cluster.getOpenOverseer().getCoreContainer().getClusterEventProducer().registerListener(eventsListener);
}
@After
public void teardown() throws Exception {
System.clearProperty("enable.packages");
if (eventsListener != null) {
cluster.getOpenOverseer().getCoreContainer().getClusterEventProducer().unregisterListener(eventsListener);
eventsListener.events.clear();
}
V2Request readPluginState = new V2Request.Builder("/cluster/plugin")
.forceV2(true)
.withMethod(GET)
.build();
V2Response rsp = readPluginState.process(cluster.getSolrClient());
if (rsp._getStr("/plugin/" + ClusterEventProducer.PLUGIN_NAME + "/class", null) != null) {
V2Request req = new V2Request.Builder("/cluster/plugin")
.withMethod(POST)
.withPayload(Collections.singletonMap("remove", ClusterEventProducer.PLUGIN_NAME))
.build();
req.process(cluster.getSolrClient());
}
}
@Test
public void testEvents() throws Exception {
PluginMeta plugin = new PluginMeta();
plugin.klass = DefaultClusterEventProducer.class.getName();
plugin.name = ClusterEventProducer.PLUGIN_NAME;
V2Request req = new V2Request.Builder("/cluster/plugin")
.withMethod(POST)
.withPayload(Collections.singletonMap("add", plugin))
.build();
V2Response rsp = req.process(cluster.getSolrClient());
assertEquals(0, rsp.getStatus());
// NODES_DOWN
eventsListener.setExpectedType(ClusterEvent.EventType.NODES_DOWN);
// don't kill Overseer
JettySolrRunner nonOverseerJetty = null;
for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
if (cluster.getOpenOverseer().getCoreContainer().getZkController().getNodeName().equals(jetty.getNodeName())) {
continue;
}
nonOverseerJetty = jetty;
break;
}
String nodeName = nonOverseerJetty.getNodeName();
cluster.stopJettySolrRunner(nonOverseerJetty);
cluster.waitForJettyToStop(nonOverseerJetty);
eventsListener.waitForExpectedEvent(30);
assertNotNull("should be NODES_DOWN events", eventsListener.events.get(ClusterEvent.EventType.NODES_DOWN));
List<ClusterEvent> events = eventsListener.events.get(ClusterEvent.EventType.NODES_DOWN);
assertEquals("should be one NODES_DOWN event", 1, events.size());
ClusterEvent event = events.get(0);
assertEquals("should be NODES_DOWN event type", ClusterEvent.EventType.NODES_DOWN, event.getType());
NodesDownEvent nodesDown = (NodesDownEvent) event;
assertEquals("should be node " + nodeName, nodeName, nodesDown.getNodeNames().next());
// NODES_UP
eventsListener.setExpectedType(ClusterEvent.EventType.NODES_UP);
JettySolrRunner newNode = cluster.startJettySolrRunner();
cluster.waitForNode(newNode, 60);
eventsListener.waitForExpectedEvent(30);
assertNotNull("should be NODES_UP events", eventsListener.events.get(ClusterEvent.EventType.NODES_UP));
events = eventsListener.events.get(ClusterEvent.EventType.NODES_UP);
assertEquals("should be one NODES_UP event", 1, events.size());
event = events.get(0);
assertEquals("should be NODES_UP event type", ClusterEvent.EventType.NODES_UP, event.getType());
NodesUpEvent nodesUp = (NodesUpEvent) event;
assertEquals("should be node " + newNode.getNodeName(), newNode.getNodeName(), nodesUp.getNodeNames().next());
// COLLECTIONS_ADDED
eventsListener.setExpectedType(ClusterEvent.EventType.COLLECTIONS_ADDED);
String collection = "testNodesEvent_collection";
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collection, "conf", 1, 1);
cluster.getSolrClient().request(create);
cluster.waitForActiveCollection(collection, 1, 1);
eventsListener.waitForExpectedEvent(30);
assertNotNull("should be COLLECTIONS_ADDED events", eventsListener.events.get(ClusterEvent.EventType.COLLECTIONS_ADDED));
events = eventsListener.events.get(ClusterEvent.EventType.COLLECTIONS_ADDED);
assertEquals("should be one COLLECTIONS_ADDED event", 1, events.size());
event = events.get(0);
assertEquals("should be COLLECTIONS_ADDED event type", ClusterEvent.EventType.COLLECTIONS_ADDED, event.getType());
CollectionsAddedEvent collectionsAdded = (CollectionsAddedEvent) event;
assertEquals("should be collection " + collection, collection, collectionsAdded.getCollectionNames().next());
// COLLECTIONS_REMOVED
eventsListener.setExpectedType(ClusterEvent.EventType.COLLECTIONS_REMOVED);
CollectionAdminRequest.Delete delete = CollectionAdminRequest.deleteCollection(collection);
cluster.getSolrClient().request(delete);
eventsListener.waitForExpectedEvent(30);
assertNotNull("should be COLLECTIONS_REMOVED events", eventsListener.events.get(ClusterEvent.EventType.COLLECTIONS_REMOVED));
events = eventsListener.events.get(ClusterEvent.EventType.COLLECTIONS_REMOVED);
assertEquals("should be one COLLECTIONS_REMOVED event", 1, events.size());
event = events.get(0);
assertEquals("should be COLLECTIONS_REMOVED event type", ClusterEvent.EventType.COLLECTIONS_REMOVED, event.getType());
CollectionsRemovedEvent collectionsRemoved = (CollectionsRemovedEvent) event;
assertEquals("should be collection " + collection, collection, collectionsRemoved.getCollectionNames().next());
// CLUSTER_CONFIG_CHANGED
eventsListener.events.clear();
eventsListener.setExpectedType(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED);
ClusterProperties clusterProperties = new ClusterProperties(cluster.getZkClient());
Map<String, Object> oldProps = new HashMap<>(clusterProperties.getClusterProperties());
clusterProperties.setClusterProperty("ext.foo", "bar");
eventsListener.waitForExpectedEvent(30);
assertNotNull("should be CLUSTER_CONFIG_CHANGED events", eventsListener.events.get(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED));
events = eventsListener.events.get(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED);
assertEquals("should be one CLUSTER_CONFIG_CHANGED event", 1, events.size());
event = events.get(0);
assertEquals("should be CLUSTER_CONFIG_CHANGED event type", ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED, event.getType());
ClusterPropertiesChangedEvent propertiesChanged = (ClusterPropertiesChangedEvent) event;
Map<String, Object> newProps = propertiesChanged.getNewClusterProperties();
assertEquals("new properties wrong value of the 'ext.foo' property: " + newProps,
"bar", newProps.get("ext.foo"));
// unset the property
eventsListener.events.clear();
eventsListener.setExpectedType(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED);
clusterProperties.setClusterProperty("ext.foo", null);
eventsListener.waitForExpectedEvent(30);
assertNotNull("should be CLUSTER_CONFIG_CHANGED events", eventsListener.events.get(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED));
events = eventsListener.events.get(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED);
assertEquals("should be one CLUSTER_CONFIG_CHANGED event", 1, events.size());
event = events.get(0);
assertEquals("should be CLUSTER_CONFIG_CHANGED event type", ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED, event.getType());
propertiesChanged = (ClusterPropertiesChangedEvent) event;
assertEquals("new properties should not have 'ext.foo' property: " + propertiesChanged.getNewClusterProperties(),
null, propertiesChanged.getNewClusterProperties().get("ext.foo"));
}
private static CountDownLatch dummyEventLatch = new CountDownLatch(1);
private static ClusterEvent lastEvent = null;
public static class DummyEventListener implements ClusterEventListener, ClusterSingleton {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
State state = State.STOPPED;
@Override
public void onEvent(ClusterEvent event) {
if (state != State.RUNNING) {
if (log.isDebugEnabled()) {
log.debug("skipped event, not running: {}", event);
}
return;
}
if (event.getType() == ClusterEvent.EventType.COLLECTIONS_ADDED ||
event.getType() == ClusterEvent.EventType.COLLECTIONS_REMOVED) {
if (log.isDebugEnabled()) {
log.debug("recorded event {}", Utils.toJSONString(event));
}
lastEvent = event;
dummyEventLatch.countDown();
} else {
if (log.isDebugEnabled()) {
log.debug("skipped event, wrong type: {}", event.getType());
}
}
}
@Override
public String getName() {
return "dummy";
}
@Override
public void start() throws Exception {
if (log.isDebugEnabled()) {
log.debug("starting {}", Integer.toHexString(hashCode()));
}
state = State.RUNNING;
}
@Override
public State getState() {
return state;
}
@Override
public void stop() {
if (log.isDebugEnabled()) {
log.debug("stopping {}", Integer.toHexString(hashCode()));
}
state = State.STOPPED;
}
@Override
public void close() throws IOException {
if (log.isDebugEnabled()) {
log.debug("closing {}", Integer.toHexString(hashCode()));
}
}
}
@Test
public void testListenerPlugins() throws Exception {
PluginMeta plugin = new PluginMeta();
plugin.klass = DefaultClusterEventProducer.class.getName();
plugin.name = ClusterEventProducer.PLUGIN_NAME;
V2Request req = new V2Request.Builder("/cluster/plugin")
.withMethod(POST)
.withPayload(Collections.singletonMap("add", plugin))
.build();
V2Response rsp = req.process(cluster.getSolrClient());
assertEquals(0, rsp.getStatus());
plugin = new PluginMeta();
plugin.name = "testplugin";
plugin.klass = DummyEventListener.class.getName();
req = new V2Request.Builder("/cluster/plugin")
.forceV2(true)
.withMethod(POST)
.withPayload(singletonMap("add", plugin))
.build();
rsp = req.process(cluster.getSolrClient());
//just check if the plugin is indeed registered
V2Request readPluginState = new V2Request.Builder("/cluster/plugin")
.forceV2(true)
.withMethod(GET)
.build();
rsp = readPluginState.process(cluster.getSolrClient());
assertEquals(DummyEventListener.class.getName(), rsp._getStr("/plugin/testplugin/class", null));
String collection = "testListenerPlugins_collection";
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collection, "conf", 1, 1);
cluster.getSolrClient().request(create);
cluster.waitForActiveCollection(collection, 1, 1);
boolean await = dummyEventLatch.await(30, TimeUnit.SECONDS);
if (!await) {
fail("Timed out waiting for COLLECTIONS_ADDED event, " + collection);
}
assertNotNull("lastEvent should be COLLECTIONS_ADDED", lastEvent);
assertEquals("lastEvent should be COLLECTIONS_ADDED", ClusterEvent.EventType.COLLECTIONS_ADDED, lastEvent.getType());
// verify timestamp
Instant now = Instant.now();
assertTrue("timestamp of the event is in the future", now.isAfter(lastEvent.getTimestamp()));
assertEquals(collection, ((CollectionsAddedEvent)lastEvent).getCollectionNames().next());
dummyEventLatch = new CountDownLatch(1);
lastEvent = null;
CollectionAdminRequest.Delete delete = CollectionAdminRequest.deleteCollection(collection);
cluster.getSolrClient().request(delete);
await = dummyEventLatch.await(30, TimeUnit.SECONDS);
if (!await) {
fail("Timed out waiting for COLLECTIONS_REMOVED event, " + collection);
}
assertNotNull("lastEvent should be COLLECTIONS_REMOVED", lastEvent);
assertEquals("lastEvent should be COLLECTIONS_REMOVED", ClusterEvent.EventType.COLLECTIONS_REMOVED, lastEvent.getType());
// verify timestamp
now = Instant.now();
assertTrue("timestamp of the event is in the future", now.isAfter(lastEvent.getTimestamp()));
assertEquals(collection, ((CollectionsRemovedEvent)lastEvent).getCollectionNames().next());
// test changing the ClusterEventProducer plugin dynamically
// remove the plugin (a NoOpProducer will be used instead)
req = new V2Request.Builder("/cluster/plugin")
.withMethod(POST)
.withPayload(Collections.singletonMap("remove", ClusterEventProducer.PLUGIN_NAME))
.build();
req.process(cluster.getSolrClient());
dummyEventLatch = new CountDownLatch(1);
lastEvent = null;
// should not receive any events now
cluster.getSolrClient().request(create);
cluster.waitForActiveCollection(collection, 1, 1);
await = dummyEventLatch.await(5, TimeUnit.SECONDS);
if (await) {
fail("should not receive any events but got " + lastEvent);
}
// reinstall the plugin
plugin = new PluginMeta();
plugin.klass = DefaultClusterEventProducer.class.getName();
plugin.name = ClusterEventProducer.PLUGIN_NAME;
req = new V2Request.Builder("/cluster/plugin")
.withMethod(POST)
.withPayload(Collections.singletonMap("add", plugin))
.build();
rsp = req.process(cluster.getSolrClient());
assertEquals(0, rsp.getStatus());
dummyEventLatch = new CountDownLatch(1);
lastEvent = null;
cluster.getSolrClient().request(delete);
await = dummyEventLatch.await(30, TimeUnit.SECONDS);
if (!await) {
fail("Timed out waiting for COLLECTIONS_REMOVED event, " + collection);
}
assertNotNull("lastEvent should be COLLECTIONS_REMOVED", lastEvent);
assertEquals("lastEvent should be COLLECTIONS_REMOVED", ClusterEvent.EventType.COLLECTIONS_REMOVED, lastEvent.getType());
}
}

View File

@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cluster.events.impl;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.V2Request;
import org.apache.solr.client.solrj.request.beans.PluginMeta;
import org.apache.solr.client.solrj.response.V2Response;
import org.apache.solr.cloud.ClusterSingleton;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.cluster.events.AllEventsListener;
import org.apache.solr.cluster.events.ClusterEvent;
import org.apache.solr.cluster.events.ClusterEventListener;
import org.apache.solr.cluster.events.ClusterEventProducer;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.util.LogLevel;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.apache.solr.client.solrj.SolrRequest.METHOD.POST;
/**
*
*/
@LogLevel("org.apache.solr.cluster.events=DEBUG")
public class CollectionsRepairEventListenerTest extends SolrCloudTestCase {
public static class CollectionsRepairWrapperListener implements ClusterEventListener, ClusterSingleton {
final CollectionsRepairEventListener delegate;
CountDownLatch completed = new CountDownLatch(1);
CollectionsRepairWrapperListener(CoreContainer cc, int waitFor) throws Exception {
delegate = new CollectionsRepairEventListener(cc);
delegate.setWaitForSecond(waitFor);
}
@Override
public void onEvent(ClusterEvent event) {
delegate.onEvent(event);
completed.countDown();
}
@Override
public String getName() {
return "wrapperListener";
}
@Override
public void start() throws Exception {
delegate.start();
}
@Override
public State getState() {
return delegate.getState();
}
@Override
public void stop() {
delegate.stop();
}
@Override
public void close() throws IOException {
delegate.close();
}
}
private static AllEventsListener eventsListener = new AllEventsListener();
private static CollectionsRepairWrapperListener repairListener;
private static int NUM_NODES = 3;
private static int waitFor;
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(NUM_NODES)
.addConfig("conf", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
.configure();
PluginMeta plugin = new PluginMeta();
plugin.klass = DefaultClusterEventProducer.class.getName();
plugin.name = ClusterEventProducer.PLUGIN_NAME;
V2Request req = new V2Request.Builder("/cluster/plugin")
.withMethod(POST)
.withPayload(Collections.singletonMap("add", plugin))
.build();
V2Response rsp = req.process(cluster.getSolrClient());
assertNotNull(rsp);
waitFor = 1 + random().nextInt(9);
CoreContainer cc = cluster.getOpenOverseer().getCoreContainer();
cc.getClusterEventProducer()
.registerListener(eventsListener, ClusterEvent.EventType.values());
repairListener = new CollectionsRepairWrapperListener(cc, waitFor);
cc.getClusterEventProducer()
.registerListener(repairListener, ClusterEvent.EventType.NODES_DOWN);
repairListener.start();
}
@Before
public void setUp() throws Exception {
super.setUp();
cluster.deleteAllCollections();
}
@Test
public void testCollectionRepair() throws Exception {
eventsListener.setExpectedType(ClusterEvent.EventType.COLLECTIONS_ADDED);
String collection = "testCollectionRepair_collection";
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collection, "conf", 1, 3);
cluster.getSolrClient().request(create);
cluster.waitForActiveCollection(collection, 1, 3);
eventsListener.waitForExpectedEvent(10);
eventsListener.setExpectedType(ClusterEvent.EventType.NODES_DOWN);
// don't kill Overseer
JettySolrRunner nonOverseerJetty = null;
for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
if (cluster.getOpenOverseer().getCoreContainer().getZkController().getNodeName().equals(jetty.getNodeName())) {
continue;
}
nonOverseerJetty = jetty;
break;
}
String nodeName = nonOverseerJetty.getNodeName();
cluster.stopJettySolrRunner(nonOverseerJetty);
cluster.waitForJettyToStop(nonOverseerJetty);
eventsListener.waitForExpectedEvent(10);
cluster.waitForActiveCollection(collection, 1, 2);
Thread.sleep(TimeUnit.MILLISECONDS.convert(waitFor, TimeUnit.SECONDS));
// wait for completed processing in the repair listener
boolean await = repairListener.completed.await(60, TimeUnit.SECONDS);
if (!await) {
fail("Timeout waiting for the processing to complete");
}
cluster.waitForActiveCollection(collection, 1, 3);
}
}

View File

@ -93,13 +93,6 @@ public class TestContainerPlugin extends SolrCloudTestCase {
.build();
expectError(req, cluster.getSolrClient(), errPath, "No method with @Command in class");
//test with an invalid class
// XXX (ab) in order to support ClusterSingleton we allow adding
// plugins without Api EndPoints
// plugin.klass = C1.class.getName();
// expectError(req, cluster.getSolrClient(), errPath, "No @EndPoints");
//test with a valid class. This should succeed now
plugin.klass = C3.class.getName();
req.process(cluster.getSolrClient());

View File

@ -0,0 +1,81 @@
= Container Plugins
:toc: macro
:toclevels: 3
|===
| Initial version released| 2020, November 2|Andrzej Białecki
|===
toc::[]
== Container plugins subsystem
Container plugins are pluggable components that are defined and instantiated at the
`CoreContainer` (node) level. The components usually provide an admin-level API for
additional functionality at the Solr node level.
=== Plugin configurations
Plugin configurations are maintained in ZooKeeper in the `/clusterprops.json` file, under
the `plugin` entry. The configuration is a JSON map where keys are the plugin names, and
values are serialized `org.apache.solr.client.solrj.request.beans.PluginMeta` beans.
== Types of container plugins
=== Plugin life-cycle
Plugin instances are loaded and initialized when Solr's `CoreContainer` is first created.
Then on each update of the plugin configurations the existing plugin configs are compared
with the new configs, and plugin instances are respectively created, removed, or
replaced (i.e. removed and added using the new configuration).
If a plugin implementation class has a constructor that accepts an instance of
`CoreContainer` then it is instantiated using this constructor, and the current instance
of `CoreContainer` is passed as argument.
=== PluginRegistryListener
Components that need to be aware of changes in plugin configurations or registration can
implement `org.apache.solr.api.ContainerPluginsRegistry.PluginRegistryListener` and register
it with the instance of registry available from `coreContainer.getContainerPluginsRegistry()`.
=== ClusterSingleton plugins
Plugins that implement `ClusterSingleton` interface are instantiated on each
Solr node. However, their start/stop life-cycle, as defined in the interface,
is controlled in such a way that only a single running instance of the plugin
is present in the cluster at any time.
(Currently this is implemented by re-using the Overseer leader election, so all
`ClusterSingleton`-s that are in the RUNNING state execute on the Overseer leader).
Any plugin type can implement this interface to indicate to Solr that
it requires this cluster singleton behavior.
// explain plugins that register Api-s
// explain plugins that don't implement any Api
=== ClusterEventProducer plugin
In order to support the generation of cluster-level events an implementation of
`ClusterEventProducer` is created on each Solr node. This component is also a
`ClusterSingleton`, which means that only one active instance is present in the
cluster at any time.
If no plugin configuration is specified then the default implementation
`org.apache.solr.cluster.events.impl.DefaultClusterEventProducer` is used. A no-op
implementation is also available in `org.apache.solr.cluster.events.NoOpProducer`.
=== ClusterEventListener plugins
Plugins that implement the `ClusterEventListener` interface will be automatically
registered with the instance of `ClusterEventProducer`.
== Plugin management API
== Predefined plugin names
Plugins with these names are used in specific parts of Solr. Their names are reserved
and cannot be used for other plugin types:
// XXX uncomment when we move the config to plugins
//`placement-plugin`::
//plugin that implements `PlacementPlugin` interface. This type of plugin
//determines the replica placement strategy in the cluster.
`cluster-event-producer`::
plugin that implements `ClusterEventProducer` interface. This type of plugin
is used for generating cluster-level events.