This closes #3201
This commit is contained in:
commit
cc7c3d3721
|
@ -3992,11 +3992,19 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
addressSettingsRepository.swap(configuration.getAddressesSettings().entrySet());
|
||||
|
||||
ActiveMQServerLogger.LOGGER.reloadingConfiguration("diverts");
|
||||
final Set<SimpleString> divertsToRemove = postOffice.getAllBindings().values().stream()
|
||||
.filter(binding -> binding instanceof DivertBinding)
|
||||
.map(Binding::getUniqueName)
|
||||
.collect(Collectors.toSet());
|
||||
for (DivertConfiguration divertConfig : configuration.getDivertConfigurations()) {
|
||||
divertsToRemove.remove(SimpleString.toSimpleString(divertConfig.getName()));
|
||||
if (postOffice.getBinding(new SimpleString(divertConfig.getName())) == null) {
|
||||
deployDivert(divertConfig);
|
||||
}
|
||||
}
|
||||
for (final SimpleString divertName : divertsToRemove) {
|
||||
destroyDivert(divertName);
|
||||
}
|
||||
|
||||
ActiveMQServerLogger.LOGGER.reloadingConfiguration("addresses");
|
||||
undeployAddressesAndQueueNotInConfiguration(configuration);
|
||||
|
|
|
@ -161,10 +161,7 @@ and several sub-elements that defines the properties of a divert.
|
|||
|
||||
> **Note:**
|
||||
>
|
||||
> Reloading `<diverts>` only resulting in deploying new diverts. Existing diverts
|
||||
> won’t get undeployed even if you delete a `<divert>` element. Nor an existing
|
||||
> divert will be updated if its element is updated after reloading. To make
|
||||
> this happen you need a restart of the broker.
|
||||
> Existing diverts get undeployed if you delete their `<divert>` element.
|
||||
|
||||
Below lists the effects of adding, deleting and updating of an
|
||||
element/attribute within the diverts element, whether a change can be done or
|
||||
|
@ -172,8 +169,8 @@ can’t be done.
|
|||
|
||||
Operation | Add | Delete | Update
|
||||
---|---|---|---
|
||||
`<diverts>` | X (no more than one can be present) | Deleting it means delete (undeploy) all diverts in running broker. | N/A
|
||||
`<divert>` | Adding a new divert. It will be deployed after reloading | No effect on the deployed divert.(unless restarting broker, in which case the divert will no longer be deployed) | No effect on the deployed divert (unless restarting broker, in which case the divert will be redeployed)
|
||||
`<diverts>` | X (no more than one can be present) | Deleting it means delete (undeploy) all diverts in running broker. | N/A
|
||||
`<divert>` | Adding a new divert. It will be deployed after reloading | Deleting it means the divert will be undeployed after reloading | No effect on the deployed divert (unless restarting broker, in which case the divert will be redeployed)
|
||||
attribute `name` | N/A | X | A new divert with the name will be deployed. (if it is not already there in broker). Otherwise no effect.
|
||||
`<transformer-class-name>` | X (no more than one can be present) | No effect on the deployed divert.(unless restarting broker, in which case the divert will be deployed without the transformer class) | No effect on the deployed divert.(unless restarting broker, in which case the divert has the transformer class)
|
||||
`<exclusive>` | X (no more than one can be present) | No effect on the deployed divert.(unless restarting broker) | No effect on the deployed divert.(unless restarting broker)
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
|
|||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
|
||||
import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding;
|
||||
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
|
||||
import org.apache.activemq.artemis.core.security.Role;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
|
@ -52,6 +53,7 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
|||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
||||
import org.apache.activemq.artemis.tests.unit.core.postoffice.impl.FakeQueue;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
|
@ -448,6 +450,63 @@ public class RedeployTest extends ActiveMQTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUndeployDivert() throws Exception {
|
||||
|
||||
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
|
||||
URL baseConfig = RedeployTest.class.getClassLoader().getResource("reload-divert-undeploy-before.xml");
|
||||
URL newConfig = RedeployTest.class.getClassLoader().getResource("reload-divert-undeploy-after.xml");
|
||||
Files.copy(baseConfig.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
|
||||
EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
|
||||
embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
|
||||
embeddedActiveMQ.start();
|
||||
|
||||
try {
|
||||
DivertBinding divertBinding = (DivertBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice()
|
||||
.getBinding(new SimpleString("divert"));
|
||||
assertNotNull(divertBinding);
|
||||
|
||||
Queue sourceQueue = (Queue) ActiveMQDestination.createDestination("queue://source", ActiveMQDestination.TYPE.QUEUE);
|
||||
Queue targetQueue = (Queue) ActiveMQDestination.createDestination("queue://target", ActiveMQDestination.TYPE.QUEUE);
|
||||
|
||||
try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
|
||||
Connection connection = factory.createConnection();
|
||||
Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer sourceProducer = session.createProducer(sourceQueue);
|
||||
MessageConsumer sourceConsumer = session.createConsumer(sourceQueue);
|
||||
MessageConsumer targetConsumer = session.createConsumer(targetQueue)) {
|
||||
|
||||
connection.start();
|
||||
Message message = session.createTextMessage("Hello world");
|
||||
sourceProducer.send(message);
|
||||
assertNotNull(sourceConsumer.receive(2000));
|
||||
assertNotNull(targetConsumer.receive(2000));
|
||||
}
|
||||
|
||||
deployBrokerConfig(embeddedActiveMQ, newConfig);
|
||||
|
||||
divertBinding = (DivertBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice()
|
||||
.getBinding(new SimpleString("divert"));
|
||||
assertNull(divertBinding);
|
||||
|
||||
try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
|
||||
Connection connection = factory.createConnection();
|
||||
Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer sourceProducer = session.createProducer(sourceQueue);
|
||||
MessageConsumer sourceConsumer = session.createConsumer(sourceQueue);
|
||||
MessageConsumer targetConsumer = session.createConsumer(targetQueue)) {
|
||||
|
||||
connection.start();
|
||||
Message message = session.createTextMessage("Hello world");
|
||||
sourceProducer.send(message);
|
||||
assertNotNull(sourceConsumer.receive(2000));
|
||||
assertNull(targetConsumer.receive(2000));
|
||||
}
|
||||
} finally {
|
||||
embeddedActiveMQ.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRedeployWithFailover() throws Exception {
|
||||
Set<Role> original = new HashSet<>();
|
||||
|
|
|
@ -0,0 +1,93 @@
|
|||
<?xml version='1.0'?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
<configuration xmlns="urn:activemq"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
|
||||
|
||||
<core xmlns="urn:activemq:core">
|
||||
|
||||
<name>0.0.0.0</name>
|
||||
|
||||
<configuration-file-refresh-period>100</configuration-file-refresh-period>
|
||||
|
||||
<persistence-enabled>false</persistence-enabled>
|
||||
|
||||
<security-enabled>false</security-enabled>
|
||||
|
||||
<!-- this could be ASYNCIO or NIO
|
||||
-->
|
||||
<journal-type>NIO</journal-type>
|
||||
|
||||
<paging-directory>./data/paging</paging-directory>
|
||||
|
||||
<bindings-directory>./data/bindings</bindings-directory>
|
||||
|
||||
<journal-directory>./data/journal</journal-directory>
|
||||
|
||||
<large-messages-directory>./data/large-messages</large-messages-directory>
|
||||
|
||||
<journal-min-files>2</journal-min-files>
|
||||
|
||||
<journal-pool-files>-1</journal-pool-files>
|
||||
|
||||
<!--
|
||||
This value was determined through a calculation.
|
||||
Your system could perform 25 writes per millisecond
|
||||
on the current journal configuration.
|
||||
That translates as a sync write every 40000 nanoseconds
|
||||
-->
|
||||
<journal-buffer-timeout>40000</journal-buffer-timeout>
|
||||
|
||||
|
||||
<acceptors>
|
||||
<!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. -->
|
||||
<!-- performance tests have shown that openWire performs best with these buffer sizes -->
|
||||
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
|
||||
|
||||
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
|
||||
<acceptor name="amqp">tcp://0.0.0.0:5672?protocols=AMQP</acceptor>
|
||||
|
||||
<!-- STOMP Acceptor. -->
|
||||
<acceptor name="stomp">tcp://0.0.0.0:61613?protocols=STOMP</acceptor>
|
||||
|
||||
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
|
||||
<acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP</acceptor>
|
||||
|
||||
<!-- MQTT Acceptor -->
|
||||
<acceptor name="mqtt">tcp://0.0.0.0:1883?protocols=MQTT</acceptor>
|
||||
|
||||
</acceptors>
|
||||
|
||||
<addresses>
|
||||
<address name="source">
|
||||
<anycast>
|
||||
<queue name="source"/>
|
||||
</anycast>
|
||||
</address>
|
||||
<address name="target">
|
||||
<multicast>
|
||||
<queue name="target"/>
|
||||
</multicast>
|
||||
</address>
|
||||
</addresses>
|
||||
|
||||
</core>
|
||||
</configuration>
|
|
@ -0,0 +1,100 @@
|
|||
<?xml version='1.0'?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
<configuration xmlns="urn:activemq"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
|
||||
|
||||
<core xmlns="urn:activemq:core">
|
||||
|
||||
<name>0.0.0.0</name>
|
||||
|
||||
<configuration-file-refresh-period>100</configuration-file-refresh-period>
|
||||
|
||||
<persistence-enabled>false</persistence-enabled>
|
||||
|
||||
<security-enabled>false</security-enabled>
|
||||
|
||||
<!-- this could be ASYNCIO or NIO
|
||||
-->
|
||||
<journal-type>NIO</journal-type>
|
||||
|
||||
<paging-directory>./data/paging</paging-directory>
|
||||
|
||||
<bindings-directory>./data/bindings</bindings-directory>
|
||||
|
||||
<journal-directory>./data/journal</journal-directory>
|
||||
|
||||
<large-messages-directory>./data/large-messages</large-messages-directory>
|
||||
|
||||
<journal-min-files>2</journal-min-files>
|
||||
|
||||
<journal-pool-files>-1</journal-pool-files>
|
||||
|
||||
<!--
|
||||
This value was determined through a calculation.
|
||||
Your system could perform 25 writes per millisecond
|
||||
on the current journal configuration.
|
||||
That translates as a sync write every 40000 nanoseconds
|
||||
-->
|
||||
<journal-buffer-timeout>40000</journal-buffer-timeout>
|
||||
|
||||
|
||||
<acceptors>
|
||||
<!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. -->
|
||||
<!-- performance tests have shown that openWire performs best with these buffer sizes -->
|
||||
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
|
||||
|
||||
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
|
||||
<acceptor name="amqp">tcp://0.0.0.0:5672?protocols=AMQP</acceptor>
|
||||
|
||||
<!-- STOMP Acceptor. -->
|
||||
<acceptor name="stomp">tcp://0.0.0.0:61613?protocols=STOMP</acceptor>
|
||||
|
||||
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
|
||||
<acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP</acceptor>
|
||||
|
||||
<!-- MQTT Acceptor -->
|
||||
<acceptor name="mqtt">tcp://0.0.0.0:1883?protocols=MQTT</acceptor>
|
||||
|
||||
</acceptors>
|
||||
|
||||
<addresses>
|
||||
<address name="source">
|
||||
<anycast>
|
||||
<queue name="source"/>
|
||||
</anycast>
|
||||
</address>
|
||||
<address name="target">
|
||||
<multicast>
|
||||
<queue name="target"/>
|
||||
</multicast>
|
||||
</address>
|
||||
</addresses>
|
||||
|
||||
<diverts>
|
||||
<divert name="divert">
|
||||
<address>source</address>
|
||||
<forwarding-address>target</forwarding-address>
|
||||
</divert>
|
||||
</diverts>
|
||||
|
||||
</core>
|
||||
</configuration>
|
Loading…
Reference in New Issue