ARTEMIS-2749 - Broadcast Group Control Improvements

https://issues.apache.org/jira/browse/ARTEMIS-2749
This commit is contained in:
Andy Taylor 2020-04-30 10:44:42 +01:00
parent 0fe11c7562
commit 2ab381565c
13 changed files with 713 additions and 161 deletions

View File

@ -2390,6 +2390,30 @@ public interface AuditLogger extends BasicLogger {
void clearDuplicateIdCache(String user, Object source, Object... args); void clearDuplicateIdCache(String user, Object source, Object... args);
static void getChannelName(Object source) {
LOGGER.getChannelName(getCaller(), source);
}
@LogMessage(level = Logger.Level.INFO)
@Message(id = 601511, value = "User {0} is getting channelName property on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void getChannelName(String user, Object source, Object... args);
static void getFileContents(Object source) {
LOGGER.getFileContents(getCaller(), source);
}
@LogMessage(level = Logger.Level.INFO)
@Message(id = 601512, value = "User {0} is getting fileContents property on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void getFileContents(String user, Object source, Object... args);
static void getFile(Object source) {
LOGGER.getFile(getCaller(), source);
}
@LogMessage(level = Logger.Level.INFO)
@Message(id = 601513, value = "User {0} is getting file property on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void getFile(String user, Object source, Object... args);
/* /*
* This logger is for message production and consumption and is on the hot path so enabled independently * This logger is for message production and consumption and is on the hot path so enabled independently
* *

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.core.management;
/**
* A BroadcastGroupControl is used to manage a broadcast group.
*/
public interface BaseBroadcastGroupControl extends ActiveMQComponentControl {
/**
* Returns the configuration name of this broadcast group.
*/
@Attribute(desc = "name of this broadcast group")
String getName();
/**
* Returns the period used by this broadcast group.
*/
@Attribute(desc = "period used by this broadcast group")
long getBroadcastPeriod();
/**
* Returns the pairs of live-backup connectors that are broadcasted by this broadcast group.
*/
@Attribute(desc = "pairs of live-backup connectors that are broadcasted by this broadcast group")
Object[] getConnectorPairs();
/**
* Returns the pairs of live-backup connectors that are broadcasted by this broadcast group
* using JSON serialization.
*/
@Attribute(desc = "pairs of live-backup connectors that are broadcasted by this broadcast group using JSON serialization")
String getConnectorPairsAsJSON() throws Exception;
}

View File

@ -19,13 +19,7 @@ package org.apache.activemq.artemis.api.core.management;
/** /**
* A BroadcastGroupControl is used to manage a broadcast group. * A BroadcastGroupControl is used to manage a broadcast group.
*/ */
public interface BroadcastGroupControl extends ActiveMQComponentControl { public interface BroadcastGroupControl extends BaseBroadcastGroupControl {
/**
* Returns the configuration name of this broadcast group.
*/
@Attribute(desc = "name of this broadcast group")
String getName();
/** /**
* Returns the local port this broadcast group is bound to. * Returns the local port this broadcast group is bound to.
@ -44,23 +38,4 @@ public interface BroadcastGroupControl extends ActiveMQComponentControl {
*/ */
@Attribute(desc = "port this broadcast group is broadcasting to") @Attribute(desc = "port this broadcast group is broadcasting to")
int getGroupPort() throws Exception; int getGroupPort() throws Exception;
/**
* Returns the period used by this broadcast group.
*/
@Attribute(desc = "period used by this broadcast group")
long getBroadcastPeriod();
/**
* Returns the pairs of live-backup connectors that are broadcasted by this broadcast group.
*/
@Attribute(desc = "pairs of live-backup connectors that are broadcasted by this broadcast group")
Object[] getConnectorPairs();
/**
* Returns the pairs of live-backup connectors that are broadcasted by this broadcast group
* using JSON serialization.
*/
@Attribute(desc = "pairs of live-backup connectors that are broadcasted by this broadcast group using JSON serialization")
String getConnectorPairsAsJSON() throws Exception;
} }

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.core.management;
/**
* A BroadcastGroupControl is used to manage a broadcast group.
*/
public interface JGroupsChannelBroadcastGroupControl extends BaseBroadcastGroupControl {
/**
* Returns the JGroups channel name
*/
@Attribute(desc = "Returns the JGroups channel name")
String getChannelName() throws Exception;
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.core.management;
/**
* A BroadcastGroupControl is used to manage a broadcast group.
*/
public interface JGroupsFileBroadcastGroupControl extends BaseBroadcastGroupControl {
/**
* Returns jgroups channel name
*/
@Attribute(desc = "Returns jgroups channel name")
String getChannelName();
/**
* Returns the jgroups file name
*/
@Attribute(desc = "Returns the jgroups file name")
String getFile();
/**
* Returns the contents of the jgroups file
*/
@Attribute(desc = "Returns the contents of the jgroups file")
String getFileContents() throws Exception;
}

View File

@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.management.impl;
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.management.BaseBroadcastGroupControl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup;
import org.apache.activemq.artemis.logs.AuditLogger;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanOperationInfo;
public class BaseBroadcastGroupControlImpl extends AbstractControl implements BaseBroadcastGroupControl {
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
private Class broadcastGroupControlClass;
private final BroadcastGroup broadcastGroup;
private final BroadcastGroupConfiguration configuration;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
public BaseBroadcastGroupControlImpl(final Class broadcastGroupControlClass,
final BroadcastGroup broadcastGroup,
final StorageManager storageManager,
final BroadcastGroupConfiguration configuration) throws Exception {
super(broadcastGroupControlClass, storageManager);
this.broadcastGroupControlClass = broadcastGroupControlClass;
this.broadcastGroup = broadcastGroup;
this.configuration = configuration;
}
// BroadcastGroupControlMBean implementation ---------------------
@Override
public String getName() {
if (AuditLogger.isEnabled()) {
AuditLogger.getName(this.broadcastGroup);
}
clearIO();
try {
return configuration.getName();
} finally {
blockOnIO();
}
}
@Override
public long getBroadcastPeriod() {
if (AuditLogger.isEnabled()) {
AuditLogger.getBroadcastPeriod(this.broadcastGroup);
}
clearIO();
try {
return configuration.getBroadcastPeriod();
} finally {
blockOnIO();
}
}
@Override
public Object[] getConnectorPairs() {
if (AuditLogger.isEnabled()) {
AuditLogger.getConnectorPairs(this.broadcastGroup);
}
clearIO();
try {
Object[] ret = new Object[configuration.getConnectorInfos().size()];
int i = 0;
for (String connector : configuration.getConnectorInfos()) {
ret[i++] = connector;
}
return ret;
} finally {
blockOnIO();
}
}
@Override
public String getConnectorPairsAsJSON() throws Exception {
if (AuditLogger.isEnabled()) {
AuditLogger.getConnectorPairsAsJSON(this.broadcastGroup);
}
clearIO();
try {
return JsonUtil.toJsonArray(configuration.getConnectorInfos()).toString();
} finally {
blockOnIO();
}
}
// MessagingComponentControlMBean implementation -----------------
@Override
public boolean isStarted() {
if (AuditLogger.isEnabled()) {
AuditLogger.isStarted(this.broadcastGroup);
}
clearIO();
try {
return broadcastGroup.isStarted();
} finally {
blockOnIO();
}
}
@Override
public void start() throws Exception {
if (AuditLogger.isEnabled()) {
AuditLogger.startBroadcastGroup(this.broadcastGroup);
}
clearIO();
try {
broadcastGroup.start();
} finally {
blockOnIO();
}
}
@Override
public void stop() throws Exception {
if (AuditLogger.isEnabled()) {
AuditLogger.stopBroadcastGroup(this.broadcastGroup);
}
clearIO();
try {
broadcastGroup.stop();
} finally {
blockOnIO();
}
}
@Override
protected MBeanOperationInfo[] fillMBeanOperationInfo() {
return MBeanInfoHelper.getMBeanOperationsInfo(broadcastGroupControlClass);
}
@Override
protected MBeanAttributeInfo[] fillMBeanAttributeInfo() {
return MBeanInfoHelper.getMBeanAttributesInfo(broadcastGroupControlClass);
}
protected BroadcastGroup getBroadcastGroup() {
return broadcastGroup;
}
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
}

View File

@ -16,26 +16,20 @@
*/ */
package org.apache.activemq.artemis.core.management.impl; package org.apache.activemq.artemis.core.management.impl;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanOperationInfo;
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration; import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.management.BroadcastGroupControl; import org.apache.activemq.artemis.api.core.management.BroadcastGroupControl;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup; import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup;
import org.apache.activemq.artemis.logs.AuditLogger; import org.apache.activemq.artemis.logs.AuditLogger;
public class BroadcastGroupControlImpl extends AbstractControl implements BroadcastGroupControl { public class BroadcastGroupControlImpl extends BaseBroadcastGroupControlImpl implements BroadcastGroupControl {
// Constants ----------------------------------------------------- // Constants -----------------------------------------------------
// Attributes ---------------------------------------------------- // Attributes ----------------------------------------------------
private final BroadcastGroup broadcastGroup; private UDPBroadcastEndpointFactory endpointFactory;
private final BroadcastGroupConfiguration configuration;
// Static -------------------------------------------------------- // Static --------------------------------------------------------
@ -43,84 +37,23 @@ public class BroadcastGroupControlImpl extends AbstractControl implements Broadc
public BroadcastGroupControlImpl(final BroadcastGroup broadcastGroup, public BroadcastGroupControlImpl(final BroadcastGroup broadcastGroup,
final StorageManager storageManager, final StorageManager storageManager,
final BroadcastGroupConfiguration configuration) throws Exception { final BroadcastGroupConfiguration configuration,
super(BroadcastGroupControl.class, storageManager); final UDPBroadcastEndpointFactory endpointFactory) throws Exception {
this.broadcastGroup = broadcastGroup; super(BroadcastGroupControl.class, broadcastGroup, storageManager, configuration);
this.configuration = configuration; this.endpointFactory = endpointFactory;
} }
// BroadcastGroupControlMBean implementation --------------------- // BroadcastGroupControlMBean implementation ---------------------
@Override
public String getName() {
if (AuditLogger.isEnabled()) {
AuditLogger.getName(this.broadcastGroup);
}
clearIO();
try {
return configuration.getName();
} finally {
blockOnIO();
}
}
@Override
public long getBroadcastPeriod() {
if (AuditLogger.isEnabled()) {
AuditLogger.getBroadcastPeriod(this.broadcastGroup);
}
clearIO();
try {
return configuration.getBroadcastPeriod();
} finally {
blockOnIO();
}
}
@Override
public Object[] getConnectorPairs() {
if (AuditLogger.isEnabled()) {
AuditLogger.getConnectorPairs(this.broadcastGroup);
}
clearIO();
try {
Object[] ret = new Object[configuration.getConnectorInfos().size()];
int i = 0;
for (String connector : configuration.getConnectorInfos()) {
ret[i++] = connector;
}
return ret;
} finally {
blockOnIO();
}
}
@Override
public String getConnectorPairsAsJSON() throws Exception {
if (AuditLogger.isEnabled()) {
AuditLogger.getConnectorPairsAsJSON(this.broadcastGroup);
}
clearIO();
try {
return JsonUtil.toJsonArray(configuration.getConnectorInfos()).toString();
} finally {
blockOnIO();
}
}
//todo ghoward we should deal with this properly //todo ghoward we should deal with this properly
@Override @Override
public String getGroupAddress() throws Exception { public String getGroupAddress() throws Exception {
if (AuditLogger.isEnabled()) { if (AuditLogger.isEnabled()) {
AuditLogger.getGroupAddress(this.broadcastGroup); AuditLogger.getGroupAddress(this.getBroadcastGroup());
} }
clearIO(); clearIO();
try { try {
if (configuration.getEndpointFactory() instanceof UDPBroadcastEndpointFactory) { return endpointFactory.getGroupAddress();
return ((UDPBroadcastEndpointFactory) configuration.getEndpointFactory()).getGroupAddress();
}
throw new Exception("Invalid request because this is not a UDP Broadcast configuration.");
} finally { } finally {
blockOnIO(); blockOnIO();
} }
@ -129,14 +62,11 @@ public class BroadcastGroupControlImpl extends AbstractControl implements Broadc
@Override @Override
public int getGroupPort() throws Exception { public int getGroupPort() throws Exception {
if (AuditLogger.isEnabled()) { if (AuditLogger.isEnabled()) {
AuditLogger.getGroupPort(this.broadcastGroup); AuditLogger.getGroupPort(this.getBroadcastGroup());
} }
clearIO(); clearIO();
try { try {
if (configuration.getEndpointFactory() instanceof UDPBroadcastEndpointFactory) { return endpointFactory.getGroupPort();
return ((UDPBroadcastEndpointFactory) configuration.getEndpointFactory()).getGroupPort();
}
throw new Exception("Invalid request because this is not a UDP Broadcast configuration.");
} finally { } finally {
blockOnIO(); blockOnIO();
} }
@ -145,14 +75,11 @@ public class BroadcastGroupControlImpl extends AbstractControl implements Broadc
@Override @Override
public int getLocalBindPort() throws Exception { public int getLocalBindPort() throws Exception {
if (AuditLogger.isEnabled()) { if (AuditLogger.isEnabled()) {
AuditLogger.getLocalBindPort(this.broadcastGroup); AuditLogger.getLocalBindPort(this.getBroadcastGroup());
} }
clearIO(); clearIO();
try { try {
if (configuration.getEndpointFactory() instanceof UDPBroadcastEndpointFactory) { return endpointFactory.getLocalBindPort();
return ((UDPBroadcastEndpointFactory) configuration.getEndpointFactory()).getLocalBindPort();
}
throw new Exception("Invalid request because this is not a UDP Broadcast configuration.");
} finally { } finally {
blockOnIO(); blockOnIO();
} }
@ -160,54 +87,6 @@ public class BroadcastGroupControlImpl extends AbstractControl implements Broadc
// MessagingComponentControlMBean implementation ----------------- // MessagingComponentControlMBean implementation -----------------
@Override
public boolean isStarted() {
if (AuditLogger.isEnabled()) {
AuditLogger.isStarted(this.broadcastGroup);
}
clearIO();
try {
return broadcastGroup.isStarted();
} finally {
blockOnIO();
}
}
@Override
public void start() throws Exception {
if (AuditLogger.isEnabled()) {
AuditLogger.startBroadcastGroup(this.broadcastGroup);
}
clearIO();
try {
broadcastGroup.start();
} finally {
blockOnIO();
}
}
@Override
public void stop() throws Exception {
if (AuditLogger.isEnabled()) {
AuditLogger.stopBroadcastGroup(this.broadcastGroup);
}
clearIO();
try {
broadcastGroup.stop();
} finally {
blockOnIO();
}
}
@Override
protected MBeanOperationInfo[] fillMBeanOperationInfo() {
return MBeanInfoHelper.getMBeanOperationsInfo(BroadcastGroupControl.class);
}
@Override
protected MBeanAttributeInfo[] fillMBeanAttributeInfo() {
return MBeanInfoHelper.getMBeanAttributesInfo(BroadcastGroupControl.class);
}
// Public -------------------------------------------------------- // Public --------------------------------------------------------

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.management.impl;
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.management.JGroupsChannelBroadcastGroupControl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup;
import org.apache.activemq.artemis.logs.AuditLogger;
public class JGroupsChannelBroadcastGroupControlImpl extends BaseBroadcastGroupControlImpl implements JGroupsChannelBroadcastGroupControl {
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
private ChannelBroadcastEndpointFactory endpointFactory;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
public JGroupsChannelBroadcastGroupControlImpl(final BroadcastGroup broadcastGroup,
final StorageManager storageManager,
final BroadcastGroupConfiguration configuration,
final ChannelBroadcastEndpointFactory endpointFactory) throws Exception {
super(JGroupsChannelBroadcastGroupControl.class, broadcastGroup, storageManager, configuration);
this.endpointFactory = endpointFactory;
}
// BroadcastGroupControlMBean implementation ---------------------
@Override
public String getChannelName() throws Exception {
if (AuditLogger.isEnabled()) {
AuditLogger.getChannelName(this.endpointFactory.getChannelName());
}
return endpointFactory.getChannelName();
}
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
}

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.management.impl;
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
import org.apache.activemq.artemis.api.core.JGroupsFileBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.management.JGroupsFileBroadcastGroupControl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup;
import org.apache.activemq.artemis.logs.AuditLogger;
import java.io.File;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
public class JGroupsFileBroadcastGroupControlImpl extends BaseBroadcastGroupControlImpl implements JGroupsFileBroadcastGroupControl {
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
private JGroupsFileBroadcastEndpointFactory endpointFactory;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
public JGroupsFileBroadcastGroupControlImpl(final BroadcastGroup broadcastGroup,
final StorageManager storageManager,
final BroadcastGroupConfiguration configuration,
final JGroupsFileBroadcastEndpointFactory endpointFactory) throws Exception {
super(JGroupsFileBroadcastGroupControl.class, broadcastGroup, storageManager, configuration);
this.endpointFactory = endpointFactory;
}
// BroadcastGroupControlMBean implementation ---------------------
@Override
public String getFileContents() throws Exception {
if (AuditLogger.isEnabled()) {
AuditLogger.getFileContents(this.getBroadcastGroup());
}
URL resource = this.getClass().getClassLoader().getResource(this.getFile());
File file = new File(resource.getFile());
return new String(Files.readAllBytes(Paths.get(file.getPath())));
}
@Override
public String getChannelName() {
if (AuditLogger.isEnabled()) {
AuditLogger.getChannelName(this.getBroadcastGroup());
}
return endpointFactory.getChannelName();
}
@Override
public String getFile() {
if (AuditLogger.isEnabled()) {
AuditLogger.getFile(this.getBroadcastGroup());
}
return endpointFactory.getFile();
}
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
}

View File

@ -34,16 +34,20 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration; import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.JGroupsFileBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.JsonUtil; import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.management.AcceptorControl; import org.apache.activemq.artemis.api.core.management.AcceptorControl;
import org.apache.activemq.artemis.api.core.management.BaseBroadcastGroupControl;
import org.apache.activemq.artemis.api.core.management.BridgeControl; import org.apache.activemq.artemis.api.core.management.BridgeControl;
import org.apache.activemq.artemis.api.core.management.BroadcastGroupControl;
import org.apache.activemq.artemis.api.core.management.ClusterConnectionControl; import org.apache.activemq.artemis.api.core.management.ClusterConnectionControl;
import org.apache.activemq.artemis.api.core.management.DivertControl; import org.apache.activemq.artemis.api.core.management.DivertControl;
import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.api.core.management.ManagementHelper;
@ -60,6 +64,8 @@ import org.apache.activemq.artemis.core.management.impl.BridgeControlImpl;
import org.apache.activemq.artemis.core.management.impl.BroadcastGroupControlImpl; import org.apache.activemq.artemis.core.management.impl.BroadcastGroupControlImpl;
import org.apache.activemq.artemis.core.management.impl.ClusterConnectionControlImpl; import org.apache.activemq.artemis.core.management.impl.ClusterConnectionControlImpl;
import org.apache.activemq.artemis.core.management.impl.DivertControlImpl; import org.apache.activemq.artemis.core.management.impl.DivertControlImpl;
import org.apache.activemq.artemis.core.management.impl.JGroupsChannelBroadcastGroupControlImpl;
import org.apache.activemq.artemis.core.management.impl.JGroupsFileBroadcastGroupControlImpl;
import org.apache.activemq.artemis.core.management.impl.QueueControlImpl; import org.apache.activemq.artemis.core.management.impl.QueueControlImpl;
import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.messagecounter.MessageCounter; import org.apache.activemq.artemis.core.messagecounter.MessageCounter;
@ -339,7 +345,15 @@ public class ManagementServiceImpl implements ManagementService {
final BroadcastGroupConfiguration configuration) throws Exception { final BroadcastGroupConfiguration configuration) throws Exception {
broadcastGroup.setNotificationService(this); broadcastGroup.setNotificationService(this);
ObjectName objectName = objectNameBuilder.getBroadcastGroupObjectName(configuration.getName()); ObjectName objectName = objectNameBuilder.getBroadcastGroupObjectName(configuration.getName());
BroadcastGroupControl control = new BroadcastGroupControlImpl(broadcastGroup, storageManager, configuration); BroadcastEndpointFactory endpointFactory = configuration.getEndpointFactory();
BaseBroadcastGroupControl control = null;
if (endpointFactory instanceof UDPBroadcastEndpointFactory) {
control = new BroadcastGroupControlImpl(broadcastGroup, storageManager, configuration, (UDPBroadcastEndpointFactory) endpointFactory);
} else if (endpointFactory instanceof JGroupsFileBroadcastEndpointFactory) {
control = new JGroupsFileBroadcastGroupControlImpl(broadcastGroup, storageManager, configuration, (JGroupsFileBroadcastEndpointFactory) endpointFactory);
} else if (endpointFactory instanceof ChannelBroadcastEndpointFactory) {
control = new JGroupsChannelBroadcastGroupControlImpl(broadcastGroup, storageManager, configuration, (ChannelBroadcastEndpointFactory) endpointFactory);
}
registerInJMX(objectName, control); registerInJMX(objectName, control);
registerInRegistry(ResourceNames.BROADCAST_GROUP + configuration.getName(), control); registerInRegistry(ResourceNames.BROADCAST_GROUP + configuration.getName(), control);
} }

View File

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.management;
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
import org.apache.activemq.artemis.api.core.management.JGroupsChannelBroadcastGroupControl;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
import org.jgroups.JChannel;
import org.jgroups.conf.PlainConfigurator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import javax.json.JsonArray;
import java.util.ArrayList;
import java.util.List;
public class JGroupsChannelBroadcastGroupControlTest extends ManagementTestBase {
private ActiveMQServer server;
BroadcastGroupConfiguration broadcastGroupConfig;
JGroupsChannelBroadcastGroupControl broadcastGroupControl;
@After
public void cleanupJChannel() {
JChannelManager.getInstance().clear();
}
@Before
public void prepareJChannel() {
JChannelManager.getInstance().setLoopbackMessages(true);
}
@Rule
public ThreadLeakCheckRule threadLeakCheckRule = new ThreadLeakCheckRule();
private final String jgroupsConfigString = "UDP(oob_thread_pool.max_threads=300;" + "bind_addr=127.0.0.1;oob_thread_pool.keep_alive_time=1000;" + "max_bundle_size=31k;mcast_send_buf_size=640000;" + "internal_thread_pool.keep_alive_time=60000;" + "internal_thread_pool.rejection_policy=discard;" + "mcast_recv_buf_size=25000000;bind_port=55200;" + "internal_thread_pool.queue_max_size=100;" + "mcast_port=45688;thread_pool.min_threads=20;" + "oob_thread_pool.rejection_policy=discard;" + "thread_pool.max_threads=300;enable_diagnostics=false;" + "thread_pool.enabled=true;internal_thread_pool.queue_enabled=true;" + "ucast_recv_buf_size=20000000;ucast_send_buf_size=640000;" + "internal_thread_pool.enabled=true;oob_thread_pool.enabled=true;" + "ip_ttl=2;thread_pool.rejection_policy=discard;thread_pool.keep_alive_time=5000;" + "internal_thread_pool.max_threads=10;thread_pool.queue_enabled=true;" + "mcast_addr=230.0.0.4;singleton_name=udp;max_bundle_timeout=30;" + "oob_thread_pool.queue_enabled=false;internal_thread_pool.min_threads=1;" + "bundler_type=old;oob_thread_pool.min_threads=20;" + "thread_pool.queue_max_size=1000):PING(num_initial_members=3;" + "timeout=2000):MERGE3(min_interval=20000;max_interval=100000)" + ":FD_SOCK(bind_addr=127.0.0.1;start_port=54200):FD_ALL(interval=3000;" + "timeout=15000):VERIFY_SUSPECT(bind_addr=127.0.0.1;" + "timeout=1500):pbcast.NAKACK2(max_msg_batch_size=100;" + "xmit_table_msgs_per_row=10000;xmit_table_max_compaction_time=10000;" + "xmit_table_num_rows=100;xmit_interval=1000):UNICAST3(xmit_table_msgs_per_row=10000;" + "xmit_table_max_compaction_time=10000;xmit_table_num_rows=20)" + ":pbcast.STABLE(desired_avg_gossip=50000;max_bytes=400000;" + "stability_delay=1000):pbcast.GMS(print_local_addr=true;" + "view_bundling=true;join_timeout=3000;view_ack_collection_timeout=5000;" + "resume_task_timeout=7500):UFC(max_credits=1m;min_threshold=0.40)" + ":MFC(max_credits=1m;min_threshold=0.40):FRAG2(frag_size=30k)" + ":RSVP(resend_interval=500;ack_on_delivery=false;timeout=60000)";
@Test
public void testAttributes() throws Exception {
ChannelBroadcastEndpointFactory udpCfg = (ChannelBroadcastEndpointFactory) broadcastGroupConfig.getEndpointFactory();
Assert.assertEquals(broadcastGroupConfig.getName(), broadcastGroupControl.getName());
Assert.assertEquals(udpCfg.getChannelName(), broadcastGroupControl.getChannelName());
Assert.assertEquals(broadcastGroupConfig.getBroadcastPeriod(), broadcastGroupControl.getBroadcastPeriod());
Object[] connectorPairs = broadcastGroupControl.getConnectorPairs();
Assert.assertEquals(1, connectorPairs.length);
String connectorPairData = (String) connectorPairs[0];
Assert.assertEquals(broadcastGroupConfig.getConnectorInfos().get(0), connectorPairData);
String jsonString = broadcastGroupControl.getConnectorPairsAsJSON();
Assert.assertNotNull(jsonString);
JsonArray array = JsonUtil.readJsonArray(jsonString);
Assert.assertEquals(1, array.size());
Assert.assertEquals(broadcastGroupConfig.getConnectorInfos().get(0), array.getString(0));
Assert.assertTrue(broadcastGroupControl.isStarted());
}
protected JGroupsChannelBroadcastGroupControl createManagementControl(final String name) throws Exception {
return ManagementControlHelper.createJgroupsChannelBroadcastGroupControl(name, mbeanServer);
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
TransportConfiguration connectorConfiguration = new TransportConfiguration(NETTY_CONNECTOR_FACTORY);
List<String> connectorInfos = new ArrayList<>();
connectorInfos.add(connectorConfiguration.getName());
PlainConfigurator configurator = new PlainConfigurator(jgroupsConfigString);
JChannel channel = new JChannel(configurator);
String channelName1 = "channel1";
ChannelBroadcastEndpointFactory endpointFactory = new ChannelBroadcastEndpointFactory(channel, channelName1);
broadcastGroupConfig = new BroadcastGroupConfiguration().setName(RandomUtil.randomString()).setBroadcastPeriod(RandomUtil.randomPositiveInt()).setConnectorInfos(connectorInfos).setEndpointFactory(endpointFactory);
Configuration config = createDefaultInVMConfig().setJMXManagementEnabled(true).addConnectorConfiguration(connectorConfiguration.getName(), connectorConfiguration).addBroadcastGroupConfiguration(broadcastGroupConfig);
server = addServer(ActiveMQServers.newActiveMQServer(config, mbeanServer, false));
server.start();
broadcastGroupControl = createManagementControl(broadcastGroupConfig.getName());
}
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.management;
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
import org.apache.activemq.artemis.api.core.JGroupsFileBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.management.JGroupsFileBroadcastGroupControl;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.json.JsonArray;
import java.util.ArrayList;
import java.util.List;
public class JGroupsFileBroadcastGroupControlTest extends ManagementTestBase {
private ActiveMQServer server;
BroadcastGroupConfiguration broadcastGroupConfig;
JGroupsFileBroadcastGroupControl broadcastGroupControl;
@Test
public void testAttributes() throws Exception {
JGroupsFileBroadcastEndpointFactory udpCfg = (JGroupsFileBroadcastEndpointFactory) broadcastGroupConfig.getEndpointFactory();
Assert.assertEquals(broadcastGroupConfig.getName(), broadcastGroupControl.getName());
Assert.assertEquals(udpCfg.getChannelName(), broadcastGroupControl.getChannelName());
Assert.assertEquals(udpCfg.getFile(), broadcastGroupControl.getFile());
Assert.assertEquals(broadcastGroupConfig.getBroadcastPeriod(), broadcastGroupControl.getBroadcastPeriod());
Object[] connectorPairs = broadcastGroupControl.getConnectorPairs();
Assert.assertEquals(1, connectorPairs.length);
String connectorPairData = (String) connectorPairs[0];
Assert.assertEquals(broadcastGroupConfig.getConnectorInfos().get(0), connectorPairData);
String jsonString = broadcastGroupControl.getConnectorPairsAsJSON();
Assert.assertNotNull(jsonString);
JsonArray array = JsonUtil.readJsonArray(jsonString);
Assert.assertEquals(1, array.size());
Assert.assertEquals(broadcastGroupConfig.getConnectorInfos().get(0), array.getString(0));
Assert.assertTrue(broadcastGroupControl.isStarted());
}
protected JGroupsFileBroadcastGroupControl createManagementControl(final String name) throws Exception {
return ManagementControlHelper.createJgroupsFileBroadcastGroupControl(name, mbeanServer);
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
TransportConfiguration connectorConfiguration = new TransportConfiguration(NETTY_CONNECTOR_FACTORY);
List<String> connectorInfos = new ArrayList<>();
connectorInfos.add(connectorConfiguration.getName());
broadcastGroupConfig = new BroadcastGroupConfiguration().setName(RandomUtil.randomString()).setBroadcastPeriod(RandomUtil.randomPositiveInt()).setConnectorInfos(connectorInfos).setEndpointFactory(new JGroupsFileBroadcastEndpointFactory().setChannelName("myChannel").setFile("test-jgroups-file_ping.xml"));
Configuration config = createDefaultInVMConfig().setJMXManagementEnabled(true).addConnectorConfiguration(connectorConfiguration.getName(), connectorConfiguration).addBroadcastGroupConfiguration(broadcastGroupConfig);
server = addServer(ActiveMQServers.newActiveMQServer(config, mbeanServer, false));
server.start();
broadcastGroupControl = createManagementControl(broadcastGroupConfig.getName());
}
}

View File

@ -29,6 +29,8 @@ import org.apache.activemq.artemis.api.core.management.BridgeControl;
import org.apache.activemq.artemis.api.core.management.BroadcastGroupControl; import org.apache.activemq.artemis.api.core.management.BroadcastGroupControl;
import org.apache.activemq.artemis.api.core.management.ClusterConnectionControl; import org.apache.activemq.artemis.api.core.management.ClusterConnectionControl;
import org.apache.activemq.artemis.api.core.management.DivertControl; import org.apache.activemq.artemis.api.core.management.DivertControl;
import org.apache.activemq.artemis.api.core.management.JGroupsChannelBroadcastGroupControl;
import org.apache.activemq.artemis.api.core.management.JGroupsFileBroadcastGroupControl;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.api.core.management.QueueControl; import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
@ -51,6 +53,16 @@ public class ManagementControlHelper {
return (BroadcastGroupControl) ManagementControlHelper.createProxy(ObjectNameBuilder.DEFAULT.getBroadcastGroupObjectName(name), BroadcastGroupControl.class, mbeanServer); return (BroadcastGroupControl) ManagementControlHelper.createProxy(ObjectNameBuilder.DEFAULT.getBroadcastGroupObjectName(name), BroadcastGroupControl.class, mbeanServer);
} }
public static JGroupsFileBroadcastGroupControl createJgroupsFileBroadcastGroupControl(final String name,
final MBeanServer mbeanServer) throws Exception {
return (JGroupsFileBroadcastGroupControl) ManagementControlHelper.createProxy(ObjectNameBuilder.DEFAULT.getBroadcastGroupObjectName(name), JGroupsFileBroadcastGroupControl.class, mbeanServer);
}
public static JGroupsChannelBroadcastGroupControl createJgroupsChannelBroadcastGroupControl(final String name,
final MBeanServer mbeanServer) throws Exception {
return (JGroupsChannelBroadcastGroupControl) ManagementControlHelper.createProxy(ObjectNameBuilder.DEFAULT.getBroadcastGroupObjectName(name), JGroupsChannelBroadcastGroupControl.class, mbeanServer);
}
public static BridgeControl createBridgeControl(final String name, final MBeanServer mbeanServer) throws Exception { public static BridgeControl createBridgeControl(final String name, final MBeanServer mbeanServer) throws Exception {
return (BridgeControl) ManagementControlHelper.createProxy(ObjectNameBuilder.DEFAULT.getBridgeObjectName(name), BridgeControl.class, mbeanServer); return (BridgeControl) ManagementControlHelper.createProxy(ObjectNameBuilder.DEFAULT.getBridgeObjectName(name), BridgeControl.class, mbeanServer);
} }