mirror of https://github.com/apache/activemq.git
Apply patch from https://issues.apache.org/activemq/browse/AMQ-1808
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@669263 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
604ebd14eb
commit
b52624e66b
|
@ -1229,7 +1229,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
||||||
try {
|
try {
|
||||||
throw JMSExceptionSupport.create(er.getException());
|
throw JMSExceptionSupport.create(er.getException());
|
||||||
}catch(Throwable e) {
|
}catch(Throwable e) {
|
||||||
LOG.error("Caught an exception trying to create a JMSException",e);
|
System.err.println(er.getException());
|
||||||
|
LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -261,14 +261,14 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
/**
|
/**
|
||||||
* @return Returns the value.
|
* @return Returns the value.
|
||||||
*/
|
*/
|
||||||
protected ConsumerId getConsumerId() {
|
public ConsumerId getConsumerId() {
|
||||||
return info.getConsumerId();
|
return info.getConsumerId();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the consumer name - used for durable consumers
|
* @return the consumer name - used for durable consumers
|
||||||
*/
|
*/
|
||||||
protected String getConsumerName() {
|
public String getConsumerName() {
|
||||||
return this.info.getSubscriptionName();
|
return this.info.getSubscriptionName();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,7 @@ import javax.jms.MessageConsumer;
|
||||||
import javax.jms.MessageListener;
|
import javax.jms.MessageListener;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQMessageConsumer;
|
||||||
import org.apache.activemq.Service;
|
import org.apache.activemq.Service;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.ActiveMQMessage;
|
import org.apache.activemq.command.ActiveMQMessage;
|
||||||
|
@ -52,7 +53,7 @@ public class ConsumerEventSource implements Service, MessageListener {
|
||||||
private AtomicBoolean started = new AtomicBoolean(false);
|
private AtomicBoolean started = new AtomicBoolean(false);
|
||||||
private AtomicInteger consumerCount = new AtomicInteger();
|
private AtomicInteger consumerCount = new AtomicInteger();
|
||||||
private Session session;
|
private Session session;
|
||||||
private MessageConsumer consumer;
|
private ActiveMQMessageConsumer consumer;
|
||||||
|
|
||||||
public ConsumerEventSource(Connection connection, Destination destination) throws JMSException {
|
public ConsumerEventSource(Connection connection, Destination destination) throws JMSException {
|
||||||
this.connection = connection;
|
this.connection = connection;
|
||||||
|
@ -62,12 +63,16 @@ public class ConsumerEventSource implements Service, MessageListener {
|
||||||
public void setConsumerListener(ConsumerListener listener) {
|
public void setConsumerListener(ConsumerListener listener) {
|
||||||
this.listener = listener;
|
this.listener = listener;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getConsumerId() {
|
||||||
|
return consumer != null ? consumer.getConsumerId().toString() : "NOT_SET";
|
||||||
|
}
|
||||||
|
|
||||||
public void start() throws Exception {
|
public void start() throws Exception {
|
||||||
if (started.compareAndSet(false, true)) {
|
if (started.compareAndSet(false, true)) {
|
||||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
ActiveMQTopic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic(destination);
|
ActiveMQTopic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic(destination);
|
||||||
consumer = session.createConsumer(advisoryTopic);
|
consumer = (ActiveMQMessageConsumer) session.createConsumer(advisoryTopic);
|
||||||
consumer.setMessageListener(this);
|
consumer.setMessageListener(this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,7 +44,9 @@ public class DurableSubscriptionView extends SubscriptionView implements Durable
|
||||||
super(clientId, sub);
|
super(clientId, sub);
|
||||||
this.broker = broker;
|
this.broker = broker;
|
||||||
this.durableSub=(DurableTopicSubscription) sub;
|
this.durableSub=(DurableTopicSubscription) sub;
|
||||||
this.subscriptionName = sub.getConsumerInfo().getSubscriptionName();
|
if (sub != null) {
|
||||||
|
this.subscriptionName = sub.getConsumerInfo().getSubscriptionName();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -111,7 +111,7 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport {
|
||||||
|
|
||||||
protected abstract PersistenceAdapter createPersistenceAdapter() throws Exception;
|
protected abstract PersistenceAdapter createPersistenceAdapter() throws Exception;
|
||||||
|
|
||||||
public void xtestUnsubscribeSubscription() throws Exception {
|
public void testUnsubscribeSubscription() throws Exception {
|
||||||
session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
|
session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
|
||||||
Topic topic = session.createTopic("TestTopic");
|
Topic topic = session.createTopic("TestTopic");
|
||||||
consumer = session.createDurableSubscriber(topic, "sub1");
|
consumer = session.createDurableSubscriber(topic, "sub1");
|
||||||
|
@ -144,7 +144,7 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport {
|
||||||
assertTextMessageEquals("Msg:3", consumer.receive(5000));
|
assertTextMessageEquals("Msg:3", consumer.receive(5000));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void xtestInactiveDurableSubscriptionTwoConnections() throws Exception {
|
public void testInactiveDurableSubscriptionTwoConnections() throws Exception {
|
||||||
session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
|
session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
|
||||||
Topic topic = session.createTopic("TestTopic");
|
Topic topic = session.createTopic("TestTopic");
|
||||||
consumer = session.createDurableSubscriber(topic, "sub1");
|
consumer = session.createDurableSubscriber(topic, "sub1");
|
||||||
|
@ -175,7 +175,7 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport {
|
||||||
assertTextMessageEquals("Msg:2", consumer.receive(5000));
|
assertTextMessageEquals("Msg:2", consumer.receive(5000));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void xtestInactiveDurableSubscriptionBrokerRestart() throws Exception {
|
public void testInactiveDurableSubscriptionBrokerRestart() throws Exception {
|
||||||
session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
|
session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
|
||||||
Topic topic = session.createTopic("TestTopic");
|
Topic topic = session.createTopic("TestTopic");
|
||||||
consumer = session.createDurableSubscriber(topic, "sub1");
|
consumer = session.createDurableSubscriber(topic, "sub1");
|
||||||
|
@ -266,7 +266,7 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport {
|
||||||
assertTextMessageEquals("Msg:2", consumer.receive(5000));
|
assertTextMessageEquals("Msg:2", consumer.receive(5000));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void xtestSelectorChange() throws Exception {
|
public void testSelectorChange() throws Exception {
|
||||||
session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
|
session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
|
||||||
Topic topic = session.createTopic("TestTopic");
|
Topic topic = session.createTopic("TestTopic");
|
||||||
consumer = session.createDurableSubscriber(topic, "sub1", "color='red'", false);
|
consumer = session.createDurableSubscriber(topic, "sub1", "color='red'", false);
|
||||||
|
@ -301,7 +301,7 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport {
|
||||||
assertTextMessageEquals("Msg:4", consumer.receive(5000));
|
assertTextMessageEquals("Msg:4", consumer.receive(5000));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void xtestDurableSubWorksInNewSession() throws JMSException {
|
public void testDurableSubWorksInNewSession() throws JMSException {
|
||||||
|
|
||||||
// Create the consumer.
|
// Create the consumer.
|
||||||
connection.start();
|
connection.start();
|
||||||
|
@ -328,7 +328,7 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void xtestDurableSubWorksInNewConnection() throws Exception {
|
public void testDurableSubWorksInNewConnection() throws Exception {
|
||||||
|
|
||||||
// Create the consumer.
|
// Create the consumer.
|
||||||
connection.start();
|
connection.start();
|
||||||
|
|
Loading…
Reference in New Issue