ARTEMIS-2534 Adding additional test for OpenWire and TempQueue

This commit is contained in:
Michael Pearce 2020-05-05 18:18:37 -04:00 committed by Clebert Suconic
parent 593207eb8a
commit ed4086c687
3 changed files with 377 additions and 0 deletions

View File

@ -0,0 +1,135 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.jms;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.junit.Test;
public class RedeployTempTest extends ActiveMQTestBase {
@Test
public void testRedeployAddressQueueOpenWire() throws Exception {
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
URL url1 = RedeployTest.class.getClassLoader().getResource("RedeployTempTest-reload-temp.xml");
URL url2 = RedeployTest.class.getClassLoader().getResource("RedeployTempTest-reload-temp-updated.xml");
Files.copy(url1.openStream(), brokerXML);
EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
embeddedActiveMQ.start();
final ReusableLatch latch = new ReusableLatch(1);
Runnable tick = latch::countDown;
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
ConnectionFactory connectionFactory = new org.apache.activemq.ActiveMQConnectionFactory();
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("queue");
MessageProducer messageProducer = session.createProducer(destination);
Destination replyTo = session.createTemporaryQueue();
Message message = session.createTextMessage("hello");
message.setJMSReplyTo(replyTo);
messageProducer.send(message);
try {
latch.await(10, TimeUnit.SECONDS);
Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
latch.setCount(1);
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
latch.await(10, TimeUnit.SECONDS);
try (Connection connectionConsumer = connectionFactory.createConnection()) {
connectionConsumer.start();
try (Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
Destination destinationConsumer = session.createQueue("queue");
MessageConsumer messageConsumer = sessionConsumer.createConsumer(destinationConsumer);
Message receivedMessage = messageConsumer.receive(1000);
assertEquals("hello", ((TextMessage) receivedMessage).getText());
Destination replyToDest = receivedMessage.getJMSReplyTo();
Message message1 = sessionConsumer.createTextMessage("hi there");
session.createProducer(replyToDest).send(message1);
}
}
MessageConsumer messageConsumerProducer = session.createConsumer(replyTo);
Message message2 = messageConsumerProducer.receive(1000);
assertEquals("hi there", ((TextMessage) message2).getText());
} finally {
connection.close();
embeddedActiveMQ.stop();
}
}
private AddressSettings getAddressSettings(EmbeddedActiveMQ embeddedActiveMQ, String address) {
return embeddedActiveMQ.getActiveMQServer().getAddressSettingsRepository().getMatch(address);
}
private Set<Role> getSecurityRoles(EmbeddedActiveMQ embeddedActiveMQ, String address) {
return embeddedActiveMQ.getActiveMQServer().getSecurityRepository().getMatch(address);
}
private AddressInfo getAddressInfo(EmbeddedActiveMQ embeddedActiveMQ, String address) {
return embeddedActiveMQ.getActiveMQServer().getPostOffice().getAddressInfo(SimpleString.toSimpleString(address));
}
private org.apache.activemq.artemis.core.server.Queue getQueue(EmbeddedActiveMQ embeddedActiveMQ,
String queueName) throws Exception {
QueueBinding queueBinding = (QueueBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(SimpleString.toSimpleString(queueName));
return queueBinding == null ? null : queueBinding.getQueue();
}
private List<String> listQueuesNamesForAddress(EmbeddedActiveMQ embeddedActiveMQ, String address) throws Exception {
return embeddedActiveMQ.getActiveMQServer().getPostOffice().listQueuesForAddress(SimpleString.toSimpleString(address)).stream().map(org.apache.activemq.artemis.core.server.Queue::getName).map(SimpleString::toString).collect(Collectors.toList());
}
}

View File

@ -0,0 +1,121 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<name>0.0.0.0</name>
<configuration-file-refresh-period>100</configuration-file-refresh-period>
<persistence-enabled>false</persistence-enabled>
<security-enabled>false</security-enabled>
<!-- this could be ASYNCIO or NIO
-->
<journal-type>NIO</journal-type>
<paging-directory>./data/paging</paging-directory>
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/large-messages</large-messages-directory>
<journal-min-files>2</journal-min-files>
<journal-pool-files>-1</journal-pool-files>
<!--
This value was determined through a calculation.
Your system could perform 25 writes per millisecond
on the current journal configuration.
That translates as a sync write every 40000 nanoseconds
-->
<journal-buffer-timeout>40000</journal-buffer-timeout>
<acceptors>
<!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. -->
<!-- performance tests have shown that openWire performs best with these buffer sizes -->
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?protocols=AMQP</acceptor>
<!-- STOMP Acceptor. -->
<acceptor name="stomp">tcp://0.0.0.0:61613?protocols=STOMP</acceptor>
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
<acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP</acceptor>
<!-- MQTT Acceptor -->
<acceptor name="mqtt">tcp://0.0.0.0:1883?protocols=MQTT</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="a"/>
<permission type="deleteNonDurableQueue" roles="a"/>
<permission type="createDurableQueue" roles="a"/>
<permission type="deleteDurableQueue" roles="a"/>
<permission type="browse" roles="a"/>
<permission type="send" roles="a"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="a"/>
</security-setting>
</security-settings>
<address-settings>
<address-setting match="#">
<auto-create-queues>false</auto-create-queues>
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>10Mb</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
<config-delete-queues>FORCE</config-delete-queues>
<config-delete-addresses>FORCE</config-delete-addresses>
</address-setting>
</address-settings>
<wildcard-addresses>
<delimiter>_</delimiter>
</wildcard-addresses>
<addresses>
<address name="queue">
<anycast>
<queue name="queue"/>
</anycast>
</address>
</addresses>
</core>
</configuration>

View File

@ -0,0 +1,121 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<name>0.0.0.0</name>
<configuration-file-refresh-period>100</configuration-file-refresh-period>
<persistence-enabled>false</persistence-enabled>
<security-enabled>false</security-enabled>
<!-- this could be ASYNCIO or NIO
-->
<journal-type>NIO</journal-type>
<paging-directory>./data/paging</paging-directory>
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/large-messages</large-messages-directory>
<journal-min-files>2</journal-min-files>
<journal-pool-files>-1</journal-pool-files>
<!--
This value was determined through a calculation.
Your system could perform 25 writes per millisecond
on the current journal configuration.
That translates as a sync write every 40000 nanoseconds
-->
<journal-buffer-timeout>40000</journal-buffer-timeout>
<acceptors>
<!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. -->
<!-- performance tests have shown that openWire performs best with these buffer sizes -->
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?protocols=AMQP</acceptor>
<!-- STOMP Acceptor. -->
<acceptor name="stomp">tcp://0.0.0.0:61613?protocols=STOMP</acceptor>
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
<acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP</acceptor>
<!-- MQTT Acceptor -->
<acceptor name="mqtt">tcp://0.0.0.0:1883?protocols=MQTT</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="a"/>
<permission type="deleteNonDurableQueue" roles="a"/>
<permission type="createDurableQueue" roles="a"/>
<permission type="deleteDurableQueue" roles="a"/>
<permission type="browse" roles="a"/>
<permission type="send" roles="a"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="a"/>
</security-setting>
</security-settings>
<address-settings>
<address-setting match="#">
<auto-create-queues>false</auto-create-queues>
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>10Mb</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
<config-delete-queues>FORCE</config-delete-queues>
<config-delete-addresses>FORCE</config-delete-addresses>
</address-setting>
</address-settings>
<wildcard-addresses>
<delimiter>_</delimiter>
</wildcard-addresses>
<addresses>
<address name="queue">
<anycast>
<queue name="queue"/>
</anycast>
</address>
</addresses>
</core>
</configuration>