ARTEMIS-130 and ARTEMIS-132 connection-factory constructors / Adding inVM serialization

https://issues.apache.org/jira/browse/ARTEMIS-130 connection-factory constructors
https://issues.apache.org/jira/browse/ARTEMIS-132 adding vm as an URL on connection factory serialization

Adding new constructors into connection factory
This will make examples easier to understand, less stuff to be written before instantiating connection factories
This commit is contained in:
Clebert Suconic 2015-06-03 09:22:51 -04:00
parent 0cbde582fb
commit cd205f6b9d
16 changed files with 367 additions and 123 deletions

View File

@ -141,7 +141,10 @@ public abstract class URISchema<T, P>
}
else
{
rc.put(parameters[i], null);
if (!parameters[i].trim().isEmpty())
{
rc.put(parameters[i], null);
}
}
}
}

View File

@ -739,6 +739,12 @@ public interface ServerLocator extends AutoCloseable
*/
boolean isHA();
/**
* Verify if all of the transports are using inVM.
* @return {@code true} if the locator has all inVM transports.
*/
boolean allInVM();
/**
* Whether to compress large messages.
*

View File

@ -1358,6 +1358,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
transportConnection = null;
connector = null;
throw new RuntimeException(cause.getMessage(), cause);
}
return transportConnection;

View File

@ -533,6 +533,22 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
}
/*
* I'm not using isAllInVM here otherwsie BeanProperties would translate this as a property for the URL
*/
public boolean allInVM()
{
for (TransportConfiguration config: getStaticTransportConfigurations())
{
if (!config.getFactoryClassName().contains("InVMConnectorFactory"))
{
return false;
}
}
return true;
}
private ServerLocatorImpl(ServerLocatorImpl locator)
{
ha = locator.ha;

View File

@ -23,6 +23,7 @@ public class ServerLocatorParser extends URIFactory<ServerLocator, String>
{
public ServerLocatorParser()
{
registerSchema(new InVMServerLocatorSchema());
registerSchema(new TCPServerLocatorSchema());
registerSchema(new UDPServerLocatorSchema());
registerSchema(new JGroupsServerLocatorSchema());

View File

@ -16,11 +16,6 @@
*/
package org.apache.activemq.artemis.uri;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.utils.uri.SchemaConstants;
import org.apache.activemq.artemis.utils.uri.URISchema;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
@ -29,6 +24,11 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.utils.uri.SchemaConstants;
import org.apache.activemq.artemis.utils.uri.URISchema;
public class TCPTransportConfigurationSchema extends AbstractTransportConfigurationSchema
{
private final Set<String> allowableProperties;
@ -68,7 +68,7 @@ public class TCPTransportConfigurationSchema extends AbstractTransportConfigurat
name));
String connectors = uri.getFragment();
if (connectors != null)
if (connectors != null && !connectors.trim().isEmpty())
{
String[] split = connectors.split(",");
for (String s : split)

View File

@ -43,8 +43,8 @@ import java.net.URI;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.jms.referenceable.ConnectionFactoryObjectFactory;
@ -53,10 +53,13 @@ import org.apache.activemq.artemis.uri.ConnectionFactoryParser;
import org.apache.activemq.artemis.uri.ServerLocatorParser;
/**
* ActiveMQ Artemis implementation of a JMS ConnectionFactory.
* <p>ActiveMQ Artemis implementation of a JMS ConnectionFactory.</p>
* <p>This connection factory will use defaults defined by {@link DefaultConnectionProperties}.
*/
public class ActiveMQConnectionFactory implements Externalizable, Referenceable, ConnectionFactory, XAConnectionFactory
public class ActiveMQConnectionFactory implements Externalizable, Referenceable, ConnectionFactory,
XAConnectionFactory, AutoCloseable
{
private ServerLocator serverLocator;
private String clientID;
@ -67,7 +70,29 @@ public class ActiveMQConnectionFactory implements Externalizable, Referenceable,
private boolean readOnly;
private String user;
private String password;
public void writeExternal(ObjectOutput out) throws IOException
{
URI uri = toURI();
try
{
out.writeUTF(uri.toASCIIString());
}
catch (Exception e)
{
if (e instanceof IOException)
{
throw (IOException) e;
}
throw new IOException(e);
}
}
public URI toURI() throws IOException
{
ConnectionFactoryParser parser = new ConnectionFactoryParser();
String scheme;
@ -84,12 +109,21 @@ public class ActiveMQConnectionFactory implements Externalizable, Referenceable,
}
else
{
scheme = "tcp";
if (serverLocator.allInVM())
{
scheme = "vm";
}
else
{
scheme = "tcp";
}
}
URI uri;
try
{
URI uri = parser.createSchema(scheme, this);
out.writeUTF(uri.toASCIIString());
uri = parser.createSchema(scheme, this);
}
catch (Exception e)
{
@ -99,6 +133,7 @@ public class ActiveMQConnectionFactory implements Externalizable, Referenceable,
}
throw new IOException(e);
}
return uri;
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
@ -118,9 +153,43 @@ public class ActiveMQConnectionFactory implements Externalizable, Referenceable,
}
}
/** This will use a default URI from {@link DefaultConnectionProperties} */
public ActiveMQConnectionFactory()
{
serverLocator = null;
this(DefaultConnectionProperties.DEFAULT_BROKER_URL);
}
public ActiveMQConnectionFactory(String url)
{
ConnectionFactoryParser cfParser = new ConnectionFactoryParser();
ServerLocatorParser locatorParser = new ServerLocatorParser();
try
{
URI uri = new URI(url);
serverLocator = locatorParser.newObject(uri, null);
cfParser.populateObject(uri, this);
}
catch (Exception e)
{
throw new RuntimeException(e.getMessage(), e);
}
if (getUser() == null)
{
setUser(DefaultConnectionProperties.DEFAULT_USER);
}
if (getPassword() == null)
{
setPassword(DefaultConnectionProperties.DEFAULT_PASSWORD);
}
}
/** For compatibility and users used to this kind of constructor */
public ActiveMQConnectionFactory(String url, String user, String password)
{
this(url);
setUser(user).setPassword(password);
}
public ActiveMQConnectionFactory(final ServerLocator serverLocator)
@ -158,11 +227,9 @@ public class ActiveMQConnectionFactory implements Externalizable, Referenceable,
serverLocator.disableFinalizeCheck();
}
// ConnectionFactory implementation -------------------------------------------------------------
public Connection createConnection() throws JMSException
{
return createConnection(null, null);
return createConnection(user, password);
}
public Connection createConnection(final String username, final String password) throws JMSException
@ -173,13 +240,13 @@ public class ActiveMQConnectionFactory implements Externalizable, Referenceable,
@Override
public JMSContext createContext()
{
return createContext(null, null);
return createContext(user, password);
}
@Override
public JMSContext createContext(final int sessionMode)
{
return createContext(null, null, sessionMode);
return createContext(user, password, sessionMode);
}
@Override
@ -208,9 +275,6 @@ public class ActiveMQConnectionFactory implements Externalizable, Referenceable,
}
}
/**
* @param mode
*/
private static void validateSessionMode(int mode)
{
switch (mode)
@ -227,8 +291,6 @@ public class ActiveMQConnectionFactory implements Externalizable, Referenceable,
}
}
// QueueConnectionFactory implementation --------------------------------------------------------
public QueueConnection createQueueConnection() throws JMSException
{
return createQueueConnection(null, null);
@ -321,8 +383,6 @@ public class ActiveMQConnectionFactory implements Externalizable, Referenceable,
null);
}
// Public ---------------------------------------------------------------------------------------
public boolean isHA()
{
return serverLocator.isHA();
@ -671,6 +731,30 @@ public class ActiveMQConnectionFactory implements Externalizable, Referenceable,
serverLocator.setInitialMessagePacketSize(size);
}
public ActiveMQConnectionFactory setUser(String user)
{
checkWrite();
this.user = user;
return this;
}
public String getUser()
{
return user;
}
public String getPassword()
{
return password;
}
public ActiveMQConnectionFactory setPassword(String password)
{
checkWrite();
this.password = password;
return this;
}
public void setGroupID(final String groupID)
{
serverLocator.setGroupID(groupID);

View File

@ -25,42 +25,38 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
/**
* A class that represents a ConnectionFactory.
* {@inheritDoc}
*/
public class ActiveMQJMSConnectionFactory extends ActiveMQConnectionFactory implements TopicConnectionFactory, QueueConnectionFactory
{
private static final long serialVersionUID = -2810634789345348326L;
/**
*
*/
public ActiveMQJMSConnectionFactory()
{
super();
}
/**
* @param serverLocator
*/
public ActiveMQJMSConnectionFactory(String uri)
{
super(uri);
}
public ActiveMQJMSConnectionFactory(String uri, String user, String password)
{
super(uri, user, password);
}
public ActiveMQJMSConnectionFactory(ServerLocator serverLocator)
{
super(serverLocator);
}
/**
* @param ha
* @param groupConfiguration
*/
public ActiveMQJMSConnectionFactory(boolean ha, final DiscoveryGroupConfiguration groupConfiguration)
{
super(ha, groupConfiguration);
}
/**
* @param ha
* @param initialConnectors
*/
public ActiveMQJMSConnectionFactory(boolean ha, TransportConfiguration... initialConnectors)
{
super(ha, initialConnectors);

View File

@ -24,41 +24,37 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
/**
* A class that represents a QueueConnectionFactory.
* {@inheritDoc}
*/
public class ActiveMQQueueConnectionFactory extends ActiveMQConnectionFactory implements QueueConnectionFactory
{
private static final long serialVersionUID = 5312455021322463546L;
/**
*
*/
public ActiveMQQueueConnectionFactory()
{
super();
}
/**
* @param serverLocator
*/
public ActiveMQQueueConnectionFactory(String url)
{
super(url);
}
public ActiveMQQueueConnectionFactory(ServerLocator serverLocator)
{
super(serverLocator);
}
/**
* @param ha
* @param groupConfiguration
*/
public ActiveMQQueueConnectionFactory(boolean ha, final DiscoveryGroupConfiguration groupConfiguration)
{
super(ha, groupConfiguration);
}
/**
* @param ha
* @param initialConnectors
*/
public ActiveMQQueueConnectionFactory(String url, String user, String password)
{
super(url, user, password);
}
public ActiveMQQueueConnectionFactory(boolean ha, TransportConfiguration... initialConnectors)
{
super(ha, initialConnectors);

View File

@ -24,42 +24,37 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
/**
* A class that represents a TopicConnectionFactory.
* {@inheritDoc}
*/
public class ActiveMQTopicConnectionFactory extends ActiveMQConnectionFactory implements TopicConnectionFactory
{
private static final long serialVersionUID = 7317051989866548455L;
/**
*
*/
public ActiveMQTopicConnectionFactory()
{
super();
}
/**
* @param serverLocator
*/
public ActiveMQTopicConnectionFactory(String url)
{
super(url);
}
public ActiveMQTopicConnectionFactory(String url, String user, String password)
{
super(url, user, password);
}
public ActiveMQTopicConnectionFactory(ServerLocator serverLocator)
{
super(serverLocator);
}
/**
* @param ha
* @param groupConfiguration
*/
public ActiveMQTopicConnectionFactory(final boolean ha, final DiscoveryGroupConfiguration groupConfiguration)
{
super(ha, groupConfiguration);
}
/**
* @param ha
* @param initialConnectors
*/
public ActiveMQTopicConnectionFactory(final boolean ha, final TransportConfiguration... initialConnectors)
{
super(ha, initialConnectors);

View File

@ -34,35 +34,31 @@ public class ActiveMQXAConnectionFactory extends ActiveMQConnectionFactory imple
{
private static final long serialVersionUID = 743611571839154115L;
/**
*
*/
public ActiveMQXAConnectionFactory()
{
super();
}
/**
* @param serverLocator
*/
public ActiveMQXAConnectionFactory(String uri)
{
super(uri);
}
public ActiveMQXAConnectionFactory(String url, String user, String password)
{
super(url, user, password);
}
public ActiveMQXAConnectionFactory(ServerLocator serverLocator)
{
super(serverLocator);
}
/**
* @param ha
* @param groupConfiguration
*/
public ActiveMQXAConnectionFactory(final boolean ha, final DiscoveryGroupConfiguration groupConfiguration)
{
super(ha, groupConfiguration);
}
/**
* @param ha
* @param initialConnectors
*/
public ActiveMQXAConnectionFactory(final boolean ha, final TransportConfiguration... initialConnectors)
{
super(ha, initialConnectors);

View File

@ -24,41 +24,37 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
/**
* A class that represents a XAQueueConnectionFactory.
* {@inheritDoc}
*/
public class ActiveMQXAQueueConnectionFactory extends ActiveMQConnectionFactory implements XAQueueConnectionFactory
{
private static final long serialVersionUID = 8612457847251087454L;
/**
*
*/
public ActiveMQXAQueueConnectionFactory()
{
super();
}
/**
* @param serverLocator
*/
public ActiveMQXAQueueConnectionFactory(String uri)
{
super(uri);
}
public ActiveMQXAQueueConnectionFactory(String url, String user, String password)
{
super(url, user, password);
}
public ActiveMQXAQueueConnectionFactory(ServerLocator serverLocator)
{
super(serverLocator);
}
/**
* @param ha
* @param groupConfiguration
*/
public ActiveMQXAQueueConnectionFactory(final boolean ha, final DiscoveryGroupConfiguration groupConfiguration)
{
super(ha, groupConfiguration);
}
/**
* @param ha
* @param initialConnectors
*/
public ActiveMQXAQueueConnectionFactory(final boolean ha, final TransportConfiguration... initialConnectors)
{
super(ha, initialConnectors);

View File

@ -24,41 +24,37 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
/**
* A class that represents a XATopicConnectionFactory.
* {@inheritDoc}
*/
public class ActiveMQXATopicConnectionFactory extends ActiveMQConnectionFactory implements XATopicConnectionFactory
{
private static final long serialVersionUID = -7018290426884419693L;
/**
*
*/
public ActiveMQXATopicConnectionFactory()
{
super();
}
/**
* @param serverLocator
*/
public ActiveMQXATopicConnectionFactory(String uri)
{
super(uri);
}
public ActiveMQXATopicConnectionFactory(String url, String user, String password)
{
super(url, user, password);
}
public ActiveMQXATopicConnectionFactory(final ServerLocator serverLocator)
{
super(serverLocator);
}
/**
* @param ha
* @param groupConfiguration
*/
public ActiveMQXATopicConnectionFactory(final boolean ha, final DiscoveryGroupConfiguration groupConfiguration)
{
super(ha, groupConfiguration);
}
/**
* @param ha
* @param initialConnectors
*/
public ActiveMQXATopicConnectionFactory(final boolean ha, final TransportConfiguration... initialConnectors)
{
super(ha, initialConnectors);

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jms.client;
import java.security.AccessController;
import java.security.PrivilegedAction;
/**
* <p>This class will provide default properties for constructors</p>
*
* <table border='1'>
* <tr> <td>Name</td> <td>Default Value</td></tr>
* <tr> <td>AMQ_HOST or org.apache.activemq.AMQ_HOST</td> <td>localhost</td></tr>
* <tr><td>AMQ_PORT or org.apache.activemq.AMQ_PORT</td> <td>61616</td></tr>
* <tr><td>BROKER_BIND_URL or org.apache.activemq.BROKER_BIND_URL</td> <td>tcp://${AMQ_HOST}:${AMQ_PORT}</td></tr>
* <tr><td>AMQ_USER or org.apache.activemq.AMQ_USER</td> <td>null</td></tr>
* <tr><td>AMQ_PASSWORD or org.apache.activemq.AMQ_PASSWORD</td> <td>null</td></tr>
* </table>
*/
public class DefaultConnectionProperties
{
public static final String DEFAULT_BROKER_HOST;
public static final int DEFAULT_BROKER_PORT;
public static final String DEFAULT_BROKER_BIND_URL;
public static final String DEFAULT_BROKER_URL;
public static final String DEFAULT_USER;
public static final String DEFAULT_PASSWORD;
static String getProperty(final String defaultValue, final String... propertyNames)
{
return AccessController.doPrivileged(new PrivilegedAction<String>()
{
@Override
public String run()
{
for (String name : propertyNames)
{
String property = System.getProperty(name);
if (property != null && !property.isEmpty())
{
return property;
}
}
return defaultValue;
}
});
}
static
{
String host = getProperty("localhost", "AMQ_HOST", "org.apache.activemq.AMQ_HOST");
String port = getProperty("61616", "AMQ_PORT", "org.apache.activemq.AMQ_PORT");
DEFAULT_BROKER_HOST = host;
DEFAULT_BROKER_PORT = Integer.parseInt(port);
String url = getProperty("tcp://" + host + ":" + port, "org.apache.activemq.BROKER_BIND_URL", "BROKER_BIND_URL");
DEFAULT_USER = getProperty(null, "AMQ_USER", "org.apache.activemq.AMQ_USER");
DEFAULT_PASSWORD = getProperty(null, "AMQ_PASSWORD", "org.apache.activemq.AMQ_PASSWORD");
if (DEFAULT_USER != null && DEFAULT_PASSWORD != null)
{
url += "?user=" + DEFAULT_USER + "&password=" + DEFAULT_PASSWORD;
}
DEFAULT_BROKER_BIND_URL = url;
// TODO: improve this once we implement failover:// as ActiveMQ5 does
DEFAULT_BROKER_URL = DEFAULT_BROKER_BIND_URL;
}
}

View File

@ -16,19 +16,21 @@
*/
package org.apache.activemq.artemis.tests.integration.client;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.uri.ServerLocatorParser;
import org.junit.Before;
import org.junit.Test;
@ -46,6 +48,37 @@ public class ServerLocatorConnectTest extends ActiveMQTestBase
server.start();
}
@Test
public void testURL() throws Exception
{
ServerLocatorParser parser = new ServerLocatorParser();
// This URL was failing in some ConnectionFactoryTests.
// The issue seemed to be the # to be creating extra spaces on the parsing
// Added some treatment to fix that, and I kept the test here.
URI uri = new URI("tcp://localhost:61616?&blockOnNonDurableSend=true&" +
"retryIntervalMultiplier=1.0&maxRetryInterval=2000&producerMaxRate=-1&" +
"blockOnDurableSend=true&connectionTTL=60000&compressLargeMessage=false&reconnectAttempts=0&" +
"cacheLargeMessagesClient=false&scheduledThreadPoolMaxSize=5&useGlobalPools=true&" +
"callFailoverTimeout=-1&initialConnectAttempts=1&clientFailureCheckPeriod=30000&" +
"blockOnAcknowledge=true&consumerWindowSize=1048576&minLargeMessageSize=102400&" +
"autoGroup=false&threadPoolMaxSize=-1&confirmationWindowSize=-1&" +
"transactionBatchSize=1048576&callTimeout=30000&preAcknowledge=false&" +
"connectionLoadBalancingPolicyClassName=org.apache.activemq.artemis.api.core.client.loadbalance." +
"RoundRobinConnectionLoadBalancingPolicy&dupsOKBatchSize=1048576&initialMessagePacketSize=1500&" +
"consumerMaxRate=-1&retryInterval=2000&failoverOnInitialConnection=false&producerWindowSize=65536&" +
"port=61616&host=localhost#");
// try it a few times to make sure it fails if it's broken
for (int i = 0; i < 10; i++)
{
ServerLocator locator = parser.newObject(uri, null);
ClientSessionFactory csf = createSessionFactory(locator);
csf.close();
locator.close();
}
}
@Test
public void testSingleConnectorSingleServer() throws Exception
{
@ -150,7 +183,7 @@ public class ServerLocatorConnectTest extends ActiveMQTestBase
public boolean isNetty()
{
return false;
return true;
}
static class Connector implements Runnable

View File

@ -16,14 +16,10 @@
*/
package org.apache.activemq.artemis.tests.integration.jms.client;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSContext;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.Session;
@ -36,11 +32,55 @@ import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
public class ConnectionTest extends JMSTestBase
{
private Connection conn2;
@Test
public void testThroughNewConnectionFactory() throws Exception
{
testThroughNewConnectionFactory(new ActiveMQConnectionFactory("vm://0"));
testThroughNewConnectionFactory(new ActiveMQConnectionFactory("tcp://localhost:61616?&blockOnNonDurableSend=true&" +
"retryIntervalMultiplier=1.0&maxRetryInterval=2000&producerMaxRate=-1&" +
"blockOnDurableSend=true&connectionTTL=60000&compressLargeMessage=false&reconnectAttempts=0&" +
"cacheLargeMessagesClient=false&scheduledThreadPoolMaxSize=5&useGlobalPools=true&" +
"callFailoverTimeout=-1&initialConnectAttempts=1&clientFailureCheckPeriod=30000&" +
"blockOnAcknowledge=true&consumerWindowSize=1048576&minLargeMessageSize=102400&" +
"autoGroup=false&threadPoolMaxSize=-1&confirmationWindowSize=-1&" +
"transactionBatchSize=1048576&callTimeout=30000&preAcknowledge=false&" +
"connectionLoadBalancingPolicyClassName=org.apache.activemq.artemis.api.core.client.loadbalance." +
"RoundRobinConnectionLoadBalancingPolicy&dupsOKBatchSize=1048576&initialMessagePacketSize=1500&" +
"consumerMaxRate=-1&retryInterval=2000&failoverOnInitialConnection=false&producerWindowSize=65536&" +
"port=61616&host=localhost#"));
}
private void testThroughNewConnectionFactory(ActiveMQConnectionFactory factory) throws Exception
{
Connection conn = factory.createConnection();
conn.close();
try (JMSContext ctx = factory.createContext())
{
ctx.createProducer().send(ctx.createQueue("queue"),"Test");
}
try (JMSContext ctx = factory.createContext())
{
Assert.assertNotNull(ctx.createConsumer(ctx.createQueue("queue")).receiveNoWait());
Assert.assertNull(ctx.createConsumer(ctx.createQueue("queue")).receiveNoWait());
}
factory.close();
}
@Test
public void testSetSameIdToDifferentConnections() throws Exception
{