mirror of https://github.com/apache/activemq.git
Fix and test for variations in configuration settings that result in a non-region broker as the AdvisoryBroker's next instance.
This commit is contained in:
parent
42d006f2b6
commit
4c38b03d14
|
@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
|
|||
|
||||
import org.apache.activemq.broker.Broker;
|
||||
import org.apache.activemq.broker.BrokerFilter;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.broker.ProducerBrokerExchange;
|
||||
import org.apache.activemq.broker.region.Destination;
|
||||
|
@ -34,7 +35,21 @@ import org.apache.activemq.broker.region.RegionBroker;
|
|||
import org.apache.activemq.broker.region.Subscription;
|
||||
import org.apache.activemq.broker.region.TopicRegion;
|
||||
import org.apache.activemq.broker.region.TopicSubscription;
|
||||
import org.apache.activemq.command.*;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQMessage;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.command.BrokerInfo;
|
||||
import org.apache.activemq.command.Command;
|
||||
import org.apache.activemq.command.ConnectionId;
|
||||
import org.apache.activemq.command.ConnectionInfo;
|
||||
import org.apache.activemq.command.ConsumerId;
|
||||
import org.apache.activemq.command.ConsumerInfo;
|
||||
import org.apache.activemq.command.DestinationInfo;
|
||||
import org.apache.activemq.command.Message;
|
||||
import org.apache.activemq.command.MessageId;
|
||||
import org.apache.activemq.command.ProducerId;
|
||||
import org.apache.activemq.command.ProducerInfo;
|
||||
import org.apache.activemq.command.RemoveSubscriptionInfo;
|
||||
import org.apache.activemq.security.SecurityContext;
|
||||
import org.apache.activemq.state.ProducerState;
|
||||
import org.apache.activemq.usage.Usage;
|
||||
|
@ -261,7 +276,21 @@ public class AdvisoryBroker extends BrokerFilter {
|
|||
@Override
|
||||
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
|
||||
SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
|
||||
DurableTopicSubscription sub = ((TopicRegion) ((RegionBroker) next).getTopicRegion()).getDurableSubscription(key);
|
||||
|
||||
RegionBroker regionBroker = null;
|
||||
if (next instanceof RegionBroker) {
|
||||
regionBroker = (RegionBroker) next;
|
||||
} else {
|
||||
BrokerService service = next.getBrokerService();
|
||||
regionBroker = (RegionBroker) service.getRegionBroker();
|
||||
}
|
||||
|
||||
if (regionBroker == null) {
|
||||
LOG.warn("Cannot locate a RegionBroker instance to pass along the removeSubscription call");
|
||||
throw new IllegalStateException("No RegionBroker found.");
|
||||
}
|
||||
|
||||
DurableTopicSubscription sub = ((TopicRegion) regionBroker.getTopicRegion()).getDurableSubscription(key);
|
||||
|
||||
super.removeSubscription(context, info);
|
||||
|
||||
|
|
|
@ -0,0 +1,82 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.bugs;
|
||||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.Topic;
|
||||
import javax.management.MalformedObjectNameException;
|
||||
import javax.management.ObjectName;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.jmx.BrokerViewMBean;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class AMQ5035Test {
|
||||
|
||||
private static final String CLIENT_ID = "amq-test-client-id";
|
||||
private static final String DURABLE_SUB_NAME = "testDurable";
|
||||
|
||||
private final String xbean = "xbean:";
|
||||
private final String confBase = "src/test/resources/org/apache/activemq/bugs/amq5035";
|
||||
|
||||
private static BrokerService brokerService;
|
||||
private String connectionUri;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
brokerService = BrokerFactory.createBroker(xbean + confBase + "/activemq.xml");
|
||||
connectionUri = brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString();
|
||||
brokerService.start();
|
||||
brokerService.waitUntilStarted();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
brokerService.stop();
|
||||
brokerService.waitUntilStopped();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFoo() throws Exception {
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
|
||||
Connection connection = factory.createConnection();
|
||||
connection.setClientID(CLIENT_ID);
|
||||
connection.start();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Topic topic = session.createTopic("Test.Topic");
|
||||
MessageConsumer consumer = session.createDurableSubscriber(topic, DURABLE_SUB_NAME);
|
||||
consumer.close();
|
||||
|
||||
BrokerViewMBean brokerView = getBrokerView(DURABLE_SUB_NAME);
|
||||
brokerView.destroyDurableSubscriber(CLIENT_ID, DURABLE_SUB_NAME);
|
||||
}
|
||||
|
||||
private BrokerViewMBean getBrokerView(String testDurable) throws MalformedObjectNameException {
|
||||
ObjectName brokerName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost");
|
||||
BrokerViewMBean view = (BrokerViewMBean) brokerService.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true);
|
||||
assertNotNull(view);
|
||||
return view;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,109 @@
|
|||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<!-- START SNIPPET: example -->
|
||||
<beans
|
||||
xmlns="http://www.springframework.org/schema/beans"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
|
||||
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
|
||||
|
||||
<!--
|
||||
The <broker> element is used to configure the ActiveMQ broker.
|
||||
-->
|
||||
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">
|
||||
|
||||
<destinationPolicy>
|
||||
<policyMap>
|
||||
<policyEntries>
|
||||
<policyEntry topic=">" >
|
||||
<!-- The constantPendingMessageLimitStrategy is used to prevent
|
||||
slow topic consumers to block producers and affect other consumers
|
||||
by limiting the number of messages that are retained
|
||||
For more information, see:
|
||||
|
||||
http://activemq.apache.org/slow-consumer-handling.html
|
||||
-->
|
||||
<pendingMessageLimitStrategy>
|
||||
<constantPendingMessageLimitStrategy limit="1000"/>
|
||||
</pendingMessageLimitStrategy>
|
||||
</policyEntry>
|
||||
</policyEntries>
|
||||
</policyMap>
|
||||
</destinationPolicy>
|
||||
|
||||
<!--
|
||||
The managementContext is used to configure how ActiveMQ is exposed in
|
||||
JMX. By default, ActiveMQ uses the MBean server that is started by
|
||||
the JVM. For more information, see:
|
||||
|
||||
http://activemq.apache.org/jmx.html
|
||||
-->
|
||||
<managementContext>
|
||||
<managementContext createConnector="false"/>
|
||||
</managementContext>
|
||||
|
||||
<!--
|
||||
Configure message persistence for the broker. The default persistence
|
||||
mechanism is the KahaDB store (identified by the kahaDB tag).
|
||||
For more information, see:
|
||||
|
||||
http://activemq.apache.org/persistence.html
|
||||
-->
|
||||
<persistenceAdapter>
|
||||
<kahaDB directory="${activemq.data}/kahadb"/>
|
||||
</persistenceAdapter>
|
||||
|
||||
|
||||
<!--
|
||||
The systemUsage controls the maximum amount of space the broker will
|
||||
use before disabling caching and/or slowing down producers. For more information, see:
|
||||
http://activemq.apache.org/producer-flow-control.html
|
||||
-->
|
||||
<systemUsage>
|
||||
<systemUsage>
|
||||
<memoryUsage>
|
||||
<memoryUsage percentOfJvmHeap="70" />
|
||||
</memoryUsage>
|
||||
<storeUsage>
|
||||
<storeUsage limit="100 gb"/>
|
||||
</storeUsage>
|
||||
<tempUsage>
|
||||
<tempUsage limit="50 gb"/>
|
||||
</tempUsage>
|
||||
</systemUsage>
|
||||
</systemUsage>
|
||||
|
||||
<!--
|
||||
The transport connectors expose ActiveMQ over a given protocol to
|
||||
clients and other brokers. For more information, see:
|
||||
|
||||
http://activemq.apache.org/configuring-transports.html
|
||||
-->
|
||||
<transportConnectors>
|
||||
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
|
||||
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
|
||||
</transportConnectors>
|
||||
|
||||
<!-- destroy the spring context on shutdown to stop jetty -->
|
||||
<shutdownHooks>
|
||||
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
|
||||
</shutdownHooks>
|
||||
|
||||
</broker>
|
||||
|
||||
</beans>
|
||||
<!-- END SNIPPET: example -->
|
Loading…
Reference in New Issue