git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1509046 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-07-31 22:05:27 +00:00
parent 55a8ef54b5
commit ed5d841c21
3 changed files with 198 additions and 63 deletions

View File

@ -102,6 +102,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class AmqpProtocolConverter {
static final Logger TRACE_FRAMES = AmqpTransportFilter.TRACE_FRAMES;
public static final EnumSet<EndpointState> UNINITIALIZED_SET = EnumSet.of(EndpointState.UNINITIALIZED);
public static final EnumSet<EndpointState> INITIALIZED_SET = EnumSet.complementOf(UNINITIALIZED_SET);
@ -151,7 +152,6 @@ class AmqpProtocolConverter {
}
}
void pumpProtonToSocket() {
try {
int size = 1024 * 64;
@ -179,6 +179,8 @@ class AmqpProtocolConverter {
long nextProducerId = 0;
long nextConsumerId = 0;
final LinkedList<ConsumerContext> consumers = new LinkedList<ConsumerContext>();
public AmqpSessionContext(ConnectionId connectionId, long id) {
sessionId = new SessionId(connectionId, id);
}
@ -368,7 +370,9 @@ class AmqpProtocolConverter {
} else if (command.isBrokerInfo()) {
// ignore
} else {
LOG.debug("Do not know how to process ActiveMQ Command " + command);
if (LOG.isDebugEnabled()) {
LOG.debug("Do not know how to process ActiveMQ Command " + command);
}
}
}
@ -379,13 +383,16 @@ class AmqpProtocolConverter {
private long nextTempDestinationId = 0;
static abstract class AmqpDeliveryListener {
abstract public void onDelivery(Delivery delivery) throws Exception;
public void onClose() throws Exception {
}
public void onClose() throws Exception {}
public void drainCheck() {
}
public void drainCheck() {}
abstract void doCommit() throws Exception;
abstract void doRollback() throws Exception;
}
private void onConnectionOpen() throws AmqpProtocolException {
@ -505,6 +512,12 @@ class AmqpProtocolConverter {
onMessage(receiver, delivery, buffer);
}
@Override
void doCommit() throws Exception {}
@Override
void doRollback() throws Exception {}
abstract protected void onMessage(Receiver receiver, Delivery delivery, Buffer buffer) throws Exception;
}
@ -573,7 +586,7 @@ class AmqpProtocolConverter {
}
}
long nextTransactionId = 0;
long nextTransactionId = 1;
class Transaction {
}
@ -592,6 +605,7 @@ class AmqpProtocolConverter {
}
AmqpDeliveryListener coordinatorContext = new BaseProducerContext() {
@Override
protected void onMessage(Receiver receiver, final Delivery delivery, Buffer buffer) throws Exception {
@ -605,7 +619,7 @@ class AmqpProtocolConverter {
len -= decoded;
}
Object action = ((AmqpValue) msg.getBody()).getValue();
final Object action = ((AmqpValue) msg.getBody()).getValue();
LOG.debug("COORDINATOR received: " + action + ", [" + buffer + "]");
if (action instanceof Declare) {
Declare declare = (Declare) action;
@ -628,7 +642,7 @@ class AmqpProtocolConverter {
Discharge discharge = (Discharge) action;
long txid = toLong(discharge.getTxnId());
byte operation;
final byte operation;
if (discharge.getFail()) {
if (LOG.isTraceEnabled()) {
LOG.trace("rollback transaction " + txid);
@ -640,6 +654,16 @@ class AmqpProtocolConverter {
}
operation = TransactionInfo.COMMIT_ONE_PHASE;
}
AmqpSessionContext context = (AmqpSessionContext) receiver.getSession().getContext();
for (ConsumerContext consumer : context.consumers) {
if (operation == TransactionInfo.ROLLBACK) {
consumer.doRollback();
} else {
consumer.doCommit();
}
}
TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), operation);
sendToActiveMQ(txinfo, new ResponseHandler() {
@Override
@ -650,10 +674,20 @@ class AmqpProtocolConverter {
rejected.setError(createErrorCondition("failed", er.getException().getMessage()));
delivery.disposition(rejected);
}
if (LOG.isDebugEnabled()) {
LOG.debug("TX: {} settling {}", operation, action);
}
delivery.settle();
pumpProtonToSocket();
}
});
for (ConsumerContext consumer : context.consumers) {
if (operation == TransactionInfo.ROLLBACK) {
consumer.pumpOutbound();
}
}
} else {
throw new Exception("Expected coordinator message type: " + action.getClass());
}
@ -744,6 +778,7 @@ class AmqpProtocolConverter {
public ConsumerInfo info;
private boolean endOfBrowse = false;
protected LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>();
public ConsumerContext(ConsumerId consumerId, Sender sender) {
this.consumerId = consumerId;
@ -789,7 +824,10 @@ class AmqpProtocolConverter {
// called when the connection receives a JMS message from ActiveMQ
public void onMessageDispatch(MessageDispatch md) throws Exception {
if (!closed) {
outbound.addLast(md);
// Lock to prevent stepping on TX redelivery
synchronized (outbound) {
outbound.addLast(md);
}
pumpOutbound();
pumpProtonToSocket();
}
@ -853,7 +891,7 @@ class AmqpProtocolConverter {
}
}
private void settle(final Delivery delivery, int ackType) throws Exception {
private void settle(final Delivery delivery, final int ackType) throws Exception {
byte[] tag = delivery.getTag();
if (tag != null && tag.length > 0) {
checkinTag(tag);
@ -877,11 +915,16 @@ class AmqpProtocolConverter {
if (remoteState != null && remoteState instanceof TransactionalState) {
TransactionalState s = (TransactionalState) remoteState;
long txid = toLong(s.getTxnId());
ack.setTransactionId(new LocalTransactionId(connectionId, txid));
LocalTransactionId localTxId = new LocalTransactionId(connectionId, txid);
ack.setTransactionId(localTxId);
// Store the message sent in this TX we might need to re-send on rollback
md.getMessage().setTransactionId(localTxId);
dispatchedInTx.addFirst(md);
}
if (LOG.isTraceEnabled()) {
LOG.trace("Sending Ack for MessageId:{} to ActiveMQ", ack.getLastMessageId());
LOG.trace("Sending Ack to ActiveMQ: {}", ack);
}
sendToActiveMQ(ack, new ResponseHandler() {
@ -917,46 +960,129 @@ class AmqpProtocolConverter {
@Override
public void onDelivery(Delivery delivery) throws Exception {
MessageDispatch md = (MessageDispatch) delivery.getContext();
final DeliveryState state = delivery.getRemoteState();
if (state instanceof Accepted) {
if (!delivery.remotelySettled()) {
delivery.disposition(new Accepted());
DeliveryState state = delivery.getRemoteState();
if (state instanceof TransactionalState) {
TransactionalState txState = (TransactionalState) state;
if (txState.getOutcome() instanceof DeliveryState) {
if (LOG.isTraceEnabled()) {
LOG.trace("onDelivery: TX delivery state = {}", state);
}
state = (DeliveryState) txState.getOutcome();
if (state instanceof Accepted) {
if (!delivery.remotelySettled()) {
delivery.disposition(new Accepted());
}
settle(delivery, MessageAck.DELIVERED_ACK_TYPE);
}
}
settle(delivery, MessageAck.INDIVIDUAL_ACK_TYPE);
} else if (state instanceof Rejected) {
// re-deliver /w incremented delivery counter.
md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
settle(delivery, -1);
} else if (state instanceof Released) {
// re-deliver && don't increment the counter.
settle(delivery, -1);
} else if (state instanceof Modified) {
Modified modified = (Modified) state;
if (modified.getDeliveryFailed()) {
// increment delivery counter..
} else {
if (state instanceof Accepted) {
if (LOG.isTraceEnabled()) {
LOG.trace("onDelivery: accepted state = {}", state);
}
if (!delivery.remotelySettled()) {
delivery.disposition(new Accepted());
}
settle(delivery, MessageAck.INDIVIDUAL_ACK_TYPE);
} else if (state instanceof Rejected) {
// re-deliver /w incremented delivery counter.
md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
if (LOG.isTraceEnabled()) {
LOG.trace("onDelivery: Rejected state = {}, delivery count now {}", state, md.getRedeliveryCounter());
}
settle(delivery, -1);
} else if (state instanceof Released) {
if (LOG.isTraceEnabled()) {
LOG.trace("onDelivery: Released state = {}", state);
}
// re-deliver && don't increment the counter.
settle(delivery, -1);
} else if (state instanceof Modified) {
Modified modified = (Modified) state;
if (modified.getDeliveryFailed()) {
// increment delivery counter..
md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
}
if (LOG.isTraceEnabled()) {
LOG.trace("onDelivery: Modified state = {}, delivery count now {}", state, md.getRedeliveryCounter());
}
byte ackType = -1;
Boolean undeliverableHere = modified.getUndeliverableHere();
if (undeliverableHere != null && undeliverableHere) {
// receiver does not want the message..
// perhaps we should DLQ it?
ackType = MessageAck.POSION_ACK_TYPE;
}
settle(delivery, ackType);
}
byte ackType = -1;
Boolean undeliverableHere = modified.getUndeliverableHere();
if (undeliverableHere != null && undeliverableHere) {
// receiver does not want the message..
// perhaps we should DLQ it?
ackType = MessageAck.POSION_ACK_TYPE;
}
settle(delivery, ackType);
}
pumpOutbound();
}
@Override
void doCommit() throws Exception {
if (!dispatchedInTx.isEmpty()) {
MessageDispatch md = dispatchedInTx.getFirst();
MessageAck pendingTxAck = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, dispatchedInTx.size());
pendingTxAck.setTransactionId(md.getMessage().getTransactionId());
pendingTxAck.setFirstMessageId(dispatchedInTx.getLast().getMessage().getMessageId());
if (LOG.isTraceEnabled()) {
LOG.trace("Sending commit Ack to ActiveMQ: {}", pendingTxAck);
}
dispatchedInTx.clear();
sendToActiveMQ(pendingTxAck, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
if (response.isException()) {
Throwable exception = ((ExceptionResponse) response).getException();
exception.printStackTrace();
sender.close();
}
}
pumpProtonToSocket();
}
});
}
}
@Override
void doRollback() throws Exception {
synchronized (outbound) {
if (LOG.isTraceEnabled()) {
LOG.trace("Rolling back {} messages for redelivery. ", dispatchedInTx.size());
}
for (MessageDispatch md : dispatchedInTx) {
md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
md.getMessage().setTransactionId(null);
outbound.addFirst(md);
}
dispatchedInTx.clear();
}
}
}
private final ConcurrentHashMap<ConsumerId, ConsumerContext> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, ConsumerContext>();
void onSenderOpen(final Sender sender, AmqpSessionContext sessionContext) {
@SuppressWarnings("rawtypes")
void onSenderOpen(final Sender sender, final AmqpSessionContext sessionContext) {
org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source) sender.getRemoteSource();
try {
final ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++);
ConsumerContext consumerContext = new ConsumerContext(id, sender);
final ConsumerContext consumerContext = new ConsumerContext(id, sender);
sender.setContext(consumerContext);
String selector = null;
@ -1062,6 +1188,7 @@ class AmqpProtocolConverter {
subscriptionsByConsumerId.remove(id);
sender.close();
} else {
sessionContext.consumers.add(consumerContext);
sender.open();
}
pumpProtonToSocket();

View File

@ -114,7 +114,7 @@ public class AmqpTestSupport {
p.send(message);
}
p.close();
session.close();
}
protected BrokerViewMBean getProxyToBroker() throws MalformedObjectNameException, JMSException {

View File

@ -37,7 +37,9 @@ import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.objectweb.jtests.jms.framework.TestConfig;
/**
@ -45,11 +47,13 @@ import org.objectweb.jtests.jms.framework.TestConfig;
*/
public class JMSClientTest extends AmqpTestSupport {
@Rule public TestName name = new TestName();
@SuppressWarnings("rawtypes")
@Test
public void testProducerConsume() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
QueueImpl queue = new QueueImpl("queue://txqueue");
QueueImpl queue = new QueueImpl("queue://" + name);
Connection connection = createConnection();
{
@ -78,32 +82,29 @@ public class JMSClientTest extends AmqpTestSupport {
@Test
public void testTransactedConsumer() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
QueueImpl queue = new QueueImpl("queue://txqueue");
final int msgCount = 10;
QueueImpl queue = new QueueImpl("queue://" + name);
final int msgCount = 1;
Connection connection = createConnection();
sendMessages(connection, queue, msgCount);
QueueViewMBean queueView = getProxyToQueue("txqueue");
QueueViewMBean queueView = getProxyToQueue(name.toString());
LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
assertEquals(msgCount, queueView.getQueueSize());
// Consumer all in TX and commit.
{
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < msgCount; ++i) {
Message msg = consumer.receive(TestConfig.TIMEOUT);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
}
Message msg = consumer.receive(TestConfig.TIMEOUT);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
consumer.close();
session.commit();
}
LOG.info("Queue size before session commit is: {}", queueView.getQueueSize());
assertEquals(msgCount, queueView.getQueueSize());
LOG.info("Queue size after consumer commit is: {}", queueView.getQueueSize());
session.commit();
LOG.info("Queue size after session commit is: {}", queueView.getQueueSize());
assertEquals(0, queueView.getQueueSize());
connection.close();
@ -113,13 +114,13 @@ public class JMSClientTest extends AmqpTestSupport {
public void testRollbackRececeivedMessage() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
QueueImpl queue = new QueueImpl("queue://txqueue");
QueueImpl queue = new QueueImpl("queue://" + name);
final int msgCount = 1;
Connection connection = createConnection();
sendMessages(connection, queue, msgCount);
QueueViewMBean queueView = getProxyToQueue("txqueue");
QueueViewMBean queueView = getProxyToQueue(name.toString());
LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
assertEquals(msgCount, queueView.getQueueSize());
@ -128,6 +129,7 @@ public class JMSClientTest extends AmqpTestSupport {
// Receive and roll back, first receive should not show redelivered.
Message msg = consumer.receive(TestConfig.TIMEOUT);
LOG.info("Test received msg: {}", msg);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
assertEquals(false, msg.getJMSRedelivered());
@ -147,19 +149,22 @@ public class JMSClientTest extends AmqpTestSupport {
LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
assertEquals(0, queueView.getQueueSize());
session.close();
connection.close();
}
@Test
public void testTXConsumerAndLargeNumberOfMessages() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
QueueImpl queue = new QueueImpl("queue://txqueue");
QueueImpl queue = new QueueImpl("queue://" + name);
final int msgCount = 500;
Connection connection = createConnection();
sendMessages(connection, queue, msgCount);
QueueViewMBean queueView = getProxyToQueue("txqueue");
QueueViewMBean queueView = getProxyToQueue(name.toString());
LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
assertEquals(msgCount, queueView.getQueueSize());
@ -177,10 +182,13 @@ public class JMSClientTest extends AmqpTestSupport {
assertTrue(msg instanceof TextMessage);
}
consumer.close();
session.commit();
consumer.close();
session.close();
}
connection.close();
LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
assertEquals(0, queueView.getQueueSize());
}
@ -189,7 +197,7 @@ public class JMSClientTest extends AmqpTestSupport {
@Test
public void testSelectors() throws Exception{
ActiveMQAdmin.enableJMSFrameTracing();
QueueImpl queue = new QueueImpl("queue://txqueue");
QueueImpl queue = new QueueImpl("queue://" + name);
Connection connection = createConnection();
{