ARTEMIS-1416 Fix regressions in Joram tests

This closes #1621
This commit is contained in:
Howard Gao 2017-11-01 23:34:05 +08:00 committed by Clebert Suconic
parent f3ace6afd7
commit ec13ed6df0
3 changed files with 23 additions and 7 deletions

View File

@ -48,6 +48,7 @@ import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMess
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
@ -386,7 +387,8 @@ public class AMQPSessionCallback implements SessionCallback {
((ServerConsumer) consumer).receiveCredits(-1);
}
public void serverSend(final Transaction transaction,
public void serverSend(final ProtonServerReceiverContext context,
final Transaction transaction,
final Receiver receiver,
final Delivery delivery,
String address,
@ -405,7 +407,10 @@ public class AMQPSessionCallback implements SessionCallback {
}
//here check queue-autocreation
if (!bindingQuery(address, RoutingType.ANYCAST)) {
org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
RoutingType routingType = context.getRoutingType(receiver);
if (!bindingQuery(address, routingType)) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
}

View File

@ -58,6 +58,8 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
protected final AMQPSessionCallback sessionSPI;
private RoutingType defRoutingType;
/*
The maximum number of credits we will allocate to clients.
This number is also used by the broker when refresh client credits.
@ -98,12 +100,13 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
if (target != null) {
if (target.getDynamic()) {
defRoutingType = getRoutingType(target.getCapabilities());
// if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and
// will be deleted on closing of the session
address = sessionSPI.tempQueueName();
try {
sessionSPI.createTemporaryQueue(address, getRoutingType(target.getCapabilities()));
sessionSPI.createTemporaryQueue(address, defRoutingType);
} catch (ActiveMQSecurityException e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingTempDestination(e.getMessage());
} catch (Exception e) {
@ -118,8 +121,9 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
address = target.getAddress();
if (address != null && !address.isEmpty()) {
defRoutingType = getRoutingType(target.getCapabilities());
try {
if (!sessionSPI.bindingQuery(address, getRoutingType(target.getCapabilities()))) {
if (!sessionSPI.bindingQuery(address, defRoutingType)) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
}
} catch (ActiveMQAMQPNotFoundException e) {
@ -177,6 +181,14 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
flow(amqpCredits, minCreditRefresh);
}
public RoutingType getRoutingType(Receiver receiver) {
if (receiver == this.receiver) {
return defRoutingType;
}
org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
return target != null ? getRoutingType(target.getCapabilities()) : getRoutingType((Symbol[])null);
}
private RoutingType getRoutingType(Symbol[] symbols) {
if (symbols != null) {
for (Symbol symbol : symbols) {
@ -223,7 +235,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
tx = this.sessionSPI.getTransaction(txState.getTxnId(), false);
}
sessionSPI.serverSend(tx, receiver, delivery, address, delivery.getMessageFormat(), data);
sessionSPI.serverSend(this, tx, receiver, delivery, address, delivery.getMessageFormat(), data);
flow(amqpCredits, minCreditRefresh);
} catch (Exception e) {

View File

@ -161,8 +161,7 @@ public abstract class PubSubTestCase extends JMSTestCase {
subscriberTCF = null;
subscriberSession = null;
subscriberConnection = null;
super.tearDown();
}
super.tearDown();
}
}