Fix for AMQ-1356 : Durable Subscriptions do not work with Wildcards after broker is restarted.

- Added a RecoveryBrokerTest.testWildCardSubscriptionPreservedOnRestart() test case that was showing
   that that wildcards did not work with DurableSubscriptions
 - Fix the TransactedTopicMasterSlaveTest so that setDeleteAllMessagesOnStartup() takes effect (had to be done before the connectors are added.
 - Change the MessageStore interface so that subscriptions are created using the data in the SubscriptionInfo class
 - Added a subscribedDestination field to the SubscriptionInfo so that the original wildcard subscrption can be remembered
 - The KahaReference store now deletes it's State store too when deleteAllMessages() is called
 - Fixed KahaPersistenceAdapter.getDestinations() so that it actually returns all the destinations.
 - We now recover all the topics eagerly when the topic region is started.



git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@563194 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-08-06 17:07:55 +00:00
parent 8b68c62249
commit 2728ccbb88
29 changed files with 335 additions and 109 deletions

View File

@ -17,10 +17,13 @@
*/
package org.apache.activemq.broker.region;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.JMSException;
@ -36,14 +39,13 @@ import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.DestinationMap;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.concurrent.ConcurrentHashMap;
/**
*
* @version $Revision: 1.14 $
@ -81,6 +83,16 @@ abstract public class AbstractRegion implements Region {
public void start() throws Exception {
started = true;
Set inactiveDests = getInactiveDestinations();
for (Iterator iter = inactiveDests.iterator(); iter.hasNext();) {
ActiveMQDestination dest = (ActiveMQDestination) iter.next();
ConnectionContext context = new ConnectionContext();
context.setBroker(broker.getBrokerService().getBroker());
context.getBroker().addDestination(context , dest);
}
for (Iterator i = destinations.values().iterator();i.hasNext();) {
Destination dest = (Destination)i.next();
dest.start();
@ -110,18 +122,28 @@ abstract public class AbstractRegion implements Region {
dest.start();
destinations.put(destination,dest);
destinationMap.put(destination,dest);
// Add all consumers that are interested in the destination.
for(Iterator iter=subscriptions.values().iterator();iter.hasNext();){
Subscription sub=(Subscription)iter.next();
if(sub.matches(destination)){
dest.addSubscription(context,sub);
}
}
addSubscriptionsForDestination(context, dest);
}
return dest;
}
}
protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context,
Destination dest) throws Exception {
ArrayList<Subscription> rc = new ArrayList<Subscription>();
// Add all consumers that are interested in the destination.
for(Iterator iter=subscriptions.values().iterator();iter.hasNext();){
Subscription sub=(Subscription)iter.next();
if(sub.matches(dest.getActiveMQDestination())){
dest.addSubscription(context,sub);
rc.add(sub);
}
}
return rc;
}
public void removeDestination(ConnectionContext context,ActiveMQDestination destination,long timeout)
throws Exception{
@ -205,7 +227,6 @@ abstract public class AbstractRegion implements Region {
return (Subscription)o;
}
Subscription sub = createSubscription(context, info);
// We may need to add some destinations that are in persistent store but not active
// in the broker.
@ -216,14 +237,9 @@ abstract public class AbstractRegion implements Region {
// eagerly load all destinations into the broker but have an inactive state for the
// destination which has reduced memory usage.
//
Set inactiveDests = getInactiveDestinations();
for (Iterator iter = inactiveDests.iterator(); iter.hasNext();) {
ActiveMQDestination dest = (ActiveMQDestination) iter.next();
if( sub.matches(dest) ) {
context.getBroker().addDestination(context, dest);
}
}
DestinationFilter destinationFilter = DestinationFilter.parseFilter(info.getDestination());
Subscription sub = createSubscription(context, info);
subscriptions.put(info.getConsumerId(), sub);

View File

@ -27,6 +27,8 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.TopicMessageStore;
/**
*
@ -53,4 +55,5 @@ public interface Destination extends Service {
public Message[] browse();
public String getName();
public MessageStore getMessageStore();
}

View File

@ -25,6 +25,7 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageStore;
import java.io.IOException;
import java.util.Iterator;
@ -114,4 +115,8 @@ public class DestinationFilter implements Destination {
dest.send(context, message);
}
}
public MessageStore getMessageStore() {
return next.getMessageStore();
}
}

View File

@ -425,7 +425,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
context.getConnection().dispatchAsync(md);
}else{
context.getConnection().dispatchSync(md);
onDispatch(node,message);
onDispatch(node,message);
}
//System.err.println(broker.getBrokerName() + " " + this + " (" + enqueueCounter + ", " + dispatchCounter +") " + node);
return true;
@ -439,11 +439,13 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
if(node!=QueueMessageReference.NULL_MESSAGE){
node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
}
try{
dispatchMatched();
}catch(IOException e){
context.getConnection().serviceExceptionAsync(e);
}
}
if( info.isDispatchAsync() ) {
try{
dispatchMatched();
}catch(IOException e){
context.getConnection().serviceExceptionAsync(e);
}
}
}

View File

@ -23,6 +23,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
@ -183,7 +184,13 @@ public class Topic implements Destination {
}
// Do we need to create the subscription?
if (info == null) {
store.addSubsciption(clientId, subscriptionName, selector, subscription.getConsumerInfo().isRetroactive());
info = new SubscriptionInfo();
info.setClientId(clientId);
info.setSelector(selector);
info.setSubscriptionName(subscriptionName);
info.setDestination(getActiveMQDestination()); // This destination is an actual destination id.
info.setSubscribedDestination(subscription.getConsumerInfo().getDestination()); // This destination might be a pattern
store.addSubsciption(info, subscription.getConsumerInfo().isRetroactive());
}
final MessageEvaluationContext msgContext = new MessageEvaluationContext();

View File

@ -18,7 +18,9 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import javax.jms.InvalidDestinationException;
@ -148,19 +150,14 @@ public class TopicRegion extends AbstractRegion {
return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getPercentUsage()
+ "%";
}
// Implementation methods
// -------------------------------------------------------------------------
protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
Topic topic = (Topic) super.createDestination(context, destination);
recoverDurableSubscriptions(context, topic);
return topic;
}
private void recoverDurableSubscriptions(ConnectionContext context, Topic topic) throws IOException, JMSException, Exception {
TopicMessageStore store = (TopicMessageStore) topic.getMessageStore();
@Override
protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception {
List<Subscription> rc = super.addSubscriptionsForDestination(context, dest);
HashSet<Subscription> dupChecker = new HashSet<Subscription>(rc);
TopicMessageStore store = (TopicMessageStore) dest.getMessageStore();
// Eagerly recover the durable subscriptions
if (store != null) {
SubscriptionInfo[] infos = store.getAllSubscriptions();
@ -181,16 +178,40 @@ public class TopicRegion extends AbstractRegion {
sub = (DurableTopicSubscription) createSubscription(c, consumerInfo );
}
topic.addSubscription(context, sub);
}
if( dupChecker.contains(sub ) ) {
continue;
}
dupChecker.add(sub);
rc.add(sub);
dest.addSubscription(context, sub);
}
// Now perhaps there other durable subscriptions (via wild card) that would match this destination..
durableSubscriptions.values();
for (Iterator iterator = durableSubscriptions.values().iterator(); iterator
.hasNext();) {
DurableTopicSubscription sub = (DurableTopicSubscription) iterator.next();
// Skip over subscriptions that we allready added..
if( dupChecker.contains(sub ) ) {
continue;
}
if( sub.matches(dest.getActiveMQDestination()) ) {
rc.add(sub);
dest.addSubscription(context, sub);
}
}
}
return rc;
}
private ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) {
ConsumerInfo rc = new ConsumerInfo();
rc.setSelector(info.getSelector());
rc.setSubscriptionName(info.getSubscriptionName());
rc.setDestination(info.getDestination());
rc.setDestination(info.getSubscribedDestination());
rc.setConsumerId(createConsumerId());
return rc;
}

View File

@ -21,6 +21,7 @@ import org.apache.activemq.util.IntrospectionSupport;
/**
* Used to represent a durable subscription.
*
* @openwire:marshaller code="55"
* @version $Revision: 1.6 $
@ -29,6 +30,7 @@ public class SubscriptionInfo implements DataStructure {
public static final byte DATA_STRUCTURE_TYPE=CommandTypes.DURABLE_SUBSCRIPTION_INFO;
protected ActiveMQDestination subscribedDestination;
protected ActiveMQDestination destination;
protected String clientId;
protected String subscriptionName;
@ -50,6 +52,9 @@ public class SubscriptionInfo implements DataStructure {
}
/**
* This is the a resolved destination that the subscription is receiving messages from.
* This will never be a pattern or a composite destination.
*
* @openwire:property version=1 cache=true
*/
public ActiveMQDestination getDestination() {
@ -121,4 +126,23 @@ public class SubscriptionInfo implements DataStructure {
return result;
}
/**
* The destination the client originally subscribed to.. This may not match the {@see getDestination} method
* if the subscribed destination uses patterns or composites.
*
* If the subscribed destinationis not set, this just ruturns the desitination.
*
* @openwire:property version=3
*/
public ActiveMQDestination getSubscribedDestination() {
if( subscribedDestination == null ) {
return getDestination();
}
return subscribedDestination;
}
public void setSubscribedDestination(ActiveMQDestination subscribedDestination) {
this.subscribedDestination = subscribedDestination;
}
}

View File

@ -70,6 +70,7 @@ public class SubscriptionInfoMarshaller extends BaseDataStreamMarshaller {
info.setDestination((org.apache.activemq.command.ActiveMQDestination) tightUnmarsalCachedObject(wireFormat, dataIn, bs));
info.setSelector(tightUnmarshalString(dataIn, bs));
info.setSubcriptionName(tightUnmarshalString(dataIn, bs));
info.setSubscribedDestination((org.apache.activemq.command.ActiveMQDestination) tightUnmarsalNestedObject(wireFormat, dataIn, bs));
}
@ -86,6 +87,7 @@ public class SubscriptionInfoMarshaller extends BaseDataStreamMarshaller {
rc += tightMarshalCachedObject1(wireFormat, (DataStructure)info.getDestination(), bs);
rc += tightMarshalString1(info.getSelector(), bs);
rc += tightMarshalString1(info.getSubcriptionName(), bs);
rc += tightMarshalNestedObject1(wireFormat, (DataStructure)info.getSubscribedDestination(), bs);
return rc + 0;
}
@ -105,6 +107,7 @@ public class SubscriptionInfoMarshaller extends BaseDataStreamMarshaller {
tightMarshalCachedObject2(wireFormat, (DataStructure)info.getDestination(), dataOut, bs);
tightMarshalString2(info.getSelector(), dataOut, bs);
tightMarshalString2(info.getSubcriptionName(), dataOut, bs);
tightMarshalNestedObject2(wireFormat, (DataStructure)info.getSubscribedDestination(), dataOut, bs);
}
@ -123,6 +126,7 @@ public class SubscriptionInfoMarshaller extends BaseDataStreamMarshaller {
info.setDestination((org.apache.activemq.command.ActiveMQDestination) looseUnmarsalCachedObject(wireFormat, dataIn));
info.setSelector(looseUnmarshalString(dataIn));
info.setSubcriptionName(looseUnmarshalString(dataIn));
info.setSubscribedDestination((org.apache.activemq.command.ActiveMQDestination) looseUnmarsalNestedObject(wireFormat, dataIn));
}
@ -139,6 +143,7 @@ public class SubscriptionInfoMarshaller extends BaseDataStreamMarshaller {
looseMarshalCachedObject(wireFormat, (DataStructure)info.getDestination(), dataOut);
looseMarshalString(info.getSelector(), dataOut);
looseMarshalString(info.getSubcriptionName(), dataOut);
looseMarshalNestedObject(wireFormat, (DataStructure)info.getSubscribedDestination(), dataOut);
}
}

