This commit is contained in:
Clebert Suconic 2018-01-17 12:31:27 -05:00
commit 14f149d755
8 changed files with 170 additions and 40 deletions

View File

@ -198,6 +198,10 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
return new ActiveMQQueue(address);
}
public static ActiveMQQueue createQueue(final String address, final String name) {
return new ActiveMQQueue(address, name);
}
public static ActiveMQTopic createTopic(final String address) {
return new ActiveMQTopic(address);
}
@ -206,6 +210,10 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
return new ActiveMQTopic(address);
}
public static ActiveMQTopic createTopic(final String address, final String name) {
return new ActiveMQTopic(address, name);
}
public static ActiveMQTemporaryQueue createTemporaryQueue(final String address, final ActiveMQSession session) {
return new ActiveMQTemporaryQueue(address, session);
}
@ -241,6 +249,18 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
*/
private SimpleString simpleAddress;
/**
* Needed for serialization backwards compatibility.
*/
@Deprecated
private String address;
/**
* The "JMS" name of the destination. Needed for serialization backwards compatibility.
*/
@Deprecated
private String name;
private final boolean temporary;
private final boolean queue;
@ -254,15 +274,7 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
protected ActiveMQDestination(final String address,
final TYPE type,
final ActiveMQSession session) {
this.simpleAddress = SimpleString.toSimpleString(address);
this.thetype = type;
this.session = session;
this.temporary = TYPE.isTemporary(type);
this.queue = TYPE.isQueue(type);
this(SimpleString.toSimpleString(address), type, session);
}
protected ActiveMQDestination(final SimpleString address,
@ -279,6 +291,26 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
this.queue = TYPE.isQueue(type);
}
@Deprecated
protected ActiveMQDestination(final String address,
final String name,
final TYPE type,
final ActiveMQSession session) {
this(SimpleString.toSimpleString(address), name, type, session);
}
@Deprecated
protected ActiveMQDestination(final SimpleString address,
final String name,
final TYPE type,
final ActiveMQSession session) {
this(address, type, session);
this.name = name;
this.address = simpleAddress != null ? simpleAddress.toString() : null;
}
public void setAddress(String address) {
setSimpleAddress(SimpleString.toSimpleString(address));
}
@ -287,6 +319,7 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
if (address == null) {
throw new IllegalArgumentException("address cannot be null");
}
this.address = address.toString();
this.simpleAddress = address;
}
@ -319,7 +352,7 @@ public class ActiveMQDestination extends JNDIStorable implements Destination, Se
}
public String getName() {
return simpleAddress.toString();
return name != null ? name : getAddress();
}
public boolean isTemporary() {

View File

@ -47,6 +47,11 @@ public class ActiveMQQueue extends ActiveMQDestination implements Queue {
super(address, TYPE.QUEUE, null);
}
@Deprecated
public ActiveMQQueue(final String address, final String name) {
super(address, name, TYPE.QUEUE, null);
}
public ActiveMQQueue(final String address, boolean temporary) {
super(address, temporary ? TYPE.TEMP_QUEUE : TYPE.QUEUE, null);
}

View File

@ -46,8 +46,13 @@ public class ActiveMQTopic extends ActiveMQDestination implements Topic {
super(address, TYPE.TOPIC, null);
}
@Deprecated
public ActiveMQTopic(final String address, final String name) {
super(address, name, TYPE.TOPIC, null);
}
public ActiveMQTopic(final String address, boolean temporary) {
super(address, TYPE.TOPIC, null);
this(address, temporary, null);
}
/**

View File

@ -60,6 +60,24 @@ public interface JMSServerManager extends ActiveMQComponent {
boolean durable,
String... bindings) throws Exception;
/**
* Creates a JMS Queue.
*
* @param queueName The name of the core queue to create
* @param jmsQueueName the name of this JMS queue
* @param selectorString
* @param durable
* @return true if the queue is created or if it existed and was added to
* the Binding Registry
* @throws Exception if problems were encountered creating the queue.
*/
boolean createQueue(boolean storeConfig,
String queueName,
String jmsQueueName,
String selectorString,
boolean durable,
String... bindings) throws Exception;
boolean addTopicToBindingRegistry(String topicName, String binding) throws Exception;
boolean addQueueToBindingRegistry(String queueName, String binding) throws Exception;
@ -69,23 +87,46 @@ public interface JMSServerManager extends ActiveMQComponent {
/**
* Creates a JMS Topic
*
* @param address the core addres of thetopic
* @param bindings the names of the binding for the Binding Registry or BindingRegistry
* @return true if the topic was created or if it existed and was added to
* the Binding Registry
* @throws Exception if a problem occurred creating the topic
*/
boolean createTopic(boolean storeConfig, String address, String... bindings) throws Exception;
/**
* Creates a JMS Topic
*
* @param address the core addres of thetopic
* @param topicName the name of the topic
* @param bindings the names of the binding for the Binding Registry or BindingRegistry
* @return true if the topic was created or if it existed and was added to
* the Binding Registry
* @throws Exception if a problem occurred creating the topic
*/
boolean createTopic(boolean storeConfig, String topicName, String... bindings) throws Exception;
boolean createTopic(String address, boolean storeConfig, String topicName, String... bindings) throws Exception;
/**
* @param storeConfig
* @param address
* @param autoCreated
* @param bindings
* @return
* @throws Exception
*/
boolean createTopic(boolean storeConfig, String address, boolean autoCreated, String... bindings) throws Exception;
/**
* @param storeConfig
* @param address
* @param topicName
* @param autoCreated
* @param bindings
* @return
* @throws Exception
*/
boolean createTopic(boolean storeConfig, String topicName, boolean autoCreated, String... bindings) throws Exception;
boolean createTopic(boolean storeConfig, String address, String topicName, boolean autoCreated, String... bindings) throws Exception;
/**
* Remove the topic from the Binding Registry or BindingRegistry.

View File

@ -117,8 +117,10 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
private BindingRegistry registry;
// keys are the core addresses of the JMS queues
private final Map<String, ActiveMQQueue> queues = new HashMap<>();
// keys are the core addresses of the topics
private final Map<String, ActiveMQTopic> topics = new HashMap<>();
private final Map<String, ActiveMQConnectionFactory> connectionFactories = new HashMap<>();
@ -465,11 +467,17 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
final String selectorString,
final boolean durable,
final String... bindings) throws Exception {
return internalCreateJMSQueue(storeConfig, queueName, selectorString, durable, false, bindings);
return internalCreateJMSQueue(storeConfig, queueName, queueName, selectorString, durable, false, bindings);
}
@Override
public boolean createQueue(boolean storeConfig, String queueName, String jmsQueueName, String selectorString, boolean durable, String... bindings) throws Exception {
return internalCreateJMSQueue(storeConfig, queueName, jmsQueueName, selectorString, durable, false, bindings);
}
protected boolean internalCreateJMSQueue(final boolean storeConfig,
final String queueName,
final String jmsQueueName,
final String selectorString,
final boolean durable,
final boolean autoCreated,
@ -489,7 +497,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
public void runException() throws Exception {
checkBindings(bindings);
if (internalCreateQueue(queueName, selectorString, durable, autoCreated)) {
if (internalCreateQueue(queueName, jmsQueueName, selectorString, durable, autoCreated)) {
ActiveMQDestination destination = queues.get(queueName);
if (destination == null) {
@ -528,32 +536,46 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
@Override
public synchronized boolean createTopic(final boolean storeConfig,
final String topicName,
final String address,
final String... bindings) throws Exception {
return createTopic(storeConfig, topicName, false, bindings);
return createTopic(storeConfig, address, false, bindings);
}
@Override
public boolean createTopic(String address, boolean storeConfig, String topicName, String... bindings) throws Exception {
return createTopic(storeConfig, address, topicName, false, bindings);
}
@Override
public synchronized boolean createTopic(final boolean storeConfig,
final String topicName,
final String address,
final boolean autoCreated,
final String... bindings) throws Exception {
if (active && topics.get(topicName) != null) {
return createTopic(storeConfig, address, address, autoCreated, bindings);
}
@Override
public boolean createTopic(final boolean storeConfig,
final String address,
final String topicName,
final boolean autoCreated,
final String... bindings) throws Exception {
if (active && topics.get(address) != null) {
return false;
}
runAfterActive(new WrappedRunnable() {
@Override
public String toString() {
return "createTopic for " + topicName;
return "createTopic for " + address;
}
@Override
public void runException() throws Exception {
checkBindings(bindings);
if (internalCreateTopic(topicName, autoCreated)) {
ActiveMQDestination destination = topics.get(topicName);
if (internalCreateTopic(address, topicName, autoCreated)) {
ActiveMQDestination destination = topics.get(address);
if (destination == null) {
// sanity check. internalCreateQueue should already have done this check
@ -571,17 +593,17 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
}
String[] usedBindings = bindingsToAdd.toArray(new String[bindingsToAdd.size()]);
addToBindings(topicBindings, topicName, usedBindings);
addToBindings(topicBindings, address, usedBindings);
if (storeConfig) {
storage.storeDestination(new PersistedDestination(PersistedType.Topic, topicName));
storage.addBindings(PersistedType.Topic, topicName, usedBindings);
storage.storeDestination(new PersistedDestination(PersistedType.Topic, address));
storage.addBindings(PersistedType.Topic, address, usedBindings);
}
}
}
});
sendNotification(JMSNotificationType.TOPIC_CREATED, topicName);
sendNotification(JMSNotificationType.TOPIC_CREATED, address);
return true;
}
@ -1056,10 +1078,11 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
private synchronized boolean internalCreateQueue(final String queueName,
final String selectorString,
final boolean durable) throws Exception {
return internalCreateQueue(queueName, selectorString, durable, false);
return internalCreateQueue(queueName, queueName, selectorString, durable, false);
}
private synchronized boolean internalCreateQueue(final String queueName,
final String jmsQueueName,
final String selectorString,
final boolean durable,
final boolean autoCreated) throws Exception {
@ -1078,7 +1101,8 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
AddressSettings as = server.getAddressSettingsRepository().getMatch(queueName);
server.createQueue(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(coreFilterString), null, durable, false, true, false, false, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), as.isAutoCreateAddresses());
queues.put(queueName, ActiveMQDestination.createQueue(queueName));
// create the JMS queue with the logical name jmsQueueName and keeps queueName for its *core* queue name
queues.put(queueName, ActiveMQDestination.createQueue(queueName, jmsQueueName));
this.recoverregistryBindings(queueName, PersistedType.Queue);
@ -1090,21 +1114,23 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
* Performs the internal creation without activating any storage.
* The storage load will call this method
*
* @param topicName
* @param address
* @return
* @throws Exception
*/
private synchronized boolean internalCreateTopic(final String topicName) throws Exception {
return internalCreateTopic(topicName, false);
private synchronized boolean internalCreateTopic(final String address) throws Exception {
return internalCreateTopic(address, address, false);
}
private synchronized boolean internalCreateTopic(final String topicName,
private synchronized boolean internalCreateTopic(final String address,
final String topicName,
final boolean autoCreated) throws Exception {
if (topics.get(topicName) != null) {
if (topics.get(address) != null) {
return false;
} else {
ActiveMQTopic activeMQTopic = ActiveMQDestination.createTopic(topicName);
// Create the JMS topic with topicName as the logical name of the topic *and* address as its address
ActiveMQTopic activeMQTopic = ActiveMQDestination.createTopic(address, topicName);
server.addOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(activeMQTopic.getAddress()), RoutingType.MULTICAST));
topics.put(topicName, activeMQTopic);

View File

@ -31,6 +31,7 @@ import org.apache.activemq.artemis.jms.client.*
file = arg[0]
method = arg[1]
version = arg[2]
System.out.println("File::" + file);
@ -49,6 +50,11 @@ if (method.equals("write")) {
topic = new ActiveMQTopic("topic")
temporary = ActiveMQDestination.createTemporaryQueue("whatever")
temporaryTopic = ActiveMQDestination.createTemporaryTopic("whatever")
if (version.equals("ARTEMIS-SNAPSHOT")) {
destination = new ActiveMQDestination("address", "name", ActiveMQDestination.TYPE.DESTINATION, null)
} else if (version.equals("ARTEMIS-155")) {
destination = new ActiveMQDestination("address", "name", false, true, null)
}
Marshaller marshaller = factory.createMarshaller(configuration)
FileOutputStream fileOutputStream = new FileOutputStream(file)
@ -58,6 +64,7 @@ if (method.equals("write")) {
marshaller.writeObject(topic)
marshaller.writeObject(temporary)
marshaller.writeObject(temporaryTopic)
marshaller.writeObject(destination)
marshaller.finish()
fileOutputStream.close();
} else {
@ -69,10 +76,13 @@ if (method.equals("write")) {
topic = unmarshaller.readObject()
temporary = unmarshaller.readObject()
temporaryTopic = unmarshaller.readObject()
destination = unmarshaller.readObject()
}
GroovyRun.assertTrue(!cf.getServerLocator().isBlockOnDurableSend());
GroovyRun.assertEquals(1048576, cf.getServerLocator().getConfirmationWindowSize());
GroovyRun.assertEquals(destination.getName(), "name")
GroovyRun.assertEquals(destination.getAddress(), "address")
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

View File

@ -24,18 +24,25 @@ import org.apache.activemq.artemis.jms.client.*
file = arg[0]
method = arg[1]
System.out.println("File::" + file);
version = arg[2]
System.out.println("File::" + file)
if (method.equals("write")) {
cf = new ActiveMQConnectionFactory("tcp://localhost:61616?confirmationWindowSize=1048576&blockOnDurableSend=false");
queue = new ActiveMQQueue("queue");
topic = new ActiveMQTopic("topic")
if (version.equals("ARTEMIS-SNAPSHOT")) {
destination = new ActiveMQDestination("address", "name", ActiveMQDestination.TYPE.DESTINATION, null)
} else if (version.equals("ARTEMIS-155")) {
destination = new ActiveMQDestination("address", "name", false, true, null)
}
ObjectOutputStream objectOutputStream = new ObjectOutputStream(new FileOutputStream(file));
objectOutputStream.writeObject(cf);
objectOutputStream.writeObject(queue)
objectOutputStream.writeObject(topic)
objectOutputStream.writeObject(destination)
objectOutputStream.close();
} else {
ObjectInputStream inputStream = new ObjectInputStream(new FileInputStream(file))
@ -43,11 +50,14 @@ if (method.equals("write")) {
cf = inputStream.readObject();
queue = inputStream.readObject()
topic = inputStream.readObject()
destination = inputStream.readObject()
inputStream.close();
}
GroovyRun.assertTrue(!cf.getServerLocator().isBlockOnDurableSend());
GroovyRun.assertEquals(1048576, cf.getServerLocator().getConfirmationWindowSize());
GroovyRun.assertEquals(1048576, cf.getServerLocator().getConfirmationWindowSize())
GroovyRun.assertEquals(destination.getName(), "name")
GroovyRun.assertEquals(destination.getAddress(), "address")
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

View File

@ -91,15 +91,15 @@ public class SerializationTest extends VersionedBaseTest {
@Test
public void testSerializeFactory() throws Throwable {
File file = serverFolder.newFile("objects.ser");
callScript(senderClassloader, "serial/serial.groovy", file.getAbsolutePath(), "write");
callScript(receiverClassloader, "serial/serial.groovy", file.getAbsolutePath(), "read");
callScript(senderClassloader, "serial/serial.groovy", file.getAbsolutePath(), "write", sender);
callScript(receiverClassloader, "serial/serial.groovy", file.getAbsolutePath(), "read", receiver);
}
@Test
public void testJBMSerializeFactory() throws Throwable {
File file = serverFolder.newFile("objectsjbm.ser");
callScript(senderClassloader, "serial/jbmserial.groovy", file.getAbsolutePath(), "write");
callScript(receiverClassloader, "serial/jbmserial.groovy", file.getAbsolutePath(), "read");
callScript(senderClassloader, "serial/jbmserial.groovy", file.getAbsolutePath(), "write", sender);
callScript(receiverClassloader, "serial/jbmserial.groovy", file.getAbsolutePath(), "read", receiver);
}
}