diff --git a/activemq-leveldb-store/src/main/proto/records.proto b/activemq-leveldb-store/src/main/proto/records.proto index d7ec58d356..629bdce308 100644 --- a/activemq-leveldb-store/src/main/proto/records.proto +++ b/activemq-leveldb-store/src/main/proto/records.proto @@ -12,7 +12,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -// +// package org.apache.activemq.leveldb.record; option java_multiple_files = true; @@ -53,4 +53,5 @@ message SubscriptionRecord { optional string subscription_name = 3; optional string selector = 4; optional string destination_name = 5; + optional string subscribed_destination_name = 6; } \ No newline at end of file diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala index b0051ccc57..bc53710cd8 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala @@ -785,6 +785,9 @@ class DBManager(val parent:LevelDBStore) { if( info.getDestination!=null ) { record.setDestinationName(info.getDestination.getQualifiedName) } + if ( info.getSubscribedDestination!=null) { + record.setSubscribedDestinationName(info.getSubscribedDestination.getQualifiedName) + } val collection = new CollectionRecord.Bean() collection.setType(SUBSCRIPTION_COLLECTION_TYPE) collection.setKey(lastCollectionKey.incrementAndGet()) @@ -854,8 +857,11 @@ class DBManager(val parent:LevelDBStore) { if( sr.hasSelector ) { info.setSelector(sr.getSelector) } - if(sr.hasDestinationName) { - info.setSubscribedDestination(ActiveMQDestination.createDestination(sr.getDestinationName, ActiveMQDestination.TOPIC_TYPE)) + if( sr.hasDestinationName ) { + info.setDestination(ActiveMQDestination.createDestination(sr.getDestinationName, ActiveMQDestination.TOPIC_TYPE)) + } + if( sr.hasSubscribedDestinationName ) { + info.setSubscribedDestination(ActiveMQDestination.createDestination(sr.getSubscribedDestinationName, ActiveMQDestination.TOPIC_TYPE)) } var sub = DurableSubscription(key, sr.getTopicKey, info) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6254Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6254Test.java new file mode 100644 index 0000000000..9c9a35b8a9 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6254Test.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import javax.jms.Connection; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.TopicSubscriber; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.TopicRegion; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.filter.DestinationMapEntry; +import org.apache.activemq.leveldb.LevelDBStoreFactory; +import org.apache.activemq.security.AuthenticationUser; +import org.apache.activemq.security.AuthorizationEntry; +import org.apache.activemq.security.AuthorizationPlugin; +import org.apache.activemq.security.DefaultAuthorizationMap; +import org.apache.activemq.security.SimpleAuthenticationPlugin; +import org.apache.activemq.security.TempDestinationAuthorizationEntry; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(Parameterized.class) +public class AMQ6254Test { + + private static final Logger LOG = LoggerFactory.getLogger(AMQ6254Test.class); + + private static final String KAHADB = "KahaDB"; + private static final String LEVELDB = "LevelDB"; + + private BrokerService brokerService; + + private String topicA = "alphabet.a"; + private String topicB = "alphabet.b"; + + private String persistenceAdapterName; + private boolean pluginsEnabled; + + @Parameters(name="{0} -> plugins = {1}") + public static Collection data() { + return Arrays.asList(new Object[][] { + {KAHADB, true }, + {KAHADB, false }, + {LEVELDB, true }, + {LEVELDB, false }, + }); + } + + public AMQ6254Test(String persistenceAdapterName, boolean pluginsEnabled) { + this.persistenceAdapterName = persistenceAdapterName; + this.pluginsEnabled = pluginsEnabled; + } + + @Test(timeout = 60000) + public void testReactivateKeepaliveSubscription() throws Exception { + // Create wild card durable subscription + Connection connection = createConnection(); + connection.setClientID("cliID"); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + TopicSubscriber subscriber = session.createDurableSubscriber(session.createTopic("alphabet.>"), "alphabet.>"); + + // Send message on Topic A + connection = createConnection(); + connection.start(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createTopic(topicA)); + producer.send(session.createTextMessage("Hello A")); + + // Verify that message is received + TextMessage message = (TextMessage) subscriber.receive(2000); + assertNotNull("Message not received.", message); + assertEquals("Hello A", message.getText()); + + subscriber.close(); + + assertTrue("Should have only one destination", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + Destination destA = getDestination(topicA); + return destA.getDestinationStatistics().getConsumers().getCount() == 1; + } + })); + + // Restart broker + brokerService.stop(); + brokerService.waitUntilStopped(); + LOG.info("Broker stopped"); + + brokerService = createBroker(false); + brokerService.start(); + brokerService.waitUntilStarted(); + LOG.info("Broker restarted"); + + // Recreate wild card durable subscription + connection = createConnection(); + connection.setClientID("cliID"); + connection.start(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + subscriber = session.createDurableSubscriber(session.createTopic("alphabet.>"), "alphabet.>"); + + // Send message on Topic B + connection = createConnection(); + connection.start(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = session.createProducer(session.createTopic(topicA)); + producer.send(session.createTextMessage("Hello Again A")); + + // Verify both messages are received + message = (TextMessage) subscriber.receive(2000); + assertNotNull("Message not received.", message); + assertEquals("Hello Again A", message.getText()); + + // Verify that we still have a single subscription + assertTrue("Should have only one destination", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + Destination destA = getDestination(topicA); + return destA.getDestinationStatistics().getConsumers().getCount() == 1; + } + })); + + subscriber.close(); + connection.close(); + } + + private Destination getDestination(String topicName) { + RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker(); + TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion(); + + Set destinations = topicRegion.getDestinations(new ActiveMQTopic(topicName)); + assertEquals(1, destinations.size()); + + return destinations.iterator().next(); + } + + private Connection createConnection() throws Exception { + String connectionURI = brokerService.getTransportConnectors().get(0).getPublishableConnectString(); + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionURI); + return cf.createConnection("system", "manager"); + } + + @Before + public void setUp() throws Exception { + brokerService = createBroker(true); + brokerService.start(); + brokerService.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + if (brokerService != null) { + brokerService.stop(); + brokerService.waitUntilStopped(); + brokerService = null; + } + } + + protected BrokerService createBroker(boolean deleteAllMessages) throws Exception { + BrokerService answer = new BrokerService(); + + answer.setKeepDurableSubsActive(true); + answer.setUseJmx(false); + answer.setPersistent(true); + answer.setDeleteAllMessagesOnStartup(deleteAllMessages); + answer.setAdvisorySupport(false); + + switch (persistenceAdapterName) { + case KAHADB: + answer.setPersistenceAdapter(new KahaDBPersistenceAdapter()); + break; + case LEVELDB: + answer.setPersistenceFactory(new LevelDBStoreFactory()); + break; + } + + answer.addConnector("tcp://localhost:0"); + + if (pluginsEnabled) { + ArrayList plugins = new ArrayList(); + + BrokerPlugin authenticationPlugin = configureAuthentication(); + if (authenticationPlugin != null) { + plugins.add(configureAuthorization()); + } + + BrokerPlugin authorizationPlugin = configureAuthorization(); + if (authorizationPlugin != null) { + plugins.add(configureAuthentication()); + } + + if (!plugins.isEmpty()) { + BrokerPlugin[] array = new BrokerPlugin[plugins.size()]; + answer.setPlugins(plugins.toArray(array)); + } + } + + ActiveMQDestination[] destinations = { new ActiveMQTopic(topicA), new ActiveMQTopic(topicB) }; + answer.setDestinations(destinations); + return answer; + } + + protected BrokerPlugin configureAuthentication() throws Exception { + List users = new ArrayList(); + users.add(new AuthenticationUser("system", "manager", "users,admins")); + SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(users); + + return authenticationPlugin; + } + + protected BrokerPlugin configureAuthorization() throws Exception { + + @SuppressWarnings("rawtypes") + List authorizationEntries = new ArrayList(); + + AuthorizationEntry entry = new AuthorizationEntry(); + entry.setQueue(">"); + entry.setRead("admins"); + entry.setWrite("admins"); + entry.setAdmin("admins"); + authorizationEntries.add(entry); + entry = new AuthorizationEntry(); + entry.setQueue("USERS.>"); + entry.setRead("users"); + entry.setWrite("users"); + entry.setAdmin("users"); + authorizationEntries.add(entry); + entry = new AuthorizationEntry(); + entry.setQueue("GUEST.>"); + entry.setRead("guests"); + entry.setWrite("guests,users"); + entry.setAdmin("guests,users"); + authorizationEntries.add(entry); + entry = new AuthorizationEntry(); + entry.setTopic(">"); + entry.setRead("admins"); + entry.setWrite("admins"); + entry.setAdmin("admins"); + authorizationEntries.add(entry); + entry = new AuthorizationEntry(); + entry.setTopic("USERS.>"); + entry.setRead("users"); + entry.setWrite("users"); + entry.setAdmin("users"); + authorizationEntries.add(entry); + entry = new AuthorizationEntry(); + entry.setTopic("GUEST.>"); + entry.setRead("guests"); + entry.setWrite("guests,users"); + entry.setAdmin("guests,users"); + authorizationEntries.add(entry); + entry = new AuthorizationEntry(); + entry.setTopic("ActiveMQ.Advisory.>"); + entry.setRead("guests,users"); + entry.setWrite("guests,users"); + entry.setAdmin("guests,users"); + authorizationEntries.add(entry); + + TempDestinationAuthorizationEntry tempEntry = new TempDestinationAuthorizationEntry(); + tempEntry.setRead("admins"); + tempEntry.setWrite("admins"); + tempEntry.setAdmin("admins"); + + DefaultAuthorizationMap authorizationMap = new DefaultAuthorizationMap(authorizationEntries); + authorizationMap.setTempDestinationAuthorizationEntry(tempEntry); + AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(authorizationMap); + + return authorizationPlugin; + } +}