ARTEMIS-738 Improving TX support on AMQP
https://issues.apache.org/jira/browse/ARTEMIS-738
This commit is contained in:
parent
5ea53c48e8
commit
113c0c9360
|
@ -21,6 +21,8 @@ import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
@ -37,8 +39,13 @@ import org.apache.activemq.artemis.core.protocol.proton.sasl.ActiveMQPlainSASL;
|
||||||
import org.apache.activemq.artemis.core.remoting.CloseListener;
|
import org.apache.activemq.artemis.core.remoting.CloseListener;
|
||||||
import org.apache.activemq.artemis.core.remoting.FailureListener;
|
import org.apache.activemq.artemis.core.remoting.FailureListener;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
|
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
|
||||||
|
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||||
|
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||||
|
import org.apache.qpid.proton.amqp.Binary;
|
||||||
import org.apache.qpid.proton.amqp.Symbol;
|
import org.apache.qpid.proton.amqp.Symbol;
|
||||||
import org.apache.qpid.proton.amqp.transport.AmqpError;
|
import org.apache.qpid.proton.amqp.transport.AmqpError;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
@ -47,7 +54,9 @@ import org.proton.plug.AMQPConnectionContext;
|
||||||
import org.proton.plug.AMQPSessionCallback;
|
import org.proton.plug.AMQPSessionCallback;
|
||||||
import org.proton.plug.SASLResult;
|
import org.proton.plug.SASLResult;
|
||||||
import org.proton.plug.ServerSASL;
|
import org.proton.plug.ServerSASL;
|
||||||
|
import org.proton.plug.exceptions.ActiveMQAMQPException;
|
||||||
import org.proton.plug.handler.ExtCapability;
|
import org.proton.plug.handler.ExtCapability;
|
||||||
|
import org.proton.plug.logger.ActiveMQAMQPProtocolMessageBundle;
|
||||||
import org.proton.plug.sasl.AnonymousServerSASL;
|
import org.proton.plug.sasl.AnonymousServerSASL;
|
||||||
|
|
||||||
import static org.proton.plug.AmqpSupport.CONTAINER_ID;
|
import static org.proton.plug.AmqpSupport.CONTAINER_ID;
|
||||||
|
@ -55,8 +64,11 @@ import static org.proton.plug.AmqpSupport.INVALID_FIELD;
|
||||||
import static org.proton.plug.context.AbstractConnectionContext.CONNECTION_OPEN_FAILED;
|
import static org.proton.plug.context.AbstractConnectionContext.CONNECTION_OPEN_FAILED;
|
||||||
|
|
||||||
public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback, FailureListener, CloseListener {
|
public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback, FailureListener, CloseListener {
|
||||||
|
private static final Logger logger = Logger.getLogger(ActiveMQProtonConnectionCallback.class);
|
||||||
private static final List<String> connectedContainers = Collections.synchronizedList(new ArrayList());
|
private static final List<String> connectedContainers = Collections.synchronizedList(new ArrayList());
|
||||||
|
|
||||||
|
private ConcurrentMap<XidImpl, Transaction> transactions = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private static final Logger log = Logger.getLogger(ActiveMQProtonConnectionCallback.class);
|
private static final Logger log = Logger.getLogger(ActiveMQProtonConnectionCallback.class);
|
||||||
|
|
||||||
private final ProtonProtocolManager manager;
|
private final ProtonProtocolManager manager;
|
||||||
|
@ -117,11 +129,23 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback,
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
if (registeredConnectionId.getAndSet(false)) {
|
try {
|
||||||
server.removeClientConnection(remoteContainerId);
|
if (registeredConnectionId.getAndSet(false)) {
|
||||||
|
server.removeClientConnection(remoteContainerId);
|
||||||
|
}
|
||||||
|
connection.close();
|
||||||
|
amqpConnection.close();
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
for (Transaction tx : transactions.values()) {
|
||||||
|
try {
|
||||||
|
tx.rollback();
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
logger.warn(e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
connection.close();
|
|
||||||
amqpConnection.close();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Executor getExeuctor() {
|
public Executor getExeuctor() {
|
||||||
|
@ -219,4 +243,43 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback,
|
||||||
public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
|
public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
|
||||||
close();
|
close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Binary newTransaction() {
|
||||||
|
XidImpl xid = newXID();
|
||||||
|
Transaction transaction = new TransactionImpl(xid, server.getStorageManager(), -1);
|
||||||
|
transactions.put(xid, transaction);
|
||||||
|
return new Binary(xid.getGlobalTransactionId());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
|
||||||
|
XidImpl xid = newXID(txid.getArray());
|
||||||
|
Transaction tx = transactions.get(xid);
|
||||||
|
|
||||||
|
if (tx == null) {
|
||||||
|
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.txNotFound(xid.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
return tx;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeTransaction(Binary txid) {
|
||||||
|
XidImpl xid = newXID(txid.getArray());
|
||||||
|
transactions.remove(xid);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected XidImpl newXID() {
|
||||||
|
return newXID(UUIDGenerator.getInstance().generateStringUUID().getBytes());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected XidImpl newXID(byte[] bytes) {
|
||||||
|
return new XidImpl("amqp".getBytes(), 1, bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,7 +42,6 @@ import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
|
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
|
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
|
||||||
import org.apache.activemq.artemis.utils.ByteUtil;
|
|
||||||
import org.apache.activemq.artemis.utils.IDGenerator;
|
import org.apache.activemq.artemis.utils.IDGenerator;
|
||||||
import org.apache.activemq.artemis.utils.SelectorTranslator;
|
import org.apache.activemq.artemis.utils.SelectorTranslator;
|
||||||
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
|
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
|
||||||
|
@ -61,6 +60,7 @@ import org.proton.plug.AMQPSessionCallback;
|
||||||
import org.proton.plug.AMQPSessionContext;
|
import org.proton.plug.AMQPSessionContext;
|
||||||
import org.proton.plug.SASLResult;
|
import org.proton.plug.SASLResult;
|
||||||
import org.proton.plug.context.ProtonPlugSender;
|
import org.proton.plug.context.ProtonPlugSender;
|
||||||
|
import org.proton.plug.exceptions.ActiveMQAMQPException;
|
||||||
import org.proton.plug.exceptions.ActiveMQAMQPInternalErrorException;
|
import org.proton.plug.exceptions.ActiveMQAMQPInternalErrorException;
|
||||||
import org.proton.plug.exceptions.ActiveMQAMQPResourceLimitExceededException;
|
import org.proton.plug.exceptions.ActiveMQAMQPResourceLimitExceededException;
|
||||||
import org.proton.plug.sasl.PlainSASLResult;
|
import org.proton.plug.sasl.PlainSASLResult;
|
||||||
|
@ -281,46 +281,11 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
|
||||||
return (ProtonJMessage) manager.getConverter().outbound((ServerMessage) message, deliveryCount);
|
return (ProtonJMessage) manager.getConverter().outbound((ServerMessage) message, deliveryCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public Binary getCurrentTXID() {
|
|
||||||
Transaction tx = serverSession.getCurrentTransaction();
|
|
||||||
if (tx == null) {
|
|
||||||
tx = serverSession.newTransaction();
|
|
||||||
serverSession.resetTX(tx);
|
|
||||||
}
|
|
||||||
return new Binary(ByteUtil.longToBytes(tx.getID()));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String tempQueueName() {
|
public String tempQueueName() {
|
||||||
return UUIDGenerator.getInstance().generateStringUUID();
|
return UUIDGenerator.getInstance().generateStringUUID();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void commitCurrentTX() throws Exception {
|
|
||||||
recoverContext();
|
|
||||||
try {
|
|
||||||
serverSession.commit();
|
|
||||||
}
|
|
||||||
finally {
|
|
||||||
resetContext();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void rollbackCurrentTX(boolean lastMessageDelivered) throws Exception {
|
|
||||||
//need to check here as this can be called if init fails
|
|
||||||
if (serverSession != null) {
|
|
||||||
recoverContext();
|
|
||||||
try {
|
|
||||||
serverSession.rollback(lastMessageDelivered);
|
|
||||||
}
|
|
||||||
finally {
|
|
||||||
resetContext();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws Exception {
|
public void close() throws Exception {
|
||||||
//need to check here as this can be called if init fails
|
//need to check here as this can be called if init fails
|
||||||
|
@ -336,10 +301,13 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void ack(Object brokerConsumer, Object message) throws Exception {
|
public void ack(Transaction transaction, Object brokerConsumer, Object message) throws Exception {
|
||||||
|
if (transaction == null) {
|
||||||
|
transaction = serverSession.getCurrentTransaction();
|
||||||
|
}
|
||||||
recoverContext();
|
recoverContext();
|
||||||
try {
|
try {
|
||||||
((ServerConsumer) brokerConsumer).individualAcknowledge(serverSession.getCurrentTransaction(), ((ServerMessage) message).getMessageID());
|
((ServerConsumer) brokerConsumer).individualAcknowledge(transaction, ((ServerMessage) message).getMessageID());
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
resetContext();
|
resetContext();
|
||||||
|
@ -363,7 +331,8 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void serverSend(final Receiver receiver,
|
public void serverSend(final Transaction transaction,
|
||||||
|
final Receiver receiver,
|
||||||
final Delivery delivery,
|
final Delivery delivery,
|
||||||
String address,
|
String address,
|
||||||
int messageFormat,
|
int messageFormat,
|
||||||
|
@ -382,10 +351,10 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
|
||||||
if (store.isRejectingMessages()) {
|
if (store.isRejectingMessages()) {
|
||||||
// We drop pre-settled messages (and abort any associated Tx)
|
// We drop pre-settled messages (and abort any associated Tx)
|
||||||
if (delivery.remotelySettled()) {
|
if (delivery.remotelySettled()) {
|
||||||
if (serverSession.getCurrentTransaction() != null) {
|
if (transaction != null) {
|
||||||
String amqpAddress = delivery.getLink().getTarget().getAddress();
|
String amqpAddress = delivery.getLink().getTarget().getAddress();
|
||||||
ActiveMQException e = new ActiveMQAMQPResourceLimitExceededException("Address is full: " + amqpAddress);
|
ActiveMQException e = new ActiveMQAMQPResourceLimitExceededException("Address is full: " + amqpAddress);
|
||||||
serverSession.getCurrentTransaction().markAsRollbackOnly(e);
|
transaction.markAsRollbackOnly(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
@ -393,7 +362,7 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
serverSend(message, delivery, receiver);
|
serverSend(transaction, message, delivery, receiver);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -406,11 +375,11 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
|
||||||
connection.flush();
|
connection.flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void serverSend(final ServerMessage message, final Delivery delivery, final Receiver receiver) throws Exception {
|
private void serverSend(final Transaction transaction, final ServerMessage message, final Delivery delivery, final Receiver receiver) throws Exception {
|
||||||
try {
|
try {
|
||||||
|
|
||||||
message.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString(), receiver.getSession().getConnection().getRemoteContainer());
|
message.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString(), receiver.getSession().getConnection().getRemoteContainer());
|
||||||
serverSession.send(message, false);
|
serverSession.send(transaction, message, false, false);
|
||||||
|
|
||||||
// FIXME Potential race here...
|
// FIXME Potential race here...
|
||||||
manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() {
|
manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() {
|
||||||
|
@ -543,4 +512,31 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
|
||||||
|
return protonSPI.getTransaction(txid);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Binary newTransaction() {
|
||||||
|
return protonSPI.newTransaction();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void commitTX(Binary txid) throws Exception {
|
||||||
|
Transaction tx = protonSPI.getTransaction(txid);
|
||||||
|
tx.commit(true);
|
||||||
|
protonSPI.removeTransaction(txid);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void rollbackTX(Binary txid, boolean lastMessageReceived) throws Exception {
|
||||||
|
Transaction tx = protonSPI.getTransaction(txid);
|
||||||
|
tx.rollback();
|
||||||
|
protonSPI.removeTransaction(txid);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,10 @@
|
||||||
package org.proton.plug;
|
package org.proton.plug;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
|
import org.apache.qpid.proton.amqp.Binary;
|
||||||
import org.apache.qpid.proton.engine.Connection;
|
import org.apache.qpid.proton.engine.Connection;
|
||||||
|
import org.proton.plug.exceptions.ActiveMQAMQPException;
|
||||||
|
|
||||||
public interface AMQPConnectionCallback {
|
public interface AMQPConnectionCallback {
|
||||||
|
|
||||||
|
@ -44,4 +47,12 @@ public interface AMQPConnectionCallback {
|
||||||
void sendSASLSupported();
|
void sendSASLSupported();
|
||||||
|
|
||||||
boolean validateConnection(Connection connection, SASLResult saslResult);
|
boolean validateConnection(Connection connection, SASLResult saslResult);
|
||||||
|
|
||||||
|
Binary newTransaction();
|
||||||
|
|
||||||
|
Transaction getTransaction(Binary txid) throws ActiveMQAMQPException;
|
||||||
|
|
||||||
|
void removeTransaction(Binary txid);
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,11 +18,13 @@ package org.proton.plug;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import org.apache.activemq.artemis.core.server.QueueQueryResult;
|
import org.apache.activemq.artemis.core.server.QueueQueryResult;
|
||||||
|
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
import org.apache.qpid.proton.amqp.Binary;
|
import org.apache.qpid.proton.amqp.Binary;
|
||||||
import org.apache.qpid.proton.engine.Delivery;
|
import org.apache.qpid.proton.engine.Delivery;
|
||||||
import org.apache.qpid.proton.engine.Receiver;
|
import org.apache.qpid.proton.engine.Receiver;
|
||||||
import org.apache.qpid.proton.message.ProtonJMessage;
|
import org.apache.qpid.proton.message.ProtonJMessage;
|
||||||
import org.proton.plug.context.ProtonPlugSender;
|
import org.proton.plug.context.ProtonPlugSender;
|
||||||
|
import org.proton.plug.exceptions.ActiveMQAMQPException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* These are methods where the Proton Plug component will call your server
|
* These are methods where the Proton Plug component will call your server
|
||||||
|
@ -67,17 +69,20 @@ public interface AMQPSessionCallback {
|
||||||
// This one can be a lot improved
|
// This one can be a lot improved
|
||||||
ProtonJMessage encodeMessage(Object message, int deliveryCount) throws Exception;
|
ProtonJMessage encodeMessage(Object message, int deliveryCount) throws Exception;
|
||||||
|
|
||||||
Binary getCurrentTXID();
|
|
||||||
|
|
||||||
String tempQueueName();
|
String tempQueueName();
|
||||||
|
|
||||||
void commitCurrentTX() throws Exception;
|
|
||||||
|
|
||||||
void rollbackCurrentTX(boolean lastMessageReceived) throws Exception;
|
Transaction getTransaction(Binary txid) throws ActiveMQAMQPException;
|
||||||
|
|
||||||
|
Binary newTransaction();
|
||||||
|
|
||||||
|
void commitTX(Binary txid) throws Exception;
|
||||||
|
|
||||||
|
void rollbackTX(Binary txid, boolean lastMessageReceived) throws Exception;
|
||||||
|
|
||||||
void close() throws Exception;
|
void close() throws Exception;
|
||||||
|
|
||||||
void ack(Object brokerConsumer, Object message) throws Exception;
|
void ack(Transaction transaction, Object brokerConsumer, Object message) throws Exception;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param brokerConsumer
|
* @param brokerConsumer
|
||||||
|
@ -96,7 +101,8 @@ public interface AMQPSessionCallback {
|
||||||
* @param messageFormat
|
* @param messageFormat
|
||||||
* @param messageEncoded a Heap Buffer ByteBuffer (safe to convert into byte[])
|
* @param messageEncoded a Heap Buffer ByteBuffer (safe to convert into byte[])
|
||||||
*/
|
*/
|
||||||
void serverSend(Receiver receiver,
|
void serverSend(Transaction transaction,
|
||||||
|
Receiver receiver,
|
||||||
Delivery delivery,
|
Delivery delivery,
|
||||||
String address,
|
String address,
|
||||||
int messageFormat,
|
int messageFormat,
|
||||||
|
|
|
@ -140,7 +140,6 @@ public abstract class AbstractProtonSessionContext extends ProtonInitializable i
|
||||||
senders.clear();
|
senders.clear();
|
||||||
try {
|
try {
|
||||||
if (sessionSPI != null) {
|
if (sessionSPI != null) {
|
||||||
sessionSPI.rollbackCurrentTX(false);
|
|
||||||
sessionSPI.close();
|
sessionSPI.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -72,7 +72,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
|
||||||
Object action = ((AmqpValue) msg.getBody()).getValue();
|
Object action = ((AmqpValue) msg.getBody()).getValue();
|
||||||
|
|
||||||
if (action instanceof Declare) {
|
if (action instanceof Declare) {
|
||||||
Binary txID = sessionSPI.getCurrentTXID();
|
Binary txID = sessionSPI.newTransaction();
|
||||||
Declared declared = new Declared();
|
Declared declared = new Declared();
|
||||||
declared.setTxnId(txID);
|
declared.setTxnId(txID);
|
||||||
delivery.disposition(declared);
|
delivery.disposition(declared);
|
||||||
|
@ -80,9 +80,11 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
|
||||||
}
|
}
|
||||||
else if (action instanceof Discharge) {
|
else if (action instanceof Discharge) {
|
||||||
Discharge discharge = (Discharge) action;
|
Discharge discharge = (Discharge) action;
|
||||||
|
|
||||||
|
Binary txID = discharge.getTxnId();
|
||||||
if (discharge.getFail()) {
|
if (discharge.getFail()) {
|
||||||
try {
|
try {
|
||||||
sessionSPI.rollbackCurrentTX(true);
|
sessionSPI.rollbackTX(txID, true);
|
||||||
delivery.disposition(new Accepted());
|
delivery.disposition(new Accepted());
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
|
@ -91,7 +93,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
try {
|
try {
|
||||||
sessionSPI.commitCurrentTX();
|
sessionSPI.commitTX(txID);
|
||||||
delivery.disposition(new Accepted());
|
delivery.disposition(new Accepted());
|
||||||
}
|
}
|
||||||
catch (ActiveMQAMQPException amqpE) {
|
catch (ActiveMQAMQPException amqpE) {
|
||||||
|
|
|
@ -18,8 +18,10 @@ package org.proton.plug.context.server;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.buffer.PooledByteBufAllocator;
|
import io.netty.buffer.PooledByteBufAllocator;
|
||||||
|
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
import org.apache.qpid.proton.amqp.Symbol;
|
import org.apache.qpid.proton.amqp.Symbol;
|
||||||
import org.apache.qpid.proton.amqp.messaging.Rejected;
|
import org.apache.qpid.proton.amqp.messaging.Rejected;
|
||||||
|
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
|
||||||
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
|
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
|
||||||
import org.apache.qpid.proton.engine.Delivery;
|
import org.apache.qpid.proton.engine.Delivery;
|
||||||
import org.apache.qpid.proton.engine.Receiver;
|
import org.apache.qpid.proton.engine.Receiver;
|
||||||
|
@ -130,7 +132,13 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext {
|
||||||
|
|
||||||
receiver.advance();
|
receiver.advance();
|
||||||
|
|
||||||
sessionSPI.serverSend(receiver, delivery, address, delivery.getMessageFormat(), buffer);
|
Transaction tx = null;
|
||||||
|
if (delivery.getRemoteState() instanceof TransactionalState) {
|
||||||
|
|
||||||
|
TransactionalState txState = (TransactionalState) delivery.getRemoteState();
|
||||||
|
tx = this.sessionSPI.getTransaction(txState.getTxnId());
|
||||||
|
}
|
||||||
|
sessionSPI.serverSend(tx, receiver, delivery, address, delivery.getMessageFormat(), buffer);
|
||||||
|
|
||||||
flow(maxCreditAllocation, minCreditRefresh);
|
flow(maxCreditAllocation, minCreditRefresh);
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.util.Objects;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.server.QueueQueryResult;
|
import org.apache.activemq.artemis.core.server.QueueQueryResult;
|
||||||
|
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
|
||||||
import org.apache.activemq.artemis.selector.filter.FilterException;
|
import org.apache.activemq.artemis.selector.filter.FilterException;
|
||||||
import org.apache.activemq.artemis.selector.impl.SelectorParser;
|
import org.apache.activemq.artemis.selector.impl.SelectorParser;
|
||||||
|
@ -339,7 +340,9 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
|
||||||
if (remoteState != null) {
|
if (remoteState != null) {
|
||||||
// If we are transactional then we need ack if the msg has been accepted
|
// If we are transactional then we need ack if the msg has been accepted
|
||||||
if (remoteState instanceof TransactionalState) {
|
if (remoteState instanceof TransactionalState) {
|
||||||
|
|
||||||
TransactionalState txState = (TransactionalState) remoteState;
|
TransactionalState txState = (TransactionalState) remoteState;
|
||||||
|
Transaction tx = this.sessionSPI.getTransaction(txState.getTxnId());
|
||||||
if (txState.getOutcome() != null) {
|
if (txState.getOutcome() != null) {
|
||||||
Outcome outcome = txState.getOutcome();
|
Outcome outcome = txState.getOutcome();
|
||||||
if (outcome instanceof Accepted) {
|
if (outcome instanceof Accepted) {
|
||||||
|
@ -353,7 +356,7 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
|
||||||
//we have to individual ack as we can't guarantee we will get the delivery updates (including acks) in order
|
//we have to individual ack as we can't guarantee we will get the delivery updates (including acks) in order
|
||||||
// from dealer, a perf hit but a must
|
// from dealer, a perf hit but a must
|
||||||
try {
|
try {
|
||||||
sessionSPI.ack(brokerConsumer, message);
|
sessionSPI.ack(tx, brokerConsumer, message);
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
|
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
|
||||||
|
@ -365,7 +368,7 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
|
||||||
//we have to individual ack as we can't guarantee we will get the delivery updates (including acks) in order
|
//we have to individual ack as we can't guarantee we will get the delivery updates (including acks) in order
|
||||||
// from dealer, a perf hit but a must
|
// from dealer, a perf hit but a must
|
||||||
try {
|
try {
|
||||||
sessionSPI.ack(brokerConsumer, message);
|
sessionSPI.ack(null, brokerConsumer, message);
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
|
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.proton.plug.context.server;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.qpid.proton.amqp.Symbol;
|
||||||
import org.apache.qpid.proton.amqp.transaction.Coordinator;
|
import org.apache.qpid.proton.amqp.transaction.Coordinator;
|
||||||
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
|
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
|
||||||
import org.apache.qpid.proton.engine.Receiver;
|
import org.apache.qpid.proton.engine.Receiver;
|
||||||
|
@ -60,6 +61,11 @@ public class ProtonServerSessionContext extends AbstractProtonSessionContext {
|
||||||
|
|
||||||
public void addTransactionHandler(Coordinator coordinator, Receiver receiver) {
|
public void addTransactionHandler(Coordinator coordinator, Receiver receiver) {
|
||||||
ProtonTransactionHandler transactionHandler = new ProtonTransactionHandler(sessionSPI);
|
ProtonTransactionHandler transactionHandler = new ProtonTransactionHandler(sessionSPI);
|
||||||
|
|
||||||
|
coordinator.setCapabilities(Symbol.getSymbol("amqp:local-transactions"),
|
||||||
|
Symbol.getSymbol("amqp:multi-txns-per-ssn"),
|
||||||
|
Symbol.getSymbol("amqp:multi-ssns-per-txn"));
|
||||||
|
|
||||||
receiver.setContext(transactionHandler);
|
receiver.setContext(transactionHandler);
|
||||||
receiver.open();
|
receiver.open();
|
||||||
receiver.flow(100);
|
receiver.flow(100);
|
||||||
|
|
|
@ -74,4 +74,7 @@ public interface ActiveMQAMQPProtocolMessageBundle {
|
||||||
@Message(id = 219013, value = "error committing coordinator: {0}", format = Message.Format.MESSAGE_FORMAT)
|
@Message(id = 219013, value = "error committing coordinator: {0}", format = Message.Format.MESSAGE_FORMAT)
|
||||||
ActiveMQAMQPIllegalStateException errorCommittingCoordinator(String message);
|
ActiveMQAMQPIllegalStateException errorCommittingCoordinator(String message);
|
||||||
|
|
||||||
|
@Message(id = 219014, value = "Transaction not found: xid={0}", format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
ActiveMQAMQPIllegalStateException txNotFound(String xidToString);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,9 @@ import java.util.concurrent.Executors;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
|
import org.apache.qpid.proton.amqp.Binary;
|
||||||
import org.apache.qpid.proton.engine.Connection;
|
import org.apache.qpid.proton.engine.Connection;
|
||||||
import org.apache.qpid.proton.engine.Link;
|
import org.apache.qpid.proton.engine.Link;
|
||||||
import org.apache.qpid.proton.engine.Session;
|
import org.apache.qpid.proton.engine.Session;
|
||||||
|
@ -77,6 +79,21 @@ public class AbstractConnectionContextTest {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Binary newTransaction() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeTransaction(Binary txid) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onTransport(ByteBuf bytes, AMQPConnectionContext connection) {
|
public void onTransport(ByteBuf bytes, AMQPConnectionContext connection) {
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,9 @@ import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
|
import org.apache.qpid.proton.amqp.Binary;
|
||||||
import org.apache.qpid.proton.engine.Connection;
|
import org.apache.qpid.proton.engine.Connection;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
import org.proton.plug.AMQPConnectionContext;
|
import org.proton.plug.AMQPConnectionContext;
|
||||||
|
@ -29,6 +31,7 @@ import org.proton.plug.AMQPSessionCallback;
|
||||||
import org.proton.plug.SASLResult;
|
import org.proton.plug.SASLResult;
|
||||||
import org.proton.plug.ServerSASL;
|
import org.proton.plug.ServerSASL;
|
||||||
import org.proton.plug.context.server.ProtonServerConnectionContext;
|
import org.proton.plug.context.server.ProtonServerConnectionContext;
|
||||||
|
import org.proton.plug.exceptions.ActiveMQAMQPException;
|
||||||
import org.proton.plug.sasl.AnonymousServerSASL;
|
import org.proton.plug.sasl.AnonymousServerSASL;
|
||||||
import org.proton.plug.sasl.ServerSASLPlain;
|
import org.proton.plug.sasl.ServerSASLPlain;
|
||||||
import org.proton.plug.test.minimalserver.MinimalSessionSPI;
|
import org.proton.plug.test.minimalserver.MinimalSessionSPI;
|
||||||
|
@ -132,6 +135,21 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Binary newTransaction() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeTransaction(Binary txid) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
class ReturnSPI implements AMQPConnectionCallback {
|
class ReturnSPI implements AMQPConnectionCallback {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -139,6 +157,21 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Binary newTransaction() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeTransaction(Binary txid) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ServerSASL[] getSASLMechnisms() {
|
public ServerSASL[] getSASLMechnisms() {
|
||||||
return new ServerSASL[]{new AnonymousServerSASL(), new ServerSASLPlain()};
|
return new ServerSASL[]{new AnonymousServerSASL(), new ServerSASLPlain()};
|
||||||
|
|
|
@ -22,6 +22,8 @@ import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
import io.netty.channel.ChannelFutureListener;
|
import io.netty.channel.ChannelFutureListener;
|
||||||
|
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
|
import org.apache.qpid.proton.amqp.Binary;
|
||||||
import org.apache.qpid.proton.engine.Connection;
|
import org.apache.qpid.proton.engine.Connection;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
import org.proton.plug.AMQPConnectionContext;
|
import org.proton.plug.AMQPConnectionContext;
|
||||||
|
@ -29,6 +31,7 @@ import org.proton.plug.AMQPConnectionCallback;
|
||||||
import org.proton.plug.AMQPSessionCallback;
|
import org.proton.plug.AMQPSessionCallback;
|
||||||
import org.proton.plug.SASLResult;
|
import org.proton.plug.SASLResult;
|
||||||
import org.proton.plug.ServerSASL;
|
import org.proton.plug.ServerSASL;
|
||||||
|
import org.proton.plug.exceptions.ActiveMQAMQPException;
|
||||||
import org.proton.plug.sasl.AnonymousServerSASL;
|
import org.proton.plug.sasl.AnonymousServerSASL;
|
||||||
import org.proton.plug.sasl.ServerSASLPlain;
|
import org.proton.plug.sasl.ServerSASLPlain;
|
||||||
import org.proton.plug.util.ByteUtil;
|
import org.proton.plug.util.ByteUtil;
|
||||||
|
@ -74,6 +77,21 @@ public class AMQPClientSPI implements AMQPConnectionCallback {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Binary newTransaction() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeTransaction(Binary txid) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean validateConnection(Connection connection, SASLResult saslResult) {
|
public boolean validateConnection(Connection connection, SASLResult saslResult) {
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -24,7 +24,9 @@ import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
import io.netty.channel.ChannelFutureListener;
|
import io.netty.channel.ChannelFutureListener;
|
||||||
|
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
|
import org.apache.qpid.proton.amqp.Binary;
|
||||||
import org.apache.qpid.proton.engine.Connection;
|
import org.apache.qpid.proton.engine.Connection;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
import org.proton.plug.AMQPConnectionContext;
|
import org.proton.plug.AMQPConnectionContext;
|
||||||
|
@ -32,6 +34,7 @@ import org.proton.plug.AMQPConnectionCallback;
|
||||||
import org.proton.plug.AMQPSessionCallback;
|
import org.proton.plug.AMQPSessionCallback;
|
||||||
import org.proton.plug.SASLResult;
|
import org.proton.plug.SASLResult;
|
||||||
import org.proton.plug.ServerSASL;
|
import org.proton.plug.ServerSASL;
|
||||||
|
import org.proton.plug.exceptions.ActiveMQAMQPException;
|
||||||
import org.proton.plug.sasl.AnonymousServerSASL;
|
import org.proton.plug.sasl.AnonymousServerSASL;
|
||||||
import org.proton.plug.sasl.ServerSASLPlain;
|
import org.proton.plug.sasl.ServerSASLPlain;
|
||||||
import org.proton.plug.util.ByteUtil;
|
import org.proton.plug.util.ByteUtil;
|
||||||
|
@ -87,6 +90,21 @@ public class MinimalConnectionSPI implements AMQPConnectionCallback {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Binary newTransaction() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeTransaction(Binary txid) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) {
|
public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) {
|
||||||
final int bufferSize = bytes.writerIndex();
|
final int bufferSize = bytes.writerIndex();
|
||||||
|
|
|
@ -22,7 +22,10 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
|
||||||
import org.apache.activemq.artemis.core.server.QueueQueryResult;
|
import org.apache.activemq.artemis.core.server.QueueQueryResult;
|
||||||
|
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
|
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
|
||||||
import org.apache.qpid.proton.amqp.Binary;
|
import org.apache.qpid.proton.amqp.Binary;
|
||||||
import org.apache.qpid.proton.engine.Delivery;
|
import org.apache.qpid.proton.engine.Delivery;
|
||||||
import org.apache.qpid.proton.engine.Receiver;
|
import org.apache.qpid.proton.engine.Receiver;
|
||||||
|
@ -123,16 +126,23 @@ public class MinimalSessionSPI implements AMQPSessionCallback {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Binary getCurrentTXID() {
|
public Transaction getTransaction(Binary txid) {
|
||||||
return new Binary(new byte[]{1});
|
return new TransactionImpl(new NullStorageManager());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void commitCurrentTX() {
|
public Binary newTransaction() {
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void rollbackCurrentTX(boolean lastMessage) {
|
public void commitTX(Binary txid) throws Exception {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void rollbackTX(Binary txid, boolean lastMessageReceived) throws Exception {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -141,7 +151,7 @@ public class MinimalSessionSPI implements AMQPSessionCallback {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void ack(Object brokerConsumer, Object message) {
|
public void ack(Transaction tx, Object brokerConsumer, Object message) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -157,7 +167,7 @@ public class MinimalSessionSPI implements AMQPSessionCallback {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void serverSend(Receiver receiver, Delivery delivery, String address, int messageFormat, ByteBuf buffer) {
|
public void serverSend(Transaction tx, Receiver receiver, Delivery delivery, String address, int messageFormat, ByteBuf buffer) {
|
||||||
ProtonServerMessage serverMessage = new ProtonServerMessage();
|
ProtonServerMessage serverMessage = new ProtonServerMessage();
|
||||||
serverMessage.decode(buffer.nioBuffer());
|
serverMessage.decode(buffer.nioBuffer());
|
||||||
|
|
||||||
|
|
|
@ -133,6 +133,8 @@ public interface ServerSession extends SecurityAuth {
|
||||||
|
|
||||||
void sendContinuations(int packetSize, long totalBodySize, byte[] body, boolean continues) throws Exception;
|
void sendContinuations(int packetSize, long totalBodySize, byte[] body, boolean continues) throws Exception;
|
||||||
|
|
||||||
|
RoutingStatus send(Transaction tx, ServerMessage message, boolean direct, boolean noAutoCreateQueue) throws Exception;
|
||||||
|
|
||||||
RoutingStatus send(ServerMessage message, boolean direct, boolean noAutoCreateQueue) throws Exception;
|
RoutingStatus send(ServerMessage message, boolean direct, boolean noAutoCreateQueue) throws Exception;
|
||||||
|
|
||||||
RoutingStatus send(ServerMessage message, boolean direct) throws Exception;
|
RoutingStatus send(ServerMessage message, boolean direct) throws Exception;
|
||||||
|
|
|
@ -1256,6 +1256,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RoutingStatus send(final ServerMessage message, final boolean direct, boolean noAutoCreateQueue) throws Exception {
|
public RoutingStatus send(final ServerMessage message, final boolean direct, boolean noAutoCreateQueue) throws Exception {
|
||||||
|
return send(getCurrentTransaction(), message, direct, noAutoCreateQueue);
|
||||||
|
}
|
||||||
|
|
||||||
|
public RoutingStatus send(Transaction tx, final ServerMessage message, final boolean direct, boolean noAutoCreateQueue) throws Exception {
|
||||||
|
|
||||||
// If the protocol doesn't support flow control, we have no choice other than fail the communication
|
// If the protocol doesn't support flow control, we have no choice other than fail the communication
|
||||||
if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) {
|
if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) {
|
||||||
|
@ -1308,10 +1312,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||||
if (message.getAddress().equals(managementAddress)) {
|
if (message.getAddress().equals(managementAddress)) {
|
||||||
// It's a management message
|
// It's a management message
|
||||||
|
|
||||||
handleManagementMessage(message, direct);
|
handleManagementMessage(tx, message, direct);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
result = doSend(message, direct, noAutoCreateQueue);
|
result = doSend(tx, message, direct, noAutoCreateQueue);
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
@ -1337,7 +1341,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||||
currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
|
currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
|
||||||
}
|
}
|
||||||
|
|
||||||
doSend(currentLargeMessage, false, false);
|
doSend(tx, currentLargeMessage, false, false);
|
||||||
|
|
||||||
currentLargeMessage = null;
|
currentLargeMessage = null;
|
||||||
}
|
}
|
||||||
|
@ -1526,7 +1530,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||||
started = s;
|
started = s;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleManagementMessage(final ServerMessage message, final boolean direct) throws Exception {
|
private RoutingStatus handleManagementMessage(final Transaction tx, final ServerMessage message, final boolean direct) throws Exception {
|
||||||
try {
|
try {
|
||||||
securityCheck(message.getAddress(), CheckType.MANAGE, this);
|
securityCheck(message.getAddress(), CheckType.MANAGE, this);
|
||||||
}
|
}
|
||||||
|
@ -1544,8 +1548,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||||
if (replyTo != null) {
|
if (replyTo != null) {
|
||||||
reply.setAddress(replyTo);
|
reply.setAddress(replyTo);
|
||||||
|
|
||||||
doSend(reply, direct, false);
|
doSend(tx, reply, direct, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return RoutingStatus.OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void doRollback(final boolean clientFailed,
|
private void doRollback(final boolean clientFailed,
|
||||||
|
@ -1600,7 +1606,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||||
theTx.rollback();
|
theTx.rollback();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected RoutingStatus doSend(final ServerMessage msg, final boolean direct, final boolean noAutoCreateQueue) throws Exception {
|
public RoutingStatus doSend(final Transaction tx, final ServerMessage msg, final boolean direct, final boolean noAutoCreateQueue) throws Exception {
|
||||||
RoutingStatus result = RoutingStatus.OK;
|
RoutingStatus result = RoutingStatus.OK;
|
||||||
// check the user has write access to this address.
|
// check the user has write access to this address.
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -228,7 +228,14 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
serializer.shutdown();
|
serializer.shutdownNow();
|
||||||
|
try {
|
||||||
|
if (!serializer.awaitTermination(10, TimeUnit.SECONDS)) {
|
||||||
|
LOG.warn("Serializer didn't shutdown cleanly");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,7 +60,8 @@ public class AmqpMessage {
|
||||||
* Creates a new AmqpMessage that wraps the information necessary to handle
|
* Creates a new AmqpMessage that wraps the information necessary to handle
|
||||||
* an outgoing message.
|
* an outgoing message.
|
||||||
*
|
*
|
||||||
* @param message the Proton message that is to be sent.
|
* @param message
|
||||||
|
* the Proton message that is to be sent.
|
||||||
*/
|
*/
|
||||||
public AmqpMessage(Message message) {
|
public AmqpMessage(Message message) {
|
||||||
this(null, message, null);
|
this(null, message, null);
|
||||||
|
@ -70,9 +71,12 @@ public class AmqpMessage {
|
||||||
* Creates a new AmqpMessage that wraps the information necessary to handle
|
* Creates a new AmqpMessage that wraps the information necessary to handle
|
||||||
* an incoming delivery.
|
* an incoming delivery.
|
||||||
*
|
*
|
||||||
* @param receiver the AmqpReceiver that received this message.
|
* @param receiver
|
||||||
* @param message the Proton message that was received.
|
* the AmqpReceiver that received this message.
|
||||||
* @param delivery the Delivery instance that produced this message.
|
* @param message
|
||||||
|
* the Proton message that was received.
|
||||||
|
* @param delivery
|
||||||
|
* the Delivery instance that produced this message.
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public AmqpMessage(AmqpReceiver receiver, Message message, Delivery delivery) {
|
public AmqpMessage(AmqpReceiver receiver, Message message, Delivery delivery) {
|
||||||
|
@ -135,11 +139,30 @@ public class AmqpMessage {
|
||||||
receiver.accept(delivery);
|
receiver.accept(delivery);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Accepts the message marking it as consumed on the remote peer.
|
||||||
|
*
|
||||||
|
* @param session
|
||||||
|
* The session that is used to manage acceptance of the message.
|
||||||
|
*
|
||||||
|
* @throws Exception if an error occurs during the accept.
|
||||||
|
*/
|
||||||
|
public void accept(AmqpSession txnSession) throws Exception {
|
||||||
|
if (receiver == null) {
|
||||||
|
throw new IllegalStateException("Can't accept non-received message.");
|
||||||
|
}
|
||||||
|
|
||||||
|
receiver.accept(delivery, txnSession);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Marks the message as Modified, indicating whether it failed to deliver and is not deliverable here.
|
* Marks the message as Modified, indicating whether it failed to deliver and is not deliverable here.
|
||||||
*
|
*
|
||||||
* @param deliveryFailed indicates that the delivery failed for some reason.
|
* @param deliveryFailed
|
||||||
* @param undeliverableHere marks the delivery as not being able to be process by link it was sent to.
|
* indicates that the delivery failed for some reason.
|
||||||
|
* @param undeliverableHere
|
||||||
|
* marks the delivery as not being able to be process by link it was sent to.
|
||||||
|
*
|
||||||
* @throws Exception if an error occurs during the process.
|
* @throws Exception if an error occurs during the process.
|
||||||
*/
|
*/
|
||||||
public void modified(Boolean deliveryFailed, Boolean undeliverableHere) throws Exception {
|
public void modified(Boolean deliveryFailed, Boolean undeliverableHere) throws Exception {
|
||||||
|
@ -165,10 +188,36 @@ public class AmqpMessage {
|
||||||
|
|
||||||
//----- Convenience methods for constructing outbound messages -----------//
|
//----- Convenience methods for constructing outbound messages -----------//
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the address which is applied to the AMQP message To field in the message properties
|
||||||
|
*
|
||||||
|
* @param address
|
||||||
|
* The address that should be applied in the Message To field.
|
||||||
|
*/
|
||||||
|
public void setAddress(String address) {
|
||||||
|
checkReadOnly();
|
||||||
|
lazyCreateProperties();
|
||||||
|
getWrappedMessage().setAddress(address);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the set address that was set in the Message To field.
|
||||||
|
*
|
||||||
|
* @return the set address String form or null if not set.
|
||||||
|
*/
|
||||||
|
public String getAddress() {
|
||||||
|
if (message.getProperties() == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return message.getProperties().getTo();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets the MessageId property on an outbound message using the provided String
|
* Sets the MessageId property on an outbound message using the provided String
|
||||||
*
|
*
|
||||||
* @param messageId the String message ID value to set.
|
* @param messageId
|
||||||
|
* the String message ID value to set.
|
||||||
*/
|
*/
|
||||||
public void setMessageId(String messageId) {
|
public void setMessageId(String messageId) {
|
||||||
checkReadOnly();
|
checkReadOnly();
|
||||||
|
@ -207,7 +256,8 @@ public class AmqpMessage {
|
||||||
/**
|
/**
|
||||||
* Sets the MessageId property on an outbound message using the provided value
|
* Sets the MessageId property on an outbound message using the provided value
|
||||||
*
|
*
|
||||||
* @param messageId the message ID value to set.
|
* @param messageId
|
||||||
|
* the message ID value to set.
|
||||||
*/
|
*/
|
||||||
public void setRawMessageId(Object messageId) {
|
public void setRawMessageId(Object messageId) {
|
||||||
checkReadOnly();
|
checkReadOnly();
|
||||||
|
@ -218,7 +268,8 @@ public class AmqpMessage {
|
||||||
/**
|
/**
|
||||||
* Sets the CorrelationId property on an outbound message using the provided String
|
* Sets the CorrelationId property on an outbound message using the provided String
|
||||||
*
|
*
|
||||||
* @param correlationId the String Correlation ID value to set.
|
* @param correlationId
|
||||||
|
* the String Correlation ID value to set.
|
||||||
*/
|
*/
|
||||||
public void setCorrelationId(String correlationId) {
|
public void setCorrelationId(String correlationId) {
|
||||||
checkReadOnly();
|
checkReadOnly();
|
||||||
|
@ -257,7 +308,8 @@ public class AmqpMessage {
|
||||||
/**
|
/**
|
||||||
* Sets the CorrelationId property on an outbound message using the provided value
|
* Sets the CorrelationId property on an outbound message using the provided value
|
||||||
*
|
*
|
||||||
* @param correlationId the correlation ID value to set.
|
* @param correlationId
|
||||||
|
* the correlation ID value to set.
|
||||||
*/
|
*/
|
||||||
public void setRawCorrelationId(Object correlationId) {
|
public void setRawCorrelationId(Object correlationId) {
|
||||||
checkReadOnly();
|
checkReadOnly();
|
||||||
|
@ -268,7 +320,8 @@ public class AmqpMessage {
|
||||||
/**
|
/**
|
||||||
* Sets the GroupId property on an outbound message using the provided String
|
* Sets the GroupId property on an outbound message using the provided String
|
||||||
*
|
*
|
||||||
* @param groupId the String Group ID value to set.
|
* @param messageId
|
||||||
|
* the String Group ID value to set.
|
||||||
*/
|
*/
|
||||||
public void setGroupId(String groupId) {
|
public void setGroupId(String groupId) {
|
||||||
checkReadOnly();
|
checkReadOnly();
|
||||||
|
@ -293,7 +346,8 @@ public class AmqpMessage {
|
||||||
/**
|
/**
|
||||||
* Sets the durable header on the outgoing message.
|
* Sets the durable header on the outgoing message.
|
||||||
*
|
*
|
||||||
* @param durable the boolean durable value to set.
|
* @param durable
|
||||||
|
* the boolean durable value to set.
|
||||||
*/
|
*/
|
||||||
public void setDurable(boolean durable) {
|
public void setDurable(boolean durable) {
|
||||||
checkReadOnly();
|
checkReadOnly();
|
||||||
|
@ -318,8 +372,10 @@ public class AmqpMessage {
|
||||||
/**
|
/**
|
||||||
* Sets a given application property on an outbound message.
|
* Sets a given application property on an outbound message.
|
||||||
*
|
*
|
||||||
* @param key the name to assign the new property.
|
* @param key
|
||||||
* @param value the value to set for the named property.
|
* the name to assign the new property.
|
||||||
|
* @param value
|
||||||
|
* the value to set for the named property.
|
||||||
*/
|
*/
|
||||||
public void setApplicationProperty(String key, Object value) {
|
public void setApplicationProperty(String key, Object value) {
|
||||||
checkReadOnly();
|
checkReadOnly();
|
||||||
|
@ -331,8 +387,10 @@ public class AmqpMessage {
|
||||||
* Gets the application property that is mapped to the given name or null
|
* Gets the application property that is mapped to the given name or null
|
||||||
* if no property has been set with that name.
|
* if no property has been set with that name.
|
||||||
*
|
*
|
||||||
* @param key the name used to lookup the property in the application properties.
|
* @param key
|
||||||
* @return the propety value or null if not set.
|
* the name used to lookup the property in the application properties.
|
||||||
|
*
|
||||||
|
* @return the property value or null if not set.
|
||||||
*/
|
*/
|
||||||
public Object getApplicationProperty(String key) {
|
public Object getApplicationProperty(String key) {
|
||||||
if (applicationPropertiesMap == null) {
|
if (applicationPropertiesMap == null) {
|
||||||
|
@ -346,8 +404,10 @@ public class AmqpMessage {
|
||||||
* Perform a proper annotation set on the AMQP Message based on a Symbol key and
|
* Perform a proper annotation set on the AMQP Message based on a Symbol key and
|
||||||
* the target value to append to the current annotations.
|
* the target value to append to the current annotations.
|
||||||
*
|
*
|
||||||
* @param key The name of the Symbol whose value is being set.
|
* @param key
|
||||||
* @param value The new value to set in the annotations of this message.
|
* The name of the Symbol whose value is being set.
|
||||||
|
* @param value
|
||||||
|
* The new value to set in the annotations of this message.
|
||||||
*/
|
*/
|
||||||
public void setMessageAnnotation(String key, Object value) {
|
public void setMessageAnnotation(String key, Object value) {
|
||||||
checkReadOnly();
|
checkReadOnly();
|
||||||
|
@ -360,7 +420,9 @@ public class AmqpMessage {
|
||||||
* that annotation name. If the message annotations have not been created yet
|
* that annotation name. If the message annotations have not been created yet
|
||||||
* then this method will always return null.
|
* then this method will always return null.
|
||||||
*
|
*
|
||||||
* @param key the Symbol name that should be looked up in the message annotations.
|
* @param key
|
||||||
|
* the Symbol name that should be looked up in the message annotations.
|
||||||
|
*
|
||||||
* @return the value of the annotation if it exists, or null if not set or not accessible.
|
* @return the value of the annotation if it exists, or null if not set or not accessible.
|
||||||
*/
|
*/
|
||||||
public Object getMessageAnnotation(String key) {
|
public Object getMessageAnnotation(String key) {
|
||||||
|
@ -375,8 +437,10 @@ public class AmqpMessage {
|
||||||
* Perform a proper delivery annotation set on the AMQP Message based on a Symbol
|
* Perform a proper delivery annotation set on the AMQP Message based on a Symbol
|
||||||
* key and the target value to append to the current delivery annotations.
|
* key and the target value to append to the current delivery annotations.
|
||||||
*
|
*
|
||||||
* @param key The name of the Symbol whose value is being set.
|
* @param key
|
||||||
* @param value The new value to set in the delivery annotations of this message.
|
* The name of the Symbol whose value is being set.
|
||||||
|
* @param value
|
||||||
|
* The new value to set in the delivery annotations of this message.
|
||||||
*/
|
*/
|
||||||
public void setDeliveryAnnotation(String key, Object value) {
|
public void setDeliveryAnnotation(String key, Object value) {
|
||||||
checkReadOnly();
|
checkReadOnly();
|
||||||
|
@ -389,7 +453,9 @@ public class AmqpMessage {
|
||||||
* that annotation name. If the message annotations have not been created yet
|
* that annotation name. If the message annotations have not been created yet
|
||||||
* then this method will always return null.
|
* then this method will always return null.
|
||||||
*
|
*
|
||||||
* @param key the Symbol name that should be looked up in the message annotations.
|
* @param key
|
||||||
|
* the Symbol name that should be looked up in the message annotations.
|
||||||
|
*
|
||||||
* @return the value of the annotation if it exists, or null if not set or not accessible.
|
* @return the value of the annotation if it exists, or null if not set or not accessible.
|
||||||
*/
|
*/
|
||||||
public Object getDeliveryAnnotation(String key) {
|
public Object getDeliveryAnnotation(String key) {
|
||||||
|
@ -406,7 +472,9 @@ public class AmqpMessage {
|
||||||
* Sets a String value into the body of an outgoing Message, throws
|
* Sets a String value into the body of an outgoing Message, throws
|
||||||
* an exception if this is an incoming message instance.
|
* an exception if this is an incoming message instance.
|
||||||
*
|
*
|
||||||
* @param value the String value to store in the Message body.
|
* @param value
|
||||||
|
* the String value to store in the Message body.
|
||||||
|
*
|
||||||
* @throws IllegalStateException if the message is read only.
|
* @throws IllegalStateException if the message is read only.
|
||||||
*/
|
*/
|
||||||
public void setText(String value) throws IllegalStateException {
|
public void setText(String value) throws IllegalStateException {
|
||||||
|
@ -419,7 +487,9 @@ public class AmqpMessage {
|
||||||
* Sets a byte array value into the body of an outgoing Message, throws
|
* Sets a byte array value into the body of an outgoing Message, throws
|
||||||
* an exception if this is an incoming message instance.
|
* an exception if this is an incoming message instance.
|
||||||
*
|
*
|
||||||
* @param bytes the byte array value to store in the Message body.
|
* @param value
|
||||||
|
* the byte array value to store in the Message body.
|
||||||
|
*
|
||||||
* @throws IllegalStateException if the message is read only.
|
* @throws IllegalStateException if the message is read only.
|
||||||
*/
|
*/
|
||||||
public void setBytes(byte[] bytes) throws IllegalStateException {
|
public void setBytes(byte[] bytes) throws IllegalStateException {
|
||||||
|
@ -432,7 +502,9 @@ public class AmqpMessage {
|
||||||
* Sets a byte array value into the body of an outgoing Message, throws
|
* Sets a byte array value into the body of an outgoing Message, throws
|
||||||
* an exception if this is an incoming message instance.
|
* an exception if this is an incoming message instance.
|
||||||
*
|
*
|
||||||
* @param described the byte array value to store in the Message body.
|
* @param value
|
||||||
|
* the byte array value to store in the Message body.
|
||||||
|
*
|
||||||
* @throws IllegalStateException if the message is read only.
|
* @throws IllegalStateException if the message is read only.
|
||||||
*/
|
*/
|
||||||
public void setDescribedType(DescribedType described) throws IllegalStateException {
|
public void setDescribedType(DescribedType described) throws IllegalStateException {
|
||||||
|
@ -445,6 +517,7 @@ public class AmqpMessage {
|
||||||
* Attempts to retrieve the message body as an DescribedType instance.
|
* Attempts to retrieve the message body as an DescribedType instance.
|
||||||
*
|
*
|
||||||
* @return an DescribedType instance if one is stored in the message body.
|
* @return an DescribedType instance if one is stored in the message body.
|
||||||
|
*
|
||||||
* @throws NoSuchElementException if the body does not contain a DescribedType.
|
* @throws NoSuchElementException if the body does not contain a DescribedType.
|
||||||
*/
|
*/
|
||||||
public DescribedType getDescribedType() throws NoSuchElementException {
|
public DescribedType getDescribedType() throws NoSuchElementException {
|
||||||
|
@ -482,21 +555,21 @@ public class AmqpMessage {
|
||||||
|
|
||||||
private void lazyCreateMessageAnnotations() {
|
private void lazyCreateMessageAnnotations() {
|
||||||
if (messageAnnotationsMap == null) {
|
if (messageAnnotationsMap == null) {
|
||||||
messageAnnotationsMap = new HashMap<>();
|
messageAnnotationsMap = new HashMap<Symbol, Object>();
|
||||||
message.setMessageAnnotations(new MessageAnnotations(messageAnnotationsMap));
|
message.setMessageAnnotations(new MessageAnnotations(messageAnnotationsMap));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void lazyCreateDeliveryAnnotations() {
|
private void lazyCreateDeliveryAnnotations() {
|
||||||
if (deliveryAnnotationsMap == null) {
|
if (deliveryAnnotationsMap == null) {
|
||||||
deliveryAnnotationsMap = new HashMap<>();
|
deliveryAnnotationsMap = new HashMap<Symbol, Object>();
|
||||||
message.setDeliveryAnnotations(new DeliveryAnnotations(deliveryAnnotationsMap));
|
message.setDeliveryAnnotations(new DeliveryAnnotations(deliveryAnnotationsMap));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void lazyCreateApplicationProperties() {
|
private void lazyCreateApplicationProperties() {
|
||||||
if (applicationPropertiesMap == null) {
|
if (applicationPropertiesMap == null) {
|
||||||
applicationPropertiesMap = new HashMap<>();
|
applicationPropertiesMap = new HashMap<String, Object>();
|
||||||
message.setApplicationProperties(new ApplicationProperties(applicationPropertiesMap));
|
message.setApplicationProperties(new ApplicationProperties(applicationPropertiesMap));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -208,6 +208,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
|
||||||
* it is returned immediately otherwise this methods return null without waiting.
|
* it is returned immediately otherwise this methods return null without waiting.
|
||||||
*
|
*
|
||||||
* @return a newly received message or null if there is no currently available message.
|
* @return a newly received message or null if there is no currently available message.
|
||||||
|
*
|
||||||
* @throws Exception if an error occurs during the receive attempt.
|
* @throws Exception if an error occurs during the receive attempt.
|
||||||
*/
|
*/
|
||||||
public AmqpMessage receiveNoWait() throws Exception {
|
public AmqpMessage receiveNoWait() throws Exception {
|
||||||
|
@ -219,6 +220,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
|
||||||
* Request a remote peer send a Message to this client waiting until one arrives.
|
* Request a remote peer send a Message to this client waiting until one arrives.
|
||||||
*
|
*
|
||||||
* @return the pulled AmqpMessage or null if none was pulled from the remote.
|
* @return the pulled AmqpMessage or null if none was pulled from the remote.
|
||||||
|
*
|
||||||
* @throws IOException if an error occurs
|
* @throws IOException if an error occurs
|
||||||
*/
|
*/
|
||||||
public AmqpMessage pull() throws IOException {
|
public AmqpMessage pull() throws IOException {
|
||||||
|
@ -402,12 +404,38 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
|
||||||
* @throws IOException if an error occurs while sending the accept.
|
* @throws IOException if an error occurs while sending the accept.
|
||||||
*/
|
*/
|
||||||
public void accept(final Delivery delivery) throws IOException {
|
public void accept(final Delivery delivery) throws IOException {
|
||||||
|
accept(delivery, this.session);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Accepts a message that was dispatched under the given Delivery instance.
|
||||||
|
*
|
||||||
|
* This method allows for the session that is used in the accept to be specified by the
|
||||||
|
* caller. This allows for an accepted message to be involved in a transaction that is
|
||||||
|
* being managed by some other session other than the one that created this receiver.
|
||||||
|
*
|
||||||
|
* @param delivery
|
||||||
|
* the Delivery instance to accept.
|
||||||
|
* @param session
|
||||||
|
* the session under which the message is being accepted.
|
||||||
|
*
|
||||||
|
* @throws IOException if an error occurs while sending the accept.
|
||||||
|
*/
|
||||||
|
public void accept(final Delivery delivery, final AmqpSession session) throws IOException {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
|
|
||||||
if (delivery == null) {
|
if (delivery == null) {
|
||||||
throw new IllegalArgumentException("Delivery to accept cannot be null");
|
throw new IllegalArgumentException("Delivery to accept cannot be null");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (session == null) {
|
||||||
|
throw new IllegalArgumentException("Session given cannot be null");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (session.getConnection() != this.session.getConnection()) {
|
||||||
|
throw new IllegalArgumentException("The session used for accept must originate from the connection that created this receiver.");
|
||||||
|
}
|
||||||
|
|
||||||
final ClientFuture request = new ClientFuture();
|
final ClientFuture request = new ClientFuture();
|
||||||
session.getScheduler().execute(new Runnable() {
|
session.getScheduler().execute(new Runnable() {
|
||||||
|
|
||||||
|
|
|
@ -117,6 +117,18 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
|
||||||
* @throws IOException if an error occurs during the send.
|
* @throws IOException if an error occurs during the send.
|
||||||
*/
|
*/
|
||||||
public void send(final AmqpMessage message) throws IOException {
|
public void send(final AmqpMessage message) throws IOException {
|
||||||
|
checkClosed();
|
||||||
|
send(message, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sends the given message to this senders assigned address using the supplied transaction ID.
|
||||||
|
*
|
||||||
|
* @param message the message to send.
|
||||||
|
* @param txId the transaction ID to assign the outgoing send.
|
||||||
|
* @throws IOException if an error occurs during the send.
|
||||||
|
*/
|
||||||
|
public void send(final AmqpMessage message, final AmqpTransactionId txId) throws IOException {
|
||||||
checkClosed();
|
checkClosed();
|
||||||
final ClientFuture sendRequest = new ClientFuture();
|
final ClientFuture sendRequest = new ClientFuture();
|
||||||
|
|
||||||
|
@ -125,7 +137,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
doSend(message, sendRequest);
|
doSend(message, sendRequest, txId);
|
||||||
session.pumpToProtonTransport(sendRequest);
|
session.pumpToProtonTransport(sendRequest);
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
|
@ -316,7 +328,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void doSend(AmqpMessage message, AsyncResult request) throws Exception {
|
private void doSend(AmqpMessage message, AsyncResult request, AmqpTransactionId txId) throws Exception {
|
||||||
LOG.trace("Producer sending message: {}", message);
|
LOG.trace("Producer sending message: {}", message);
|
||||||
|
|
||||||
Delivery delivery = null;
|
Delivery delivery = null;
|
||||||
|
@ -330,8 +342,15 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
|
||||||
|
|
||||||
delivery.setContext(request);
|
delivery.setContext(request);
|
||||||
|
|
||||||
if (session.isInTransaction()) {
|
Binary amqpTxId = null;
|
||||||
Binary amqpTxId = session.getTransactionId().getRemoteTxId();
|
if (txId != null) {
|
||||||
|
amqpTxId = txId.getRemoteTxId();
|
||||||
|
}
|
||||||
|
else if (session.isInTransaction()) {
|
||||||
|
amqpTxId = session.getTransactionId().getRemoteTxId();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (amqpTxId != null) {
|
||||||
TransactionalState state = new TransactionalState();
|
TransactionalState state = new TransactionalState();
|
||||||
state.setTxnId(amqpTxId);
|
state.setTxnId(amqpTxId);
|
||||||
delivery.disposition(state);
|
delivery.disposition(state);
|
||||||
|
|
|
@ -6,7 +6,7 @@
|
||||||
* (the "License"); you may not use this file except in compliance with
|
* (the "License"); you may not use this file except in compliance with
|
||||||
* the License. You may obtain a copy of the License at
|
* the License. You may obtain a copy of the License at
|
||||||
*
|
*
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
*
|
*
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
@ -408,11 +408,15 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
|
||||||
connection.pumpToProtonTransport(request);
|
connection.pumpToProtonTransport(request);
|
||||||
}
|
}
|
||||||
|
|
||||||
AmqpTransactionId getTransactionId() {
|
public AmqpTransactionId getTransactionId() {
|
||||||
return txContext.getTransactionId();
|
if (txContext != null && txContext.isInTransaction()) {
|
||||||
|
return txContext.getTransactionId();
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public AmqpTransactionContext getTransactionContext() {
|
AmqpTransactionContext getTransactionContext() {
|
||||||
return txContext;
|
return txContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,194 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.artemis.tests.integration.amqp;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test support class for tests that will be using the AMQP Proton wrapper client.
|
||||||
|
* This is to make it easier to migrate tests from ActiveMQ5
|
||||||
|
*/
|
||||||
|
public class AmqpClientTestSupport extends ActiveMQTestBase {
|
||||||
|
|
||||||
|
|
||||||
|
ActiveMQServer server;
|
||||||
|
|
||||||
|
LinkedList<AmqpConnection> connections = new LinkedList<>();
|
||||||
|
|
||||||
|
|
||||||
|
protected AmqpConnection addConnection(AmqpConnection connection) {
|
||||||
|
connections.add(connection);
|
||||||
|
return connection;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
server = createServer(true, true);
|
||||||
|
server.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
super.tearDown();
|
||||||
|
|
||||||
|
for (AmqpConnection conn: connections) {
|
||||||
|
try {
|
||||||
|
conn.close();
|
||||||
|
}
|
||||||
|
catch (Throwable ignored) {
|
||||||
|
ignored.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
server.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Queue getProxyToQueue(String queueName) {
|
||||||
|
return server.locateQueue(SimpleString.toSimpleString(queueName));
|
||||||
|
}
|
||||||
|
|
||||||
|
private String connectorScheme = "amqp";
|
||||||
|
private boolean useSSL;
|
||||||
|
|
||||||
|
public String getTestName() {
|
||||||
|
return "jms.queue." + getName();
|
||||||
|
}
|
||||||
|
|
||||||
|
public AmqpClientTestSupport() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public AmqpClientTestSupport(String connectorScheme, boolean useSSL) {
|
||||||
|
this.connectorScheme = connectorScheme;
|
||||||
|
this.useSSL = useSSL;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getConnectorScheme() {
|
||||||
|
return connectorScheme;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isUseSSL() {
|
||||||
|
return useSSL;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getAmqpConnectionURIOptions() {
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean isUseTcpConnector() {
|
||||||
|
return !isUseSSL() && !connectorScheme.contains("nio") && !connectorScheme.contains("ws");
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean isUseSslConnector() {
|
||||||
|
return isUseSSL() && !connectorScheme.contains("nio") && !connectorScheme.contains("wss");
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean isUseNioConnector() {
|
||||||
|
return !isUseSSL() && connectorScheme.contains("nio");
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean isUseNioPlusSslConnector() {
|
||||||
|
return isUseSSL() && connectorScheme.contains("nio");
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean isUseWsConnector() {
|
||||||
|
return !isUseSSL() && connectorScheme.contains("ws");
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean isUseWssConnector() {
|
||||||
|
return isUseSSL() && connectorScheme.contains("wss");
|
||||||
|
}
|
||||||
|
|
||||||
|
public URI getBrokerAmqpConnectionURI() {
|
||||||
|
boolean webSocket = false;
|
||||||
|
|
||||||
|
try {
|
||||||
|
int port = 61616;
|
||||||
|
|
||||||
|
String uri = null;
|
||||||
|
|
||||||
|
if (isUseSSL()) {
|
||||||
|
if (webSocket) {
|
||||||
|
uri = "wss://127.0.0.1:" + port;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
uri = "ssl://127.0.0.1:" + port;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
if (webSocket) {
|
||||||
|
uri = "ws://127.0.0.1:" + port;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
uri = "tcp://127.0.0.1:" + port;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!getAmqpConnectionURIOptions().isEmpty()) {
|
||||||
|
uri = uri + "?" + getAmqpConnectionURIOptions();
|
||||||
|
}
|
||||||
|
|
||||||
|
return new URI(uri);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw new RuntimeException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public AmqpConnection createAmqpConnection() throws Exception {
|
||||||
|
return createAmqpConnection(getBrokerAmqpConnectionURI());
|
||||||
|
}
|
||||||
|
|
||||||
|
public AmqpConnection createAmqpConnection(String username, String password) throws Exception {
|
||||||
|
return createAmqpConnection(getBrokerAmqpConnectionURI(), username, password);
|
||||||
|
}
|
||||||
|
|
||||||
|
public AmqpConnection createAmqpConnection(URI brokerURI) throws Exception {
|
||||||
|
return createAmqpConnection(brokerURI, null, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public AmqpConnection createAmqpConnection(URI brokerURI, String username, String password) throws Exception {
|
||||||
|
return createAmqpClient(brokerURI, username, password).connect();
|
||||||
|
}
|
||||||
|
|
||||||
|
public AmqpClient createAmqpClient() throws Exception {
|
||||||
|
return createAmqpClient(getBrokerAmqpConnectionURI(), null, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public AmqpClient createAmqpClient(URI brokerURI) throws Exception {
|
||||||
|
return createAmqpClient(brokerURI, null, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public AmqpClient createAmqpClient(String username, String password) throws Exception {
|
||||||
|
return createAmqpClient(getBrokerAmqpConnectionURI(), username, password);
|
||||||
|
}
|
||||||
|
|
||||||
|
public AmqpClient createAmqpClient(URI brokerURI, String username, String password) throws Exception {
|
||||||
|
return new AmqpClient(brokerURI, username, password);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,625 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.artemis.tests.integration.amqp;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpSender;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test various aspects of Transaction support.
|
||||||
|
*/
|
||||||
|
public class AmqpTransactionTest extends AmqpClientTestSupport {
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void createQueue() throws Exception {
|
||||||
|
server.createQueue(SimpleString.toSimpleString(getTestName()), SimpleString.toSimpleString(getTestName()), null, true, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testBeginAndCommitTransaction() throws Exception {
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
assertNotNull(session);
|
||||||
|
|
||||||
|
session.begin();
|
||||||
|
assertTrue(session.isInTransaction());
|
||||||
|
session.commit();
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testBeginAndRollbackTransaction() throws Exception {
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
assertNotNull(session);
|
||||||
|
|
||||||
|
session.begin();
|
||||||
|
assertTrue(session.isInTransaction());
|
||||||
|
session.rollback();
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
|
||||||
|
System.err.println("Closed");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testSendMessageToQueueWithCommit() throws Exception {
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
|
||||||
|
AmqpSender sender = session.createSender(getTestName());
|
||||||
|
final Queue queue = getProxyToQueue(getTestName());
|
||||||
|
|
||||||
|
session.begin();
|
||||||
|
|
||||||
|
AmqpMessage message = new AmqpMessage();
|
||||||
|
message.setText("Test-Message");
|
||||||
|
sender.send(message);
|
||||||
|
|
||||||
|
assertEquals(0, queue.getMessageCount());
|
||||||
|
|
||||||
|
session.commit();
|
||||||
|
|
||||||
|
assertEquals(1, queue.getMessageCount());
|
||||||
|
|
||||||
|
sender.close();
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testSendMessageToQueueWithRollback() throws Exception {
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
|
||||||
|
AmqpSender sender = session.createSender(getTestName());
|
||||||
|
final Queue queue = getProxyToQueue(getTestName());
|
||||||
|
|
||||||
|
session.begin();
|
||||||
|
|
||||||
|
AmqpMessage message = new AmqpMessage();
|
||||||
|
message.setText("Test-Message");
|
||||||
|
sender.send(message);
|
||||||
|
|
||||||
|
assertEquals(0, queue.getMessageCount());
|
||||||
|
|
||||||
|
session.rollback();
|
||||||
|
|
||||||
|
assertEquals(0, queue.getMessageCount());
|
||||||
|
|
||||||
|
sender.close();
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testReceiveMessageWithCommit() throws Exception {
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
|
||||||
|
AmqpSender sender = session.createSender(getTestName());
|
||||||
|
final Queue queue = getProxyToQueue(getTestName());
|
||||||
|
|
||||||
|
AmqpMessage message = new AmqpMessage();
|
||||||
|
message.setText("Test-Message");
|
||||||
|
sender.send(message);
|
||||||
|
|
||||||
|
assertEquals(1, queue.getMessageCount());
|
||||||
|
|
||||||
|
AmqpReceiver receiver = session.createReceiver(getTestName());
|
||||||
|
|
||||||
|
session.begin();
|
||||||
|
|
||||||
|
receiver.flow(1);
|
||||||
|
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
|
||||||
|
assertNotNull(received);
|
||||||
|
received.accept();
|
||||||
|
|
||||||
|
session.commit();
|
||||||
|
|
||||||
|
assertEquals(0, queue.getMessageCount());
|
||||||
|
|
||||||
|
sender.close();
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testReceiveAfterConnectionClose() throws Exception {
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
|
||||||
|
AmqpSender sender = session.createSender(getTestName());
|
||||||
|
final Queue queue = getProxyToQueue(getTestName());
|
||||||
|
|
||||||
|
AmqpMessage message = new AmqpMessage();
|
||||||
|
message.setText("Test-Message");
|
||||||
|
sender.send(message);
|
||||||
|
|
||||||
|
assertEquals(1, queue.getMessageCount());
|
||||||
|
|
||||||
|
AmqpReceiver receiver = session.createReceiver(getTestName());
|
||||||
|
|
||||||
|
session.begin();
|
||||||
|
|
||||||
|
receiver.flow(1);
|
||||||
|
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
|
||||||
|
assertNotNull(received);
|
||||||
|
received.accept();
|
||||||
|
|
||||||
|
// this will force a rollback on the TX (It should at least)
|
||||||
|
connection.close();
|
||||||
|
|
||||||
|
connection = addConnection(client.connect());
|
||||||
|
session = connection.createSession();
|
||||||
|
receiver = session.createReceiver(getTestName());
|
||||||
|
session.begin();
|
||||||
|
receiver.flow(1);
|
||||||
|
|
||||||
|
received = receiver.receive(5, TimeUnit.SECONDS);
|
||||||
|
assertNotNull(received);
|
||||||
|
received.accept();
|
||||||
|
|
||||||
|
session.commit();
|
||||||
|
|
||||||
|
assertEquals(0, queue.getMessageCount());
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testReceiveMessageWithRollback() throws Exception {
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
|
||||||
|
AmqpSender sender = session.createSender(getTestName());
|
||||||
|
final Queue queue = getProxyToQueue(getTestName());
|
||||||
|
|
||||||
|
AmqpMessage message = new AmqpMessage();
|
||||||
|
message.setText("Test-Message");
|
||||||
|
sender.send(message);
|
||||||
|
|
||||||
|
assertEquals(1, queue.getMessageCount());
|
||||||
|
|
||||||
|
AmqpReceiver receiver = session.createReceiver(getTestName());
|
||||||
|
|
||||||
|
session.begin();
|
||||||
|
|
||||||
|
receiver.flow(1);
|
||||||
|
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
|
||||||
|
assertNotNull(received);
|
||||||
|
received.accept();
|
||||||
|
|
||||||
|
session.rollback();
|
||||||
|
|
||||||
|
assertEquals(1, queue.getMessageCount());
|
||||||
|
|
||||||
|
sender.close();
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testMultipleSessionReceiversInSingleTXNWithCommit() throws Exception {
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
|
||||||
|
// Load up the Queue with some messages
|
||||||
|
{
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
AmqpSender sender = session.createSender(getTestName());
|
||||||
|
AmqpMessage message = new AmqpMessage();
|
||||||
|
message.setText("Test-Message");
|
||||||
|
sender.send(message);
|
||||||
|
sender.send(message);
|
||||||
|
sender.send(message);
|
||||||
|
sender.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Root TXN session controls all TXN send lifetimes.
|
||||||
|
AmqpSession txnSession = connection.createSession();
|
||||||
|
|
||||||
|
// Create some sender sessions
|
||||||
|
AmqpSession session1 = connection.createSession();
|
||||||
|
AmqpSession session2 = connection.createSession();
|
||||||
|
AmqpSession session3 = connection.createSession();
|
||||||
|
|
||||||
|
// Sender linked to each session
|
||||||
|
AmqpReceiver receiver1 = session1.createReceiver(getTestName());
|
||||||
|
AmqpReceiver receiver2 = session2.createReceiver(getTestName());
|
||||||
|
AmqpReceiver receiver3 = session3.createReceiver(getTestName());
|
||||||
|
|
||||||
|
final Queue queue = getProxyToQueue(getTestName());
|
||||||
|
assertEquals(3, queue.getMessageCount());
|
||||||
|
|
||||||
|
// Begin the transaction that all senders will operate in.
|
||||||
|
txnSession.begin();
|
||||||
|
|
||||||
|
assertTrue(txnSession.isInTransaction());
|
||||||
|
|
||||||
|
receiver1.flow(1);
|
||||||
|
receiver2.flow(1);
|
||||||
|
receiver3.flow(1);
|
||||||
|
|
||||||
|
AmqpMessage message1 = receiver1.receive(5, TimeUnit.SECONDS);
|
||||||
|
AmqpMessage message2 = receiver2.receive(5, TimeUnit.SECONDS);
|
||||||
|
AmqpMessage message3 = receiver3.receive(5, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
message1.accept(txnSession);
|
||||||
|
message2.accept(txnSession);
|
||||||
|
message3.accept(txnSession);
|
||||||
|
|
||||||
|
assertEquals(3, queue.getMessageCount());
|
||||||
|
|
||||||
|
txnSession.commit();
|
||||||
|
|
||||||
|
assertEquals(0, queue.getMessageCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testMultipleSessionReceiversInSingleTXNWithRollback() throws Exception {
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
|
||||||
|
// Load up the Queue with some messages
|
||||||
|
{
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
AmqpSender sender = session.createSender(getTestName());
|
||||||
|
AmqpMessage message = new AmqpMessage();
|
||||||
|
message.setText("Test-Message");
|
||||||
|
sender.send(message);
|
||||||
|
sender.send(message);
|
||||||
|
sender.send(message);
|
||||||
|
sender.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Root TXN session controls all TXN send lifetimes.
|
||||||
|
AmqpSession txnSession = connection.createSession();
|
||||||
|
|
||||||
|
// Create some sender sessions
|
||||||
|
AmqpSession session1 = connection.createSession();
|
||||||
|
AmqpSession session2 = connection.createSession();
|
||||||
|
AmqpSession session3 = connection.createSession();
|
||||||
|
|
||||||
|
// Sender linked to each session
|
||||||
|
AmqpReceiver receiver1 = session1.createReceiver(getTestName());
|
||||||
|
AmqpReceiver receiver2 = session2.createReceiver(getTestName());
|
||||||
|
AmqpReceiver receiver3 = session3.createReceiver(getTestName());
|
||||||
|
|
||||||
|
final Queue queue = getProxyToQueue(getTestName());
|
||||||
|
assertEquals(3, queue.getMessageCount());
|
||||||
|
|
||||||
|
// Begin the transaction that all senders will operate in.
|
||||||
|
txnSession.begin();
|
||||||
|
|
||||||
|
assertTrue(txnSession.isInTransaction());
|
||||||
|
|
||||||
|
receiver1.flow(1);
|
||||||
|
receiver2.flow(1);
|
||||||
|
receiver3.flow(1);
|
||||||
|
|
||||||
|
AmqpMessage message1 = receiver1.receive(5, TimeUnit.SECONDS);
|
||||||
|
AmqpMessage message2 = receiver2.receive(5, TimeUnit.SECONDS);
|
||||||
|
AmqpMessage message3 = receiver3.receive(5, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
message1.accept(txnSession);
|
||||||
|
message2.accept(txnSession);
|
||||||
|
message3.accept(txnSession);
|
||||||
|
|
||||||
|
assertEquals(3, queue.getMessageCount());
|
||||||
|
|
||||||
|
txnSession.rollback();
|
||||||
|
|
||||||
|
assertEquals(3, queue.getMessageCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testMultipleSessionSendersInSingleTXNWithCommit() throws Exception {
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
|
||||||
|
// Root TXN session controls all TXN send lifetimes.
|
||||||
|
AmqpSession txnSession = connection.createSession();
|
||||||
|
|
||||||
|
// Create some sender sessions
|
||||||
|
AmqpSession session1 = connection.createSession();
|
||||||
|
AmqpSession session2 = connection.createSession();
|
||||||
|
AmqpSession session3 = connection.createSession();
|
||||||
|
|
||||||
|
// Sender linked to each session
|
||||||
|
AmqpSender sender1 = session1.createSender(getTestName());
|
||||||
|
AmqpSender sender2 = session2.createSender(getTestName());
|
||||||
|
AmqpSender sender3 = session3.createSender(getTestName());
|
||||||
|
|
||||||
|
final Queue queue = getProxyToQueue(getTestName());
|
||||||
|
assertEquals(0, queue.getMessageCount());
|
||||||
|
|
||||||
|
// Begin the transaction that all senders will operate in.
|
||||||
|
txnSession.begin();
|
||||||
|
|
||||||
|
AmqpMessage message = new AmqpMessage();
|
||||||
|
message.setText("Test-Message");
|
||||||
|
|
||||||
|
assertTrue(txnSession.isInTransaction());
|
||||||
|
|
||||||
|
sender1.send(message, txnSession.getTransactionId());
|
||||||
|
sender2.send(message, txnSession.getTransactionId());
|
||||||
|
sender3.send(message, txnSession.getTransactionId());
|
||||||
|
|
||||||
|
assertEquals(0, queue.getMessageCount());
|
||||||
|
|
||||||
|
txnSession.commit();
|
||||||
|
|
||||||
|
assertEquals(3, queue.getMessageCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testMultipleSessionSendersInSingleTXNWithRollback() throws Exception {
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
|
||||||
|
// Root TXN session controls all TXN send lifetimes.
|
||||||
|
AmqpSession txnSession = connection.createSession();
|
||||||
|
|
||||||
|
// Create some sender sessions
|
||||||
|
AmqpSession session1 = connection.createSession();
|
||||||
|
AmqpSession session2 = connection.createSession();
|
||||||
|
AmqpSession session3 = connection.createSession();
|
||||||
|
|
||||||
|
// Sender linked to each session
|
||||||
|
AmqpSender sender1 = session1.createSender(getTestName());
|
||||||
|
AmqpSender sender2 = session2.createSender(getTestName());
|
||||||
|
AmqpSender sender3 = session3.createSender(getTestName());
|
||||||
|
|
||||||
|
final Queue queue = getProxyToQueue(getTestName());
|
||||||
|
assertEquals(0, queue.getMessageCount());
|
||||||
|
|
||||||
|
// Begin the transaction that all senders will operate in.
|
||||||
|
txnSession.begin();
|
||||||
|
|
||||||
|
AmqpMessage message = new AmqpMessage();
|
||||||
|
message.setText("Test-Message");
|
||||||
|
|
||||||
|
assertTrue(txnSession.isInTransaction());
|
||||||
|
|
||||||
|
sender1.send(message, txnSession.getTransactionId());
|
||||||
|
sender2.send(message, txnSession.getTransactionId());
|
||||||
|
sender3.send(message, txnSession.getTransactionId());
|
||||||
|
|
||||||
|
assertEquals(0, queue.getMessageCount());
|
||||||
|
|
||||||
|
txnSession.rollback();
|
||||||
|
|
||||||
|
assertEquals(0, queue.getMessageCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
//----- Tests Ported from AmqpNetLite client -----------------------------//
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testSendersCommitAndRollbackWithMultipleSessionsInSingleTX() throws Exception {
|
||||||
|
final int NUM_MESSAGES = 5;
|
||||||
|
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
|
||||||
|
// Root TXN session controls all TXN send lifetimes.
|
||||||
|
AmqpSession txnSession = connection.createSession();
|
||||||
|
|
||||||
|
// Normal Session which won't create an TXN itself
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
|
||||||
|
AmqpSender sender = session.createSender(getTestName());
|
||||||
|
|
||||||
|
// Commit TXN work from a sender.
|
||||||
|
txnSession.begin();
|
||||||
|
for (int i = 0; i < NUM_MESSAGES; ++i) {
|
||||||
|
AmqpMessage message = new AmqpMessage();
|
||||||
|
message.setText("Test-Message");
|
||||||
|
sender.send(message, txnSession.getTransactionId());
|
||||||
|
}
|
||||||
|
txnSession.commit();
|
||||||
|
|
||||||
|
// Rollback an additional batch of TXN work from a sender.
|
||||||
|
txnSession.begin();
|
||||||
|
for (int i = 0; i < NUM_MESSAGES; ++i) {
|
||||||
|
AmqpMessage message = new AmqpMessage();
|
||||||
|
message.setText("Test-Message");
|
||||||
|
sender.send(message, txnSession.getTransactionId());
|
||||||
|
}
|
||||||
|
txnSession.rollback();
|
||||||
|
|
||||||
|
// Commit more TXN work from a sender.
|
||||||
|
txnSession.begin();
|
||||||
|
for (int i = 0; i < NUM_MESSAGES; ++i) {
|
||||||
|
AmqpMessage message = new AmqpMessage();
|
||||||
|
message.setText("Test-Message");
|
||||||
|
sender.send(message, txnSession.getTransactionId());
|
||||||
|
}
|
||||||
|
txnSession.commit();
|
||||||
|
|
||||||
|
AmqpReceiver receiver = session.createReceiver(getTestName());
|
||||||
|
receiver.flow(NUM_MESSAGES * 2);
|
||||||
|
for (int i = 0; i < NUM_MESSAGES * 2; ++i) {
|
||||||
|
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
|
||||||
|
assertNotNull(message);
|
||||||
|
message.accept(txnSession);
|
||||||
|
}
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testReceiversCommitAndRollbackWithMultipleSessionsInSingleTX() throws Exception {
|
||||||
|
final int NUM_MESSAGES = 10;
|
||||||
|
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
|
||||||
|
// Root TXN session controls all TXN send lifetimes.
|
||||||
|
AmqpSession txnSession = connection.createSession();
|
||||||
|
|
||||||
|
// Normal Session which won't create an TXN itself
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
AmqpSender sender = session.createSender(getTestName());
|
||||||
|
|
||||||
|
for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
|
||||||
|
AmqpMessage message = new AmqpMessage();
|
||||||
|
message.setText("Test-Message");
|
||||||
|
message.setApplicationProperty("msgId", i);
|
||||||
|
sender.send(message, txnSession.getTransactionId());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read all messages from the Queue, do not accept them yet.
|
||||||
|
AmqpReceiver receiver = session.createReceiver(getTestName());
|
||||||
|
ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
|
||||||
|
receiver.flow((NUM_MESSAGES + 2) * 2);
|
||||||
|
for (int i = 0; i < NUM_MESSAGES; ++i) {
|
||||||
|
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
|
||||||
|
assertNotNull(message);
|
||||||
|
messages.add(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Commit half the consumed messages
|
||||||
|
txnSession.begin();
|
||||||
|
for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
|
||||||
|
messages.get(i).accept(txnSession);
|
||||||
|
}
|
||||||
|
txnSession.commit();
|
||||||
|
|
||||||
|
// Rollback the other half the consumed messages
|
||||||
|
txnSession.begin();
|
||||||
|
for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
|
||||||
|
messages.get(i).accept(txnSession);
|
||||||
|
}
|
||||||
|
txnSession.rollback();
|
||||||
|
|
||||||
|
{
|
||||||
|
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
|
||||||
|
assertNotNull(message);
|
||||||
|
assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
|
||||||
|
message.release();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Commit the other half the consumed messages
|
||||||
|
// This is a variation from the .NET client tests which doesn't settle the
|
||||||
|
// messages in the TX until commit is called but on ActiveMQ they will be
|
||||||
|
// redispatched regardless and not stay in the acquired state.
|
||||||
|
txnSession.begin();
|
||||||
|
for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
|
||||||
|
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
|
||||||
|
assertNotNull(message);
|
||||||
|
message.accept();
|
||||||
|
}
|
||||||
|
txnSession.commit();
|
||||||
|
|
||||||
|
// The final message should still be pending.
|
||||||
|
{
|
||||||
|
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
|
||||||
|
receiver.flow(1);
|
||||||
|
assertNotNull(message);
|
||||||
|
assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
|
||||||
|
message.release();
|
||||||
|
}
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testCommitAndRollbackWithMultipleSessionsInSingleTX() throws Exception {
|
||||||
|
final int NUM_MESSAGES = 10;
|
||||||
|
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
|
||||||
|
// Root TXN session controls all TXN send lifetimes.
|
||||||
|
AmqpSession txnSession = connection.createSession();
|
||||||
|
|
||||||
|
// Normal Session which won't create an TXN itself
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
AmqpSender sender = session.createSender(getTestName());
|
||||||
|
|
||||||
|
for (int i = 0; i < NUM_MESSAGES; ++i) {
|
||||||
|
AmqpMessage message = new AmqpMessage();
|
||||||
|
message.setText("Test-Message");
|
||||||
|
message.setApplicationProperty("msgId", i);
|
||||||
|
sender.send(message, txnSession.getTransactionId());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read all messages from the Queue, do not accept them yet.
|
||||||
|
AmqpReceiver receiver = session.createReceiver(getTestName());
|
||||||
|
receiver.flow(2);
|
||||||
|
AmqpMessage message1 = receiver.receive(5, TimeUnit.SECONDS);
|
||||||
|
AmqpMessage message2 = receiver.receive(5, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
// Accept the first one in a TXN and send a new message in that TXN as well
|
||||||
|
txnSession.begin();
|
||||||
|
{
|
||||||
|
message1.accept(txnSession);
|
||||||
|
|
||||||
|
AmqpMessage message = new AmqpMessage();
|
||||||
|
message.setText("Test-Message");
|
||||||
|
message.setApplicationProperty("msgId", NUM_MESSAGES);
|
||||||
|
|
||||||
|
sender.send(message, txnSession.getTransactionId());
|
||||||
|
}
|
||||||
|
txnSession.commit();
|
||||||
|
|
||||||
|
// Accept the second one in a TXN and send a new message in that TXN as well but rollback
|
||||||
|
txnSession.begin();
|
||||||
|
{
|
||||||
|
message2.accept(txnSession);
|
||||||
|
|
||||||
|
AmqpMessage message = new AmqpMessage();
|
||||||
|
message.setText("Test-Message");
|
||||||
|
message.setApplicationProperty("msgId", NUM_MESSAGES + 1);
|
||||||
|
sender.send(message, txnSession.getTransactionId());
|
||||||
|
}
|
||||||
|
txnSession.rollback();
|
||||||
|
|
||||||
|
// Variation here from .NET code, the client settles the accepted message where
|
||||||
|
// the .NET client does not and instead releases here to have it redelivered.
|
||||||
|
|
||||||
|
receiver.flow(NUM_MESSAGES);
|
||||||
|
for (int i = 1; i <= NUM_MESSAGES; ++i) {
|
||||||
|
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
|
||||||
|
assertNotNull(message);
|
||||||
|
assertEquals(i, message.getApplicationProperty("msgId"));
|
||||||
|
message.accept();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should be nothing left.
|
||||||
|
assertNull(receiver.receive(1, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
|
@ -815,7 +815,8 @@ public class ProtonTest extends ProtonTestBase {
|
||||||
request.setText("[]");
|
request.setText("[]");
|
||||||
|
|
||||||
sender.send(request);
|
sender.send(request);
|
||||||
AmqpMessage response = receiver.receive();
|
AmqpMessage response = receiver.receive(50, TimeUnit.SECONDS);
|
||||||
|
Assert.assertNotNull(response);
|
||||||
assertNotNull(response);
|
assertNotNull(response);
|
||||||
Object section = response.getWrappedMessage().getBody();
|
Object section = response.getWrappedMessage().getBody();
|
||||||
assertTrue(section instanceof AmqpValue);
|
assertTrue(section instanceof AmqpValue);
|
||||||
|
|
|
@ -41,7 +41,6 @@ public class ProtonTestBase extends ActiveMQTestBase {
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
super.setUp();
|
super.setUp();
|
||||||
disableCheckThread();
|
|
||||||
|
|
||||||
server = this.createServer(true, true);
|
server = this.createServer(true, true);
|
||||||
HashMap<String, Object> params = new HashMap<>();
|
HashMap<String, Object> params = new HashMap<>();
|
||||||
|
|
|
@ -43,7 +43,6 @@ public class ProtonTestForHeader extends ActiveMQTestBase {
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
super.setUp();
|
super.setUp();
|
||||||
disableCheckThread();
|
|
||||||
server = this.createServer(true, true);
|
server = this.createServer(true, true);
|
||||||
HashMap<String, Object> params = new HashMap<>();
|
HashMap<String, Object> params = new HashMap<>();
|
||||||
params.put(TransportConstants.PORT_PROP_NAME, "5672");
|
params.put(TransportConstants.PORT_PROP_NAME, "5672");
|
||||||
|
@ -61,8 +60,6 @@ public class ProtonTestForHeader extends ActiveMQTestBase {
|
||||||
@After
|
@After
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(250);
|
|
||||||
|
|
||||||
server.stop();
|
server.stop();
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
|
|
Loading…
Reference in New Issue