This commit is contained in:
Clebert Suconic 2017-03-14 17:13:05 -04:00
commit 9e6c40a8de
3 changed files with 28 additions and 3 deletions

View File

@ -148,7 +148,7 @@ public class AMQPSessionContext extends ProtonInitializable {
receiver.setContext(transactionHandler);
receiver.open();
receiver.flow(100);
receiver.flow(ProtonTransactionHandler.DEFAULT_COORDINATOR_CREDIT);
}
public void addSender(Sender sender) throws Exception {

View File

@ -16,8 +16,6 @@
*/
package org.apache.activemq.artemis.protocol.amqp.proton.transaction;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
@ -38,6 +36,9 @@ import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.jboss.logging.Logger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
/**
* handles an amqp Coordinator to deal with transaction boundaries etc
*/
@ -45,6 +46,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
private static final Logger log = Logger.getLogger(ProtonTransactionHandler.class);
public static final int DEFAULT_COORDINATOR_CREDIT = 100;
final AMQPSessionCallback sessionSPI;
public ProtonTransactionHandler(AMQPSessionCallback sessionSPI) {
@ -98,6 +101,12 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCommittingCoordinator(e.getMessage());
}
}
// Replenish coordinator receiver credit on exhaustion so sender can continue
// transaction declare and discahrge operations.
if (receiver.getCredit() == 0) {
receiver.flow(DEFAULT_COORDINATOR_CREDIT);
}
}
} catch (ActiveMQAMQPException amqpE) {
delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage()));

View File

@ -48,6 +48,22 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
connection.close();
}
@Test(timeout = 30000)
public void testCoordinatorReplenishesCredit() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
assertNotNull(session);
for (int i = 0; i < 1000; ++i) {
session.begin();
assertTrue(session.isInTransaction());
session.commit();
}
connection.close();
}
@Test(timeout = 30000)
public void testBeginAndRollbackTransaction() throws Exception {
AmqpClient client = createAmqpClient();