View File

@ -71,8 +71,8 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
throws IOException {
delegate.acknowledge(context, clientId, subscriptionName, messageId);
}
public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException {
delegate.addSubsciption(clientId, subscriptionName, selector, retroactive);
public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
delegate.addSubsciption(subscriptionInfo, retroactive);
}
public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
delegate.deleteSubscription(clientId, subscriptionName);

View File

@ -106,7 +106,7 @@ public interface TopicMessageStore extends MessageStore{
public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException;
/**
* Lists all the durable subscirptions for a given destination.
* Lists all the durable subscriptions for a given destination.
*
* @return an array SubscriptionInfos
* @throws IOException
@ -126,6 +126,6 @@ public interface TopicMessageStore extends MessageStore{
* @throws IOException
*
*/
public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive)
throws IOException;
}

View File

@ -132,6 +132,6 @@ public interface TopicReferenceStore extends ReferenceStore, TopicMessageStore {
* @throws IOException
*
*/
public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
public void addSubsciption(SubscriptionInfo subscriptionInfo,boolean retroactive)
throws IOException;
}

View File

@ -70,9 +70,9 @@ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessag
return topicReferenceStore.lookupSubscription(clientId,subscriptionName);
}
public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
public void addSubsciption(SubscriptionInfo subscriptionInfo,boolean retroactive)
throws IOException{
topicReferenceStore.addSubsciption(clientId,subscriptionName,selector,retroactive);
topicReferenceStore.addSubsciption(subscriptionInfo,retroactive);
}
/**

View File

@ -56,8 +56,7 @@ public interface JDBCAdapter{
public abstract void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,String clientId,
String subscriptionName,long seq,int maxReturned,JDBCMessageRecoveryListener listener) throws Exception;
public abstract void doSetSubscriberEntry(TransactionContext c,ActiveMQDestination destination,String clientId,
String subscriptionName,String selector,boolean retroactive) throws SQLException,IOException;
public abstract void doSetSubscriberEntry(TransactionContext c,SubscriptionInfo subscriptionInfo,boolean retroactive) throws SQLException,IOException;
public abstract SubscriptionInfo doGetSubscriberEntry(TransactionContext c,ActiveMQDestination destination,
String clientId,String subscriptionName) throws SQLException,IOException;

View File

@ -138,16 +138,16 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
* @see org.apache.activemq.store.TopicMessageStore#storeSubsciption(org.apache.activemq.service.SubscriptionInfo,
* boolean)
*/
public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive)
public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive)
throws IOException {
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
c = persistenceAdapter.getTransactionContext();
adapter.doSetSubscriberEntry(c, destination, clientId, subscriptionName, selector, retroactive);
adapter.doSetSubscriberEntry(c, subscriptionInfo, retroactive);
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ",e);
throw IOExceptionSupport
.create("Failed to lookup subscription for info: " + clientId + ". Reason: " + e, e);
.create("Failed to lookup subscription for info: " + subscriptionInfo.getClientId() + ". Reason: " + e, e);
} finally {
c.close();
}

