AMQ-6707 - fix destination filter delegate param, refactor-auto-gen method; jees

This commit is contained in:
gtully 2018-05-18 14:44:05 +01:00
parent 2eff835ee2
commit 01384c714d
2 changed files with 48 additions and 16 deletions

View File

@ -380,7 +380,7 @@ public class DestinationFilter implements Destination {
@Override @Override
public void clearPendingMessages(int pendingAdditionsCount) { public void clearPendingMessages(int pendingAdditionsCount) {
next.clearPendingMessages(0); next.clearPendingMessages(pendingAdditionsCount);
} }
@Override @Override

View File

@ -21,14 +21,22 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQXAConnection; import org.apache.activemq.ActiveMQXAConnection;
import org.apache.activemq.ActiveMQXAConnectionFactory; import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.TestSupport; import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.XATransactionId; import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.filter.AnyDestination;
import org.apache.activemq.filter.DestinationMap;
import org.apache.activemq.jaas.GroupPrincipal;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.security.AuthorizationPlugin;
import org.apache.activemq.security.SimpleAuthenticationPlugin;
import org.apache.activemq.security.SimpleAuthorizationMap;
import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.wireformat.WireFormat;
@ -464,6 +472,7 @@ public class XACompletionTest extends TestSupport {
// set maxBatchSize=1 // set maxBatchSize=1
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=" + 1); ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=" + 1);
factory.setWatchTopicAdvisories(false);
javax.jms.Connection connection = factory.createConnection(); javax.jms.Connection connection = factory.createConnection();
connection.start(); connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@ -472,6 +481,7 @@ public class XACompletionTest extends TestSupport {
consumer.close(); consumer.close();
ActiveMQConnectionFactory receiveFactory = new ActiveMQConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=0"); ActiveMQConnectionFactory receiveFactory = new ActiveMQConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=0");
receiveFactory.setWatchTopicAdvisories(false);
// recover/rollback the second tx // recover/rollback the second tx
ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=0"); ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=0");
@ -870,6 +880,7 @@ public class XACompletionTest extends TestSupport {
private Message regularReceive(String qName) throws Exception { private Message regularReceive(String qName) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
factory.setWatchTopicAdvisories(false);
return regularReceiveWith(factory, qName); return regularReceiveWith(factory, qName);
} }
@ -889,6 +900,7 @@ public class XACompletionTest extends TestSupport {
private int drainUnack(int limit, String qName) throws Exception { private int drainUnack(int limit, String qName) throws Exception {
int drained = 0; int drained = 0;
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=" + limit); ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=" + limit);
factory.setWatchTopicAdvisories(false);
javax.jms.Connection connection = factory.createConnection(); javax.jms.Connection connection = factory.createConnection();
try { try {
connection.start(); connection.start();
@ -897,7 +909,8 @@ public class XACompletionTest extends TestSupport {
MessageConsumer consumer = session.createConsumer(destination); MessageConsumer consumer = session.createConsumer(destination);
while (drained < limit && consumer.receive(2000) != null) { while (drained < limit && consumer.receive(2000) != null) {
drained++; drained++;
}; }
;
consumer.close(); consumer.close();
} finally { } finally {
connection.close(); connection.close();
@ -921,6 +934,7 @@ public class XACompletionTest extends TestSupport {
connection.close(); connection.close();
} }
} }
protected void sendMessages(int messagesExpected) throws Exception { protected void sendMessages(int messagesExpected) throws Exception {
sendMessagesWith(factory, messagesExpected); sendMessagesWith(factory, messagesExpected);
} }
@ -985,6 +999,24 @@ public class XACompletionTest extends TestSupport {
setPersistenceAdapter(broker, persistenceAdapterChoice); setPersistenceAdapter(broker, persistenceAdapterChoice);
broker.setPersistent(true); broker.setPersistent(true);
connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString(); connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
// ensure we run through a destination filter
final String id = "a";
AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin();
SimpleAuthorizationMap map = new SimpleAuthorizationMap();
DestinationMap destinationMap = new DestinationMap();
GroupPrincipal anaGroup = new GroupPrincipal(id);
destinationMap.put(new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">")}), anaGroup);
map.setWriteACLs(destinationMap);
map.setAdminACLs(destinationMap);
map.setReadACLs(destinationMap);
authorizationPlugin.setMap(map);
SimpleAuthenticationPlugin simpleAuthenticationPlugin = new SimpleAuthenticationPlugin();
simpleAuthenticationPlugin.setAnonymousAccessAllowed(true);
simpleAuthenticationPlugin.setAnonymousGroup(id);
simpleAuthenticationPlugin.setAnonymousUser(id);
broker.setPlugins(new BrokerPlugin[]{simpleAuthenticationPlugin, authorizationPlugin});
broker.start(); broker.start();
return broker; return broker;
} }