https://issues.apache.org/jira/browse/AMQ-6254

Store the original subscribed destination along with the target
destination in the subscription info object to ensure that wildcard
subscriptions remain linked.
(cherry picked from commit b027e65553)
This commit is contained in:
Timothy Bish 2016-04-19 09:53:58 -04:00
parent 46dadf9866
commit ad07ee4612
3 changed files with 321 additions and 3 deletions

View File

@ -53,4 +53,5 @@ message SubscriptionRecord {
optional string subscription_name = 3;
optional string selector = 4;
optional string destination_name = 5;
optional string subscribed_destination_name = 6;
}

View File

@ -785,6 +785,9 @@ class DBManager(val parent:LevelDBStore) {
if( info.getDestination!=null ) {
record.setDestinationName(info.getDestination.getQualifiedName)
}
if ( info.getSubscribedDestination!=null) {
record.setSubscribedDestinationName(info.getSubscribedDestination.getQualifiedName)
}
val collection = new CollectionRecord.Bean()
collection.setType(SUBSCRIPTION_COLLECTION_TYPE)
collection.setKey(lastCollectionKey.incrementAndGet())
@ -854,8 +857,11 @@ class DBManager(val parent:LevelDBStore) {
if( sr.hasSelector ) {
info.setSelector(sr.getSelector)
}
if(sr.hasDestinationName) {
info.setSubscribedDestination(ActiveMQDestination.createDestination(sr.getDestinationName, ActiveMQDestination.TOPIC_TYPE))
if( sr.hasDestinationName ) {
info.setDestination(ActiveMQDestination.createDestination(sr.getDestinationName, ActiveMQDestination.TOPIC_TYPE))
}
if( sr.hasSubscribedDestinationName ) {
info.setSubscribedDestination(ActiveMQDestination.createDestination(sr.getSubscribedDestinationName, ActiveMQDestination.TOPIC_TYPE))
}
var sub = DurableSubscription(key, sr.getTopicKey, info)

View File

@ -0,0 +1,311 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.filter.DestinationMapEntry;
import org.apache.activemq.leveldb.LevelDBStoreFactory;
import org.apache.activemq.security.AuthenticationUser;
import org.apache.activemq.security.AuthorizationEntry;
import org.apache.activemq.security.AuthorizationPlugin;
import org.apache.activemq.security.DefaultAuthorizationMap;
import org.apache.activemq.security.SimpleAuthenticationPlugin;
import org.apache.activemq.security.TempDestinationAuthorizationEntry;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(Parameterized.class)
public class AMQ6254Test {
private static final Logger LOG = LoggerFactory.getLogger(AMQ6254Test.class);
private static final String KAHADB = "KahaDB";
private static final String LEVELDB = "LevelDB";
private BrokerService brokerService;
private String topicA = "alphabet.a";
private String topicB = "alphabet.b";
private String persistenceAdapterName;
private boolean pluginsEnabled;
@Parameters(name="{0} -> plugins = {1}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{KAHADB, true },
{KAHADB, false },
{LEVELDB, true },
{LEVELDB, false },
});
}
public AMQ6254Test(String persistenceAdapterName, boolean pluginsEnabled) {
this.persistenceAdapterName = persistenceAdapterName;
this.pluginsEnabled = pluginsEnabled;
}
@Test(timeout = 60000)
public void testReactivateKeepaliveSubscription() throws Exception {
// Create wild card durable subscription
Connection connection = createConnection();
connection.setClientID("cliID");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber subscriber = session.createDurableSubscriber(session.createTopic("alphabet.>"), "alphabet.>");
// Send message on Topic A
connection = createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createTopic(topicA));
producer.send(session.createTextMessage("Hello A"));
// Verify that message is received
TextMessage message = (TextMessage) subscriber.receive(2000);
assertNotNull("Message not received.", message);
assertEquals("Hello A", message.getText());
subscriber.close();
assertTrue("Should have only one destination", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
Destination destA = getDestination(topicA);
return destA.getDestinationStatistics().getConsumers().getCount() == 1;
}
}));
// Restart broker
brokerService.stop();
brokerService.waitUntilStopped();
LOG.info("Broker stopped");
brokerService = createBroker(false);
brokerService.start();
brokerService.waitUntilStarted();
LOG.info("Broker restarted");
// Recreate wild card durable subscription
connection = createConnection();
connection.setClientID("cliID");
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
subscriber = session.createDurableSubscriber(session.createTopic("alphabet.>"), "alphabet.>");
// Send message on Topic B
connection = createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(session.createTopic(topicA));
producer.send(session.createTextMessage("Hello Again A"));
// Verify both messages are received
message = (TextMessage) subscriber.receive(2000);
assertNotNull("Message not received.", message);
assertEquals("Hello Again A", message.getText());
// Verify that we still have a single subscription
assertTrue("Should have only one destination", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
Destination destA = getDestination(topicA);
return destA.getDestinationStatistics().getConsumers().getCount() == 1;
}
}));
subscriber.close();
connection.close();
}
private Destination getDestination(String topicName) {
RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
Set<Destination> destinations = topicRegion.getDestinations(new ActiveMQTopic(topicName));
assertEquals(1, destinations.size());
return destinations.iterator().next();
}
private Connection createConnection() throws Exception {
String connectionURI = brokerService.getTransportConnectors().get(0).getPublishableConnectString();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionURI);
return cf.createConnection("system", "manager");
}
@Before
public void setUp() throws Exception {
brokerService = createBroker(true);
brokerService.start();
brokerService.waitUntilStarted();
}
@After
public void tearDown() throws Exception {
if (brokerService != null) {
brokerService.stop();
brokerService.waitUntilStopped();
brokerService = null;
}
}
protected BrokerService createBroker(boolean deleteAllMessages) throws Exception {
BrokerService answer = new BrokerService();
answer.setKeepDurableSubsActive(true);
answer.setUseJmx(false);
answer.setPersistent(true);
answer.setDeleteAllMessagesOnStartup(deleteAllMessages);
answer.setAdvisorySupport(false);
switch (persistenceAdapterName) {
case KAHADB:
answer.setPersistenceAdapter(new KahaDBPersistenceAdapter());
break;
case LEVELDB:
answer.setPersistenceFactory(new LevelDBStoreFactory());
break;
}
answer.addConnector("tcp://localhost:0");
if (pluginsEnabled) {
ArrayList<BrokerPlugin> plugins = new ArrayList<BrokerPlugin>();
BrokerPlugin authenticationPlugin = configureAuthentication();
if (authenticationPlugin != null) {
plugins.add(configureAuthorization());
}
BrokerPlugin authorizationPlugin = configureAuthorization();
if (authorizationPlugin != null) {
plugins.add(configureAuthentication());
}
if (!plugins.isEmpty()) {
BrokerPlugin[] array = new BrokerPlugin[plugins.size()];
answer.setPlugins(plugins.toArray(array));
}
}
ActiveMQDestination[] destinations = { new ActiveMQTopic(topicA), new ActiveMQTopic(topicB) };
answer.setDestinations(destinations);
return answer;
}
protected BrokerPlugin configureAuthentication() throws Exception {
List<AuthenticationUser> users = new ArrayList<AuthenticationUser>();
users.add(new AuthenticationUser("system", "manager", "users,admins"));
SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(users);
return authenticationPlugin;
}
protected BrokerPlugin configureAuthorization() throws Exception {
@SuppressWarnings("rawtypes")
List<DestinationMapEntry> authorizationEntries = new ArrayList<DestinationMapEntry>();
AuthorizationEntry entry = new AuthorizationEntry();
entry.setQueue(">");
entry.setRead("admins");
entry.setWrite("admins");
entry.setAdmin("admins");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
entry.setQueue("USERS.>");
entry.setRead("users");
entry.setWrite("users");
entry.setAdmin("users");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
entry.setQueue("GUEST.>");
entry.setRead("guests");
entry.setWrite("guests,users");
entry.setAdmin("guests,users");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
entry.setTopic(">");
entry.setRead("admins");
entry.setWrite("admins");
entry.setAdmin("admins");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
entry.setTopic("USERS.>");
entry.setRead("users");
entry.setWrite("users");
entry.setAdmin("users");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
entry.setTopic("GUEST.>");
entry.setRead("guests");
entry.setWrite("guests,users");
entry.setAdmin("guests,users");
authorizationEntries.add(entry);
entry = new AuthorizationEntry();
entry.setTopic("ActiveMQ.Advisory.>");
entry.setRead("guests,users");
entry.setWrite("guests,users");
entry.setAdmin("guests,users");
authorizationEntries.add(entry);
TempDestinationAuthorizationEntry tempEntry = new TempDestinationAuthorizationEntry();
tempEntry.setRead("admins");
tempEntry.setWrite("admins");
tempEntry.setAdmin("admins");
DefaultAuthorizationMap authorizationMap = new DefaultAuthorizationMap(authorizationEntries);
authorizationMap.setTempDestinationAuthorizationEntry(tempEntry);
AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(authorizationMap);
return authorizationPlugin;
}
}