https://issues.apache.org/jira/browse/AMQ-6254

Store the original subscribed destination along with the target
destination in the subscription info object to ensure that wildcard
subscriptions remain linked.
This commit is contained in:
Timothy Bish 2016-04-19 09:53:58 -04:00
parent 6541bef52d
commit b027e65553
3 changed files with 321 additions and 3 deletions

View File

@ -53,4 +53,5 @@ message SubscriptionRecord {
optional string subscription_name = 3;
optional string selector = 4;
optional string destination_name = 5;
optional string subscribed_destination_name = 6;
}

View File

@ -785,6 +785,9 @@ class DBManager(val parent:LevelDBStore) {
if( info.getDestination!=null ) {
record.setDestinationName(info.getDestination.getQualifiedName)
}
if ( info.getSubscribedDestination!=null) {
record.setSubscribedDestinationName(info.getSubscribedDestination.getQualifiedName)
}
val collection = new CollectionRecord.Bean()
collection.setType(SUBSCRIPTION_COLLECTION_TYPE)
collection.setKey(lastCollectionKey.incrementAndGet())
@ -855,7 +858,10 @@ class DBManager(val parent:LevelDBStore) {
info.setSelector(sr.getSelector)
}
if( sr.hasDestinationName ) {
info.setSubscribedDestination(ActiveMQDestination.createDestination(sr.getDestinationName, ActiveMQDestination.TOPIC_TYPE))
info.setDestination(ActiveMQDestination.createDestination(sr.getDestinationName, ActiveMQDestination.TOPIC_TYPE))
}
if( sr.hasSubscribedDestinationName ) {
info.setSubscribedDestination(ActiveMQDestination.createDestination(sr.getSubscribedDestinationName, ActiveMQDestination.TOPIC_TYPE))
}
var sub = DurableSubscription(key, sr.getTopicKey, info)

View File

@ -0,0 +1,311 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.filter.DestinationMapEntry;
import org.apache.activemq.leveldb.LevelDBStoreFactory;
import org.apache.activemq.security.AuthenticationUser;
import org.apache.activemq.security.AuthorizationEntry;
import org.apache.activemq.security.AuthorizationPlugin;
import org.apache.activemq.security.DefaultAuthorizationMap;
import org.apache.activemq.security.SimpleAuthenticationPlugin;
import org.apache.activemq.security.TempDestinationAuthorizationEntry;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(Parameterized.class)
public class AMQ6254Test {
private static final Logger LOG = LoggerFactory.getLogger(AMQ6254Test.class);
private static final String KAHADB = "KahaDB";
private static final String LEVELDB = "LevelDB";
private BrokerService brokerService;
private String topicA = "alphabet.a";
private String topicB = "alphabet.b";
private String persistenceAdapterName;
private boolean pluginsEnabled;
@Parameters(name="{0} -> plugins = {1}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{KAHADB, true },
{KAHADB, false },
{LEVELDB, true },
{LEVELDB, false },
});
}
public AMQ6254Test(String persistenceAdapterName, boolean pluginsEnabled) {
this.persistenceAdapterName = persistenceAdapterName;
this.pluginsEnabled = pluginsEnabled;
}
@Test(timeout = 60000)
public void testReactivateKeepaliveSubscription() throws Exception {
// Create wild card durable subscription
Connection connection = createConnection();
connection.setClientID("cliID");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber subscriber = session.createDurableSubscriber(session.createTopic("alphabet.>"), "alphabet.>");
// Send message on Topic A
connection = createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createTopic(topicA));
producer.send(session.createTextMessage("Hello A"));
// Verify that message is received
TextMessage message = (TextMessage) subscriber.receive(2000);
assertNotNull("Message not received.", message);
assertEquals("Hello A", message.getText());
subscriber.close();
assertTrue("Should have only one destination", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
Destination destA = getDestination(topicA);
return destA.getDestinationStatistics().getConsumers().getCount() == 1;
}
}));
// Restart broker
brokerService.stop();
brokerService.waitUntilStopped();
LOG.info("Broker stopped");
brokerService = createBroker(false);
brokerService.start();
brokerService.waitUntilStarted();
LOG.info("Broker restarted");
// Recreate wild card durable subscription
connection = createConnection();
connection.setClientID("cliID");
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
subscriber = session.createDurableSubscriber(session.createTopic("alphabet.>"), "alphabet.>");
// Send message on Topic B
connection = createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(session.createTopic(topicA));
producer.send(session.createTextMessage("Hello Again A"));
// Verify both messages are received
message = (TextMessage) subscriber.receive(2000);
assertNotNull("Message not received.", message);
assertEquals("Hello Again A", message.getText());
// Verify that we still have a single subscription
assertTrue("Should have only one destination", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
Destination destA = getDestination(topicA);
return destA.getDestinationStatistics().getConsumers().getCount() == 1;
}
}));
subscriber.close();
connection.close();
}
private Destination getDestination(String topicName) {
RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
Set<Destination> destinations = topicRegion.getDestinations(new ActiveMQTopic(topicName));
assertEquals(1, destinations.size());
return destinations.iterator().next();
}
private Connection createConnection() throws Exception {
String connectionURI = brokerService.getTransportConnectors().get(0).getPublishableConnectString();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionURI);
return cf.createConnection("system", "manager");
}
@Before
public void setUp() throws Exception {
brokerService = createBroker(true);
brokerService.start();
brokerService.waitUntilStarted();
}
@After
public void tearDown() throws Exception {
if (brokerService != null) {
brokerService.stop();
brokerService.waitUntilStopped();
brokerService = null;
}
}
protected BrokerService createBroker(boolean deleteAllMessages) throws Exception {
BrokerService answer = new BrokerService();
answer.setKeepDurableSubsActive(true);
answer.setUseJmx(false);
answer.setPersistent(true);
answer.setDeleteAllMessagesOnStartup(deleteAllMessages);
answer.setAdvisorySupport(false);
switch (persistenceAdapterName) {
case KAHADB:
answer.setPersistenceAdapter(new KahaDBPersistenceAdapter());
break;
case LEVELDB:
answer.setPersistenceFactory(new LevelDBStoreFactory());
break;
}
answer.addConnector("tcp://localhost:0");
if (pluginsEnabled) {
ArrayList<BrokerPlugin> plugins = new ArrayList<BrokerPlugin>();
BrokerPlugin authenticationPlugin = configureAuthentication();
if (authenticationPlugin != null) {
plugins.add(configureAuthorization());
}
BrokerPlugin authorizationPlugin = configureAuthorization();
if (authorizationPlugin != null) {
plugins.add(configureAuthentication());
}
if (!plugins.isEmpty()) {
BrokerPlugin[] array = new BrokerPlugin[plugins.size()];
answer.setPlugins(plugins.toArray(array));
}
}
ActiveMQDestination[] destinations = { new ActiveMQTopic(topicA), new ActiveMQTopic(topicB) };
answer.setDestinations(destinations);
return answer;
}
protected BrokerPlugin configureAuthentication() throws Exception {
List<AuthenticationUser> users = new ArrayList<AuthenticationUser>();
users.add(new AuthenticationUser("system", "manager", "users,admins"));
SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(users);
return authenticationPlugin;
}
protected BrokerPlugin configureAuthorization() throws Exception {
@SuppressWarnings("rawtypes")
List<DestinationMapEntry> authorizationEntries = new ArrayList<DestinationMapEntry>();
AuthorizationEntry entry = new AuthorizationEntry();
entry.setQueue(">");
entry.setRead("admins");
entry.setWrite("admins");
entry.setAdmin("admins");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
entry.setQueue("USERS.>");
entry.setRead("users");
entry.setWrite("users");
entry.setAdmin("users");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
entry.setQueue("GUEST.>");
entry.setRead("guests");
entry.setWrite("guests,users");
entry.setAdmin("guests,users");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
entry.setTopic(">");
entry.setRead("admins");
entry.setWrite("admins");
entry.setAdmin("admins");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
entry.setTopic("USERS.>");
entry.setRead("users");
entry.setWrite("users");
entry.setAdmin("users");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
entry.setTopic("GUEST.>");
entry.setRead("guests");
entry.setWrite("guests,users");
entry.setAdmin("guests,users");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
entry.setTopic("ActiveMQ.Advisory.>");
entry.setRead("guests,users");
entry.setWrite("guests,users");
entry.setAdmin("guests,users");
authorizationEntries.add(entry);
TempDestinationAuthorizationEntry tempEntry = new TempDestinationAuthorizationEntry();
tempEntry.setRead("admins");
tempEntry.setWrite("admins");
tempEntry.setAdmin("admins");
DefaultAuthorizationMap authorizationMap = new DefaultAuthorizationMap(authorizationEntries);
authorizationMap.setTempDestinationAuthorizationEntry(tempEntry);
AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(authorizationMap);
return authorizationPlugin;
}
}