This closes #1102
This commit is contained in:
commit
224d780622
|
@ -62,7 +62,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
|
|||
|
||||
private static final Logger logger = Logger.getLogger(AMQPConnectionCallback.class);
|
||||
|
||||
private ConcurrentMap<XidImpl, Transaction> transactions = new ConcurrentHashMap<>();
|
||||
private ConcurrentMap<Binary, Transaction> transactions = new ConcurrentHashMap<>();
|
||||
|
||||
private final ProtonProtocolManager manager;
|
||||
|
||||
|
@ -224,25 +224,32 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
|
|||
|
||||
public Binary newTransaction() {
|
||||
XidImpl xid = newXID();
|
||||
Binary binary = new Binary(xid.getGlobalTransactionId());
|
||||
Transaction transaction = new ProtonTransactionImpl(xid, server.getStorageManager(), -1);
|
||||
transactions.put(xid, transaction);
|
||||
return new Binary(xid.getGlobalTransactionId());
|
||||
transactions.put(binary, transaction);
|
||||
return binary;
|
||||
}
|
||||
|
||||
public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
|
||||
XidImpl xid = newXID(txid.getArray());
|
||||
Transaction tx = transactions.get(xid);
|
||||
public Transaction getTransaction(Binary txid, boolean remove) throws ActiveMQAMQPException {
|
||||
Transaction tx;
|
||||
|
||||
if (remove) {
|
||||
tx = transactions.remove(txid);
|
||||
} else {
|
||||
tx = transactions.get(txid);
|
||||
}
|
||||
|
||||
if (tx == null) {
|
||||
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.txNotFound(xid.toString());
|
||||
logger.warn("Couldn't find txid = " + txid);
|
||||
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.txNotFound(txid.toString());
|
||||
}
|
||||
|
||||
return tx;
|
||||
}
|
||||
|
||||
public void removeTransaction(Binary txid) {
|
||||
public Transaction removeTransaction(Binary txid) {
|
||||
XidImpl xid = newXID(txid.getArray());
|
||||
transactions.remove(xid);
|
||||
return transactions.remove(xid);
|
||||
}
|
||||
|
||||
protected XidImpl newXID() {
|
||||
|
|
|
@ -72,6 +72,7 @@ public class AMQPMessage extends RefCountMessage {
|
|||
private DeliveryAnnotations _deliveryAnnotations;
|
||||
private MessageAnnotations _messageAnnotations;
|
||||
private Properties _properties;
|
||||
private int appLocation = -1;
|
||||
private ApplicationProperties applicationProperties;
|
||||
private long scheduledTime = -1;
|
||||
private String connectionID;
|
||||
|
@ -93,7 +94,7 @@ public class AMQPMessage extends RefCountMessage {
|
|||
|
||||
public AMQPMessage(long messageFormat, Message message) {
|
||||
this.messageFormat = messageFormat;
|
||||
this.protonMessage = (MessageImpl)message;
|
||||
this.protonMessage = (MessageImpl) message;
|
||||
|
||||
}
|
||||
|
||||
|
@ -124,7 +125,7 @@ public class AMQPMessage extends RefCountMessage {
|
|||
_deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
|
||||
_properties = new Properties();
|
||||
this.applicationProperties = new ApplicationProperties(new HashMap<>());
|
||||
this.protonMessage = (MessageImpl)Message.Factory.create();
|
||||
this.protonMessage = (MessageImpl) Message.Factory.create();
|
||||
this.protonMessage.setApplicationProperties(applicationProperties);
|
||||
this.protonMessage.setDeliveryAnnotations(_deliveryAnnotations);
|
||||
}
|
||||
|
@ -148,6 +149,20 @@ public class AMQPMessage extends RefCountMessage {
|
|||
|
||||
private ApplicationProperties getApplicationProperties() {
|
||||
parseHeaders();
|
||||
|
||||
if (applicationProperties == null && appLocation >= 0) {
|
||||
ByteBuffer buffer = getBuffer().nioBuffer();
|
||||
buffer.position(appLocation);
|
||||
TLSEncode.getDecoder().setByteBuffer(buffer);
|
||||
Object section = TLSEncode.getDecoder().readObject();
|
||||
if (section instanceof ApplicationProperties) {
|
||||
this.applicationProperties = (ApplicationProperties) section;
|
||||
}
|
||||
this.appLocation = -1;
|
||||
TLSEncode.getDecoder().setByteBuffer(null);
|
||||
|
||||
}
|
||||
|
||||
return applicationProperties;
|
||||
}
|
||||
|
||||
|
@ -161,6 +176,7 @@ public class AMQPMessage extends RefCountMessage {
|
|||
parsedHeaders = true;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public org.apache.activemq.artemis.api.core.Message setConnectionID(String connectionID) {
|
||||
this.connectionID = connectionID;
|
||||
|
@ -172,7 +188,6 @@ public class AMQPMessage extends RefCountMessage {
|
|||
return connectionID;
|
||||
}
|
||||
|
||||
|
||||
public MessageAnnotations getMessageAnnotations() {
|
||||
parseHeaders();
|
||||
return _messageAnnotations;
|
||||
|
@ -202,7 +217,6 @@ public class AMQPMessage extends RefCountMessage {
|
|||
return null;
|
||||
}
|
||||
|
||||
|
||||
private void setSymbol(String symbol, Object value) {
|
||||
setSymbol(Symbol.getSymbol(symbol), value);
|
||||
}
|
||||
|
@ -231,11 +245,9 @@ public class AMQPMessage extends RefCountMessage {
|
|||
return null;
|
||||
} */
|
||||
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public SimpleString getGroupID() {
|
||||
parseHeaders();
|
||||
|
@ -247,7 +259,6 @@ public class AMQPMessage extends RefCountMessage {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Long getScheduledDeliveryTime() {
|
||||
|
||||
|
@ -339,15 +350,19 @@ public class AMQPMessage extends RefCountMessage {
|
|||
this.expiration = _properties.getAbsoluteExpiryTime().getTime();
|
||||
}
|
||||
|
||||
if (buffer.hasRemaining()) {
|
||||
section = (Section) decoder.readObject();
|
||||
} else {
|
||||
section = null;
|
||||
}
|
||||
// We don't read the next section on purpose, as we will parse ApplicationProperties
|
||||
// lazily
|
||||
section = null;
|
||||
}
|
||||
|
||||
if (section instanceof ApplicationProperties) {
|
||||
applicationProperties = (ApplicationProperties) section;
|
||||
} else {
|
||||
if (buffer.hasRemaining()) {
|
||||
this.appLocation = buffer.position();
|
||||
} else {
|
||||
this.appLocation = -1;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
decoder.setByteBuffer(null);
|
||||
|
@ -446,13 +461,11 @@ public class AMQPMessage extends RefCountMessage {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Object getDuplicateProperty() {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public org.apache.activemq.artemis.api.core.Message setDurable(boolean durable) {
|
||||
return null;
|
||||
|
@ -463,7 +476,7 @@ public class AMQPMessage extends RefCountMessage {
|
|||
if (address == null) {
|
||||
Properties properties = getProtonMessage().getProperties();
|
||||
if (properties != null) {
|
||||
return properties.getTo();
|
||||
return properties.getTo();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
|
@ -539,7 +552,7 @@ public class AMQPMessage extends RefCountMessage {
|
|||
header.setDeliveryCount(UnsignedInteger.valueOf(deliveryCount - 1));
|
||||
TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buffer));
|
||||
TLSEncode.getEncoder().writeObject(header);
|
||||
TLSEncode.getEncoder().setByteBuffer((WritableBuffer)null);
|
||||
TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
|
||||
}
|
||||
}
|
||||
buffer.writeBytes(data, sendFrom, data.writerIndex() - sendFrom);
|
||||
|
@ -676,27 +689,27 @@ public class AMQPMessage extends RefCountMessage {
|
|||
|
||||
@Override
|
||||
public Boolean getBooleanProperty(String key) throws ActiveMQPropertyConversionException {
|
||||
return (Boolean)getApplicationPropertiesMap().get(key);
|
||||
return (Boolean) getApplicationPropertiesMap().get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Byte getByteProperty(String key) throws ActiveMQPropertyConversionException {
|
||||
return (Byte)getApplicationPropertiesMap().get(key);
|
||||
return (Byte) getApplicationPropertiesMap().get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Double getDoubleProperty(String key) throws ActiveMQPropertyConversionException {
|
||||
return (Double)getApplicationPropertiesMap().get(key);
|
||||
return (Double) getApplicationPropertiesMap().get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer getIntProperty(String key) throws ActiveMQPropertyConversionException {
|
||||
return (Integer)getApplicationPropertiesMap().get(key);
|
||||
return (Integer) getApplicationPropertiesMap().get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long getLongProperty(String key) throws ActiveMQPropertyConversionException {
|
||||
return (Long)getApplicationPropertiesMap().get(key);
|
||||
return (Long) getApplicationPropertiesMap().get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -712,12 +725,12 @@ public class AMQPMessage extends RefCountMessage {
|
|||
|
||||
@Override
|
||||
public Short getShortProperty(String key) throws ActiveMQPropertyConversionException {
|
||||
return (Short)getApplicationPropertiesMap().get(key);
|
||||
return (Short) getApplicationPropertiesMap().get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Float getFloatProperty(String key) throws ActiveMQPropertyConversionException {
|
||||
return (Float)getApplicationPropertiesMap().get(key);
|
||||
return (Float) getApplicationPropertiesMap().get(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -727,7 +740,7 @@ public class AMQPMessage extends RefCountMessage {
|
|||
} else if (key.equals(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString())) {
|
||||
return getConnectionID();
|
||||
} else {
|
||||
return (String)getApplicationPropertiesMap().get(key);
|
||||
return (String) getApplicationPropertiesMap().get(key);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -747,7 +760,7 @@ public class AMQPMessage extends RefCountMessage {
|
|||
|
||||
@Override
|
||||
public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException {
|
||||
return SimpleString.toSimpleString((String)getApplicationPropertiesMap().get(key));
|
||||
return SimpleString.toSimpleString((String) getApplicationPropertiesMap().get(key));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -842,8 +855,7 @@ public class AMQPMessage extends RefCountMessage {
|
|||
@Override
|
||||
public int getMemoryEstimate() {
|
||||
if (memoryEstimate == -1) {
|
||||
memoryEstimate = memoryOffset +
|
||||
(data != null ? data.capacity() : 0);
|
||||
memoryEstimate = memoryOffset + (data != null ? data.capacity() : 0);
|
||||
}
|
||||
|
||||
return memoryEstimate;
|
||||
|
@ -858,7 +870,6 @@ public class AMQPMessage extends RefCountMessage {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public SimpleString getReplyTo() {
|
||||
if (getProperties() != null) {
|
||||
|
@ -877,7 +888,6 @@ public class AMQPMessage extends RefCountMessage {
|
|||
return this;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public int getPersistSize() {
|
||||
checkBuffer();
|
||||
|
|
|
@ -47,7 +47,6 @@ import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
|
|||
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
|
||||
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
|
||||
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
|
||||
import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl;
|
||||
import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
|
||||
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
|
||||
|
@ -92,6 +91,10 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
|
||||
private final AtomicBoolean draining = new AtomicBoolean(false);
|
||||
|
||||
public Object getProtonLock() {
|
||||
return connection.getLock();
|
||||
}
|
||||
|
||||
public AMQPSessionCallback(AMQPConnectionCallback protonSPI,
|
||||
ProtonProtocolManager manager,
|
||||
AMQPConnectionContext connection,
|
||||
|
@ -382,8 +385,10 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
condition.setDescription(errorMessage);
|
||||
Rejected rejected = new Rejected();
|
||||
rejected.setError(condition);
|
||||
delivery.disposition(rejected);
|
||||
delivery.settle();
|
||||
synchronized (connection.getLock()) {
|
||||
delivery.disposition(rejected);
|
||||
delivery.settle();
|
||||
}
|
||||
connection.flush();
|
||||
}
|
||||
|
||||
|
@ -536,29 +541,14 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
}
|
||||
}
|
||||
|
||||
public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
|
||||
return protonSPI.getTransaction(txid);
|
||||
public Transaction getTransaction(Binary txid, boolean remove) throws ActiveMQAMQPException {
|
||||
return protonSPI.getTransaction(txid, remove);
|
||||
}
|
||||
|
||||
public Binary newTransaction() {
|
||||
return protonSPI.newTransaction();
|
||||
}
|
||||
|
||||
public void commitTX(Binary txid) throws Exception {
|
||||
Transaction tx = protonSPI.getTransaction(txid);
|
||||
tx.commit(true);
|
||||
protonSPI.removeTransaction(txid);
|
||||
}
|
||||
|
||||
public void rollbackTX(Binary txid, boolean lastMessageReceived) throws Exception {
|
||||
Transaction tx = protonSPI.getTransaction(txid);
|
||||
tx.rollback();
|
||||
protonSPI.removeTransaction(txid);
|
||||
}
|
||||
|
||||
public void dischargeTx(Binary txid) throws ActiveMQAMQPException {
|
||||
((ProtonTransactionImpl) protonSPI.getTransaction(txid)).discharge();
|
||||
}
|
||||
|
||||
public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception {
|
||||
return serverSession.getMatchingQueue(address, routingType);
|
||||
|
|
|
@ -142,7 +142,7 @@ public class AMQPSessionContext extends ProtonInitializable {
|
|||
}
|
||||
|
||||
public void addTransactionHandler(Coordinator coordinator, Receiver receiver) {
|
||||
ProtonTransactionHandler transactionHandler = new ProtonTransactionHandler(sessionSPI);
|
||||
ProtonTransactionHandler transactionHandler = new ProtonTransactionHandler(sessionSPI, connection);
|
||||
|
||||
coordinator.setCapabilities(Symbol.getSymbol("amqp:local-transactions"), Symbol.getSymbol("amqp:multi-txns-per-ssn"), Symbol.getSymbol("amqp:multi-ssns-per-txn"));
|
||||
|
||||
|
|
|
@ -155,7 +155,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
|
|||
if (delivery.getRemoteState() instanceof TransactionalState) {
|
||||
|
||||
TransactionalState txState = (TransactionalState) delivery.getRemoteState();
|
||||
tx = this.sessionSPI.getTransaction(txState.getTxnId());
|
||||
tx = this.sessionSPI.getTransaction(txState.getTxnId(), false);
|
||||
}
|
||||
|
||||
sessionSPI.serverSend(tx, receiver, delivery, address, delivery.getMessageFormat(), data);
|
||||
|
@ -201,8 +201,8 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
|
|||
} else {
|
||||
synchronized (connection.getLock()) {
|
||||
receiver.flow(credits);
|
||||
connection.flush();
|
||||
}
|
||||
connection.flush();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -42,7 +42,6 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFound
|
|||
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
|
||||
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
|
||||
import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl;
|
||||
import org.apache.activemq.artemis.protocol.amqp.util.CreditsSemaphore;
|
||||
import org.apache.activemq.artemis.reader.MessageUtil;
|
||||
import org.apache.activemq.artemis.selector.filter.FilterException;
|
||||
import org.apache.activemq.artemis.selector.impl.SelectorParser;
|
||||
|
@ -89,7 +88,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
|||
private boolean multicast;
|
||||
//todo get this from somewhere
|
||||
private RoutingType defaultRoutingType = RoutingType.ANYCAST;
|
||||
protected CreditsSemaphore creditsSemaphore = new CreditsSemaphore(0);
|
||||
private RoutingType routingTypeToUse = defaultRoutingType;
|
||||
private boolean shared = false;
|
||||
private boolean global = false;
|
||||
|
@ -110,7 +108,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
|||
|
||||
@Override
|
||||
public void onFlow(int currentCredits, boolean drain) {
|
||||
this.creditsSemaphore.setCredits(currentCredits);
|
||||
sessionSPI.onFlowConsumer(brokerConsumer, currentCredits, drain);
|
||||
}
|
||||
|
||||
|
@ -496,7 +493,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
|||
if (remoteState instanceof TransactionalState) {
|
||||
|
||||
TransactionalState txState = (TransactionalState) remoteState;
|
||||
ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId());
|
||||
ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId(), false);
|
||||
|
||||
if (txState.getOutcome() != null) {
|
||||
settleImmediate = false;
|
||||
|
@ -590,16 +587,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
|||
return 0;
|
||||
}
|
||||
|
||||
if (!creditsSemaphore.tryAcquire()) {
|
||||
try {
|
||||
creditsSemaphore.acquire();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
// nothing to be done here.. we just keep going
|
||||
throw new IllegalStateException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
// presettle means we can settle the message on the dealer side before we send it, i.e.
|
||||
// for browsers
|
||||
boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
|
||||
|
|
|
@ -18,10 +18,9 @@ package org.apache.activemq.artemis.protocol.amqp.proton.transaction;
|
|||
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
|
||||
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
|
||||
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
|
||||
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
|
||||
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler;
|
||||
import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil;
|
||||
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
|
||||
import org.apache.qpid.proton.amqp.Binary;
|
||||
import org.apache.qpid.proton.amqp.Symbol;
|
||||
import org.apache.qpid.proton.amqp.messaging.Accepted;
|
||||
|
@ -36,9 +35,6 @@ import org.apache.qpid.proton.engine.Receiver;
|
|||
import org.apache.qpid.proton.message.impl.MessageImpl;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
|
||||
/**
|
||||
* handles an amqp Coordinator to deal with transaction boundaries etc
|
||||
*/
|
||||
|
@ -47,17 +43,18 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
|
|||
private static final Logger log = Logger.getLogger(ProtonTransactionHandler.class);
|
||||
|
||||
public static final int DEFAULT_COORDINATOR_CREDIT = 100;
|
||||
public static final int CREDIT_LOW_WATERMARK = 30;
|
||||
|
||||
final AMQPSessionCallback sessionSPI;
|
||||
final AMQPConnectionContext connection;
|
||||
|
||||
public ProtonTransactionHandler(AMQPSessionCallback sessionSPI) {
|
||||
public ProtonTransactionHandler(AMQPSessionCallback sessionSPI, AMQPConnectionContext connection) {
|
||||
this.sessionSPI = sessionSPI;
|
||||
this.connection = connection;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
|
||||
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
|
||||
|
||||
final Receiver receiver;
|
||||
try {
|
||||
receiver = ((Receiver) delivery.getLink());
|
||||
|
@ -66,9 +63,21 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
|
|||
return;
|
||||
}
|
||||
|
||||
receiver.recv(new NettyWritable(buffer));
|
||||
byte[] buffer;
|
||||
|
||||
synchronized (connection.getLock()) {
|
||||
// Replenish coordinator receiver credit on exhaustion so sender can continue
|
||||
// transaction declare and discahrge operations.
|
||||
if (receiver.getCredit() < CREDIT_LOW_WATERMARK) {
|
||||
receiver.flow(DEFAULT_COORDINATOR_CREDIT);
|
||||
}
|
||||
|
||||
buffer = new byte[delivery.available()];
|
||||
receiver.recv(buffer, 0, buffer.length);
|
||||
receiver.advance();
|
||||
}
|
||||
|
||||
|
||||
receiver.advance();
|
||||
|
||||
MessageImpl msg = DeliveryUtil.decodeMessageImpl(buffer);
|
||||
|
||||
|
@ -78,44 +87,47 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
|
|||
Binary txID = sessionSPI.newTransaction();
|
||||
Declared declared = new Declared();
|
||||
declared.setTxnId(txID);
|
||||
delivery.disposition(declared);
|
||||
synchronized (connection.getLock()) {
|
||||
delivery.disposition(declared);
|
||||
}
|
||||
} else if (action instanceof Discharge) {
|
||||
Discharge discharge = (Discharge) action;
|
||||
|
||||
Binary txID = discharge.getTxnId();
|
||||
sessionSPI.dischargeTx(txID);
|
||||
if (discharge.getFail()) {
|
||||
try {
|
||||
sessionSPI.rollbackTX(txID, true);
|
||||
delivery.disposition(new Accepted());
|
||||
} catch (Exception e) {
|
||||
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorRollingbackCoordinator(e.getMessage());
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
sessionSPI.commitTX(txID);
|
||||
delivery.disposition(new Accepted());
|
||||
} catch (ActiveMQAMQPException amqpE) {
|
||||
throw amqpE;
|
||||
} catch (Exception e) {
|
||||
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCommittingCoordinator(e.getMessage());
|
||||
}
|
||||
}
|
||||
ProtonTransactionImpl tx = (ProtonTransactionImpl)sessionSPI.getTransaction(txID, true);
|
||||
tx.discharge();
|
||||
|
||||
// Replenish coordinator receiver credit on exhaustion so sender can continue
|
||||
// transaction declare and discahrge operations.
|
||||
if (receiver.getCredit() == 0) {
|
||||
receiver.flow(DEFAULT_COORDINATOR_CREDIT);
|
||||
if (discharge.getFail()) {
|
||||
tx.rollback();
|
||||
synchronized (connection.getLock()) {
|
||||
delivery.disposition(new Accepted());
|
||||
}
|
||||
connection.flush();
|
||||
} else {
|
||||
tx.commit();
|
||||
synchronized (connection.getLock()) {
|
||||
delivery.disposition(new Accepted());
|
||||
}
|
||||
connection.flush();
|
||||
}
|
||||
}
|
||||
} catch (ActiveMQAMQPException amqpE) {
|
||||
delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage()));
|
||||
} catch (Exception e) {
|
||||
log.warn(amqpE.getMessage(), amqpE);
|
||||
synchronized (connection.getLock()) {
|
||||
delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage()));
|
||||
}
|
||||
connection.flush();
|
||||
} catch (Throwable e) {
|
||||
log.warn(e.getMessage(), e);
|
||||
delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage()));
|
||||
synchronized (connection.getLock()) {
|
||||
delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage()));
|
||||
}
|
||||
connection.flush();
|
||||
} finally {
|
||||
delivery.settle();
|
||||
buffer.release();
|
||||
synchronized (connection.getLock()) {
|
||||
delivery.settle();
|
||||
}
|
||||
connection.flush();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -16,28 +16,14 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.protocol.amqp.util;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import org.apache.qpid.proton.engine.Receiver;
|
||||
import org.apache.qpid.proton.message.Message;
|
||||
import org.apache.qpid.proton.message.impl.MessageImpl;
|
||||
|
||||
public class DeliveryUtil {
|
||||
|
||||
public static int readDelivery(Receiver receiver, ByteBuf buffer) {
|
||||
int initial = buffer.writerIndex();
|
||||
// optimization by norman
|
||||
int count;
|
||||
while ((count = receiver.recv(buffer.array(), buffer.arrayOffset() + buffer.writerIndex(), buffer.writableBytes())) > 0) {
|
||||
// Increment the writer index by the number of bytes written into it while calling recv.
|
||||
buffer.writerIndex(buffer.writerIndex() + count);
|
||||
buffer.ensureWritable(count);
|
||||
}
|
||||
return buffer.writerIndex() - initial;
|
||||
}
|
||||
|
||||
public static MessageImpl decodeMessageImpl(ByteBuf buffer) {
|
||||
public static MessageImpl decodeMessageImpl(byte[] data) {
|
||||
MessageImpl message = (MessageImpl) Message.Factory.create();
|
||||
message.decode(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes());
|
||||
message.decode(data, 0, data.length);
|
||||
return message;
|
||||
}
|
||||
|
||||
|
|
|
@ -5,9 +5,9 @@
|
|||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
|
@ -17,9 +17,20 @@
|
|||
|
||||
package org.apache.activemq.artemis.tests.integration.amqp;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||
|
@ -27,6 +38,8 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
|||
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSender;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||
import org.apache.qpid.jms.JmsConnectionFactory;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
|
@ -788,4 +801,77 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
|
|||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 120000)
|
||||
public void testSendPersistentTX() throws Exception {
|
||||
int MESSAGE_COUNT = 100000;
|
||||
AtomicInteger errors = new AtomicInteger(0);
|
||||
server.createQueue(SimpleString.toSimpleString("q1"), RoutingType.ANYCAST, SimpleString.toSimpleString("q1"), null, true, false, 1, false, true);
|
||||
ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616");
|
||||
Connection sendConnection = factory.createConnection();
|
||||
Connection consumerConnection = factory.createConnection();
|
||||
try {
|
||||
|
||||
Thread receiverThread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
consumerConnection.start();
|
||||
Session consumerSession = consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
javax.jms.Queue q1 = consumerSession.createQueue("q1");
|
||||
|
||||
MessageConsumer consumer = consumerSession.createConsumer(q1);
|
||||
|
||||
for (int i = 1; i <= MESSAGE_COUNT; i++) {
|
||||
Message message = consumer.receive(5000);
|
||||
if (message == null) {
|
||||
throw new IOException("No message read in time.");
|
||||
}
|
||||
|
||||
if (i % 100 == 0) {
|
||||
if (i % 1000 == 0) System.out.println("Read message " + i);
|
||||
consumerSession.commit();
|
||||
}
|
||||
}
|
||||
|
||||
// Assure that all messages are consumed
|
||||
consumerSession.commit();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
errors.incrementAndGet();
|
||||
}
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
receiverThread.start();
|
||||
|
||||
Session sendingSession = sendConnection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
|
||||
javax.jms.Queue q1 = sendingSession.createQueue("q1");
|
||||
MessageProducer producer = sendingSession.createProducer(q1);
|
||||
producer.setDeliveryDelay(DeliveryMode.NON_PERSISTENT);
|
||||
for (int i = 0; i < MESSAGE_COUNT; i++) {
|
||||
producer.send(sendingSession.createTextMessage("message " + i), DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
|
||||
if (i % 100 == 0) {
|
||||
if (i % 1000 == 0) System.out.println("Sending " + i);
|
||||
sendingSession.commit();
|
||||
}
|
||||
}
|
||||
|
||||
sendingSession.commit();
|
||||
|
||||
receiverThread.join(50000);
|
||||
Assert.assertFalse(receiverThread.isAlive());
|
||||
|
||||
Assert.assertEquals(0, errors.get());
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
sendConnection.close();
|
||||
consumerConnection.close();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue