ARTEMIS-2019 - Seperate ServerPlugin Interfaces
Seperate plugin interface by area, all extending a base interface. Update code to check and call only plugins implementing specific interfaces. Existing interface extends all the new interfaces for back compatibility or those who want simplicity and don't care about perf.
This commit is contained in:
parent
bf073f19ac
commit
19e1bbeb49
|
@ -23,6 +23,16 @@ import java.util.Map;
|
|||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBridgePlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConnectionPlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerCriticalPlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
|
||||
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
|
||||
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
|
||||
|
@ -33,7 +43,6 @@ import org.apache.activemq.artemis.core.security.Role;
|
|||
import org.apache.activemq.artemis.core.server.JournalType;
|
||||
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
|
||||
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
|
||||
|
||||
|
@ -1129,20 +1138,65 @@ public interface Configuration {
|
|||
/**
|
||||
* @param plugins
|
||||
*/
|
||||
void registerBrokerPlugins(List<ActiveMQServerPlugin> plugins);
|
||||
void registerBrokerPlugins(List<ActiveMQServerBasePlugin> plugins);
|
||||
|
||||
/**
|
||||
* @param plugin
|
||||
*/
|
||||
void registerBrokerPlugin(ActiveMQServerPlugin plugin);
|
||||
void registerBrokerPlugin(ActiveMQServerBasePlugin plugin);
|
||||
|
||||
/**
|
||||
* @param plugin
|
||||
*/
|
||||
void unRegisterBrokerPlugin(ActiveMQServerPlugin plugin);
|
||||
void unRegisterBrokerPlugin(ActiveMQServerBasePlugin plugin);
|
||||
|
||||
/**
|
||||
* @return
|
||||
*/
|
||||
List<ActiveMQServerPlugin> getBrokerPlugins();
|
||||
List<ActiveMQServerBasePlugin> getBrokerPlugins();
|
||||
|
||||
/**
|
||||
* @return
|
||||
*/
|
||||
List<ActiveMQServerConnectionPlugin> getBrokerConnectionPlugins();
|
||||
|
||||
/**
|
||||
* @return
|
||||
*/
|
||||
List<ActiveMQServerSessionPlugin> getBrokerSessionPlugins();
|
||||
|
||||
/**
|
||||
* @return
|
||||
*/
|
||||
List<ActiveMQServerConsumerPlugin> getBrokerConsumerPlugins();
|
||||
|
||||
/**
|
||||
* @return
|
||||
*/
|
||||
List<ActiveMQServerAddressPlugin> getBrokerAddressPlugins();
|
||||
|
||||
/**
|
||||
* @return
|
||||
*/
|
||||
List<ActiveMQServerQueuePlugin> getBrokerQueuePlugins();
|
||||
|
||||
/**
|
||||
* @return
|
||||
*/
|
||||
List<ActiveMQServerBindingPlugin> getBrokerBindingPlugins();
|
||||
|
||||
/**
|
||||
* @return
|
||||
*/
|
||||
List<ActiveMQServerMessagePlugin> getBrokerMessagePlugins();
|
||||
|
||||
/**
|
||||
* @return
|
||||
*/
|
||||
List<ActiveMQServerBridgePlugin> getBrokerBridgePlugins();
|
||||
|
||||
/**
|
||||
* @return
|
||||
*/
|
||||
List<ActiveMQServerCriticalPlugin> getBrokerCriticalPlugins();
|
||||
}
|
||||
|
|
|
@ -42,6 +42,16 @@ import java.util.concurrent.CopyOnWriteArrayList;
|
|||
|
||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBridgePlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConnectionPlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerCriticalPlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
|
||||
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
|
||||
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
|
||||
|
@ -66,7 +76,6 @@ import org.apache.activemq.artemis.core.server.JournalType;
|
|||
import org.apache.activemq.artemis.core.server.NetworkHealthCheck;
|
||||
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
|
||||
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
|
||||
import org.apache.activemq.artemis.utils.Env;
|
||||
|
@ -244,7 +253,16 @@ public class ConfigurationImpl implements Configuration, Serializable {
|
|||
|
||||
private List<SecuritySettingPlugin> securitySettingPlugins = new ArrayList<>();
|
||||
|
||||
private final List<ActiveMQServerPlugin> brokerPlugins = new CopyOnWriteArrayList<>();
|
||||
private final List<ActiveMQServerBasePlugin> brokerPlugins = new CopyOnWriteArrayList<>();
|
||||
private final List<ActiveMQServerConnectionPlugin> brokerConnectionPlugins = new CopyOnWriteArrayList<>();
|
||||
private final List<ActiveMQServerSessionPlugin> brokerSessionPlugins = new CopyOnWriteArrayList<>();
|
||||
private final List<ActiveMQServerConsumerPlugin> brokerConsumerPlugins = new CopyOnWriteArrayList<>();
|
||||
private final List<ActiveMQServerAddressPlugin> brokerAddressPlugins = new CopyOnWriteArrayList<>();
|
||||
private final List<ActiveMQServerQueuePlugin> brokerQueuePlugins = new CopyOnWriteArrayList<>();
|
||||
private final List<ActiveMQServerBindingPlugin> brokerBindingPlugins = new CopyOnWriteArrayList<>();
|
||||
private final List<ActiveMQServerMessagePlugin> brokerMessagePlugins = new CopyOnWriteArrayList<>();
|
||||
private final List<ActiveMQServerBridgePlugin> brokerBridgePlugins = new CopyOnWriteArrayList<>();
|
||||
private final List<ActiveMQServerCriticalPlugin> brokerCriticalPlugins = new CopyOnWriteArrayList<>();
|
||||
|
||||
private Map<String, Set<String>> securityRoleNameMappings = new HashMap<>();
|
||||
|
||||
|
@ -1371,25 +1389,124 @@ public class ConfigurationImpl implements Configuration, Serializable {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void registerBrokerPlugins(final List<ActiveMQServerPlugin> plugins) {
|
||||
brokerPlugins.addAll(plugins);
|
||||
public void registerBrokerPlugins(final List<ActiveMQServerBasePlugin> plugins) {
|
||||
plugins.forEach(plugin -> registerBrokerPlugin(plugin));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerBrokerPlugin(final ActiveMQServerPlugin plugin) {
|
||||
public void registerBrokerPlugin(final ActiveMQServerBasePlugin plugin) {
|
||||
brokerPlugins.add(plugin);
|
||||
if (plugin instanceof ActiveMQServerConnectionPlugin) {
|
||||
brokerConnectionPlugins.add((ActiveMQServerConnectionPlugin) plugin);
|
||||
}
|
||||
if (plugin instanceof ActiveMQServerSessionPlugin) {
|
||||
brokerSessionPlugins.add((ActiveMQServerSessionPlugin) plugin);
|
||||
}
|
||||
if (plugin instanceof ActiveMQServerConsumerPlugin) {
|
||||
brokerConsumerPlugins.add((ActiveMQServerConsumerPlugin) plugin);
|
||||
}
|
||||
if (plugin instanceof ActiveMQServerAddressPlugin) {
|
||||
brokerAddressPlugins.add((ActiveMQServerAddressPlugin) plugin);
|
||||
}
|
||||
if (plugin instanceof ActiveMQServerQueuePlugin) {
|
||||
brokerQueuePlugins.add((ActiveMQServerQueuePlugin) plugin);
|
||||
}
|
||||
if (plugin instanceof ActiveMQServerBindingPlugin) {
|
||||
brokerBindingPlugins.add((ActiveMQServerBindingPlugin) plugin);
|
||||
}
|
||||
if (plugin instanceof ActiveMQServerMessagePlugin) {
|
||||
brokerMessagePlugins.add((ActiveMQServerMessagePlugin) plugin);
|
||||
}
|
||||
if (plugin instanceof ActiveMQServerBridgePlugin) {
|
||||
brokerBridgePlugins.add((ActiveMQServerBridgePlugin) plugin);
|
||||
}
|
||||
if (plugin instanceof ActiveMQServerCriticalPlugin) {
|
||||
brokerCriticalPlugins.add((ActiveMQServerCriticalPlugin) plugin);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unRegisterBrokerPlugin(final ActiveMQServerPlugin plugin) {
|
||||
public void unRegisterBrokerPlugin(final ActiveMQServerBasePlugin plugin) {
|
||||
brokerPlugins.remove(plugin);
|
||||
if (plugin instanceof ActiveMQServerConnectionPlugin) {
|
||||
brokerConnectionPlugins.remove(plugin);
|
||||
}
|
||||
if (plugin instanceof ActiveMQServerSessionPlugin) {
|
||||
brokerSessionPlugins.remove(plugin);
|
||||
}
|
||||
if (plugin instanceof ActiveMQServerConsumerPlugin) {
|
||||
brokerConsumerPlugins.remove(plugin);
|
||||
}
|
||||
if (plugin instanceof ActiveMQServerAddressPlugin) {
|
||||
brokerAddressPlugins.remove(plugin);
|
||||
}
|
||||
if (plugin instanceof ActiveMQServerQueuePlugin) {
|
||||
brokerQueuePlugins.remove(plugin);
|
||||
}
|
||||
if (plugin instanceof ActiveMQServerBindingPlugin) {
|
||||
brokerBindingPlugins.remove(plugin);
|
||||
}
|
||||
if (plugin instanceof ActiveMQServerMessagePlugin) {
|
||||
brokerMessagePlugins.remove(plugin);
|
||||
}
|
||||
if (plugin instanceof ActiveMQServerBridgePlugin) {
|
||||
brokerBridgePlugins.remove(plugin);
|
||||
}
|
||||
if (plugin instanceof ActiveMQServerCriticalPlugin) {
|
||||
brokerCriticalPlugins.remove(plugin);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ActiveMQServerPlugin> getBrokerPlugins() {
|
||||
public List<ActiveMQServerBasePlugin> getBrokerPlugins() {
|
||||
return brokerPlugins;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ActiveMQServerConnectionPlugin> getBrokerConnectionPlugins() {
|
||||
return brokerConnectionPlugins;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ActiveMQServerSessionPlugin> getBrokerSessionPlugins() {
|
||||
return brokerSessionPlugins;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ActiveMQServerConsumerPlugin> getBrokerConsumerPlugins() {
|
||||
return brokerConsumerPlugins;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ActiveMQServerAddressPlugin> getBrokerAddressPlugins() {
|
||||
return brokerAddressPlugins;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ActiveMQServerQueuePlugin> getBrokerQueuePlugins() {
|
||||
return brokerQueuePlugins;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ActiveMQServerBindingPlugin> getBrokerBindingPlugins() {
|
||||
return brokerBindingPlugins;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ActiveMQServerMessagePlugin> getBrokerMessagePlugins() {
|
||||
return brokerMessagePlugins;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ActiveMQServerBridgePlugin> getBrokerBridgePlugins() {
|
||||
return brokerBridgePlugins;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ActiveMQServerCriticalPlugin> getBrokerCriticalPlugins() {
|
||||
return brokerCriticalPlugins;
|
||||
}
|
||||
|
||||
@Override
|
||||
public File getBrokerInstance() {
|
||||
if (artemisInstance != null) {
|
||||
|
|
|
@ -435,8 +435,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
|
||||
private boolean internalAddressInfo(AddressInfo addressInfo, boolean reload) throws Exception {
|
||||
synchronized (addressLock) {
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.beforeAddAddress(addressInfo, reload));
|
||||
if (server.hasBrokerAddressPlugins()) {
|
||||
server.callBrokerAddressPlugins(plugin -> plugin.beforeAddAddress(addressInfo, reload));
|
||||
}
|
||||
|
||||
boolean result;
|
||||
|
@ -451,8 +451,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
if (!addressInfo.isInternal()) {
|
||||
managementService.registerAddress(addressInfo);
|
||||
}
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.afterAddAddress(addressInfo, reload));
|
||||
if (server.hasBrokerAddressPlugins()) {
|
||||
server.callBrokerAddressPlugins(plugin -> plugin.afterAddAddress(addressInfo, reload));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
|
@ -552,13 +552,13 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
public AddressInfo updateAddressInfo(SimpleString addressName,
|
||||
EnumSet<RoutingType> routingTypes) throws Exception {
|
||||
synchronized (addressLock) {
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.beforeUpdateAddress(addressName, routingTypes));
|
||||
if (server.hasBrokerAddressPlugins()) {
|
||||
server.callBrokerAddressPlugins(plugin -> plugin.beforeUpdateAddress(addressName, routingTypes));
|
||||
}
|
||||
|
||||
final AddressInfo address = addressManager.updateAddressInfo(addressName, routingTypes);
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.afterUpdateAddress(address));
|
||||
if (server.hasBrokerAddressPlugins()) {
|
||||
server.callBrokerAddressPlugins(plugin -> plugin.afterUpdateAddress(address));
|
||||
}
|
||||
|
||||
return address;
|
||||
|
@ -574,8 +574,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
@Override
|
||||
public AddressInfo removeAddressInfo(SimpleString address, boolean force) throws Exception {
|
||||
synchronized (addressLock) {
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.beforeRemoveAddress(address));
|
||||
if (server.hasBrokerAddressPlugins()) {
|
||||
server.callBrokerAddressPlugins(plugin -> plugin.beforeRemoveAddress(address));
|
||||
}
|
||||
|
||||
final Bindings bindingsForAddress = getDirectBindings(address);
|
||||
|
@ -593,8 +593,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
}
|
||||
managementService.unregisterAddress(address);
|
||||
final AddressInfo addressInfo = addressManager.removeAddressInfo(address);
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.afterRemoveAddress(address, addressInfo));
|
||||
if (server.hasBrokerAddressPlugins()) {
|
||||
server.callBrokerAddressPlugins(plugin -> plugin.afterRemoveAddress(address, addressInfo));
|
||||
}
|
||||
|
||||
return addressInfo;
|
||||
|
@ -628,8 +628,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
// even though failover is complete
|
||||
@Override
|
||||
public synchronized void addBinding(final Binding binding) throws Exception {
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.beforeAddBinding(binding));
|
||||
if (server.hasBrokerBindingPlugins()) {
|
||||
server.callBrokerBindingPlugins(plugin -> plugin.beforeAddBinding(binding));
|
||||
}
|
||||
|
||||
addressManager.addBinding(binding);
|
||||
|
@ -662,8 +662,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
|
||||
managementService.sendNotification(new Notification(uid, CoreNotificationType.BINDING_ADDED, props));
|
||||
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.afterAddBinding(binding));
|
||||
if (server.hasBrokerBindingPlugins()) {
|
||||
server.callBrokerBindingPlugins(plugin -> plugin.afterAddBinding(binding));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -673,8 +673,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
Transaction tx,
|
||||
boolean deleteData) throws Exception {
|
||||
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.beforeRemoveBinding(uniqueName, tx, deleteData));
|
||||
if (server.hasBrokerBindingPlugins()) {
|
||||
server.callBrokerBindingPlugins(plugin -> plugin.beforeRemoveBinding(uniqueName, tx, deleteData));
|
||||
}
|
||||
|
||||
addressSettingsRepository.clearCache();
|
||||
|
@ -722,8 +722,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
|
||||
binding.close();
|
||||
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.afterRemoveBinding(binding, tx, deleteData) );
|
||||
if (server.hasBrokerBindingPlugins()) {
|
||||
server.callBrokerBindingPlugins(plugin -> plugin.afterRemoveBinding(binding, tx, deleteData) );
|
||||
}
|
||||
|
||||
return binding;
|
||||
|
@ -869,8 +869,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
}
|
||||
}
|
||||
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.beforeMessageRoute(message, context, direct, rejectDuplicates));
|
||||
if (server.hasBrokerMessagePlugins()) {
|
||||
server.callBrokerMessagePlugins(plugin -> plugin.beforeMessageRoute(message, context, direct, rejectDuplicates));
|
||||
}
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
|
@ -935,8 +935,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
context.getTransaction().commit();
|
||||
}
|
||||
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, result));
|
||||
if (server.hasBrokerMessagePlugins()) {
|
||||
server.callBrokerMessagePlugins(plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, result));
|
||||
}
|
||||
|
||||
return result;
|
||||
|
|
|
@ -515,8 +515,8 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
|
|||
|
||||
ConnectionEntry entry = protocol.createConnectionEntry((Acceptor) component, connection);
|
||||
try {
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.afterCreateConnection(entry.connection));
|
||||
if (server.hasBrokerConnectionPlugins()) {
|
||||
server.callBrokerConnectionPlugins(plugin -> plugin.afterCreateConnection(entry.connection));
|
||||
}
|
||||
} catch (ActiveMQException t) {
|
||||
logger.warn("Error executing afterCreateConnection plugin method: {}", t.getMessage(), t);
|
||||
|
@ -549,8 +549,8 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
|
|||
RemotingConnection removedConnection = removeConnection(connectionID);
|
||||
if (removedConnection != null) {
|
||||
try {
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.afterDestroyConnection(removedConnection));
|
||||
if (server.hasBrokerConnectionPlugins()) {
|
||||
server.callBrokerConnectionPlugins(plugin -> plugin.afterDestroyConnection(removedConnection));
|
||||
}
|
||||
} catch (ActiveMQException t) {
|
||||
logger.warn("Error executing afterDestroyConnection plugin method: {}", t.getMessage(), t);
|
||||
|
|
|
@ -52,7 +52,16 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
|||
import org.apache.activemq.artemis.core.server.impl.ConnectorsService;
|
||||
import org.apache.activemq.artemis.core.server.management.ManagementService;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBridgePlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConnectionPlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerCriticalPlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
|
||||
import org.apache.activemq.artemis.core.server.reload.ReloadManager;
|
||||
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
|
@ -225,18 +234,72 @@ public interface ActiveMQServer extends ServiceComponent {
|
|||
*/
|
||||
void callPostQueueDeletionCallbacks(SimpleString address, SimpleString queueName) throws Exception;
|
||||
|
||||
void registerBrokerPlugin(ActiveMQServerPlugin plugin);
|
||||
void registerBrokerPlugin(ActiveMQServerBasePlugin plugin);
|
||||
|
||||
void unRegisterBrokerPlugin(ActiveMQServerPlugin plugin);
|
||||
void unRegisterBrokerPlugin(ActiveMQServerBasePlugin plugin);
|
||||
|
||||
void registerBrokerPlugins(List<ActiveMQServerPlugin> plugins);
|
||||
void registerBrokerPlugins(List<ActiveMQServerBasePlugin> plugins);
|
||||
|
||||
List<ActiveMQServerPlugin> getBrokerPlugins();
|
||||
List<ActiveMQServerBasePlugin> getBrokerPlugins();
|
||||
|
||||
List<ActiveMQServerConnectionPlugin> getBrokerConnectionPlugins();
|
||||
|
||||
List<ActiveMQServerSessionPlugin> getBrokerSessionPlugins();
|
||||
|
||||
List<ActiveMQServerConsumerPlugin> getBrokerConsumerPlugins();
|
||||
|
||||
List<ActiveMQServerAddressPlugin> getBrokerAddressPlugins();
|
||||
|
||||
List<ActiveMQServerQueuePlugin> getBrokerQueuePlugins();
|
||||
|
||||
List<ActiveMQServerBindingPlugin> getBrokerBindingPlugins();
|
||||
|
||||
List<ActiveMQServerMessagePlugin> getBrokerMessagePlugins();
|
||||
|
||||
List<ActiveMQServerBridgePlugin> getBrokerBridgePlugins();
|
||||
|
||||
List<ActiveMQServerCriticalPlugin> getBrokerCriticalPlugins();
|
||||
|
||||
void callBrokerPlugins(ActiveMQPluginRunnable pluginRun) throws ActiveMQException;
|
||||
|
||||
void callBrokerConnectionPlugins(ActiveMQPluginRunnable<ActiveMQServerConnectionPlugin> pluginRun) throws ActiveMQException;
|
||||
|
||||
void callBrokerSessionPlugins(ActiveMQPluginRunnable<ActiveMQServerSessionPlugin> pluginRun) throws ActiveMQException;
|
||||
|
||||
void callBrokerConsumerPlugins(ActiveMQPluginRunnable<ActiveMQServerConsumerPlugin> pluginRun) throws ActiveMQException;
|
||||
|
||||
void callBrokerAddressPlugins(ActiveMQPluginRunnable<ActiveMQServerAddressPlugin> pluginRun) throws ActiveMQException;
|
||||
|
||||
void callBrokerQueuePlugins(ActiveMQPluginRunnable<ActiveMQServerQueuePlugin> pluginRun) throws ActiveMQException;
|
||||
|
||||
void callBrokerBindingPlugins(ActiveMQPluginRunnable<ActiveMQServerBindingPlugin> pluginRun) throws ActiveMQException;
|
||||
|
||||
void callBrokerMessagePlugins(ActiveMQPluginRunnable<ActiveMQServerMessagePlugin> pluginRun) throws ActiveMQException;
|
||||
|
||||
void callBrokerBridgePlugins(ActiveMQPluginRunnable<ActiveMQServerBridgePlugin> pluginRun) throws ActiveMQException;
|
||||
|
||||
void callBrokerCriticalPlugins(ActiveMQPluginRunnable<ActiveMQServerCriticalPlugin> pluginRun) throws ActiveMQException;
|
||||
|
||||
boolean hasBrokerPlugins();
|
||||
|
||||
boolean hasBrokerConnectionPlugins();
|
||||
|
||||
boolean hasBrokerSessionPlugins();
|
||||
|
||||
boolean hasBrokerConsumerPlugins();
|
||||
|
||||
boolean hasBrokerAddressPlugins();
|
||||
|
||||
boolean hasBrokerQueuePlugins();
|
||||
|
||||
boolean hasBrokerBindingPlugins();
|
||||
|
||||
boolean hasBrokerMessagePlugins();
|
||||
|
||||
boolean hasBrokerBridgePlugins();
|
||||
|
||||
boolean hasBrokerCriticalPlugins();
|
||||
|
||||
void checkQueueCreationLimit(String username) throws Exception;
|
||||
|
||||
ServerSession createSession(String name,
|
||||
|
|
|
@ -406,8 +406,8 @@ public final class ClusterManager implements ActiveMQComponent {
|
|||
return;
|
||||
}
|
||||
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.beforeDeployBridge(config));
|
||||
if (server.hasBrokerBridgePlugins()) {
|
||||
server.callBrokerBridgePlugins(plugin -> plugin.beforeDeployBridge(config));
|
||||
}
|
||||
|
||||
Queue queue = (Queue) binding.getBindable();
|
||||
|
@ -483,8 +483,8 @@ public final class ClusterManager implements ActiveMQComponent {
|
|||
|
||||
bridge.start();
|
||||
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.afterDeployBridge(bridge));
|
||||
if (server.hasBrokerBridgePlugins()) {
|
||||
server.callBrokerBridgePlugins(plugin -> plugin.afterDeployBridge(bridge));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -522,8 +522,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
|||
pendingAcks.countDown();
|
||||
metrics.incrementMessagesAcknowledged();
|
||||
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.afterAcknowledgeBridge(this, ref));
|
||||
if (server.hasBrokerBridgePlugins()) {
|
||||
server.callBrokerBridgePlugins(plugin -> plugin.afterAcknowledgeBridge(this, ref));
|
||||
}
|
||||
} else {
|
||||
if (logger.isTraceEnabled()) {
|
||||
|
@ -618,8 +618,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
|||
pendingAcks.countUp();
|
||||
|
||||
try {
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.beforeDeliverBridge(this, ref));
|
||||
if (server.hasBrokerBridgePlugins()) {
|
||||
server.callBrokerBridgePlugins(plugin -> plugin.beforeDeliverBridge(this, ref));
|
||||
}
|
||||
|
||||
final HandleStatus status;
|
||||
|
@ -636,8 +636,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
|||
metrics.incrementMessagesPendingAcknowledgement();
|
||||
}
|
||||
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.afterDeliverBridge(this, ref, status));
|
||||
if (server.hasBrokerBridgePlugins()) {
|
||||
server.callBrokerBridgePlugins(plugin -> plugin.afterDeliverBridge(this, ref, status));
|
||||
}
|
||||
|
||||
return status;
|
||||
|
|
|
@ -149,7 +149,16 @@ import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager;
|
|||
import org.apache.activemq.artemis.core.server.management.ManagementService;
|
||||
import org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBridgePlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConnectionPlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerCriticalPlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
|
||||
import org.apache.activemq.artemis.core.server.reload.ReloadCallback;
|
||||
import org.apache.activemq.artemis.core.server.reload.ReloadManager;
|
||||
import org.apache.activemq.artemis.core.server.reload.ReloadManagerImpl;
|
||||
|
@ -655,7 +664,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.criticalFailure(criticalComponent) : null);
|
||||
if (hasBrokerCriticalPlugins()) {
|
||||
callBrokerCriticalPlugins(plugin -> plugin.criticalFailure(criticalComponent));
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
}
|
||||
|
@ -1412,14 +1423,17 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
|
||||
checkSessionLimit(validatedUser);
|
||||
|
||||
callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeCreateSession(name, username, minLargeMessageSize, connection,
|
||||
autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, autoCreateQueues, context, prefixes) : null);
|
||||
|
||||
if (hasBrokerSessionPlugins()) {
|
||||
callBrokerSessionPlugins(plugin -> plugin.beforeCreateSession(name, username, minLargeMessageSize, connection,
|
||||
autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, autoCreateQueues, context, prefixes));
|
||||
}
|
||||
final ServerSessionImpl session = internalCreateSession(name, username, password, validatedUser, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues, prefixes);
|
||||
|
||||
sessions.put(name, session);
|
||||
|
||||
callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterCreateSession(session) : null);
|
||||
if (hasBrokerSessionPlugins()) {
|
||||
callBrokerSessionPlugins(plugin -> plugin.afterCreateSession(session));
|
||||
}
|
||||
|
||||
return session;
|
||||
}
|
||||
|
@ -1898,8 +1912,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
return;
|
||||
}
|
||||
|
||||
callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeDestroyQueue(queueName, session, checkConsumerCount,
|
||||
removeConsumers, autoDeleteAddress) : null);
|
||||
if (hasBrokerQueuePlugins()) {
|
||||
callBrokerQueuePlugins(plugin -> plugin.beforeDestroyQueue(queueName, session, checkConsumerCount,
|
||||
removeConsumers, autoDeleteAddress));
|
||||
}
|
||||
|
||||
addressSettingsRepository.clearCache();
|
||||
|
||||
|
@ -1930,8 +1946,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
|
||||
queue.deleteQueue(removeConsumers);
|
||||
|
||||
callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterDestroyQueue(queue, address, session, checkConsumerCount,
|
||||
removeConsumers, autoDeleteAddress) : null);
|
||||
if (hasBrokerQueuePlugins()) {
|
||||
callBrokerQueuePlugins(plugin -> plugin.afterDestroyQueue(queue, address, session, checkConsumerCount,
|
||||
removeConsumers, autoDeleteAddress));
|
||||
}
|
||||
AddressInfo addressInfo = getAddressInfo(address);
|
||||
|
||||
if (autoDeleteAddress && postOffice != null && addressInfo != null && addressInfo.isAutoCreated()) {
|
||||
|
@ -2008,32 +2026,126 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void registerBrokerPlugins(final List<ActiveMQServerPlugin> plugins) {
|
||||
public void registerBrokerPlugins(final List<ActiveMQServerBasePlugin> plugins) {
|
||||
configuration.registerBrokerPlugins(plugins);
|
||||
plugins.forEach(plugin -> plugin.registered(this));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerBrokerPlugin(final ActiveMQServerPlugin plugin) {
|
||||
public void registerBrokerPlugin(final ActiveMQServerBasePlugin plugin) {
|
||||
configuration.registerBrokerPlugin(plugin);
|
||||
plugin.registered(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unRegisterBrokerPlugin(final ActiveMQServerPlugin plugin) {
|
||||
public void unRegisterBrokerPlugin(final ActiveMQServerBasePlugin plugin) {
|
||||
configuration.unRegisterBrokerPlugin(plugin);
|
||||
plugin.unregistered(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ActiveMQServerPlugin> getBrokerPlugins() {
|
||||
public List<ActiveMQServerBasePlugin> getBrokerPlugins() {
|
||||
return configuration.getBrokerPlugins();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ActiveMQServerConnectionPlugin> getBrokerConnectionPlugins() {
|
||||
return configuration.getBrokerConnectionPlugins();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ActiveMQServerSessionPlugin> getBrokerSessionPlugins() {
|
||||
return configuration.getBrokerSessionPlugins();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ActiveMQServerConsumerPlugin> getBrokerConsumerPlugins() {
|
||||
return configuration.getBrokerConsumerPlugins();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ActiveMQServerAddressPlugin> getBrokerAddressPlugins() {
|
||||
return configuration.getBrokerAddressPlugins();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ActiveMQServerQueuePlugin> getBrokerQueuePlugins() {
|
||||
return configuration.getBrokerQueuePlugins();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ActiveMQServerBindingPlugin> getBrokerBindingPlugins() {
|
||||
return configuration.getBrokerBindingPlugins();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ActiveMQServerMessagePlugin> getBrokerMessagePlugins() {
|
||||
return configuration.getBrokerMessagePlugins();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ActiveMQServerBridgePlugin> getBrokerBridgePlugins() {
|
||||
return configuration.getBrokerBridgePlugins();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ActiveMQServerCriticalPlugin> getBrokerCriticalPlugins() {
|
||||
return configuration.getBrokerCriticalPlugins();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void callBrokerPlugins(final ActiveMQPluginRunnable pluginRun) throws ActiveMQException {
|
||||
callBrokerPlugins(getBrokerPlugins(), pluginRun);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void callBrokerConnectionPlugins(final ActiveMQPluginRunnable<ActiveMQServerConnectionPlugin> pluginRun) throws ActiveMQException {
|
||||
callBrokerPlugins(getBrokerConnectionPlugins(), pluginRun);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void callBrokerSessionPlugins(final ActiveMQPluginRunnable<ActiveMQServerSessionPlugin> pluginRun) throws ActiveMQException {
|
||||
callBrokerPlugins(getBrokerSessionPlugins(), pluginRun);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void callBrokerConsumerPlugins(final ActiveMQPluginRunnable<ActiveMQServerConsumerPlugin> pluginRun) throws ActiveMQException {
|
||||
callBrokerPlugins(getBrokerConsumerPlugins(), pluginRun);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void callBrokerAddressPlugins(final ActiveMQPluginRunnable<ActiveMQServerAddressPlugin> pluginRun) throws ActiveMQException {
|
||||
callBrokerPlugins(getBrokerAddressPlugins(), pluginRun);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void callBrokerQueuePlugins(final ActiveMQPluginRunnable<ActiveMQServerQueuePlugin> pluginRun) throws ActiveMQException {
|
||||
callBrokerPlugins(getBrokerQueuePlugins(), pluginRun);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void callBrokerBindingPlugins(final ActiveMQPluginRunnable<ActiveMQServerBindingPlugin> pluginRun) throws ActiveMQException {
|
||||
callBrokerPlugins(getBrokerBindingPlugins(), pluginRun);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void callBrokerMessagePlugins(final ActiveMQPluginRunnable<ActiveMQServerMessagePlugin> pluginRun) throws ActiveMQException {
|
||||
callBrokerPlugins(getBrokerMessagePlugins(), pluginRun);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void callBrokerBridgePlugins(final ActiveMQPluginRunnable<ActiveMQServerBridgePlugin> pluginRun) throws ActiveMQException {
|
||||
callBrokerPlugins(getBrokerBridgePlugins(), pluginRun);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void callBrokerCriticalPlugins(final ActiveMQPluginRunnable<ActiveMQServerCriticalPlugin> pluginRun) throws ActiveMQException {
|
||||
callBrokerPlugins(getBrokerCriticalPlugins(), pluginRun);
|
||||
}
|
||||
|
||||
private <P extends ActiveMQServerBasePlugin> void callBrokerPlugins(final List<P> plugins, final ActiveMQPluginRunnable<P> pluginRun) throws ActiveMQException {
|
||||
if (pluginRun != null) {
|
||||
for (ActiveMQServerPlugin plugin : getBrokerPlugins()) {
|
||||
for (P plugin : plugins) {
|
||||
try {
|
||||
pluginRun.run(plugin);
|
||||
} catch (Throwable e) {
|
||||
|
@ -2053,6 +2165,51 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
return !getBrokerPlugins().isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasBrokerConnectionPlugins() {
|
||||
return !getBrokerConnectionPlugins().isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasBrokerSessionPlugins() {
|
||||
return !getBrokerSessionPlugins().isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasBrokerConsumerPlugins() {
|
||||
return !getBrokerConsumerPlugins().isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasBrokerAddressPlugins() {
|
||||
return !getBrokerAddressPlugins().isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasBrokerQueuePlugins() {
|
||||
return !getBrokerQueuePlugins().isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasBrokerBindingPlugins() {
|
||||
return !getBrokerBindingPlugins().isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasBrokerMessagePlugins() {
|
||||
return !getBrokerMessagePlugins().isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasBrokerBridgePlugins() {
|
||||
return !getBrokerBridgePlugins().isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasBrokerCriticalPlugins() {
|
||||
return !getBrokerCriticalPlugins().isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExecutorFactory getExecutorFactory() {
|
||||
return executorFactory;
|
||||
|
@ -2854,7 +3011,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
.delayBeforeDispatch(delayBeforeDispatch)
|
||||
.build();
|
||||
|
||||
callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeCreateQueue(queueConfig) : null);
|
||||
if (hasBrokerQueuePlugins()) {
|
||||
callBrokerQueuePlugins(plugin -> plugin.beforeCreateQueue(queueConfig));
|
||||
}
|
||||
|
||||
final Queue queue = queueFactory.createQueueWith(queueConfig);
|
||||
|
||||
|
@ -2898,7 +3057,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
managementService.registerQueue(queue, queue.getAddress(), storageManager);
|
||||
}
|
||||
|
||||
callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterCreateQueue(queue) : null);
|
||||
if (hasBrokerQueuePlugins()) {
|
||||
callBrokerQueuePlugins(plugin -> plugin.afterCreateQueue(queue));
|
||||
}
|
||||
|
||||
callPostQueueCreationCallbacks(queue.getName());
|
||||
|
||||
|
@ -2978,7 +3139,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
.delayBeforeDispatch(delayBeforeDispatch)
|
||||
.build();
|
||||
|
||||
callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeCreateQueue(queueConfig) : null);
|
||||
if (hasBrokerQueuePlugins()) {
|
||||
callBrokerQueuePlugins(plugin -> plugin.beforeCreateQueue(queueConfig));
|
||||
}
|
||||
|
||||
final Queue queue = queueFactory.createQueueWith(queueConfig);
|
||||
|
||||
|
@ -3020,7 +3183,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
|
||||
managementService.registerQueue(queue, queue.getAddress(), storageManager);
|
||||
|
||||
callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterCreateQueue(queue) : null);
|
||||
if (hasBrokerQueuePlugins()) {
|
||||
callBrokerQueuePlugins(plugin -> plugin.afterCreateQueue(queue));
|
||||
}
|
||||
|
||||
callPostQueueCreationCallbacks(queue.getName());
|
||||
|
||||
|
|
|
@ -1384,8 +1384,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
messagesAcknowledged.incrementAndGet();
|
||||
}
|
||||
|
||||
if (server != null && server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer));
|
||||
if (server != null && server.hasBrokerMessagePlugins()) {
|
||||
server.callBrokerMessagePlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1422,8 +1422,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
messagesAcknowledged.incrementAndGet();
|
||||
}
|
||||
|
||||
if (server != null && server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer));
|
||||
if (server != null && server.hasBrokerMessagePlugins()) {
|
||||
server.callBrokerMessagePlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1514,9 +1514,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
acknowledge(ref, AckReason.EXPIRED, consumer);
|
||||
}
|
||||
|
||||
if (server != null && server.hasBrokerPlugins()) {
|
||||
if (server != null && server.hasBrokerMessagePlugins()) {
|
||||
final SimpleString expiryAddress = messageExpiryAddress;
|
||||
server.callBrokerPlugins(plugin -> plugin.messageExpired(ref, expiryAddress, consumer));
|
||||
server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, expiryAddress, consumer));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -446,8 +446,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
|||
try {
|
||||
Message message = reference.getMessage();
|
||||
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.beforeDeliver(this, reference));
|
||||
if (server.hasBrokerMessagePlugins()) {
|
||||
server.callBrokerMessagePlugins(plugin -> plugin.beforeDeliver(this, reference));
|
||||
}
|
||||
|
||||
if (message.isLargeMessage() && supportLargeMessage) {
|
||||
|
@ -466,8 +466,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
|||
} finally {
|
||||
lockDelivery.readLock().unlock();
|
||||
callback.afterDelivery();
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.afterDeliver(this, reference));
|
||||
if (server.hasBrokerMessagePlugins()) {
|
||||
server.callBrokerMessagePlugins(plugin -> plugin.afterDeliver(this, reference));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -489,8 +489,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
|||
logger.trace("ServerConsumerImpl::" + this + " being closed with failed=" + failed, new Exception("trace"));
|
||||
}
|
||||
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.beforeCloseConsumer(this, failed));
|
||||
if (server.hasBrokerConsumerPlugins()) {
|
||||
server.callBrokerConsumerPlugins(plugin -> plugin.beforeCloseConsumer(this, failed));
|
||||
}
|
||||
|
||||
setStarted(false);
|
||||
|
@ -550,8 +550,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
|||
managementService.sendNotification(notification);
|
||||
}
|
||||
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.afterCloseConsumer(this, failed));
|
||||
if (server.hasBrokerConsumerPlugins()) {
|
||||
server.callBrokerConsumerPlugins(plugin -> plugin.afterCloseConsumer(this, failed));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -356,8 +356,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
|||
}
|
||||
synchronized (this) {
|
||||
if (!closed) {
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.beforeCloseSession(this, failed));
|
||||
if (server.hasBrokerSessionPlugins()) {
|
||||
server.callBrokerSessionPlugins(plugin -> plugin.beforeCloseSession(this, failed));
|
||||
}
|
||||
}
|
||||
this.setStarted(false);
|
||||
|
@ -412,8 +412,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
|||
|
||||
closed = true;
|
||||
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.afterCloseSession(this, failed));
|
||||
if (server.hasBrokerSessionPlugins()) {
|
||||
server.callBrokerSessionPlugins(plugin -> plugin.afterCloseSession(this, failed));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -470,16 +470,16 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
|||
|
||||
Filter filter = FilterImpl.createFilter(filterString);
|
||||
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.beforeCreateConsumer(consumerID, (QueueBinding) binding,
|
||||
if (server.hasBrokerConsumerPlugins()) {
|
||||
server.callBrokerConsumerPlugins(plugin -> plugin.beforeCreateConsumer(consumerID, (QueueBinding) binding,
|
||||
filterString, browseOnly, supportLargeMessage));
|
||||
}
|
||||
|
||||
ServerConsumer consumer = new ServerConsumerImpl(consumerID, this, (QueueBinding) binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);
|
||||
consumers.put(consumer.getID(), consumer);
|
||||
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.afterCreateConsumer(consumer));
|
||||
if (server.hasBrokerConsumerPlugins()) {
|
||||
server.callBrokerConsumerPlugins(plugin -> plugin.afterCreateConsumer(consumer));
|
||||
}
|
||||
|
||||
if (!browseOnly) {
|
||||
|
@ -1422,8 +1422,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
|||
message = msg;
|
||||
}
|
||||
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.beforeSend(this, tx, message, direct, noAutoCreateQueue));
|
||||
if (server.hasBrokerMessagePlugins()) {
|
||||
server.callBrokerMessagePlugins(plugin -> plugin.beforeSend(this, tx, message, direct, noAutoCreateQueue));
|
||||
}
|
||||
|
||||
// If the protocol doesn't support flow control, we have no choice other than fail the communication
|
||||
|
@ -1470,8 +1470,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
|||
result = doSend(tx, message, address, direct, noAutoCreateQueue);
|
||||
}
|
||||
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.afterSend(this, tx, message, direct, noAutoCreateQueue, result));
|
||||
if (server.hasBrokerMessagePlugins()) {
|
||||
server.callBrokerMessagePlugins(plugin -> plugin.afterSend(this, tx, message, direct, noAutoCreateQueue, result));
|
||||
}
|
||||
|
||||
return result;
|
||||
|
@ -1504,8 +1504,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
|||
|
||||
@Override
|
||||
public void addMetaData(String key, String data) throws Exception {
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.beforeSessionMetadataAdded(this, key, data));
|
||||
if (server.hasBrokerSessionPlugins()) {
|
||||
server.callBrokerSessionPlugins(plugin -> plugin.beforeSessionMetadataAdded(this, key, data));
|
||||
}
|
||||
|
||||
if (metaData == null) {
|
||||
|
@ -1513,8 +1513,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
|||
}
|
||||
metaData.put(key, data);
|
||||
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.afterSessionMetadataAdded(this, key, data));
|
||||
if (server.hasBrokerSessionPlugins()) {
|
||||
server.callBrokerSessionPlugins(plugin -> plugin.afterSessionMetadataAdded(this, key, data));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1523,8 +1523,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
|||
ServerSession sessionWithMetaData = server.lookupSession(key, data);
|
||||
if (sessionWithMetaData != null && sessionWithMetaData != this) {
|
||||
// There is a duplication of this property
|
||||
if (server.hasBrokerPlugins()) {
|
||||
server.callBrokerPlugins(plugin -> plugin.duplicateSessionMetadataFailure(this, key, data));
|
||||
if (server.hasBrokerSessionPlugins()) {
|
||||
server.callBrokerSessionPlugins(plugin -> plugin.duplicateSessionMetadataFailure(this, key, data));
|
||||
}
|
||||
return false;
|
||||
} else {
|
||||
|
|
|
@ -19,8 +19,8 @@ package org.apache.activemq.artemis.core.server.plugin;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
|
||||
public interface ActiveMQPluginRunnable {
|
||||
public interface ActiveMQPluginRunnable<P extends ActiveMQServerBasePlugin> {
|
||||
|
||||
void run(ActiveMQServerPlugin plugin) throws ActiveMQException;
|
||||
void run(P plugin) throws ActiveMQException;
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,95 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.server.plugin;
|
||||
|
||||
import java.util.EnumSet;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public interface ActiveMQServerAddressPlugin extends ActiveMQServerBasePlugin {
|
||||
|
||||
/**
|
||||
* Before an address is added tot he broker
|
||||
*
|
||||
* @param addressInfo The addressInfo that will be added
|
||||
* @param reload If the address is being reloaded
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeAddAddress(AddressInfo addressInfo, boolean reload) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* After an address has been added tot he broker
|
||||
*
|
||||
* @param addressInfo The newly added address
|
||||
* @param reload If the address is being reloaded
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterAddAddress(AddressInfo addressInfo, boolean reload) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Before an address is updated
|
||||
*
|
||||
* @param address The existing address info that is about to be updated
|
||||
* @param routingTypes The new routing types that the address will be updated with
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeUpdateAddress(SimpleString address, EnumSet<RoutingType> routingTypes) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* After an address has been updated
|
||||
*
|
||||
* @param addressInfo The newly updated address info
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterUpdateAddress(AddressInfo addressInfo) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Before an address is removed
|
||||
*
|
||||
* @param address The address that will be removed
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeRemoveAddress(SimpleString address) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* After an address has been removed
|
||||
*
|
||||
* @param address The address that has been removed
|
||||
* @param addressInfo The address info that has been removed or null if not removed
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterRemoveAddress(SimpleString address, AddressInfo addressInfo) throws ActiveMQException {
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,52 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.server.plugin;
|
||||
|
||||
import java.util.Map;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public interface ActiveMQServerBasePlugin {
|
||||
|
||||
/**
|
||||
* used to pass configured properties to Plugin
|
||||
*
|
||||
* @param properties
|
||||
*/
|
||||
default void init(Map<String, String> properties) {
|
||||
}
|
||||
|
||||
/**
|
||||
* The plugin has been registered with the server
|
||||
*
|
||||
* @param server The ActiveMQServer the plugin has been registered to
|
||||
*/
|
||||
default void registered(ActiveMQServer server) {
|
||||
}
|
||||
|
||||
/**
|
||||
* The plugin has been unregistered with the server
|
||||
*
|
||||
* @param server The ActiveMQServer the plugin has been unregistered to
|
||||
*/
|
||||
default void unregistered(ActiveMQServer server) {
|
||||
}
|
||||
}
|
|
@ -0,0 +1,73 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.server.plugin;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public interface ActiveMQServerBindingPlugin extends ActiveMQServerBasePlugin {
|
||||
|
||||
/**
|
||||
* Before a binding is added
|
||||
*
|
||||
* @param binding
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeAddBinding(Binding binding) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* After a binding has been added
|
||||
*
|
||||
* @param binding The newly added binding
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterAddBinding(Binding binding) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Before a binding is removed
|
||||
*
|
||||
* @param uniqueName
|
||||
* @param tx
|
||||
* @param deleteData
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeRemoveBinding(SimpleString uniqueName, Transaction tx, boolean deleteData) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* After a binding is removed
|
||||
*
|
||||
* @param binding
|
||||
* @param tx
|
||||
* @param deleteData
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterRemoveBinding(Binding binding, Transaction tx, boolean deleteData) throws ActiveMQException {
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,85 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.server.plugin;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.HandleStatus;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
import org.apache.activemq.artemis.core.server.cluster.Bridge;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public interface ActiveMQServerBridgePlugin extends ActiveMQServerBasePlugin {
|
||||
|
||||
/**
|
||||
* Before a bridge is deployed
|
||||
*
|
||||
* @param config The bridge configuration
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeDeployBridge(BridgeConfiguration config) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* After a bridge has been deployed
|
||||
*
|
||||
* @param bridge The newly deployed bridge
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterDeployBridge(Bridge bridge) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called immediately before a bridge delivers a message
|
||||
*
|
||||
* @param bridge
|
||||
* @param ref
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeDeliverBridge(Bridge bridge, MessageReference ref) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called immediately after a bridge delivers a message but before the message
|
||||
* is acknowledged
|
||||
*
|
||||
* @param bridge
|
||||
* @param ref
|
||||
* @param status
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterDeliverBridge(Bridge bridge, MessageReference ref, HandleStatus status) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after delivered message over this bridge has been acknowledged by the remote broker
|
||||
*
|
||||
* @param bridge
|
||||
* @param ref
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterAcknowledgeBridge(Bridge bridge, MessageReference ref) throws ActiveMQException {
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.server.plugin;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public interface ActiveMQServerConnectionPlugin extends ActiveMQServerBasePlugin {
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* A connection has been created.
|
||||
*
|
||||
* @param connection The newly created connection
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterCreateConnection(RemotingConnection connection) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* A connection has been destroyed.
|
||||
*
|
||||
* @param connection
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterDestroyConnection(RemotingConnection connection) throws ActiveMQException {
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,97 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.server.plugin;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
|
||||
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public interface ActiveMQServerConsumerPlugin extends ActiveMQServerBasePlugin {
|
||||
|
||||
/**
|
||||
* Before a consumer is created
|
||||
*
|
||||
* @param consumerID
|
||||
* @param queueName
|
||||
* @param filterString
|
||||
* @param browseOnly
|
||||
* @param supportLargeMessage
|
||||
* @throws ActiveMQException
|
||||
*
|
||||
* @deprecated use {@link #beforeCreateConsumer(long, QueueBinding, SimpleString, boolean, boolean)
|
||||
*/
|
||||
@Deprecated
|
||||
default void beforeCreateConsumer(long consumerID, SimpleString queueName, SimpleString filterString,
|
||||
boolean browseOnly, boolean supportLargeMessage) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
* Before a consumer is created
|
||||
*
|
||||
* @param consumerID
|
||||
* @param QueueBinding
|
||||
* @param filterString
|
||||
* @param browseOnly
|
||||
* @param supportLargeMessage
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeCreateConsumer(long consumerID, QueueBinding queueBinding, SimpleString filterString,
|
||||
boolean browseOnly, boolean supportLargeMessage) throws ActiveMQException {
|
||||
//by default call the old method for backwards compatibility
|
||||
this.beforeCreateConsumer(consumerID, queueBinding.getQueue().getName(), filterString, browseOnly, supportLargeMessage);
|
||||
}
|
||||
|
||||
/**
|
||||
* After a consumer has been created
|
||||
*
|
||||
* @param consumer the created consumer
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterCreateConsumer(ServerConsumer consumer) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Before a consumer is closed
|
||||
*
|
||||
* @param consumer
|
||||
* @param failed
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeCloseConsumer(ServerConsumer consumer, boolean failed) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* After a consumer is closed
|
||||
*
|
||||
* @param consumer
|
||||
* @param failed
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterCloseConsumer(ServerConsumer consumer, boolean failed) throws ActiveMQException {
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.server.plugin;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.utils.critical.CriticalComponent;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public interface ActiveMQServerCriticalPlugin extends ActiveMQServerBasePlugin {
|
||||
|
||||
/**
|
||||
* A Critical failure has been detected.
|
||||
* This will be called before the broker is stopped
|
||||
* @param components
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void criticalFailure(CriticalComponent components) throws ActiveMQException {
|
||||
}
|
||||
}
|
|
@ -0,0 +1,236 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.server.plugin;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
import org.apache.activemq.artemis.core.server.RoutingContext;
|
||||
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
||||
import org.apache.activemq.artemis.core.server.ServerSession;
|
||||
import org.apache.activemq.artemis.core.server.impl.AckReason;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public interface ActiveMQServerMessagePlugin extends ActiveMQServerBasePlugin {
|
||||
|
||||
/**
|
||||
* Before a message is sent
|
||||
*
|
||||
* @param session the session that sends the message
|
||||
* @param tx
|
||||
* @param message
|
||||
* @param direct
|
||||
* @param noAutoCreateQueue
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) throws ActiveMQException {
|
||||
//by default call the old method for backwards compatibility
|
||||
this.beforeSend(tx, message, direct, noAutoCreateQueue);
|
||||
}
|
||||
|
||||
/**
|
||||
* After a message is sent
|
||||
*
|
||||
* @param session the session that sends the message
|
||||
* @param tx
|
||||
* @param message
|
||||
* @param direct
|
||||
* @param noAutoCreateQueue
|
||||
* @param result
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue,
|
||||
RoutingStatus result) throws ActiveMQException {
|
||||
//by default call the old method for backwards compatibility
|
||||
this.afterSend(tx, message, direct, noAutoCreateQueue, result);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Before a message is sent
|
||||
*
|
||||
* @param tx
|
||||
* @param message
|
||||
* @param direct
|
||||
* @param noAutoCreateQueue
|
||||
* @throws ActiveMQException
|
||||
*
|
||||
* @deprecated use {@link #beforeSend(ServerSession, Transaction, Message, boolean, boolean)}
|
||||
*/
|
||||
@Deprecated
|
||||
default void beforeSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* After a message is sent
|
||||
*
|
||||
* @param tx
|
||||
* @param message
|
||||
* @param direct
|
||||
* @param noAutoCreateQueue
|
||||
* @param result
|
||||
* @throws ActiveMQException
|
||||
*
|
||||
* @deprecated use {@link #afterSend(ServerSession, Transaction, Message, boolean, boolean, RoutingStatus)}
|
||||
*/
|
||||
@Deprecated
|
||||
default void afterSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue,
|
||||
RoutingStatus result) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Before a message is routed
|
||||
*
|
||||
* @param message
|
||||
* @param context
|
||||
* @param direct
|
||||
* @param rejectDuplicates
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* After a message is routed
|
||||
*
|
||||
* @param message
|
||||
* @param context
|
||||
* @param direct
|
||||
* @param rejectDuplicates
|
||||
* @param result
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates,
|
||||
RoutingStatus result) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Before a message is delivered to a client consumer
|
||||
*
|
||||
* @param consumer the consumer the message will be delivered to
|
||||
* @param reference message reference
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeDeliver(ServerConsumer consumer, MessageReference reference) throws ActiveMQException {
|
||||
//by default call the old method for backwards compatibility
|
||||
this.beforeDeliver(reference);
|
||||
}
|
||||
|
||||
/**
|
||||
* After a message is delivered to a client consumer
|
||||
*
|
||||
* @param consumer the consumer the message was delivered to
|
||||
* @param reference message reference
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterDeliver(ServerConsumer consumer, MessageReference reference) throws ActiveMQException {
|
||||
//by default call the old method for backwards compatibility
|
||||
this.afterDeliver(reference);
|
||||
}
|
||||
|
||||
/**
|
||||
* Before a message is delivered to a client consumer
|
||||
*
|
||||
* @param reference
|
||||
* @throws ActiveMQException
|
||||
*
|
||||
* @deprecated use throws ActiveMQException {@link #beforeDeliver(ServerConsumer, MessageReference)}
|
||||
*/
|
||||
@Deprecated
|
||||
default void beforeDeliver(MessageReference reference) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* After a message is delivered to a client consumer
|
||||
*
|
||||
* @param reference
|
||||
* @throws ActiveMQException
|
||||
*
|
||||
* @deprecated use {@link #afterDeliver(ServerConsumer, MessageReference)}
|
||||
*/
|
||||
@Deprecated
|
||||
default void afterDeliver(MessageReference reference) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* A message has been expired
|
||||
*
|
||||
* @param message The expired message
|
||||
* @param messageExpiryAddress The message expiry address if exists
|
||||
* @throws ActiveMQException
|
||||
*
|
||||
* @deprecated use {@link #messageExpired(MessageReference, SimpleString, ServerConsumer)}
|
||||
*/
|
||||
@Deprecated
|
||||
default void messageExpired(MessageReference message, SimpleString messageExpiryAddress) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* A message has been expired
|
||||
*
|
||||
* @param message The expired message
|
||||
* @param messageExpiryAddress The message expiry address if exists
|
||||
* @param consumer the Consumer that acknowledged the message - this field is optional
|
||||
* and can be null
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void messageExpired(MessageReference message, SimpleString messageExpiryAddress, ServerConsumer consumer) throws ActiveMQException {
|
||||
messageExpired(message, messageExpiryAddress);
|
||||
}
|
||||
|
||||
/**
|
||||
* A message has been acknowledged
|
||||
*
|
||||
* @param ref The acked message
|
||||
* @param reason The ack reason
|
||||
* @throws ActiveMQException
|
||||
*
|
||||
* @deprecated use {@link #messageAcknowledged(MessageReference, AckReason, ServerConsumer)}
|
||||
*/
|
||||
@Deprecated
|
||||
default void messageAcknowledged(MessageReference ref, AckReason reason) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* A message has been acknowledged
|
||||
*
|
||||
* @param ref The acked message
|
||||
* @param reason The ack reason
|
||||
* @param consumer the Consumer that acknowledged the message - this field is optional
|
||||
* and can be null
|
||||
* @throws ActiveMQException
|
||||
*
|
||||
*/
|
||||
default void messageAcknowledged(MessageReference ref, AckReason reason, ServerConsumer consumer) throws ActiveMQException {
|
||||
//by default call the old method for backwards compatibility
|
||||
this.messageAcknowledged(ref, reason);
|
||||
}
|
||||
}
|
|
@ -17,669 +17,18 @@
|
|||
|
||||
package org.apache.activemq.artemis.core.server.plugin;
|
||||
|
||||
import java.util.EnumSet;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
|
||||
import org.apache.activemq.artemis.core.persistence.OperationContext;
|
||||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
|
||||
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
|
||||
import org.apache.activemq.artemis.core.security.SecurityAuth;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.HandleStatus;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.QueueConfig;
|
||||
import org.apache.activemq.artemis.core.server.RoutingContext;
|
||||
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
||||
import org.apache.activemq.artemis.core.server.ServerSession;
|
||||
import org.apache.activemq.artemis.core.server.cluster.Bridge;
|
||||
import org.apache.activemq.artemis.core.server.impl.AckReason;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
|
||||
import org.apache.activemq.artemis.utils.critical.CriticalComponent;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public interface ActiveMQServerPlugin {
|
||||
|
||||
/**
|
||||
* The plugin has been registered with the server
|
||||
*
|
||||
* @param server The ActiveMQServer the plugin has been registered to
|
||||
*/
|
||||
default void registered(ActiveMQServer server) {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* The plugin has been unregistered with the server
|
||||
*
|
||||
* @param server The ActiveMQServer the plugin has been unregistered to
|
||||
*/
|
||||
default void unregistered(ActiveMQServer server) {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* A connection has been created.
|
||||
*
|
||||
* @param connection The newly created connection
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterCreateConnection(RemotingConnection connection) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* A connection has been destroyed.
|
||||
*
|
||||
* @param connection
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterDestroyConnection(RemotingConnection connection) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Before a session is created.
|
||||
*
|
||||
* @param name
|
||||
* @param username
|
||||
* @param minLargeMessageSize
|
||||
* @param connection
|
||||
* @param autoCommitSends
|
||||
* @param autoCommitAcks
|
||||
* @param preAcknowledge
|
||||
* @param xa
|
||||
* @param defaultAddress
|
||||
* @param callback
|
||||
* @param autoCreateQueues
|
||||
* @param context
|
||||
* @param prefixes
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeCreateSession(String name, String username, int minLargeMessageSize,
|
||||
RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge,
|
||||
boolean xa, String defaultAddress, SessionCallback callback, boolean autoCreateQueues, OperationContext context,
|
||||
Map<SimpleString, RoutingType> prefixes) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* After a session has been created.
|
||||
*
|
||||
* @param session The newly created session
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterCreateSession(ServerSession session) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Before a session is closed
|
||||
*
|
||||
* @param session
|
||||
* @param failed
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeCloseSession(ServerSession session, boolean failed) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* After a session is closed
|
||||
*
|
||||
* @param session
|
||||
* @param failed
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterCloseSession(ServerSession session, boolean failed) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Before session metadata is added to the session
|
||||
*
|
||||
* @param session
|
||||
* @param key
|
||||
* @param data
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeSessionMetadataAdded(ServerSession session, String key, String data) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when adding session metadata fails because the metadata is a duplicate
|
||||
*
|
||||
* @param session
|
||||
* @param key
|
||||
* @param data
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void duplicateSessionMetadataFailure(ServerSession session, String key, String data) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* After session metadata is added to the session
|
||||
*
|
||||
* @param session
|
||||
* @param key
|
||||
* @param data
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterSessionMetadataAdded(ServerSession session, String key, String data) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Before a consumer is created
|
||||
*
|
||||
* @param consumerID
|
||||
* @param queueName
|
||||
* @param filterString
|
||||
* @param browseOnly
|
||||
* @param supportLargeMessage
|
||||
* @throws ActiveMQException
|
||||
*
|
||||
* @deprecated use {@link #beforeCreateConsumer(long, QueueBinding, SimpleString, boolean, boolean)
|
||||
*/
|
||||
@Deprecated
|
||||
default void beforeCreateConsumer(long consumerID, SimpleString queueName, SimpleString filterString,
|
||||
boolean browseOnly, boolean supportLargeMessage) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
* Before a consumer is created
|
||||
*
|
||||
* @param consumerID
|
||||
* @param QueueBinding
|
||||
* @param filterString
|
||||
* @param browseOnly
|
||||
* @param supportLargeMessage
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeCreateConsumer(long consumerID, QueueBinding queueBinding, SimpleString filterString,
|
||||
boolean browseOnly, boolean supportLargeMessage) throws ActiveMQException {
|
||||
//by default call the old method for backwards compatibility
|
||||
this.beforeCreateConsumer(consumerID, queueBinding.getQueue().getName(), filterString, browseOnly, supportLargeMessage);
|
||||
}
|
||||
|
||||
/**
|
||||
* After a consumer has been created
|
||||
*
|
||||
* @param consumer the created consumer
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterCreateConsumer(ServerConsumer consumer) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Before a consumer is closed
|
||||
*
|
||||
* @param consumer
|
||||
* @param failed
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeCloseConsumer(ServerConsumer consumer, boolean failed) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* After a consumer is closed
|
||||
*
|
||||
* @param consumer
|
||||
* @param failed
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterCloseConsumer(ServerConsumer consumer, boolean failed) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Before an address is added tot he broker
|
||||
*
|
||||
* @param addressInfo The addressInfo that will be added
|
||||
* @param reload If the address is being reloaded
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeAddAddress(AddressInfo addressInfo, boolean reload) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* After an address has been added tot he broker
|
||||
*
|
||||
* @param addressInfo The newly added address
|
||||
* @param reload If the address is being reloaded
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterAddAddress(AddressInfo addressInfo, boolean reload) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Before an address is updated
|
||||
*
|
||||
* @param address The existing address info that is about to be updated
|
||||
* @param routingTypes The new routing types that the address will be updated with
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeUpdateAddress(SimpleString address, EnumSet<RoutingType> routingTypes) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* After an address has been updated
|
||||
*
|
||||
* @param addressInfo The newly updated address info
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterUpdateAddress(AddressInfo addressInfo) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Before an address is removed
|
||||
*
|
||||
* @param address The address that will be removed
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeRemoveAddress(SimpleString address) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* After an address has been removed
|
||||
*
|
||||
* @param address The address that has been removed
|
||||
* @param addressInfo The address info that has been removed or null if not removed
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterRemoveAddress(SimpleString address, AddressInfo addressInfo) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Before a queue is created
|
||||
*
|
||||
* @param queueConfig
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeCreateQueue(QueueConfig queueConfig) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* After a queue has been created
|
||||
*
|
||||
* @param queue The newly created queue
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterCreateQueue(Queue queue) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Before a queue is destroyed
|
||||
*
|
||||
* @param queueName
|
||||
* @param session
|
||||
* @param checkConsumerCount
|
||||
* @param removeConsumers
|
||||
* @param autoDeleteAddress
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeDestroyQueue(SimpleString queueName, final SecurityAuth session, boolean checkConsumerCount,
|
||||
boolean removeConsumers, boolean autoDeleteAddress) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* After a queue has been destroyed
|
||||
*
|
||||
* @param queue
|
||||
* @param address
|
||||
* @param session
|
||||
* @param checkConsumerCount
|
||||
* @param removeConsumers
|
||||
* @param autoDeleteAddress
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterDestroyQueue(Queue queue, SimpleString address, final SecurityAuth session, boolean checkConsumerCount,
|
||||
boolean removeConsumers, boolean autoDeleteAddress) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Before a binding is added
|
||||
*
|
||||
* @param binding
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeAddBinding(Binding binding) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* After a binding has been added
|
||||
*
|
||||
* @param binding The newly added binding
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterAddBinding(Binding binding) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Before a binding is removed
|
||||
*
|
||||
* @param uniqueName
|
||||
* @param tx
|
||||
* @param deleteData
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeRemoveBinding(SimpleString uniqueName, Transaction tx, boolean deleteData) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* After a binding is removed
|
||||
*
|
||||
* @param binding
|
||||
* @param tx
|
||||
* @param deleteData
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterRemoveBinding(Binding binding, Transaction tx, boolean deleteData) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Before a message is sent
|
||||
*
|
||||
* @param session the session that sends the message
|
||||
* @param tx
|
||||
* @param message
|
||||
* @param direct
|
||||
* @param noAutoCreateQueue
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) throws ActiveMQException {
|
||||
//by default call the old method for backwards compatibility
|
||||
this.beforeSend(tx, message, direct, noAutoCreateQueue);
|
||||
}
|
||||
|
||||
/**
|
||||
* After a message is sent
|
||||
*
|
||||
* @param session the session that sends the message
|
||||
* @param tx
|
||||
* @param message
|
||||
* @param direct
|
||||
* @param noAutoCreateQueue
|
||||
* @param result
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue,
|
||||
RoutingStatus result) throws ActiveMQException {
|
||||
//by default call the old method for backwards compatibility
|
||||
this.afterSend(tx, message, direct, noAutoCreateQueue, result);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Before a message is sent
|
||||
*
|
||||
* @param tx
|
||||
* @param message
|
||||
* @param direct
|
||||
* @param noAutoCreateQueue
|
||||
* @throws ActiveMQException
|
||||
*
|
||||
* @deprecated use {@link #beforeSend(ServerSession, Transaction, Message, boolean, boolean)}
|
||||
*/
|
||||
@Deprecated
|
||||
default void beforeSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* After a message is sent
|
||||
*
|
||||
* @param tx
|
||||
* @param message
|
||||
* @param direct
|
||||
* @param noAutoCreateQueue
|
||||
* @param result
|
||||
* @throws ActiveMQException
|
||||
*
|
||||
* @deprecated use {@link #afterSend(ServerSession, Transaction, Message, boolean, boolean, RoutingStatus)}
|
||||
*/
|
||||
@Deprecated
|
||||
default void afterSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue,
|
||||
RoutingStatus result) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Before a message is routed
|
||||
*
|
||||
* @param message
|
||||
* @param context
|
||||
* @param direct
|
||||
* @param rejectDuplicates
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* After a message is routed
|
||||
*
|
||||
* @param message
|
||||
* @param context
|
||||
* @param direct
|
||||
* @param rejectDuplicates
|
||||
* @param result
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates,
|
||||
RoutingStatus result) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Before a message is delivered to a client consumer
|
||||
*
|
||||
* @param consumer the consumer the message will be delivered to
|
||||
* @param reference message reference
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeDeliver(ServerConsumer consumer, MessageReference reference) throws ActiveMQException {
|
||||
//by default call the old method for backwards compatibility
|
||||
this.beforeDeliver(reference);
|
||||
}
|
||||
|
||||
/**
|
||||
* After a message is delivered to a client consumer
|
||||
*
|
||||
* @param consumer the consumer the message was delivered to
|
||||
* @param reference message reference
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterDeliver(ServerConsumer consumer, MessageReference reference) throws ActiveMQException {
|
||||
//by default call the old method for backwards compatibility
|
||||
this.afterDeliver(reference);
|
||||
}
|
||||
|
||||
/**
|
||||
* Before a message is delivered to a client consumer
|
||||
*
|
||||
* @param reference
|
||||
* @throws ActiveMQException
|
||||
*
|
||||
* @deprecated use throws ActiveMQException {@link #beforeDeliver(ServerConsumer, MessageReference)}
|
||||
*/
|
||||
@Deprecated
|
||||
default void beforeDeliver(MessageReference reference) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* After a message is delivered to a client consumer
|
||||
*
|
||||
* @param reference
|
||||
* @throws ActiveMQException
|
||||
*
|
||||
* @deprecated use {@link #afterDeliver(ServerConsumer, MessageReference)}
|
||||
*/
|
||||
@Deprecated
|
||||
default void afterDeliver(MessageReference reference) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* A message has been expired
|
||||
*
|
||||
* @param message The expired message
|
||||
* @param messageExpiryAddress The message expiry address if exists
|
||||
* @throws ActiveMQException
|
||||
*
|
||||
* @deprecated use {@link #messageExpired(MessageReference, SimpleString, ServerConsumer)}
|
||||
*/
|
||||
@Deprecated
|
||||
default void messageExpired(MessageReference message, SimpleString messageExpiryAddress) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* A message has been expired
|
||||
*
|
||||
* @param message The expired message
|
||||
* @param messageExpiryAddress The message expiry address if exists
|
||||
* @param consumer the Consumer that acknowledged the message - this field is optional
|
||||
* and can be null
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void messageExpired(MessageReference message, SimpleString messageExpiryAddress, ServerConsumer consumer) throws ActiveMQException {
|
||||
messageExpired(message, messageExpiryAddress);
|
||||
}
|
||||
|
||||
/**
|
||||
* A message has been acknowledged
|
||||
*
|
||||
* @param ref The acked message
|
||||
* @param reason The ack reason
|
||||
* @throws ActiveMQException
|
||||
*
|
||||
* @deprecated use {@link #messageAcknowledged(MessageReference, AckReason, ServerConsumer)}
|
||||
*/
|
||||
@Deprecated
|
||||
default void messageAcknowledged(MessageReference ref, AckReason reason) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* A message has been acknowledged
|
||||
*
|
||||
* @param ref The acked message
|
||||
* @param reason The ack reason
|
||||
* @param consumer the Consumer that acknowledged the message - this field is optional
|
||||
* and can be null
|
||||
* @throws ActiveMQException
|
||||
*
|
||||
*/
|
||||
default void messageAcknowledged(MessageReference ref, AckReason reason, ServerConsumer consumer) throws ActiveMQException {
|
||||
//by default call the old method for backwards compatibility
|
||||
this.messageAcknowledged(ref, reason);
|
||||
}
|
||||
|
||||
/**
|
||||
* Before a bridge is deployed
|
||||
*
|
||||
* @param config The bridge configuration
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeDeployBridge(BridgeConfiguration config) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* After a bridge has been deployed
|
||||
*
|
||||
* @param bridge The newly deployed bridge
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterDeployBridge(Bridge bridge) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called immediately before a bridge delivers a message
|
||||
*
|
||||
* @param bridge
|
||||
* @param ref
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeDeliverBridge(Bridge bridge, MessageReference ref) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called immediately after a bridge delivers a message but before the message
|
||||
* is acknowledged
|
||||
*
|
||||
* @param bridge
|
||||
* @param ref
|
||||
* @param status
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterDeliverBridge(Bridge bridge, MessageReference ref, HandleStatus status) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after delivered message over this bridge has been acknowledged by the remote broker
|
||||
*
|
||||
* @param bridge
|
||||
* @param ref
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterAcknowledgeBridge(Bridge bridge, MessageReference ref) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* A Critical failure has been detected.
|
||||
* This will be called before the broker is stopped
|
||||
* @param components
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void criticalFailure(CriticalComponent components) throws ActiveMQException {
|
||||
}
|
||||
|
||||
/**
|
||||
* used to pass configured properties to Plugin
|
||||
*
|
||||
* @param properties
|
||||
*/
|
||||
default void init(Map<String, String> properties) {
|
||||
}
|
||||
public interface ActiveMQServerPlugin extends
|
||||
ActiveMQServerBasePlugin,
|
||||
ActiveMQServerConnectionPlugin,
|
||||
ActiveMQServerSessionPlugin,
|
||||
ActiveMQServerConsumerPlugin,
|
||||
ActiveMQServerAddressPlugin,
|
||||
ActiveMQServerQueuePlugin,
|
||||
ActiveMQServerBindingPlugin,
|
||||
ActiveMQServerMessagePlugin,
|
||||
ActiveMQServerBridgePlugin,
|
||||
ActiveMQServerCriticalPlugin {
|
||||
}
|
||||
|
|
|
@ -0,0 +1,81 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.server.plugin;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.security.SecurityAuth;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.QueueConfig;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public interface ActiveMQServerQueuePlugin extends ActiveMQServerBasePlugin {
|
||||
|
||||
/**
|
||||
* Before a queue is created
|
||||
*
|
||||
* @param queueConfig
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeCreateQueue(QueueConfig queueConfig) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* After a queue has been created
|
||||
*
|
||||
* @param queue The newly created queue
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterCreateQueue(Queue queue) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Before a queue is destroyed
|
||||
*
|
||||
* @param queueName
|
||||
* @param session
|
||||
* @param checkConsumerCount
|
||||
* @param removeConsumers
|
||||
* @param autoDeleteAddress
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeDestroyQueue(SimpleString queueName, final SecurityAuth session, boolean checkConsumerCount,
|
||||
boolean removeConsumers, boolean autoDeleteAddress) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* After a queue has been destroyed
|
||||
*
|
||||
* @param queue
|
||||
* @param address
|
||||
* @param session
|
||||
* @param checkConsumerCount
|
||||
* @param removeConsumers
|
||||
* @param autoDeleteAddress
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterDestroyQueue(Queue queue, SimpleString address, final SecurityAuth session, boolean checkConsumerCount,
|
||||
boolean removeConsumers, boolean autoDeleteAddress) throws ActiveMQException {
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,126 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.server.plugin;
|
||||
|
||||
import java.util.Map;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.persistence.OperationContext;
|
||||
import org.apache.activemq.artemis.core.server.ServerSession;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public interface ActiveMQServerSessionPlugin extends ActiveMQServerBasePlugin {
|
||||
|
||||
/**
|
||||
* Before a session is created.
|
||||
*
|
||||
* @param name
|
||||
* @param username
|
||||
* @param minLargeMessageSize
|
||||
* @param connection
|
||||
* @param autoCommitSends
|
||||
* @param autoCommitAcks
|
||||
* @param preAcknowledge
|
||||
* @param xa
|
||||
* @param defaultAddress
|
||||
* @param callback
|
||||
* @param autoCreateQueues
|
||||
* @param context
|
||||
* @param prefixes
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeCreateSession(String name, String username, int minLargeMessageSize,
|
||||
RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge,
|
||||
boolean xa, String defaultAddress, SessionCallback callback, boolean autoCreateQueues, OperationContext context,
|
||||
Map<SimpleString, RoutingType> prefixes) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* After a session has been created.
|
||||
*
|
||||
* @param session The newly created session
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterCreateSession(ServerSession session) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Before a session is closed
|
||||
*
|
||||
* @param session
|
||||
* @param failed
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeCloseSession(ServerSession session, boolean failed) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* After a session is closed
|
||||
*
|
||||
* @param session
|
||||
* @param failed
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterCloseSession(ServerSession session, boolean failed) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Before session metadata is added to the session
|
||||
*
|
||||
* @param session
|
||||
* @param key
|
||||
* @param data
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void beforeSessionMetadataAdded(ServerSession session, String key, String data) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when adding session metadata fails because the metadata is a duplicate
|
||||
*
|
||||
* @param session
|
||||
* @param key
|
||||
* @param data
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void duplicateSessionMetadataFailure(ServerSession session, String key, String data) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* After session metadata is added to the session
|
||||
*
|
||||
* @param session
|
||||
* @param key
|
||||
* @param data
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default void afterSessionMetadataAdded(ServerSession session, String key, String data) throws ActiveMQException {
|
||||
|
||||
}
|
||||
}
|
|
@ -29,6 +29,7 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
|
||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
|
||||
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
|
||||
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
|
||||
|
@ -649,7 +650,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
|
|||
deploymentManager.addDeployable(fc);
|
||||
deploymentManager.readConfiguration();
|
||||
|
||||
List<ActiveMQServerPlugin> brokerPlugins = fc.getBrokerPlugins();
|
||||
List<ActiveMQServerBasePlugin> brokerPlugins = fc.getBrokerPlugins();
|
||||
assertEquals(2, brokerPlugins.size());
|
||||
assertTrue(brokerPlugins.get(0) instanceof EmptyPlugin1);
|
||||
assertTrue(brokerPlugins.get(1) instanceof EmptyPlugin2);
|
||||
|
|
Loading…
Reference in New Issue