mirror of https://github.com/apache/activemq.git
https://issues.apache.org/activemq/browse/AMQ-2973 - removing composite subscription
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1021768 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1e900b300a
commit
3a2d93958d
|
@ -547,14 +547,16 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
||||||
|
|
||||||
// Synchronized to DispatchLock
|
// Synchronized to DispatchLock
|
||||||
synchronized(dispatchLock) {
|
synchronized(dispatchLock) {
|
||||||
|
ArrayList<MessageReference> references = new ArrayList<MessageReference>();
|
||||||
for (MessageReference r : dispatched) {
|
for (MessageReference r : dispatched) {
|
||||||
if( r.getRegionDestination() == destination) {
|
if( r.getRegionDestination() == destination) {
|
||||||
rc.add(r);
|
references.add(r);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
destination.getDestinationStatistics().getDispatched().subtract(dispatched.size());
|
rc.addAll(references);
|
||||||
destination.getDestinationStatistics().getInflight().subtract(dispatched.size());
|
destination.getDestinationStatistics().getDispatched().subtract(references.size());
|
||||||
dispatched.clear();
|
destination.getDestinationStatistics().getInflight().subtract(references.size());
|
||||||
|
dispatched.removeAll(references);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return rc;
|
return rc;
|
||||||
|
|
|
@ -51,7 +51,7 @@ public class ListenerTest {
|
||||||
Thread.sleep(3000);
|
Thread.sleep(3000);
|
||||||
|
|
||||||
LOG.info("messages received= " + listener.messages.size());
|
LOG.info("messages received= " + listener.messages.size());
|
||||||
Assert.assertEquals(listener.messages.size(), msgNum);
|
Assert.assertEquals(msgNum, listener.messages.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -63,7 +63,7 @@ public class ListenerTest {
|
||||||
Thread.sleep(3000);
|
Thread.sleep(3000);
|
||||||
|
|
||||||
LOG.info("messages received= " + listener.messages.size());
|
LOG.info("messages received= " + listener.messages.size());
|
||||||
Assert.assertEquals(listener.messages.size(), 6 * msgNum);
|
Assert.assertEquals(6 * msgNum, listener.messages.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void sendMessages(String destName, int msgNum) throws Exception {
|
public void sendMessages(String destName, int msgNum) throws Exception {
|
||||||
|
|
|
@ -58,9 +58,8 @@
|
||||||
<bean id="compositeContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
|
<bean id="compositeContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
|
||||||
<property name="connectionFactory" ref="connectionFactory"/>
|
<property name="connectionFactory" ref="connectionFactory"/>
|
||||||
<property name="messageListener" ref="messageListener"/>
|
<property name="messageListener" ref="messageListener"/>
|
||||||
<property name="destinationName" value="TEST.>"/>
|
<property name="destinationName" value="TEST.>?consumer.prefetchSize=1"/>
|
||||||
<property name="transactionManager" ref="transactionManager"/>
|
<property name="transactionManager" ref="transactionManager"/>
|
||||||
<property name="cacheLevelName" value="CACHE_CONSUMER"/>
|
|
||||||
</bean>
|
</bean>
|
||||||
|
|
||||||
</beans>
|
</beans>
|
Loading…
Reference in New Issue