git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@789283 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2009-06-29 10:41:30 +00:00
parent a09c810859
commit 2e7d2195cc
3 changed files with 20 additions and 17 deletions

View File

@ -117,7 +117,6 @@ public class Topic extends BaseDestination implements Task{
public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception {
sub.add(context, this);
destinationStatistics.getConsumers().increment();
if (!sub.getConsumerInfo().isDurable()) {
@ -133,6 +132,7 @@ public class Topic extends BaseDestination implements Task{
try {
synchronized (consumers) {
sub.add(context, this);
consumers.add(sub);
}
subscriptionRecoveryPolicy.recover(context, this, sub);
@ -143,10 +143,12 @@ public class Topic extends BaseDestination implements Task{
} else {
synchronized (consumers) {
sub.add(context, this);
consumers.add(sub);
}
}
} else {
sub.add(context, this);
DurableTopicSubscription dsub = (DurableTopicSubscription)sub;
durableSubcribers.put(dsub.getSubscriptionKey(), dsub);
}
@ -178,11 +180,7 @@ public class Topic extends BaseDestination implements Task{
// we are recovering a subscription to avoid out of order messages.
dispatchValve.turnOff();
try {
synchronized (consumers) {
consumers.add(subscription);
}
if (topicStore == null) {
return;
}
@ -199,6 +197,10 @@ public class Topic extends BaseDestination implements Task{
// Need to delete the subscription
topicStore.deleteSubscription(clientId, subscriptionName);
info = null;
} else {
synchronized (consumers) {
consumers.add(subscription);
}
}
}
// Do we need to create the subscription?
@ -208,11 +210,15 @@ public class Topic extends BaseDestination implements Task{
info.setSelector(selector);
info.setSubscriptionName(subscriptionName);
info.setDestination(getActiveMQDestination());
// Thi destination is an actual destination id.
// This destination is an actual destination id.
info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());
// This destination might be a pattern
topicStore.addSubsciption(info,subscription.getConsumerInfo().isRetroactive());
synchronized (consumers) {
consumers.add(subscription);
topicStore.addSubsciption(info,subscription.getConsumerInfo().isRetroactive());
}
}
final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
msgContext.setDestination(destination);
@ -244,7 +250,6 @@ public class Topic extends BaseDestination implements Task{
}
});
}
} finally {
dispatchValve.turnOn();
}

View File

@ -565,7 +565,6 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
*/
private void recover() throws IllegalStateException, IOException {
referenceStoreAdapter.clearMessages();
referenceStoreAdapter.recoverState();
Location pos = null;
int redoCounter = 0;
LOG.info("Journal Recovery Started from: " + asyncDataManager);

View File

@ -136,11 +136,11 @@ public class DurableConsumerTest extends TestCase {
topicConnectionFactory = new ActiveMQConnectionFactory(CONNECTION_URL);
try {
topic = new ActiveMQTopic(TOPIC_NAME);
topicConnection = topicConnectionFactory.createTopicConnection();
topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topicPublisher = topicSession.createPublisher(topic);
message = topicSession.createMessage();
topic = new ActiveMQTopic(TOPIC_NAME);
topicConnection = topicConnectionFactory.createTopicConnection();
topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topicPublisher = topicSession.createPublisher(topic);
message = topicSession.createMessage();
} catch( Exception ex ) {
exceptions.add(ex);
}
@ -174,13 +174,12 @@ public class DurableConsumerTest extends TestCase {
} );
thread.start();
LOG.info( "subscribed " + i + " of 100" );
}
Thread.sleep(5000);
broker.stop();
broker = createBroker(false);
Thread.sleep(5000);
Thread.sleep(10000);
assertEquals(0, exceptions.size());
}