mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-3193 - consumers don't get messages after JMX remove
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1075346 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9308db8b75
commit
99653c6c8c
|
@ -1437,6 +1437,23 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
LOG.error("Failed to page in more queue messages ", e);
|
LOG.error("Failed to page in more queue messages ", e);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// if there are already paged messages
|
||||||
|
// dispatch them
|
||||||
|
if (pagedInMessages.size() != 0) {
|
||||||
|
pagedInMessagesLock.writeLock().lock();
|
||||||
|
ArrayList paged = new ArrayList();
|
||||||
|
try {
|
||||||
|
paged.addAll(pagedInMessages.values());
|
||||||
|
} finally {
|
||||||
|
pagedInMessagesLock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
doDispatch(paged);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Failed to dispatch already paged messages ", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pendingBrowserDispatch != null) {
|
if (pendingBrowserDispatch != null) {
|
||||||
|
|
|
@ -48,6 +48,7 @@ import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
|
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
|
||||||
import org.apache.activemq.command.ActiveMQBlobMessage;
|
import org.apache.activemq.command.ActiveMQBlobMessage;
|
||||||
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
import org.apache.activemq.command.ActiveMQTempQueue;
|
import org.apache.activemq.command.ActiveMQTempQueue;
|
||||||
import org.apache.activemq.util.JMXSupport;
|
import org.apache.activemq.util.JMXSupport;
|
||||||
|
@ -161,6 +162,39 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
|
||||||
assertTrue("cache enabled", queueNew.isCacheEnabled());
|
assertTrue("cache enabled", queueNew.isCacheEnabled());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testRemoveMessages() throws Exception {
|
||||||
|
ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
|
||||||
|
BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
|
||||||
|
broker.addQueue(getDestinationString());
|
||||||
|
|
||||||
|
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
|
||||||
|
|
||||||
|
QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
|
||||||
|
String msg1 = queue.sendTextMessage("message 1");
|
||||||
|
String msg2 = queue.sendTextMessage("message 2");
|
||||||
|
|
||||||
|
assertTrue(queue.removeMessage(msg2));
|
||||||
|
|
||||||
|
connection = connectionFactory.createConnection();
|
||||||
|
connection.start();
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
ActiveMQDestination dest = createDestination();
|
||||||
|
|
||||||
|
MessageConsumer consumer = session.createConsumer(dest);
|
||||||
|
Message message = consumer.receive(1000);
|
||||||
|
assertNotNull(message);
|
||||||
|
assertEquals(msg1, message.getJMSMessageID());
|
||||||
|
|
||||||
|
String msg3 = queue.sendTextMessage("message 3");
|
||||||
|
message = consumer.receive(1000);
|
||||||
|
assertNotNull(message);
|
||||||
|
assertEquals(msg3, message.getJMSMessageID());
|
||||||
|
|
||||||
|
message = consumer.receive(1000);
|
||||||
|
assertNull(message);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
public void testRetryMessages() throws Exception {
|
public void testRetryMessages() throws Exception {
|
||||||
// lets speed up redelivery
|
// lets speed up redelivery
|
||||||
ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) connectionFactory;
|
ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) connectionFactory;
|
||||||
|
|
Loading…
Reference in New Issue