mirror of
https://github.com/apache/activemq-artemis.git
synced 2025-02-20 17:05:51 +00:00
ARTEMIS-2413 upgrade JGroups
JGroups 3.x hasn't been updated in some time now. The last release was in April 2020 almost 2 years ago. Lots of protocols have been updated and added and users are wanting to use them. There is also increasing concern about using older components triggered mainly by other recently-discovered high-profile vulnerabilities in the wider Open Source Java community. This commit bumps JGroups up to the latest release - 5.2.0.Final. However, there is a cost associated with upgrading. The old-style properties configuration is no longer supported. I think it's unlikely that end-users are leveraging this because it is not exposed via broker.xml. The JGroups XML configuration has been around for a long time, is widely adopted, and is still supported. I expect most (if not all) users are using this. However, a handful of tests needed to be updated and/or removed to deal with this absence. Some protocols and/or protocol properties are no longer supported. This means that users may have to change their JGroups stack configurations when they upgrade. For example, our own clustered-jgroups example had to be updated or it wouldn't run properly.
This commit is contained in:
parent
263b723726
commit
9c459eb313
@ -53,9 +53,9 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint {
|
||||
if (logger.isTraceEnabled())
|
||||
logger.trace("Broadcasting: BroadCastOpened=" + broadcastOpened + ", channelOPen=" + channel.getChannel().isOpen());
|
||||
if (broadcastOpened) {
|
||||
org.jgroups.Message msg = new org.jgroups.Message();
|
||||
org.jgroups.Message msg = new org.jgroups.BytesMessage();
|
||||
|
||||
msg.setBuffer(data);
|
||||
msg.setArray(data);
|
||||
|
||||
channel.send(msg);
|
||||
}
|
||||
|
@ -43,6 +43,6 @@ public final class JGroupsFileBroadcastEndpoint extends JGroupsBroadcastEndpoint
|
||||
throw new RuntimeException("couldn't find JGroups configuration " + file);
|
||||
}
|
||||
|
||||
return new JChannel(configURL);
|
||||
return new JChannel(configURL.openStream());
|
||||
}
|
||||
}
|
||||
|
@ -1,43 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.api.core;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
|
||||
import org.jgroups.JChannel;
|
||||
import org.jgroups.conf.PlainConfigurator;
|
||||
|
||||
/**
|
||||
* This class is the implementation of ActiveMQ Artemis members discovery that will use JGroups.
|
||||
*/
|
||||
public final class JGroupsPropertiesBroadcastEndpoint extends JGroupsBroadcastEndpoint {
|
||||
|
||||
private String properties;
|
||||
|
||||
public JGroupsPropertiesBroadcastEndpoint(final JChannelManager manager,
|
||||
final String properties,
|
||||
final String channelName) throws Exception {
|
||||
super(manager, channelName);
|
||||
this.properties = properties;
|
||||
}
|
||||
|
||||
@Override
|
||||
public JChannel createChannel() throws Exception {
|
||||
PlainConfigurator configurator = new PlainConfigurator(properties);
|
||||
return new JChannel(configurator);
|
||||
}
|
||||
}
|
||||
|
@ -1,51 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.api.core;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
|
||||
|
||||
public class JGroupsPropertiesBroadcastEndpointFactory implements BroadcastEndpointFactory {
|
||||
|
||||
private String properties;
|
||||
|
||||
private String channelName;
|
||||
|
||||
private final JChannelManager manager = JChannelManager.getInstance();
|
||||
|
||||
@Override
|
||||
public BroadcastEndpoint createBroadcastEndpoint() throws Exception {
|
||||
return new JGroupsPropertiesBroadcastEndpoint(manager, properties, channelName).initChannel();
|
||||
}
|
||||
|
||||
public String getProperties() {
|
||||
return properties;
|
||||
}
|
||||
|
||||
public JGroupsPropertiesBroadcastEndpointFactory setProperties(String properties) {
|
||||
this.properties = properties;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getChannelName() {
|
||||
return channelName;
|
||||
}
|
||||
|
||||
public JGroupsPropertiesBroadcastEndpointFactory setChannelName(String channelName) {
|
||||
this.channelName = channelName;
|
||||
return this;
|
||||
}
|
||||
}
|
@ -23,7 +23,7 @@ import java.util.List;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.jgroups.JChannel;
|
||||
import org.jgroups.Message;
|
||||
import org.jgroups.ReceiverAdapter;
|
||||
import org.jgroups.Receiver;
|
||||
|
||||
/**
|
||||
* This class wraps a JChannel with a reference counter. The reference counter
|
||||
@ -52,7 +52,7 @@ public class JChannelWrapper {
|
||||
}
|
||||
|
||||
//we always add this for the first ref count
|
||||
channel.setReceiver(new ReceiverAdapter() {
|
||||
channel.setReceiver(new Receiver() {
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
@ -139,7 +139,7 @@ public class JChannelWrapper {
|
||||
if (logger.isTraceEnabled())
|
||||
logger.trace(this + "::Sending JGroups Message: Open=" + channel.isOpen() + " on channel " + channelName + " msg=" + msg);
|
||||
if (!manager.isLoopbackMessages()) {
|
||||
msg.setTransientFlag(Message.TransientFlag.DONT_LOOPBACK);
|
||||
msg.setFlag(Message.TransientFlag.DONT_LOOPBACK);
|
||||
}
|
||||
channel.send(msg);
|
||||
}
|
||||
|
@ -22,13 +22,13 @@ import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.jboss.logging.Logger;
|
||||
import org.jgroups.ReceiverAdapter;
|
||||
import org.jgroups.Receiver;
|
||||
|
||||
/**
|
||||
* This class is used to receive messages from a JGroups channel.
|
||||
* Incoming messages are put into a queue.
|
||||
*/
|
||||
public class JGroupsReceiver extends ReceiverAdapter {
|
||||
public class JGroupsReceiver implements Receiver {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(JGroupsReceiver.class);
|
||||
|
||||
@ -38,7 +38,7 @@ public class JGroupsReceiver extends ReceiverAdapter {
|
||||
public void receive(org.jgroups.Message msg) {
|
||||
if (logger.isTraceEnabled())
|
||||
logger.trace("sending message " + msg);
|
||||
dequeue.add(msg.getBuffer());
|
||||
dequeue.add(msg.getArray());
|
||||
}
|
||||
|
||||
public byte[] receiveBroadcast() throws Exception {
|
||||
|
@ -23,7 +23,6 @@ import java.util.Map;
|
||||
import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
|
||||
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.JGroupsFileBroadcastEndpointFactory;
|
||||
import org.apache.activemq.artemis.api.core.JGroupsPropertiesBroadcastEndpointFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.utils.uri.BeanSupport;
|
||||
@ -56,8 +55,6 @@ public class JGroupsServerLocatorSchema extends AbstractServerLocatorSchema {
|
||||
String auth;
|
||||
if (endpoint instanceof JGroupsFileBroadcastEndpointFactory) {
|
||||
auth = ((JGroupsFileBroadcastEndpointFactory) endpoint).getChannelName();
|
||||
} else if (endpoint instanceof JGroupsPropertiesBroadcastEndpointFactory) {
|
||||
auth = ((JGroupsPropertiesBroadcastEndpointFactory) endpoint).getChannelName();
|
||||
} else {
|
||||
throw new NotSerializableException(endpoint + "not serializable");
|
||||
}
|
||||
@ -70,12 +67,7 @@ public class JGroupsServerLocatorSchema extends AbstractServerLocatorSchema {
|
||||
Map<String, String> query,
|
||||
String name) throws Exception {
|
||||
BroadcastEndpointFactory endpointFactory;
|
||||
if (query.containsKey("file")) {
|
||||
endpointFactory = new JGroupsFileBroadcastEndpointFactory().setChannelName(uri.getAuthority());
|
||||
} else {
|
||||
endpointFactory = new JGroupsPropertiesBroadcastEndpointFactory().setChannelName(uri.getAuthority());
|
||||
}
|
||||
|
||||
endpointFactory = new JGroupsFileBroadcastEndpointFactory().setChannelName(uri.getAuthority());
|
||||
BeanSupport.setData(uri, endpointFactory, query);
|
||||
|
||||
DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration().setName(name).setBroadcastEndpointFactory(endpointFactory);
|
||||
|
@ -47,13 +47,14 @@
|
||||
</feature>
|
||||
|
||||
<feature name="artemis-common" version="${pom.version}" description="ActiveMQ Artemis Common libraries">
|
||||
<feature prerequisite="true">wrap</feature>
|
||||
<feature>transaction</feature>
|
||||
<feature>netty-core</feature>
|
||||
<bundle dependency="true">mvn:commons-beanutils/commons-beanutils/${commons.beanutils.version}</bundle>
|
||||
<bundle dependency="true">mvn:commons-collections/commons-collections/${commons.collections.version}</bundle>
|
||||
|
||||
<bundle dependency="true">mvn:org.jboss.logging/jboss-logging/${jboss.logging.version}</bundle>
|
||||
<bundle dependency="true">mvn:org.jgroups/jgroups/${jgroups.version}</bundle>
|
||||
<bundle dependency="true">wrap:mvn:org.jgroups/jgroups/${jgroups.version}</bundle>
|
||||
</feature>
|
||||
|
||||
<feature name="artemis-core" version="${pom.version}" description="ActiveMQ Artemis broker libraries">
|
||||
|
@ -24,7 +24,6 @@ import java.util.Map;
|
||||
import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
|
||||
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.JGroupsFileBroadcastEndpointFactory;
|
||||
import org.apache.activemq.artemis.api.core.JGroupsPropertiesBroadcastEndpointFactory;
|
||||
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.uri.schema.serverLocator.JGroupsServerLocatorSchema;
|
||||
@ -62,8 +61,6 @@ public class JGroupsSchema extends AbstractCFSchema {
|
||||
String auth;
|
||||
if (endpoint instanceof JGroupsFileBroadcastEndpointFactory) {
|
||||
auth = ((JGroupsFileBroadcastEndpointFactory) endpoint).getChannelName();
|
||||
} else if (endpoint instanceof JGroupsPropertiesBroadcastEndpointFactory) {
|
||||
auth = ((JGroupsPropertiesBroadcastEndpointFactory) endpoint).getChannelName();
|
||||
} else {
|
||||
throw new NotSerializableException(endpoint + "not serializable");
|
||||
}
|
||||
|
@ -33,7 +33,6 @@ import java.util.Set;
|
||||
import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
|
||||
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.JGroupsFileBroadcastEndpointFactory;
|
||||
import org.apache.activemq.artemis.api.core.JGroupsPropertiesBroadcastEndpointFactory;
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
|
||||
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
|
||||
@ -353,16 +352,6 @@ public class ConnectionFactoryURITest {
|
||||
Assert.assertEquals(broadcastEndpointFactory.getChannelName(), "channel-name");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJGroupsKeyValue() throws Exception {
|
||||
ActiveMQConnectionFactory factory = parser.newObject(new URI("jgroups://channel-name?properties=param=value;param2=value2&test=33"), null);
|
||||
|
||||
Assert.assertTrue(ActiveMQJMSConnectionFactory.class.getName().equals(factory.getClass().getName()));
|
||||
JGroupsPropertiesBroadcastEndpointFactory broadcastEndpointFactory = (JGroupsPropertiesBroadcastEndpointFactory) factory.getDiscoveryGroupConfiguration().getBroadcastEndpointFactory();
|
||||
Assert.assertEquals(broadcastEndpointFactory.getProperties(), "param=value;param2=value2");
|
||||
Assert.assertEquals(broadcastEndpointFactory.getChannelName(), "channel-name");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJGroupsAllProperties() throws Exception {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
@ -398,30 +387,6 @@ public class ConnectionFactoryURITest {
|
||||
checkEquals(bean, connectionFactoryWithHA, factory);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJGroupsPropertiesURI() throws Exception {
|
||||
DiscoveryGroupConfiguration discoveryGroupConfiguration = new DiscoveryGroupConfiguration();
|
||||
JGroupsPropertiesBroadcastEndpointFactory endpointFactory = new JGroupsPropertiesBroadcastEndpointFactory().setChannelName("channel-name").setProperties("param=val,param2-val2");
|
||||
discoveryGroupConfiguration.setName("foo").setRefreshTimeout(12345).setDiscoveryInitialWaitTimeout(5678).setBroadcastEndpointFactory(endpointFactory);
|
||||
ActiveMQConnectionFactory connectionFactoryWithHA = ActiveMQJMSClient.createConnectionFactoryWithHA(discoveryGroupConfiguration, JMSFactoryType.CF);
|
||||
URI tcp = parser.createSchema("jgroups", connectionFactoryWithHA);
|
||||
ActiveMQConnectionFactory factory = parser.newObject(tcp, null);
|
||||
DiscoveryGroupConfiguration dgc = factory.getDiscoveryGroupConfiguration();
|
||||
Assert.assertNotNull(dgc);
|
||||
BroadcastEndpointFactory broadcastEndpointFactory = dgc.getBroadcastEndpointFactory();
|
||||
Assert.assertNotNull(broadcastEndpointFactory);
|
||||
Assert.assertTrue(broadcastEndpointFactory instanceof JGroupsPropertiesBroadcastEndpointFactory);
|
||||
Assert.assertEquals(dgc.getName(), "foo");
|
||||
Assert.assertEquals(dgc.getDiscoveryInitialWaitTimeout(), 5678);
|
||||
Assert.assertEquals(dgc.getRefreshTimeout(), 12345);
|
||||
JGroupsPropertiesBroadcastEndpointFactory propertiesBroadcastEndpointFactory = (JGroupsPropertiesBroadcastEndpointFactory) broadcastEndpointFactory;
|
||||
Assert.assertEquals(propertiesBroadcastEndpointFactory.getProperties(), "param=val,param2-val2");
|
||||
Assert.assertEquals(propertiesBroadcastEndpointFactory.getChannelName(), "channel-name");
|
||||
|
||||
BeanUtilsBean bean = new BeanUtilsBean();
|
||||
checkEquals(bean, connectionFactoryWithHA, factory);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCacheDestinations() throws Exception {
|
||||
ActiveMQConnectionFactory factory = parser.newObject(new URI("tcp://localhost:3030"), null);
|
||||
|
@ -1,29 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.management.impl;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.management.Attribute;
|
||||
import org.apache.activemq.artemis.api.core.management.JGroupsChannelBroadcastGroupControl;
|
||||
|
||||
public interface JGroupsPropertiesBroadcastGroupControl extends JGroupsChannelBroadcastGroupControl {
|
||||
|
||||
/**
|
||||
* Returns the JGroups properties
|
||||
*/
|
||||
@Attribute(desc = "Returns the JGroups properties")
|
||||
String getProperties();
|
||||
}
|
@ -1,56 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.management.impl;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.JGroupsPropertiesBroadcastEndpointFactory;
|
||||
import org.apache.activemq.artemis.api.core.management.JGroupsChannelBroadcastGroupControl;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup;
|
||||
import org.apache.activemq.artemis.logs.AuditLogger;
|
||||
|
||||
public class JGroupsPropertiesBroadcastGroupControlImpl extends BaseBroadcastGroupControlImpl implements JGroupsPropertiesBroadcastGroupControl {
|
||||
|
||||
|
||||
private JGroupsPropertiesBroadcastEndpointFactory endpointFactory;
|
||||
|
||||
|
||||
public JGroupsPropertiesBroadcastGroupControlImpl(final BroadcastGroup broadcastGroup,
|
||||
final StorageManager storageManager,
|
||||
final BroadcastGroupConfiguration configuration,
|
||||
final JGroupsPropertiesBroadcastEndpointFactory endpointFactory) throws Exception {
|
||||
super(JGroupsChannelBroadcastGroupControl.class, broadcastGroup, storageManager, configuration);
|
||||
this.endpointFactory = endpointFactory;
|
||||
}
|
||||
|
||||
// BroadcastGroupControlMBean implementation ---------------------
|
||||
|
||||
@Override
|
||||
public String getChannelName() throws Exception {
|
||||
if (AuditLogger.isBaseLoggingEnabled()) {
|
||||
AuditLogger.getChannelName(this.endpointFactory.getChannelName());
|
||||
}
|
||||
return endpointFactory.getChannelName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getProperties() {
|
||||
return endpointFactory.getProperties();
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -39,7 +39,6 @@ import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory;
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
import org.apache.activemq.artemis.api.core.JGroupsFileBroadcastEndpointFactory;
|
||||
import org.apache.activemq.artemis.api.core.JGroupsPropertiesBroadcastEndpointFactory;
|
||||
import org.apache.activemq.artemis.api.core.JsonUtil;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
@ -72,7 +71,6 @@ import org.apache.activemq.artemis.core.management.impl.ClusterConnectionControl
|
||||
import org.apache.activemq.artemis.core.management.impl.DivertControlImpl;
|
||||
import org.apache.activemq.artemis.core.management.impl.JGroupsChannelBroadcastGroupControlImpl;
|
||||
import org.apache.activemq.artemis.core.management.impl.JGroupsFileBroadcastGroupControlImpl;
|
||||
import org.apache.activemq.artemis.core.management.impl.JGroupsPropertiesBroadcastGroupControlImpl;
|
||||
import org.apache.activemq.artemis.core.management.impl.QueueControlImpl;
|
||||
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
||||
import org.apache.activemq.artemis.core.messagecounter.MessageCounter;
|
||||
@ -439,8 +437,6 @@ public class ManagementServiceImpl implements ManagementService {
|
||||
control = new JGroupsFileBroadcastGroupControlImpl(broadcastGroup, storageManager, configuration, (JGroupsFileBroadcastEndpointFactory) endpointFactory);
|
||||
} else if (endpointFactory instanceof ChannelBroadcastEndpointFactory) {
|
||||
control = new JGroupsChannelBroadcastGroupControlImpl(broadcastGroup, storageManager, configuration, (ChannelBroadcastEndpointFactory) endpointFactory);
|
||||
} else if (endpointFactory instanceof JGroupsPropertiesBroadcastEndpointFactory) {
|
||||
control = new JGroupsPropertiesBroadcastGroupControlImpl(broadcastGroup, storageManager, configuration, (JGroupsPropertiesBroadcastEndpointFactory) endpointFactory);
|
||||
} else {
|
||||
control = new BaseBroadcastGroupControlImpl(broadcastGroup, storageManager, configuration);
|
||||
}
|
||||
|
@ -24,7 +24,7 @@ import org.apache.activemq.artemis.service.extensions.xa.ActiveMQXAResourceWrapp
|
||||
import org.apache.activemq.artemis.service.extensions.xa.ActiveMQXAResourceWrapperImpl;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.jgroups.util.Util.assertEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class ActiveMQXAResourceWrapperImplTest {
|
||||
|
||||
|
@ -21,7 +21,7 @@ import java.lang.reflect.Method;
|
||||
import org.apache.activemq.artemis.service.extensions.ServiceUtils;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.jgroups.util.Util.assertTrue;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class ServiceUtilsTest {
|
||||
|
||||
|
@ -167,12 +167,11 @@ The `udp` scheme supports 4 properties:
|
||||
connection. The default value for this parameter is 10000 milliseconds.
|
||||
|
||||
Lastly, the `jgroups` scheme is supported which provides an alternative to the
|
||||
`udp` scheme for server discovery. The URL pattern is either
|
||||
`udp` scheme for server discovery. The URL pattern is
|
||||
`jgroups://channelName?file=jgroups-xml-conf-filename`
|
||||
where`jgroups-xml-conf-filename` refers to an XML file on the classpath that
|
||||
contains the JGroups configuration or it can be
|
||||
`jgroups://channelName?properties=some-jgroups-properties`. In both instance
|
||||
the `channelName` is the name given to the jgroups channel created.
|
||||
contains the JGroups configuration. The `channelName` is the name given to the
|
||||
jgroups channel created.
|
||||
|
||||
The `refreshTimeout` and `discoveryInitialWaitTimeout` properties are supported
|
||||
just like with `udp`.
|
||||
|
@ -15,53 +15,32 @@
|
||||
limitations under the License.
|
||||
-->
|
||||
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns="urn:org:jgroups"
|
||||
xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups.xsd">
|
||||
<TCP loopback="true"
|
||||
recv_buf_size="${tcp.recv_buf_size:5M}"
|
||||
send_buf_size="${tcp.send_buf_size:5M}"
|
||||
max_bundle_size="64K"
|
||||
max_bundle_timeout="30"
|
||||
use_send_queues="true"
|
||||
sock_conn_timeout="300"
|
||||
|
||||
timer_type="new3"
|
||||
timer.min_threads="4"
|
||||
timer.max_threads="10"
|
||||
timer.keep_alive_time="3000"
|
||||
timer.queue_max_size="500"
|
||||
|
||||
thread_pool.enabled="true"
|
||||
thread_pool.min_threads="2"
|
||||
thread_pool.max_threads="8"
|
||||
thread_pool.keep_alive_time="5000"
|
||||
thread_pool.queue_enabled="true"
|
||||
thread_pool.queue_max_size="10000"
|
||||
thread_pool.rejection_policy="discard"
|
||||
|
||||
oob_thread_pool.enabled="true"
|
||||
oob_thread_pool.min_threads="1"
|
||||
oob_thread_pool.max_threads="8"
|
||||
oob_thread_pool.keep_alive_time="5000"
|
||||
oob_thread_pool.queue_enabled="false"
|
||||
oob_thread_pool.queue_max_size="100"
|
||||
oob_thread_pool.rejection_policy="discard"/>
|
||||
xmlns="urn:org:jgroups"
|
||||
xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups.xsd">
|
||||
<TCP bind_addr="${jgroups.bind_addr:site_local}"
|
||||
bind_port="${jgroups.bind_port:7800}"
|
||||
external_addr="${jgroups.external_addr}"
|
||||
external_port="${jgroups.external_port}"
|
||||
thread_pool.min_threads="0"
|
||||
thread_pool.max_threads="200"
|
||||
thread_pool.keep_alive_time="30000"/>
|
||||
<RED/>
|
||||
|
||||
<!-- a location that can be found by both server's running -->
|
||||
<FILE_PING location="../../file.ping.dir"/>
|
||||
<MERGE3 min_interval="10000"
|
||||
max_interval="30000"/>
|
||||
<FD_SOCK/>
|
||||
<FD timeout="3000" max_tries="3" />
|
||||
<VERIFY_SUSPECT timeout="1500" />
|
||||
<FD_SOCK2/>
|
||||
<FD_ALL3 timeout="40000" interval="5000" />
|
||||
<VERIFY_SUSPECT2 timeout="1500" />
|
||||
<BARRIER />
|
||||
<pbcast.NAKACK2 use_mcast_xmit="false"
|
||||
discard_delivered_msgs="true"/>
|
||||
<pbcast.NAKACK2 use_mcast_xmit="false" />
|
||||
<UNICAST3 />
|
||||
<pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
|
||||
<pbcast.STABLE desired_avg_gossip="50000"
|
||||
max_bytes="4M"/>
|
||||
<pbcast.GMS print_local_addr="true" join_timeout="2000"
|
||||
view_bundling="true"/>
|
||||
<pbcast.GMS print_local_addr="true" join_timeout="2000"/>
|
||||
<UFC max_credits="2M"
|
||||
min_threshold="0.4"/>
|
||||
<MFC max_credits="2M"
|
||||
min_threshold="0.4"/>
|
||||
<FRAG2 frag_size="60K" />
|
||||
|
@ -17,51 +17,30 @@
|
||||
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns="urn:org:jgroups"
|
||||
xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups.xsd">
|
||||
<TCP loopback="true"
|
||||
recv_buf_size="${tcp.recv_buf_size:5M}"
|
||||
send_buf_size="${tcp.send_buf_size:5M}"
|
||||
max_bundle_size="64K"
|
||||
max_bundle_timeout="30"
|
||||
use_send_queues="true"
|
||||
sock_conn_timeout="300"
|
||||
|
||||
timer_type="new3"
|
||||
timer.min_threads="4"
|
||||
timer.max_threads="10"
|
||||
timer.keep_alive_time="3000"
|
||||
timer.queue_max_size="500"
|
||||
|
||||
thread_pool.enabled="true"
|
||||
thread_pool.min_threads="2"
|
||||
thread_pool.max_threads="8"
|
||||
thread_pool.keep_alive_time="5000"
|
||||
thread_pool.queue_enabled="true"
|
||||
thread_pool.queue_max_size="10000"
|
||||
thread_pool.rejection_policy="discard"
|
||||
|
||||
oob_thread_pool.enabled="true"
|
||||
oob_thread_pool.min_threads="1"
|
||||
oob_thread_pool.max_threads="8"
|
||||
oob_thread_pool.keep_alive_time="5000"
|
||||
oob_thread_pool.queue_enabled="false"
|
||||
oob_thread_pool.queue_max_size="100"
|
||||
oob_thread_pool.rejection_policy="discard"/>
|
||||
<TCP bind_addr="${jgroups.bind_addr:site_local}"
|
||||
bind_port="${jgroups.bind_port:7800}"
|
||||
external_addr="${jgroups.external_addr}"
|
||||
external_port="${jgroups.external_port}"
|
||||
thread_pool.min_threads="0"
|
||||
thread_pool.max_threads="200"
|
||||
thread_pool.keep_alive_time="30000"/>
|
||||
<RED/>
|
||||
|
||||
<!-- a location that can be found by both server's running -->
|
||||
<FILE_PING location="../../file.ping.dir"/>
|
||||
<MERGE3 min_interval="10000"
|
||||
max_interval="30000"/>
|
||||
<FD_SOCK/>
|
||||
<FD timeout="3000" max_tries="3" />
|
||||
<VERIFY_SUSPECT timeout="1500" />
|
||||
<FD_SOCK2/>
|
||||
<FD_ALL3 timeout="40000" interval="5000" />
|
||||
<VERIFY_SUSPECT2 timeout="1500" />
|
||||
<BARRIER />
|
||||
<pbcast.NAKACK2 use_mcast_xmit="false"
|
||||
discard_delivered_msgs="true"/>
|
||||
<pbcast.NAKACK2 use_mcast_xmit="false" />
|
||||
<UNICAST3 />
|
||||
<pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
|
||||
<pbcast.STABLE desired_avg_gossip="50000"
|
||||
max_bytes="4M"/>
|
||||
<pbcast.GMS print_local_addr="true" join_timeout="2000"
|
||||
view_bundling="true"/>
|
||||
<pbcast.GMS print_local_addr="true" join_timeout="2000"/>
|
||||
<UFC max_credits="2M"
|
||||
min_threshold="0.4"/>
|
||||
<MFC max_credits="2M"
|
||||
min_threshold="0.4"/>
|
||||
<FRAG2 frag_size="60K" />
|
||||
|
2
pom.xml
2
pom.xml
@ -102,7 +102,7 @@
|
||||
<jsr305.version>3.0.2</jsr305.version>
|
||||
<jboss.logging.version>3.4.2.Final</jboss.logging.version>
|
||||
<jetty.version>9.4.44.v20210927</jetty.version>
|
||||
<jgroups.version>3.6.13.Final</jgroups.version>
|
||||
<jgroups.version>5.2.0.Final</jgroups.version>
|
||||
<errorprone.version>2.10.0</errorprone.version>
|
||||
<maven.enforcer.plugin.version>3.0.0-M3</maven.enforcer.plugin.version>
|
||||
<maven.bundle.plugin.version>5.1.2</maven.bundle.plugin.version>
|
||||
|
@ -22,7 +22,6 @@ import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory;
|
||||
import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
|
||||
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
|
||||
import org.jgroups.JChannel;
|
||||
import org.jgroups.conf.PlainConfigurator;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
@ -44,8 +43,6 @@ public class JGroupsBroadcastTest {
|
||||
@Rule
|
||||
public ThreadLeakCheckRule threadLeakCheckRule = new ThreadLeakCheckRule();
|
||||
|
||||
private final String jgroupsConfigString = "UDP(oob_thread_pool.max_threads=300;" + "bind_addr=127.0.0.1;oob_thread_pool.keep_alive_time=1000;" + "max_bundle_size=31k;mcast_send_buf_size=640000;" + "internal_thread_pool.keep_alive_time=60000;" + "internal_thread_pool.rejection_policy=discard;" + "mcast_recv_buf_size=25000000;bind_port=55200;" + "internal_thread_pool.queue_max_size=100;" + "mcast_port=45688;thread_pool.min_threads=20;" + "oob_thread_pool.rejection_policy=discard;" + "thread_pool.max_threads=300;enable_diagnostics=false;" + "thread_pool.enabled=true;internal_thread_pool.queue_enabled=true;" + "ucast_recv_buf_size=20000000;ucast_send_buf_size=640000;" + "internal_thread_pool.enabled=true;oob_thread_pool.enabled=true;" + "ip_ttl=2;thread_pool.rejection_policy=discard;thread_pool.keep_alive_time=5000;" + "internal_thread_pool.max_threads=10;thread_pool.queue_enabled=true;" + "mcast_addr=230.0.0.4;singleton_name=udp;max_bundle_timeout=30;" + "oob_thread_pool.queue_enabled=false;internal_thread_pool.min_threads=1;" + "bundler_type=old;oob_thread_pool.min_threads=20;" + "thread_pool.queue_max_size=1000):PING(num_initial_members=3;" + "timeout=2000):MERGE3(min_interval=20000;max_interval=100000)" + ":FD_SOCK(bind_addr=127.0.0.1;start_port=54200):FD_ALL(interval=3000;" + "timeout=15000):VERIFY_SUSPECT(bind_addr=127.0.0.1;" + "timeout=1500):pbcast.NAKACK2(max_msg_batch_size=100;" + "xmit_table_msgs_per_row=10000;xmit_table_max_compaction_time=10000;" + "xmit_table_num_rows=100;xmit_interval=1000):UNICAST3(xmit_table_msgs_per_row=10000;" + "xmit_table_max_compaction_time=10000;xmit_table_num_rows=20)" + ":pbcast.STABLE(desired_avg_gossip=50000;max_bytes=400000;" + "stability_delay=1000):pbcast.GMS(print_local_addr=true;" + "view_bundling=true;join_timeout=3000;view_ack_collection_timeout=5000;" + "resume_task_timeout=7500):UFC(max_credits=1m;min_threshold=0.40)" + ":MFC(max_credits=1m;min_threshold=0.40):FRAG2(frag_size=30k)" + ":RSVP(resend_interval=500;ack_on_delivery=false;timeout=60000)";
|
||||
|
||||
@Test
|
||||
public void testRefCount() throws Exception {
|
||||
JChannel channel = null;
|
||||
@ -53,8 +50,7 @@ public class JGroupsBroadcastTest {
|
||||
|
||||
try {
|
||||
|
||||
PlainConfigurator configurator = new PlainConfigurator(jgroupsConfigString);
|
||||
channel = new JChannel(configurator);
|
||||
channel = new JChannel("udp.xml");
|
||||
|
||||
String channelName1 = "channel1";
|
||||
|
||||
@ -88,7 +84,7 @@ public class JGroupsBroadcastTest {
|
||||
} catch (Exception e) {
|
||||
}
|
||||
|
||||
newChannel = new JChannel(configurator);
|
||||
newChannel = new JChannel("udp.xml");
|
||||
|
||||
jgroupsBroadcastCfg1 = new ChannelBroadcastEndpointFactory(newChannel, channelName1);
|
||||
|
||||
|
@ -39,11 +39,9 @@ import java.util.Set;
|
||||
|
||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.BroadcastEndpoint;
|
||||
import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
|
||||
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.JGroupsFileBroadcastEndpoint;
|
||||
import org.apache.activemq.artemis.api.core.JGroupsPropertiesBroadcastEndpointFactory;
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
|
||||
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
|
||||
@ -259,50 +257,6 @@ public class SimpleJNDIClientTest extends ActiveMQTestBase {
|
||||
broadcastEndpoint.close(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoteCFWithJgroupsWithTransportConfigProps() throws Exception {
|
||||
Hashtable<String, String> props = new Hashtable<>();
|
||||
props.put(Context.INITIAL_CONTEXT_FACTORY, ActiveMQInitialContextFactory.class.getCanonicalName());
|
||||
props.put("connectionFactory.ConnectionFactory", "jgroups://testChannelName?properties=param=value&" +
|
||||
ActiveMQInitialContextFactory.REFRESH_TIMEOUT + "=5000&" +
|
||||
ActiveMQInitialContextFactory.DISCOVERY_INITIAL_WAIT_TIMEOUT + "=6000");
|
||||
Context ctx = new InitialContext(props);
|
||||
|
||||
ActiveMQConnectionFactory cf = (ActiveMQConnectionFactory) ctx.lookup("ConnectionFactory");
|
||||
|
||||
DiscoveryGroupConfiguration discoveryGroupConfiguration = cf.getDiscoveryGroupConfiguration();
|
||||
Assert.assertEquals(5000, discoveryGroupConfiguration.getRefreshTimeout());
|
||||
Assert.assertEquals(6000, discoveryGroupConfiguration.getDiscoveryInitialWaitTimeout());
|
||||
|
||||
BroadcastEndpointFactory broadcastEndpointFactory = cf.getDiscoveryGroupConfiguration().getBroadcastEndpointFactory();
|
||||
Assert.assertTrue(broadcastEndpointFactory instanceof JGroupsPropertiesBroadcastEndpointFactory);
|
||||
JGroupsPropertiesBroadcastEndpointFactory endpointFactory = (JGroupsPropertiesBroadcastEndpointFactory) broadcastEndpointFactory;
|
||||
Assert.assertEquals(endpointFactory.getProperties(), "param=value");
|
||||
Assert.assertEquals(endpointFactory.getChannelName(), "testChannelName");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoteCFWithJgroupsWithTransportConfigNullProps() throws Exception {
|
||||
Hashtable<String, String> props = new Hashtable<>();
|
||||
props.put(Context.INITIAL_CONTEXT_FACTORY, ActiveMQInitialContextFactory.class.getCanonicalName());
|
||||
props.put("connectionFactory.ConnectionFactory", "jgroups://testChannelName?" +
|
||||
ActiveMQInitialContextFactory.REFRESH_TIMEOUT + "=5000&" +
|
||||
ActiveMQInitialContextFactory.DISCOVERY_INITIAL_WAIT_TIMEOUT + "=6000");
|
||||
Context ctx = new InitialContext(props);
|
||||
|
||||
ActiveMQConnectionFactory cf = (ActiveMQConnectionFactory) ctx.lookup("ConnectionFactory");
|
||||
|
||||
DiscoveryGroupConfiguration discoveryGroupConfiguration = cf.getDiscoveryGroupConfiguration();
|
||||
Assert.assertEquals(5000, discoveryGroupConfiguration.getRefreshTimeout());
|
||||
Assert.assertEquals(6000, discoveryGroupConfiguration.getDiscoveryInitialWaitTimeout());
|
||||
|
||||
BroadcastEndpointFactory broadcastEndpointFactory = cf.getDiscoveryGroupConfiguration().getBroadcastEndpointFactory();
|
||||
Assert.assertTrue(broadcastEndpointFactory instanceof JGroupsPropertiesBroadcastEndpointFactory);
|
||||
JGroupsPropertiesBroadcastEndpointFactory endpointFactory = (JGroupsPropertiesBroadcastEndpointFactory) broadcastEndpointFactory;
|
||||
Assert.assertEquals(endpointFactory.getProperties(), null);
|
||||
Assert.assertEquals(endpointFactory.getChannelName(), "testChannelName");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoteCFWithUDP() throws NamingException, JMSException {
|
||||
Hashtable<String, String> props = new Hashtable<>();
|
||||
|
@ -36,7 +36,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.JGroupsFileBroadcastEndpointFactory;
|
||||
import org.apache.activemq.artemis.api.core.JGroupsPropertiesBroadcastEndpointFactory;
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
|
||||
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
|
||||
@ -104,26 +103,6 @@ public class ConnectionFactorySerializationTest extends JMSTestBase {
|
||||
Assert.assertEquals("/META-INF/myfile.xml", befc.getFile());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConnectionFactoryJgroupsProperties() throws Exception {
|
||||
createDiscoveryFactoryJGroupsProperties();
|
||||
cf = (ActiveMQConnectionFactory) namingContext.lookup("/MyConnectionFactory");
|
||||
|
||||
// apparently looking up the connection factory with the org.apache.activemq.artemis.jms.tests.tools.container.InVMInitialContextFactory
|
||||
// is not enough to actually serialize it so we serialize it manually
|
||||
byte[] x = serialize(cf);
|
||||
ActiveMQConnectionFactory y = deserialize(x, ActiveMQConnectionFactory.class);
|
||||
checkEquals(cf, y);
|
||||
DiscoveryGroupConfiguration dgc = y.getDiscoveryGroupConfiguration();
|
||||
Assert.assertEquals(dgc.getName(), "dg1");
|
||||
Assert.assertEquals(dgc.getDiscoveryInitialWaitTimeout(), 5000);
|
||||
Assert.assertEquals(dgc.getRefreshTimeout(), 5000);
|
||||
Assert.assertTrue(dgc.getBroadcastEndpointFactory() instanceof JGroupsPropertiesBroadcastEndpointFactory);
|
||||
JGroupsPropertiesBroadcastEndpointFactory befc = (JGroupsPropertiesBroadcastEndpointFactory) dgc.getBroadcastEndpointFactory();
|
||||
Assert.assertEquals("myChannel", befc.getChannelName());
|
||||
Assert.assertEquals("param=1,param2=2", befc.getProperties());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConnectionFactoryStatic1() throws Exception {
|
||||
createStaticFactory(true);
|
||||
@ -213,20 +192,6 @@ public class ConnectionFactorySerializationTest extends JMSTestBase {
|
||||
jmsServer.createConnectionFactory("MyConnectionFactory", false, JMSFactoryType.CF, dcConfig.getName(), "/MyConnectionFactory");
|
||||
}
|
||||
|
||||
private void createDiscoveryFactoryJGroupsProperties() throws Exception {
|
||||
// Deploy a connection factory with discovery
|
||||
List<String> bindings = new ArrayList<>();
|
||||
bindings.add("MyConnectionFactory");
|
||||
|
||||
JGroupsPropertiesBroadcastEndpointFactory config = new JGroupsPropertiesBroadcastEndpointFactory().setChannelName("myChannel").setProperties("param=1,param2=2");
|
||||
|
||||
DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration().setName("dg1").setRefreshTimeout(5000).setDiscoveryInitialWaitTimeout(5000).setBroadcastEndpointFactory(config);
|
||||
|
||||
jmsServer.getActiveMQServer().getConfiguration().getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
|
||||
|
||||
jmsServer.createConnectionFactory("MyConnectionFactory", false, JMSFactoryType.CF, dcConfig.getName(), "/MyConnectionFactory");
|
||||
}
|
||||
|
||||
private void createStaticFactory(boolean b) throws Exception {
|
||||
HashMap<String, Object> params = new HashMap<>();
|
||||
Set<String> allowableConnectorKeys = TransportConstants.ALLOWABLE_CONNECTOR_KEYS;
|
||||
|
@ -34,7 +34,6 @@ import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.tests.util.JMSTestBase;
|
||||
import org.jgroups.JChannel;
|
||||
import org.jgroups.conf.PlainConfigurator;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
@ -45,8 +44,6 @@ public class ConnectionFactoryWithJGroupsSerializationTest extends JMSTestBase {
|
||||
protected static ActiveMQConnectionFactory jmsCf1;
|
||||
protected static ActiveMQConnectionFactory jmsCf2;
|
||||
|
||||
private final String jgroupsConfigString = "UDP(oob_thread_pool.max_threads=300;" + "bind_addr=127.0.0.1;oob_thread_pool.keep_alive_time=1000;" + "max_bundle_size=31k;mcast_send_buf_size=640000;" + "internal_thread_pool.keep_alive_time=60000;" + "internal_thread_pool.rejection_policy=discard;" + "mcast_recv_buf_size=25000000;bind_port=55200;" + "internal_thread_pool.queue_max_size=100;" + "mcast_port=45688;thread_pool.min_threads=20;" + "oob_thread_pool.rejection_policy=discard;" + "thread_pool.max_threads=300;enable_diagnostics=false;" + "thread_pool.enabled=true;internal_thread_pool.queue_enabled=true;" + "ucast_recv_buf_size=20000000;ucast_send_buf_size=640000;" + "internal_thread_pool.enabled=true;oob_thread_pool.enabled=true;" + "ip_ttl=2;thread_pool.rejection_policy=discard;thread_pool.keep_alive_time=5000;" + "internal_thread_pool.max_threads=10;thread_pool.queue_enabled=true;" + "mcast_addr=230.0.0.4;singleton_name=udp;max_bundle_timeout=30;" + "oob_thread_pool.queue_enabled=false;internal_thread_pool.min_threads=1;" + "bundler_type=old;oob_thread_pool.min_threads=20;" + "thread_pool.queue_max_size=1000):PING(num_initial_members=3;" + "timeout=2000):MERGE3(min_interval=20000;max_interval=100000)" + ":FD_SOCK(bind_addr=127.0.0.1;start_port=54200):FD_ALL(interval=3000;" + "timeout=15000):VERIFY_SUSPECT(bind_addr=127.0.0.1;" + "timeout=1500):pbcast.NAKACK2(max_msg_batch_size=100;" + "xmit_table_msgs_per_row=10000;xmit_table_max_compaction_time=10000;" + "xmit_table_num_rows=100;xmit_interval=1000):UNICAST3(xmit_table_msgs_per_row=10000;" + "xmit_table_max_compaction_time=10000;xmit_table_num_rows=20)" + ":pbcast.STABLE(desired_avg_gossip=50000;max_bytes=400000;" + "stability_delay=1000):pbcast.GMS(print_local_addr=true;" + "view_bundling=true;join_timeout=3000;view_ack_collection_timeout=5000;" + "resume_task_timeout=7500):UFC(max_credits=1m;min_threshold=0.40)" + ":MFC(max_credits=1m;min_threshold=0.40):FRAG2(frag_size=30k)" + ":RSVP(resend_interval=500;ack_on_delivery=false;timeout=60000)";
|
||||
|
||||
JChannel channel = null;
|
||||
Queue testQueue = null;
|
||||
|
||||
@ -56,14 +53,13 @@ public class ConnectionFactoryWithJGroupsSerializationTest extends JMSTestBase {
|
||||
try {
|
||||
super.setUp();
|
||||
|
||||
PlainConfigurator configurator = new PlainConfigurator(jgroupsConfigString);
|
||||
channel = new JChannel(configurator);
|
||||
channel = new JChannel("udp.xml");
|
||||
|
||||
String channelName1 = "channel1";
|
||||
String channelName2 = "channel2";
|
||||
|
||||
BroadcastEndpointFactory jgroupsBroadcastCfg1 = new ChannelBroadcastEndpointFactory(channel, channelName1);
|
||||
BroadcastEndpointFactory jgroupsBroadcastCfg2 = new JGroupsFileBroadcastEndpointFactory().setChannelName(channelName2).setFile(jgroupsConfigString);
|
||||
BroadcastEndpointFactory jgroupsBroadcastCfg2 = new JGroupsFileBroadcastEndpointFactory().setChannelName(channelName2).setFile("udp.xml");
|
||||
|
||||
DiscoveryGroupConfiguration dcConfig1 = new DiscoveryGroupConfiguration().setName("dg1").setRefreshTimeout(5000).setDiscoveryInitialWaitTimeout(5000).setBroadcastEndpointFactory(jgroupsBroadcastCfg1);
|
||||
|
||||
|
@ -28,7 +28,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
|
||||
import org.jgroups.JChannel;
|
||||
import org.jgroups.conf.PlainConfigurator;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
@ -59,9 +58,6 @@ public class JGroupsChannelBroadcastGroupControlTest extends ManagementTestBase
|
||||
@Rule
|
||||
public ThreadLeakCheckRule threadLeakCheckRule = new ThreadLeakCheckRule();
|
||||
|
||||
private final String jgroupsConfigString = "UDP(oob_thread_pool.max_threads=300;" + "bind_addr=127.0.0.1;oob_thread_pool.keep_alive_time=1000;" + "max_bundle_size=31k;mcast_send_buf_size=640000;" + "internal_thread_pool.keep_alive_time=60000;" + "internal_thread_pool.rejection_policy=discard;" + "mcast_recv_buf_size=25000000;bind_port=55200;" + "internal_thread_pool.queue_max_size=100;" + "mcast_port=45688;thread_pool.min_threads=20;" + "oob_thread_pool.rejection_policy=discard;" + "thread_pool.max_threads=300;enable_diagnostics=false;" + "thread_pool.enabled=true;internal_thread_pool.queue_enabled=true;" + "ucast_recv_buf_size=20000000;ucast_send_buf_size=640000;" + "internal_thread_pool.enabled=true;oob_thread_pool.enabled=true;" + "ip_ttl=2;thread_pool.rejection_policy=discard;thread_pool.keep_alive_time=5000;" + "internal_thread_pool.max_threads=10;thread_pool.queue_enabled=true;" + "mcast_addr=230.0.0.4;singleton_name=udp;max_bundle_timeout=30;" + "oob_thread_pool.queue_enabled=false;internal_thread_pool.min_threads=1;" + "bundler_type=old;oob_thread_pool.min_threads=20;" + "thread_pool.queue_max_size=1000):PING(num_initial_members=3;" + "timeout=2000):MERGE3(min_interval=20000;max_interval=100000)" + ":FD_SOCK(bind_addr=127.0.0.1;start_port=54200):FD_ALL(interval=3000;" + "timeout=15000):VERIFY_SUSPECT(bind_addr=127.0.0.1;" + "timeout=1500):pbcast.NAKACK2(max_msg_batch_size=100;" + "xmit_table_msgs_per_row=10000;xmit_table_max_compaction_time=10000;" + "xmit_table_num_rows=100;xmit_interval=1000):UNICAST3(xmit_table_msgs_per_row=10000;" + "xmit_table_max_compaction_time=10000;xmit_table_num_rows=20)" + ":pbcast.STABLE(desired_avg_gossip=50000;max_bytes=400000;" + "stability_delay=1000):pbcast.GMS(print_local_addr=true;" + "view_bundling=true;join_timeout=3000;view_ack_collection_timeout=5000;" + "resume_task_timeout=7500):UFC(max_credits=1m;min_threshold=0.40)" + ":MFC(max_credits=1m;min_threshold=0.40):FRAG2(frag_size=30k)" + ":RSVP(resend_interval=500;ack_on_delivery=false;timeout=60000)";
|
||||
|
||||
|
||||
@Test
|
||||
public void testAttributes() throws Exception {
|
||||
ChannelBroadcastEndpointFactory udpCfg = (ChannelBroadcastEndpointFactory) broadcastGroupConfig.getEndpointFactory();
|
||||
@ -95,8 +91,7 @@ public class JGroupsChannelBroadcastGroupControlTest extends ManagementTestBase
|
||||
TransportConfiguration connectorConfiguration = new TransportConfiguration(NETTY_CONNECTOR_FACTORY);
|
||||
List<String> connectorInfos = new ArrayList<>();
|
||||
connectorInfos.add(connectorConfiguration.getName());
|
||||
PlainConfigurator configurator = new PlainConfigurator(jgroupsConfigString);
|
||||
JChannel channel = new JChannel(configurator);
|
||||
JChannel channel = new JChannel("udp.xml");
|
||||
|
||||
String channelName1 = "channel1";
|
||||
ChannelBroadcastEndpointFactory endpointFactory = new ChannelBroadcastEndpointFactory(channel, channelName1);
|
||||
|
@ -17,36 +17,13 @@
|
||||
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns="urn:org:jgroups"
|
||||
xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups.xsd">
|
||||
<TCP loopback="true"
|
||||
recv_buf_size="${tcp.recv_buf_size:5M}"
|
||||
send_buf_size="${tcp.send_buf_size:5M}"
|
||||
max_bundle_size="64K"
|
||||
max_bundle_timeout="30"
|
||||
use_send_queues="true"
|
||||
sock_conn_timeout="300"
|
||||
|
||||
timer_type="new3"
|
||||
timer.min_threads="4"
|
||||
timer.max_threads="10"
|
||||
timer.keep_alive_time="3000"
|
||||
timer.queue_max_size="500"
|
||||
|
||||
thread_pool.enabled="true"
|
||||
thread_pool.min_threads="2"
|
||||
thread_pool.max_threads="8"
|
||||
thread_pool.keep_alive_time="5000"
|
||||
thread_pool.queue_enabled="true"
|
||||
thread_pool.queue_max_size="10000"
|
||||
thread_pool.rejection_policy="discard"
|
||||
|
||||
oob_thread_pool.enabled="true"
|
||||
oob_thread_pool.min_threads="1"
|
||||
oob_thread_pool.max_threads="8"
|
||||
oob_thread_pool.keep_alive_time="5000"
|
||||
oob_thread_pool.queue_enabled="false"
|
||||
oob_thread_pool.queue_max_size="100"
|
||||
oob_thread_pool.rejection_policy="discard"/>
|
||||
|
||||
<TCP bind_addr="${jgroups.bind_addr:site_local}"
|
||||
bind_port="${jgroups.bind_port:7800}"
|
||||
external_addr="${jgroups.external_addr}"
|
||||
external_port="${jgroups.external_port}"
|
||||
thread_pool.min_threads="0"
|
||||
thread_pool.max_threads="200"
|
||||
thread_pool.keep_alive_time="30000"/>
|
||||
<FILE_PING location="./target/tmp/amqtest.ping.dir"/>
|
||||
<MERGE3 min_interval="10000"
|
||||
max_interval="30000"/>
|
||||
@ -59,8 +36,7 @@
|
||||
<UNICAST3 />
|
||||
<pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
|
||||
max_bytes="4M"/>
|
||||
<pbcast.GMS print_local_addr="true" join_timeout="2000"
|
||||
view_bundling="true"/>
|
||||
<pbcast.GMS print_local_addr="true" join_timeout="2000"/>
|
||||
<MFC max_credits="2M"
|
||||
min_threshold="0.4"/>
|
||||
<FRAG2 frag_size="60K" />
|
||||
|
Loading…
x
Reference in New Issue
Block a user