ARTEMIS-2254 support useTopologyForLoadBalancing on JMS cf config

This commit is contained in:
Justin Bertram 2019-02-18 15:32:31 -06:00 committed by Clebert Suconic
parent d78cd81cc6
commit 0cffe03d2e
4 changed files with 62 additions and 2 deletions

View File

@ -197,4 +197,8 @@ public interface ConnectionFactoryConfiguration extends EncodingSupport {
boolean isEnableSharedClientID(); boolean isEnableSharedClientID();
ConnectionFactoryConfiguration setEnableSharedClientID(boolean enabled); ConnectionFactoryConfiguration setEnableSharedClientID(boolean enabled);
boolean getUseTopologyForLoadBalancing();
ConnectionFactoryConfiguration setUseTopologyForLoadBalancing(boolean useTopologyForLoadBalancing);
} }

View File

@ -129,6 +129,8 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
private boolean enableSharedClientID = ActiveMQClient.DEFAULT_ENABLED_SHARED_CLIENT_ID; private boolean enableSharedClientID = ActiveMQClient.DEFAULT_ENABLED_SHARED_CLIENT_ID;
private boolean useTopologyForLoadBalancing = ActiveMQClient.DEFAULT_USE_TOPOLOGY_FOR_LOADBALANCING;
// Static -------------------------------------------------------- // Static --------------------------------------------------------
@ -643,8 +645,9 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
enable1xPrefixes = buffer.readableBytes() > 0 ? buffer.readBoolean() : ActiveMQJMSClient.DEFAULT_ENABLE_1X_PREFIXES; enable1xPrefixes = buffer.readableBytes() > 0 ? buffer.readBoolean() : ActiveMQJMSClient.DEFAULT_ENABLE_1X_PREFIXES;
enableSharedClientID = buffer.readableBytes() > 0 ? buffer.readBoolean() : ActiveMQClient.DEFAULT_ENABLED_SHARED_CLIENT_ID; enableSharedClientID = buffer.readableBytes() > 0 ? BufferHelper.readNullableBoolean(buffer) : ActiveMQClient.DEFAULT_ENABLED_SHARED_CLIENT_ID;
useTopologyForLoadBalancing = buffer.readableBytes() > 0 ? BufferHelper.readNullableBoolean(buffer) : ActiveMQClient.DEFAULT_USE_TOPOLOGY_FOR_LOADBALANCING;
} }
@Override @Override
@ -738,6 +741,8 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
buffer.writeBoolean(enable1xPrefixes); buffer.writeBoolean(enable1xPrefixes);
BufferHelper.writeNullableBoolean(buffer, enableSharedClientID); BufferHelper.writeNullableBoolean(buffer, enableSharedClientID);
BufferHelper.writeNullableBoolean(buffer, useTopologyForLoadBalancing);
} }
@Override @Override
@ -856,7 +861,9 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
DataConstants.SIZE_BOOLEAN + DataConstants.SIZE_BOOLEAN +
// enable1xPrefixes; // enable1xPrefixes;
BufferHelper.sizeOfNullableBoolean(enableSharedClientID); BufferHelper.sizeOfNullableBoolean(enableSharedClientID) +
BufferHelper.sizeOfNullableBoolean(useTopologyForLoadBalancing);
return size; return size;
} }
@ -936,6 +943,17 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf
return enableSharedClientID; return enableSharedClientID;
} }
@Override
public ConnectionFactoryConfiguration setUseTopologyForLoadBalancing(boolean useTopologyForLoadBalancing) {
this.useTopologyForLoadBalancing = useTopologyForLoadBalancing;
return this;
}
@Override
public boolean getUseTopologyForLoadBalancing() {
return useTopologyForLoadBalancing;
}
// Public -------------------------------------------------------- // Public --------------------------------------------------------
// Package protected --------------------------------------------- // Package protected ---------------------------------------------

View File

@ -1222,6 +1222,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
cf.setInitialMessagePacketSize(cfConfig.getInitialMessagePacketSize()); cf.setInitialMessagePacketSize(cfConfig.getInitialMessagePacketSize());
cf.setEnable1xPrefixes(cfConfig.isEnable1xPrefixes()); cf.setEnable1xPrefixes(cfConfig.isEnable1xPrefixes());
cf.setEnableSharedClientID(cfConfig.isEnableSharedClientID()); cf.setEnableSharedClientID(cfConfig.isEnableSharedClientID());
cf.setUseTopologyForLoadBalancing(cfConfig.getUseTopologyForLoadBalancing());
return cf; return cf;
} }

View File

@ -32,6 +32,8 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.JGroupsFileBroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.JGroupsFileBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.JGroupsPropertiesBroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.JGroupsPropertiesBroadcastEndpointFactory;
@ -151,6 +153,41 @@ public class ConnectionFactorySerializationTest extends JMSTestBase {
} }
} }
@Test
public void testConnectionFactoryEncodeDecode() throws Exception {
jmsServer.getActiveMQServer().getConfiguration().addConnectorConfiguration("foo", "tcp://localhost:1234");
ArrayList<String> connectorNames = new ArrayList<>();
connectorNames.add("foo");
ConnectionFactoryConfiguration cfc1 = new ConnectionFactoryConfigurationImpl()
.setName("MyConnectionFactory")
.setConnectorNames(connectorNames)
.setUseTopologyForLoadBalancing(false)
.setEnableSharedClientID(true);
ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(1024);
cfc1.encode(buffer);
byte[] bytes = new byte[buffer.readableBytes()];
buffer.readBytes(bytes);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
outputStream.write(bytes);
ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
buffer = ActiveMQBuffers.dynamicBuffer(1024);
while (true) {
int byteRead = inputStream.read();
if (byteRead < 0) {
break;
}
buffer.writeByte((byte)byteRead);
}
ConnectionFactoryConfigurationImpl cfc2 = new ConnectionFactoryConfigurationImpl();
cfc2.decode(buffer);
assertEquals(cfc1.getUseTopologyForLoadBalancing(), cfc2.getUseTopologyForLoadBalancing());
assertEquals(cfc1.isEnableSharedClientID(), cfc2.isEnableSharedClientID());
}
private void createDiscoveryFactoryUDP() throws Exception { private void createDiscoveryFactoryUDP() throws Exception {
// Deploy a connection factory with discovery // Deploy a connection factory with discovery
List<String> bindings = new ArrayList<>(); List<String> bindings = new ArrayList<>();