View File

@ -84,6 +84,7 @@ public class Statements {
"CREATE INDEX " + getFullMessageTableName() + "_EIDX ON " + getFullMessageTableName()
+ " (EXPIRATION)",
"CREATE TABLE " + getFullAckTableName() + "(" + "CONTAINER " + containerNameDataType + " NOT NULL"
+ ", SUB_DEST " + stringIdDataType
+ ", CLIENT_ID " + stringIdDataType + " NOT NULL" + ", SUB_NAME " + stringIdDataType
+ " NOT NULL" + ", SELECTOR " + stringIdDataType + ", LAST_ACKED_ID " + sequenceDataType
+ ", PRIMARY KEY ( CONTAINER, CLIENT_ID, SUB_NAME))",
@ -165,14 +166,14 @@ public class Statements {
public String getCreateDurableSubStatement() {
if (createDurableSubStatement == null) {
createDurableSubStatement = "INSERT INTO " + getFullAckTableName()
+ "(CONTAINER, CLIENT_ID, SUB_NAME, SELECTOR, LAST_ACKED_ID) " + "VALUES (?, ?, ?, ?, ?)";
+ "(CONTAINER, CLIENT_ID, SUB_NAME, SELECTOR, LAST_ACKED_ID, SUB_DEST) " + "VALUES (?, ?, ?, ?, ?, ?)";
}
return createDurableSubStatement;
}
public String getFindDurableSubStatement() {
if (findDurableSubStatement == null) {
findDurableSubStatement = "SELECT SELECTOR, SUB_NAME " + "FROM " + getFullAckTableName()
findDurableSubStatement = "SELECT SELECTOR, SUB_DEST " + "FROM " + getFullAckTableName()
+ " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
}
return findDurableSubStatement;
@ -180,7 +181,7 @@ public class Statements {
public String getFindAllDurableSubsStatement() {
if (findAllDurableSubsStatement == null) {
findAllDurableSubsStatement = "SELECT SELECTOR, SUB_NAME, CLIENT_ID" + " FROM " + getFullAckTableName()
findAllDurableSubsStatement = "SELECT SELECTOR, SUB_NAME, CLIENT_ID, SUB_DEST" + " FROM " + getFullAckTableName()
+ " WHERE CONTAINER=?";
}
return findAllDurableSubsStatement;

View File

@ -48,6 +48,7 @@ public class AxionJDBCAdapter extends StreamJDBCAdapter {
"CREATE INDEX "+statements.getFullMessageTableName()+"_EIDX ON "+statements.getFullMessageTableName()+" (EXPIRATION)",
"CREATE TABLE "+statements.getFullAckTableName()+"("
+"CONTAINER "+statements.getContainerNameDataType()+" NOT NULL"
+", SUB_DEST " + statements.getContainerNameDataType()
+", CLIENT_ID "+statements.getStringIdDataType()+" NOT NULL"
+", SUB_NAME "+statements.getStringIdDataType()+" NOT NULL"
+", SELECTOR "+statements.getStringIdDataType()

View File

@ -431,8 +431,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter{
* @see org.apache.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection, java.lang.Object,
* org.apache.activemq.service.SubscriptionInfo)
*/
public void doSetSubscriberEntry(TransactionContext c,ActiveMQDestination destination,String clientId,
String subscriptionName,String selector,boolean retroactive) throws SQLException,IOException{
public void doSetSubscriberEntry(TransactionContext c,SubscriptionInfo info,boolean retroactive) throws SQLException,IOException{
// dumpTables(c, destination.getQualifiedName(), clientId, subscriptionName);
PreparedStatement s=null;
try{
@ -451,13 +450,14 @@ public class DefaultJDBCAdapter implements JDBCAdapter{
}
}
s=c.getConnection().prepareStatement(statements.getCreateDurableSubStatement());
s.setString(1,destination.getQualifiedName());
s.setString(2,clientId);
s.setString(3,subscriptionName);
s.setString(4,selector);
s.setString(1,info.getDestination().getQualifiedName());
s.setString(2,info.getClientId());
s.setString(3,info.getSubscriptionName());
s.setString(4,info.getSelector());
s.setLong(5,lastMessageId);
s.setString(6, info.getSubscribedDestination().getQualifiedName());
if(s.executeUpdate()!=1){
throw new IOException("Could not create durable subscription for: "+clientId);
throw new IOException("Could not create durable subscription for: "+info.getClientId());
}
}finally{
close(s);
@ -480,8 +480,9 @@ public class DefaultJDBCAdapter implements JDBCAdapter{
SubscriptionInfo subscription=new SubscriptionInfo();
subscription.setDestination(destination);
subscription.setClientId(clientId);
subscription.setSubcriptionName(subscriptionName);
subscription.setSubscriptionName(subscriptionName);
subscription.setSelector(rs.getString(1));
subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(2), ActiveMQDestination.QUEUE_TYPE));
return subscription;
}finally{
close(rs);
@ -502,8 +503,9 @@ public class DefaultJDBCAdapter implements JDBCAdapter{
SubscriptionInfo subscription=new SubscriptionInfo();
subscription.setDestination(destination);
subscription.setSelector(rs.getString(1));
subscription.setSubcriptionName(rs.getString(2));
subscription.setSubscriptionName(rs.getString(2));
subscription.setClientId(rs.getString(3));
subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(4),ActiveMQDestination.QUEUE_TYPE));
rc.add(subscription);
}
return (SubscriptionInfo[])rc.toArray(new SubscriptionInfo[rc.size()]);

View File

@ -68,9 +68,9 @@ public class JournalTopicMessageStore extends JournalMessageStore implements Top
return longTermStore.lookupSubscription(clientId, subscriptionName);
}
public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException {
public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
this.peristenceAdapter.checkpoint(true, true);
longTermStore.addSubsciption(clientId, subscriptionName, selector, retroactive);
longTermStore.addSubsciption(subscriptionInfo, retroactive);
}
public void addMessage(ConnectionContext context, Message message) throws IOException {

View File

@ -27,6 +27,7 @@ import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.CommandMarshaller;
import org.apache.activemq.kaha.ContainerId;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Marshaller;
@ -71,7 +72,8 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
try{
Store store=getStore();
for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){
Object obj=i.next();
ContainerId id=(ContainerId) i.next();
Object obj = id.getKey();
if(obj instanceof ActiveMQDestination){
rc.add((ActiveMQDestination)obj);
}

View File

@ -211,7 +211,7 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
for(Iterator i=durableSubscribers.iterator();i.hasNext();){
SubscriptionInfo info=(SubscriptionInfo)i.next();
TopicReferenceStore ts=createTopicReferenceStore((ActiveMQTopic)info.getDestination());
ts.addSubsciption(info.getClientId(),info.getSubscriptionName(),info.getSelector(),false);
ts.addSubsciption(info,false);
}
}
@ -249,6 +249,20 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
return this.stateStore;
}
public void deleteAllMessages() throws IOException{
super.deleteAllMessages();
if(stateStore!=null){
if(stateStore.isInitialized()){
stateStore.clear();
}else{
stateStore.delete();
}
}else{
File stateDirectory=new File(getDirectory(),"kr-state");
StoreFactory.delete(stateDirectory.getAbsolutePath());
}
}
private Store createStateStore(File directory){
File stateDirectory=new File(directory,"state");
stateDirectory.mkdirs();

View File

@ -106,14 +106,9 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
}
public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
public synchronized void addSubsciption(SubscriptionInfo info,boolean retroactive)
throws IOException{
SubscriptionInfo info=new SubscriptionInfo();
info.setDestination(destination);
info.setClientId(clientId);
info.setSelector(selector);
info.setSubcriptionName(subscriptionName);
String key=getSubscriptionKey(clientId,subscriptionName);
String key=getSubscriptionKey(info.getClientId(),info.getSubscriptionName());
// if already exists - won't add it again as it causes data files
// to hang around
if(!subscriberContainer.containsKey(key)){

View File

@ -18,6 +18,7 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@ -46,7 +47,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
subscriberContainer=subsContainer;
// load all the Ack containers
for(Iterator i=subscriberContainer.keySet().iterator();i.hasNext();){
Object key=i.next();
String key=(String) i.next();
addSubscriberMessageContainer(key);
}
}
@ -102,8 +103,8 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
}
}
protected ListContainer addSubscriberMessageContainer(Object key) throws IOException{
ListContainer container=store.getListContainer(key,"topic-subs-references");
protected ListContainer addSubscriberMessageContainer(String key) throws IOException{
ListContainer container=store.getListContainer(destination,"topic-subs-references-"+key);
Marshaller marshaller=new ConsumerMessageRefMarshaller();
container.setMarshaller(marshaller);
TopicSubContainer tsc=new TopicSubContainer(container);
@ -141,14 +142,9 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
}
}
public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
public synchronized void addSubsciption(SubscriptionInfo info,boolean retroactive)
throws IOException{
SubscriptionInfo info=new SubscriptionInfo();
info.setDestination(destination);
info.setClientId(clientId);
info.setSelector(selector);
info.setSubcriptionName(subscriptionName);
String key=getSubscriptionKey(clientId,subscriptionName);
String key=getSubscriptionKey(info.getClientId(), info.getSubscriptionName());
// if already exists - won't add it again as it causes data files
// to hang around
if(!subscriberContainer.containsKey(key)){
@ -253,7 +249,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
}
}
protected void removeSubscriberMessageContainer(Object key) throws IOException{
protected void removeSubscriberMessageContainer(String key) throws IOException{
subscriberContainer.remove(key);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.remove(key);
for(Iterator i=container.iterator();i.hasNext();){
@ -270,7 +266,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
}
}
}
store.deleteListContainer(key,"topic-subs-references");
store.deleteListContainer(destination,"topic-subs-references-"+key);
}
protected String getSubscriptionKey(String clientId,String subscriberName){

View File

@ -73,14 +73,9 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
return (SubscriptionInfo)subscriberDatabase.get(new SubscriptionKey(clientId,subscriptionName));
}
public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
public synchronized void addSubsciption(SubscriptionInfo info,boolean retroactive)
throws IOException{
SubscriptionInfo info=new SubscriptionInfo();
info.setDestination(destination);
info.setClientId(clientId);
info.setSelector(selector);
info.setSubcriptionName(subscriptionName);
SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
SubscriptionKey key=new SubscriptionKey(info);
MemoryTopicSub sub=new MemoryTopicSub();
topicSubMap.put(key,sub);
if(retroactive){

View File

@ -17,6 +17,8 @@
*/
package org.apache.activemq.broker;
import java.util.ArrayList;
import javax.jms.DeliveryMode;
import junit.framework.Test;
@ -29,6 +31,7 @@ import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
@ -39,6 +42,113 @@ import org.apache.activemq.command.SessionInfo;
*/
public class RecoveryBrokerTest extends BrokerRestartTestSupport {
/**
* Used to verify that after a broker restart durable subscriptions that
* use wild cards are still wild card subscription after broker restart.
*
* @throws Exception
*/
public void testWildCardSubscriptionPreservedOnRestart() throws Exception {
ActiveMQDestination dest1 = new ActiveMQTopic("TEST.A");
ActiveMQDestination dest2 = new ActiveMQTopic("TEST.B");
ActiveMQDestination dest3 = new ActiveMQTopic("TEST.C");
ActiveMQDestination wildDest = new ActiveMQTopic("TEST.>");
ArrayList<MessageId> sentBeforeRestart = new ArrayList<MessageId>();
ArrayList<MessageId> sentBeforeCreateConsumer = new ArrayList<MessageId>();
ArrayList<MessageId> sentAfterCreateConsumer = new ArrayList<MessageId>();
// Setup a first connection
{
StubConnection connection1 = createConnection();
ConnectionInfo connectionInfo1 = createConnectionInfo();
connectionInfo1.setClientId("A");
SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
connection1.send(connectionInfo1);
connection1.send(sessionInfo1);
connection1.send(producerInfo1);
// Create the durable subscription.
ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, wildDest);
consumerInfo1.setSubscriptionName("test");
consumerInfo1.setPrefetchSize(100);
connection1.send(consumerInfo1);
// Close the subscription.
connection1.send(closeConsumerInfo(consumerInfo1));
// Send the messages
for( int i=0; i < 4; i++) {
Message m = createMessage(producerInfo1, dest1, DeliveryMode.PERSISTENT);
connection1.send(m);
sentBeforeRestart.add(m.getMessageId());
}
connection1.request(closeConnectionInfo(connectionInfo1));
connection1.stop();
}
// Restart the broker.
restartBroker();
// Get a connection to the new broker.
{
StubConnection connection2 = createConnection();
ConnectionInfo connectionInfo2 = createConnectionInfo();
connectionInfo2.setClientId("A");
SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
connection2.send(connectionInfo2);
connection2.send(sessionInfo2);
ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
connection2.send(producerInfo2);
// Send messages before the durable subscription is re-activated.
for( int i=0; i < 4; i++) {
Message m = createMessage(producerInfo2, dest2, DeliveryMode.PERSISTENT);
connection2.send(m);
sentBeforeCreateConsumer.add(m.getMessageId());
}
// Re-open the subscription.
ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, wildDest);
consumerInfo2.setSubscriptionName("test");
consumerInfo2.setPrefetchSize(100);
connection2.send(consumerInfo2);
// Send messages after the subscription is activated.
for( int i=0; i < 4; i++) {
Message m = createMessage(producerInfo2, dest3, DeliveryMode.PERSISTENT);
connection2.send(m);
sentAfterCreateConsumer.add(m.getMessageId());
}
// We should get the recovered messages...
for( int i=0; i < 4 ; i++ ) {
Message m2 = receiveMessage(connection2);
assertNotNull("Recovered message missing: "+i, m2);
assertEquals(sentBeforeRestart.get(i), m2.getMessageId());
}
// We should get get the messages that were sent before the sub was reactivated.
for( int i=0; i < 4 ; i++ ) {
Message m2 = receiveMessage(connection2);
assertNotNull("Before activated message missing: "+i, m2);
assertEquals(sentBeforeCreateConsumer.get(i), m2.getMessageId());
}
// We should get get the messages that were sent after the sub was reactivated.
for( int i=0; i < 4 ; i++ ) {
Message m2 = receiveMessage(connection2);
assertNotNull("After activated message missing: "+i, m2);
assertEquals(""+i, sentAfterCreateConsumer.get(i), m2.getMessageId());
}
assertNoMessagesLeft(connection2);
}
}
public void testConsumedQueuePersistentMessagesLostOnRestart() throws Exception {
ActiveMQDestination destination = new ActiveMQQueue("TEST");

View File

@ -39,9 +39,9 @@ public class TransactedTopicMasterSlaveTest extends JmsTopicTransactionTest{
broker.start();
slave = new BrokerService();
slave.setBrokerName("slave");
slave.addConnector("tcp://localhost:62002");
slave.setDeleteAllMessagesOnStartup(true);
slave.setMasterConnectorURI("tcp://localhost:62001");
slave.addConnector("tcp://localhost:62002");
slave.start();
// wait for thing to connect
Thread.sleep(1000);
@ -62,8 +62,8 @@ public class TransactedTopicMasterSlaveTest extends JmsTopicTransactionTest{
protected BrokerService createBroker() throws Exception,URISyntaxException{
BrokerService broker=new BrokerService();
broker.setBrokerName("master");
broker.addConnector("tcp://localhost:62001");
broker.setDeleteAllMessagesOnStartup(true);
broker.addConnector("tcp://localhost:62001");
return broker;
}

View File

@ -55,5 +55,6 @@ public class SubscriptionInfoTest extends DataFileGeneratorTestSupport {
info.setDestination(createActiveMQDestination("Destination:2"));
info.setSelector("Selector:3");
info.setSubcriptionName("SubcriptionName:4");
info.setSubscribedDestination(createActiveMQDestination("SubscribedDestination:5"));
}
}

View File

@ -59,14 +59,15 @@ public class JPATopicMessageStore extends JPAMessageStore implements TopicMessag
adapter.commitEntityManager(context,manager);
}
public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException {
public void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
EntityManager manager = adapter.beginEntityManager(null);
try {
StoredSubscription ss = new StoredSubscription();
ss.setClientId(clientId);
ss.setSubscriptionName(subscriptionName);
ss.setClientId(info.getClientId());
ss.setSubscriptionName(info.getSubscriptionName());
ss.setDestination(destinationName);
ss.setSelector(selector);
ss.setSelector(info.getSelector());
ss.setSubscribedDestination(info.getSubscribedDestination().getQualifiedName());
ss.setLastAckedId(-1);
if( !retroactive ) {
@ -125,7 +126,8 @@ public class JPATopicMessageStore extends JPAMessageStore implements TopicMessag
info.setClientId(ss.getClientId());
info.setDestination(destination);
info.setSelector(ss.getSelector());
info.setSubcriptionName(ss.getSubscriptionName());
info.setSubscriptionName(ss.getSubscriptionName());
info.setSubscribedDestination(toSubscribedDestination(ss));
l.add(info);
}
@ -171,7 +173,8 @@ public class JPATopicMessageStore extends JPAMessageStore implements TopicMessag
rc.setClientId(ss.getClientId());
rc.setDestination(destination);
rc.setSelector(ss.getSelector());
rc.setSubcriptionName(ss.getSubscriptionName());
rc.setSubscriptionName(ss.getSubscriptionName());
rc.setSubscribedDestination(toSubscribedDestination(ss));
}
} catch (Throwable e) {
adapter.rollbackEntityManager(null,manager);
@ -181,6 +184,12 @@ public class JPATopicMessageStore extends JPAMessageStore implements TopicMessag
return rc;
}
private ActiveMQDestination toSubscribedDestination(StoredSubscription ss) {
if( ss.getSubscribedDestination() == null )
return null;
return ActiveMQDestination.createDestination(ss.getSubscribedDestination(), ActiveMQDestination.QUEUE_TYPE);
}
public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
EntityManager manager = adapter.beginEntityManager(null);
try {

View File

@ -57,14 +57,15 @@ public class JPATopicReferenceStore extends JPAReferenceStore implements TopicRe
adapter.commitEntityManager(context,manager);
}
public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException {
public void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
EntityManager manager = adapter.beginEntityManager(null);
try {
StoredSubscription ss = new StoredSubscription();
ss.setClientId(clientId);
ss.setSubscriptionName(subscriptionName);
ss.setClientId(info.getClientId());
ss.setSubscriptionName(info.getSubcriptionName());
ss.setDestination(destinationName);
ss.setSelector(selector);
ss.setSelector(info.getSelector());
ss.setSubscribedDestination(info.getSubscribedDestination().getQualifiedName());
ss.setLastAckedId(-1);
if( !retroactive ) {
@ -123,7 +124,8 @@ public class JPATopicReferenceStore extends JPAReferenceStore implements TopicRe
info.setClientId(ss.getClientId());
info.setDestination(destination);
info.setSelector(ss.getSelector());
info.setSubcriptionName(ss.getSubscriptionName());
info.setSubscriptionName(ss.getSubscriptionName());
info.setSubscribedDestination(toSubscribedDestination(ss));
l.add(info);
}
@ -136,6 +138,12 @@ public class JPATopicReferenceStore extends JPAReferenceStore implements TopicRe
adapter.commitEntityManager(null,manager);
return rc;
}
private ActiveMQDestination toSubscribedDestination(StoredSubscription ss) {
if( ss.getSubscribedDestination() == null )
return null;
return ActiveMQDestination.createDestination(ss.getSubscribedDestination(), ActiveMQDestination.QUEUE_TYPE);
}
public int getMessageCount(String clientId, String subscriptionName) throws IOException {
Long rc;
@ -169,7 +177,8 @@ public class JPATopicReferenceStore extends JPAReferenceStore implements TopicRe
rc.setClientId(ss.getClientId());
rc.setDestination(destination);
rc.setSelector(ss.getSelector());
rc.setSubcriptionName(ss.getSubscriptionName());
rc.setSubscriptionName(ss.getSubscriptionName());
rc.setSubscribedDestination(toSubscribedDestination(ss));
}
} catch (Throwable e) {
adapter.rollbackEntityManager(null,manager);

View File

@ -106,7 +106,8 @@ public class StoredSubscription {
private long lastAckedId;
@Basic
private String selector;
@Basic
private String subscribedDestination;
public long getLastAckedId() {
return lastAckedId;
@ -155,4 +156,12 @@ public class StoredSubscription {
public void setId(long id) {
this.id = id;
}
public String getSubscribedDestination() {
return subscribedDestination;
}
public void setSubscribedDestination(String subscribedDestination) {
this.subscribedDestination = subscribedDestination;
}
}