git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@738904 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2009-01-29 16:01:35 +00:00
parent eaafb4c838
commit f6ebeecc2a
3 changed files with 132 additions and 27 deletions

View File

@ -275,7 +275,7 @@ public class ProtocolConverter {
boolean acked = false;
for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
StompSubscription sub = iter.next();
MessageAck ack = sub.onStompMessageAck(messageId);
MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
if (ack != null) {
ack.setTransactionId(activemqTx);
sendToActiveMQ(ack, createResponseHandler(command));
@ -331,6 +331,11 @@ public class ProtocolConverter {
if (activemqTx == null) {
throw new ProtocolException("Invalid transaction id: " + stompTx);
}
for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
StompSubscription sub = iter.next();
sub.onStompCommit(activemqTx);
}
TransactionInfo tx = new TransactionInfo();
tx.setConnectionId(connectionId);
@ -338,6 +343,7 @@ public class ProtocolConverter {
tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
sendToActiveMQ(tx, createResponseHandler(command));
}
protected void onStompAbort(StompFrame command) throws ProtocolException {
@ -353,6 +359,14 @@ public class ProtocolConverter {
if (activemqTx == null) {
throw new ProtocolException("Invalid transaction id: " + stompTx);
}
for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
StompSubscription sub = iter.next();
try {
sub.onStompAbort(activemqTx);
} catch (Exception e) {
throw new ProtocolException("Transaction abort failed", false, e);
}
}
TransactionInfo tx = new TransactionInfo();
tx.setConnectionId(connectionId);
@ -543,7 +557,6 @@ public class ProtocolConverter {
* @throws IOException
*/
public void onActiveMQCommad(Command command) throws IOException, JMSException {
if (command.isResponse()) {
Response response = (Response)command;

View File

@ -19,6 +19,7 @@ package org.apache.activemq.transport.stomp;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
@ -30,9 +31,10 @@ import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.TransactionId;
/**
* Keeps track of the STOMP susbscription so that acking is correctly done.
* Keeps track of the STOMP subscription so that acking is correctly done.
*
* @author <a href="http://hiramchirino.com">chirino</a>
*/
@ -46,11 +48,13 @@ public class StompSubscription {
private final String subscriptionId;
private final ConsumerInfo consumerInfo;
private final LinkedHashMap<String, MessageId> dispatchedMessage = new LinkedHashMap<String, MessageId>();
private final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<MessageId, MessageDispatch>();
private final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<MessageDispatch>();
private String ackMode = AUTO_ACK;
private ActiveMQDestination destination;
private String transformation;
public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
this.protocolConverter = stompTransport;
@ -60,16 +64,14 @@ public class StompSubscription {
}
void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
if (ackMode == CLIENT_ACK) {
synchronized (this) {
dispatchedMessage.put(message.getJMSMessageID(), message.getMessageId());
dispatchedMessage.put(message.getMessageId(), md);
}
} else if (ackMode == INDIVIDUAL_ACK) {
synchronized (this) {
dispatchedMessage.put(message.getJMSMessageID(), message.getMessageId());
dispatchedMessage.put(message.getMessageId(), md);
}
} else if (ackMode == AUTO_ACK) {
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
@ -86,19 +88,60 @@ public class StompSubscription {
ignoreTransformation = true;
}
}
StompFrame command = protocolConverter.convertMessage(message, ignoreTransformation);
command.setAction(Stomp.Responses.MESSAGE);
if (subscriptionId != null) {
command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
}
protocolConverter.getTransportFilter().sendToStomp(command);
}
synchronized void onStompAbort(TransactionId transactionId) throws IOException, JMSException {
//ack all unacked messages
for (MessageDispatch md : dispatchedMessage.values()) {
if (!unconsumedMessage.contains(md)) {
MessageAck ack = new MessageAck();
ack.setDestination(consumerInfo.getDestination());
ack.setConsumerId(consumerInfo.getConsumerId());
ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
ack.setFirstMessageId(md.getMessage().getMessageId());
ack.setLastMessageId(md.getMessage().getMessageId());
ack.setMessageCount(1);
ack.setTransactionId(transactionId);
protocolConverter.getTransportFilter().sendToActiveMQ(ack);
unconsumedMessage.add(md);
}
}
// redeliver all unconsumed messages
for (MessageDispatch md : unconsumedMessage) {
onMessageDispatch(md);
}
}
synchronized void onStompCommit(TransactionId transactionId) {
// ack all messages
MessageAck ack = new MessageAck();
ack.setDestination(consumerInfo.getDestination());
ack.setConsumerId(consumerInfo.getConsumerId());
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
ack.setFirstMessageId(unconsumedMessage.getFirst().getMessage().getMessageId());
ack.setLastMessageId(unconsumedMessage.getLast().getMessage().getMessageId());
ack.setMessageCount(unconsumedMessage.size());
ack.setTransactionId(transactionId);
protocolConverter.getTransportFilter().sendToActiveMQ(ack);
// clear lists
unconsumedMessage.clear();
dispatchedMessage.clear();
}
synchronized MessageAck onStompMessageAck(String messageId) {
if (!dispatchedMessage.containsKey(messageId)) {
synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) {
MessageId msgId = new MessageId(messageId);
if (!dispatchedMessage.containsKey(msgId)) {
return null;
}
@ -107,33 +150,50 @@ public class StompSubscription {
ack.setConsumerId(consumerInfo.getConsumerId());
if (ackMode == CLIENT_ACK) {
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
if (transactionId != null) {
ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
} else {
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
}
int count = 0;
for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
Map.Entry entry = (Entry)iter.next();
String id = (String)entry.getKey();
MessageId msgid = (MessageId)entry.getValue();
MessageId id = (MessageId)entry.getKey();
MessageDispatch msg = (MessageDispatch)entry.getValue();
if (ack.getFirstMessageId() == null) {
ack.setFirstMessageId(msgid);
ack.setFirstMessageId(id);
}
iter.remove();
if (transactionId != null) {
if (!unconsumedMessage.contains(msg))
unconsumedMessage.add(msg);
} else {
iter.remove();
}
count++;
if (id.equals(messageId)) {
ack.setLastMessageId(msgid);
if (id.equals(msgId)) {
ack.setLastMessageId(id);
break;
}
}
ack.setMessageCount(count);
if (transactionId != null) {
ack.setTransactionId(transactionId);
}
}
else if (ackMode == INDIVIDUAL_ACK) {
ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
MessageId msgid = (MessageId)dispatchedMessage.get(messageId);
ack.setMessageID(msgid);
ack.setMessageID(msgId);
if (transactionId != null) {
unconsumedMessage.add(dispatchedMessage.get(msgId));
ack.setTransactionId(transactionId);
}
dispatchedMessage.remove(messageId);
}
return ack;

View File

@ -46,7 +46,6 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.transport.stomp.Stomp.Headers.Subscribe;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -937,24 +936,55 @@ public class StompTest extends CombinationTestSupport {
sendMessage("message 1");
sendMessage("message 2");
sendMessage("message 3");
sendMessage("message 4");
sendMessage("message 5");
StompFrame frame = stompConnection.receive();
StompFrame frame = stompConnection.receive();
assertEquals(frame.getBody(), "message 1");
stompConnection.begin("tx1");
stompConnection.ack(frame, "tx1");
StompFrame frame1 = stompConnection.receive();
assertEquals(frame1.getBody(), "message 2");
try {
StompFrame frame2 = stompConnection.receive(500);
if (frame2 != null) {
fail("Should not have received the second message");
}
} catch (SocketTimeoutException soe) {}
Thread.sleep(100);
stompConnection.abort("tx1");
stompConnection.begin("tx2");
StompFrame frame3 = stompConnection.receive();
assertEquals(frame3.getBody(), "message 1");
stompConnection.ack(frame3, "tx2");
StompFrame frame4 = stompConnection.receive();
assertEquals(frame4.getBody(), "message 2");
stompConnection.ack(frame4, "tx2");
StompFrame frame5 = stompConnection.receive();
assertEquals(frame5.getBody(), "message 3");
stompConnection.ack(frame5, "tx2");
stompConnection.commit("tx2");
stompConnection.begin("tx3");
StompFrame frame6 = stompConnection.receive();
assertEquals(frame6.getBody(), "message 4");
stompConnection.ack(frame6, "tx3");
stompConnection.commit("tx3");
stompDisconnect();
}
}
protected void assertClients(int expected) throws Exception {
org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients();
int actual = clients.length;
@ -969,3 +999,5 @@ public class StompTest extends CombinationTestSupport {
Thread.sleep(2000);
}
}