ARTEMIS-1039 Transaction Coordinator credit refill

The coordinator needs to refill credit on the receiver once it has been
exhausted, otherwise the remote cannot send additional declare or
discharge commands to the broker.
This commit is contained in:
Timothy Bish 2017-03-14 16:07:58 -04:00 committed by Clebert Suconic
parent b674c6b846
commit 7282b6890a
3 changed files with 28 additions and 3 deletions

View File

@ -148,7 +148,7 @@ public class AMQPSessionContext extends ProtonInitializable {
receiver.setContext(transactionHandler);
receiver.open();
receiver.flow(100);
receiver.flow(ProtonTransactionHandler.DEFAULT_COORDINATOR_CREDIT);
}
public void addSender(Sender sender) throws Exception {

View File

@ -16,8 +16,6 @@
*/
package org.apache.activemq.artemis.protocol.amqp.proton.transaction;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
@ -38,6 +36,9 @@ import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.jboss.logging.Logger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
/**
* handles an amqp Coordinator to deal with transaction boundaries etc
*/
@ -45,6 +46,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
private static final Logger log = Logger.getLogger(ProtonTransactionHandler.class);
public static final int DEFAULT_COORDINATOR_CREDIT = 100;
final AMQPSessionCallback sessionSPI;
public ProtonTransactionHandler(AMQPSessionCallback sessionSPI) {
@ -98,6 +101,12 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCommittingCoordinator(e.getMessage());
}
}
// Replenish coordinator receiver credit on exhaustion so sender can continue
// transaction declare and discahrge operations.
if (receiver.getCredit() == 0) {
receiver.flow(DEFAULT_COORDINATOR_CREDIT);
}
}
} catch (ActiveMQAMQPException amqpE) {
delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage()));

View File

@ -48,6 +48,22 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
connection.close();
}
@Test(timeout = 30000)
public void testCoordinatorReplenishesCredit() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
assertNotNull(session);
for (int i = 0; i < 1000; ++i) {
session.begin();
assertTrue(session.isInTransaction());
session.commit();
}
connection.close();
}
@Test(timeout = 30000)
public void testBeginAndRollbackTransaction() throws Exception {
AmqpClient client = createAmqpClient();