diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java index 9cef3b974c..6389f97cfe 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java @@ -113,7 +113,11 @@ public class StompSubscription { } } - unconsumedMessage.clear(); + if (!unconsumedMessage.isEmpty()) { + MessageAck ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size()); + protocolConverter.getStompTransport().sendToActiveMQ(ack); + unconsumedMessage.clear(); + } } synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) { @@ -129,7 +133,11 @@ public class StompSubscription { ack.setConsumerId(consumerInfo.getConsumerId()); if (ackMode == CLIENT_ACK) { - ack.setAckType(MessageAck.STANDARD_ACK_TYPE); + if (transactionId == null) { + ack.setAckType(MessageAck.STANDARD_ACK_TYPE); + } else { + ack.setAckType(MessageAck.DELIVERED_ACK_TYPE); + } int count = 0; for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) { @@ -138,20 +146,16 @@ public class StompSubscription { MessageId id = (MessageId)entry.getKey(); MessageDispatch msg = (MessageDispatch)entry.getValue(); - if (ack.getFirstMessageId() == null) { - ack.setFirstMessageId(id); - } - if (transactionId != null) { if (!unconsumedMessage.contains(msg)) { unconsumedMessage.add(msg); + count++; } } else { iter.remove(); + count++; } - count++; - if (id.equals(msgId)) { ack.setLastMessageId(id); break; diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java index 3b65152ab3..45224e8d8c 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java @@ -36,6 +36,7 @@ import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TextMessage; +import javax.management.MalformedObjectNameException; import javax.management.ObjectName; import org.apache.activemq.ActiveMQConnectionFactory; @@ -43,6 +44,7 @@ import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.BrokerViewMBean; +import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTextMessage; import org.slf4j.Logger; @@ -55,7 +57,6 @@ public class StompTest extends CombinationTestSupport { protected String confUri = "xbean:org/apache/activemq/transport/stomp/stomp-auth-broker.xml"; protected String jmsUri = "vm://localhost"; - private BrokerService broker; private StompConnection stompConnection = new StompConnection(); private Connection connection; @@ -1398,6 +1399,8 @@ public class StompTest extends CombinationTestSupport { stompConnection.ack(frame5, "tx3"); stompConnection.commit("tx3"); + waitForFrameToTakeEffect(); + stompDisconnect(); } @@ -1464,7 +1467,6 @@ public class StompTest extends CombinationTestSupport { TextMessage message = (TextMessage)consumer.receive(5000); assertNotNull(message); assertEquals("system", message.getStringProperty(Stomp.Headers.Message.USERID)); - } public void testJMSXUserIDIsSetInStompMessage() throws Exception { @@ -1493,10 +1495,8 @@ public class StompTest extends CombinationTestSupport { headers.put(Stomp.Headers.Message.SUBSCRIPTION, "Thisisnotallowed"); headers.put(Stomp.Headers.Message.USERID, "Thisisnotallowed"); - stompConnection.connect("system", "manager"); - stompConnection.send("/queue/" + getQueueName(), "msg", null, headers); stompConnection.subscribe("/queue/" + getQueueName()); @@ -1511,7 +1511,6 @@ public class StompTest extends CombinationTestSupport { assertNull(mess_headers.get(Stomp.Headers.Message.REDELIVERED)); assertNull(mess_headers.get(Stomp.Headers.Message.SUBSCRIPTION)); assertEquals("system", mess_headers.get(Stomp.Headers.Message.USERID)); - } public void testExpire() throws Exception { @@ -1559,30 +1558,28 @@ public class StompTest extends CombinationTestSupport { assertNotNull(stompMessage); assertNull(stompMessage.getHeaders().get(Stomp.Headers.Message.PERSISTENT)); } - + public void testReceiptNewQueue() throws Exception { - + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); frame = stompConnection.receiveFrame(); assertTrue(frame.startsWith("CONNECTED")); - + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + 1234 + "\n" + "id:8fee4b8-4e5c9f66-4703-e936-3" + "\n" + "receipt:8fee4b8-4e5c9f66-4703-e936-2" + "\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); - + StompFrame receipt = stompConnection.receive(); assertTrue(receipt.getAction().startsWith("RECEIPT")); assertEquals("8fee4b8-4e5c9f66-4703-e936-2", receipt.getHeaders().get("receipt-id")); - frame = "SEND\n destination:/queue/" + getQueueName() + 123 + "\ncontent-length:0" + " \n\n" + Stomp.NULL; stompConnection.sendFrame(frame); - frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + 123 + "\n" + "id:8fee4b8-4e5c9f66-4703-e936-2" + "\n" + "receipt:8fee4b8-4e5c9f66-4703-e936-1" + "\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); - + receipt = stompConnection.receive(); assertTrue(receipt.getAction().startsWith("RECEIPT")); assertEquals("8fee4b8-4e5c9f66-4703-e936-1", receipt.getHeaders().get("receipt-id")); @@ -1598,6 +1595,51 @@ public class StompTest extends CombinationTestSupport { stompConnection.sendFrame(frame); } + public void testTransactedClientAckBrokerStats() throws Exception { + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + sendMessage(getName()); + sendMessage(getName()); + + stompConnection.begin("tx1"); + + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + StompFrame message = stompConnection.receive(); + assertTrue(message.getAction().equals("MESSAGE")); + stompConnection.ack(message, "tx1"); + + message = stompConnection.receive(); + assertTrue(message.getAction().equals("MESSAGE")); + stompConnection.ack(message, "tx1"); + + stompConnection.commit("tx1"); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + waitForFrameToTakeEffect(); + + QueueViewMBean queueView = getProxyToQueue(getQueueName()); + assertEquals(2, queueView.getDispatchCount()); + assertEquals(2, queueView.getDequeueCount()); + assertEquals(0, queueView.getQueueSize()); + } + + private QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException { + ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq" + + ":Type=Queue,Destination=" + name + + ",BrokerName=localhost"); + QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext() + .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); + return proxy; + } + protected void assertClients(int expected) throws Exception { org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients(); int actual = clients.length;