mirror of https://github.com/apache/activemq.git
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1167582 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
251dc7bb23
commit
3060a64c8c
|
@ -113,7 +113,11 @@ public class StompSubscription {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
unconsumedMessage.clear();
|
if (!unconsumedMessage.isEmpty()) {
|
||||||
|
MessageAck ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size());
|
||||||
|
protocolConverter.getStompTransport().sendToActiveMQ(ack);
|
||||||
|
unconsumedMessage.clear();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) {
|
synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) {
|
||||||
|
@ -129,7 +133,11 @@ public class StompSubscription {
|
||||||
ack.setConsumerId(consumerInfo.getConsumerId());
|
ack.setConsumerId(consumerInfo.getConsumerId());
|
||||||
|
|
||||||
if (ackMode == CLIENT_ACK) {
|
if (ackMode == CLIENT_ACK) {
|
||||||
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
|
if (transactionId == null) {
|
||||||
|
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
|
||||||
|
} else {
|
||||||
|
ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
|
||||||
|
}
|
||||||
int count = 0;
|
int count = 0;
|
||||||
for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
|
for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
|
||||||
|
|
||||||
|
@ -138,20 +146,16 @@ public class StompSubscription {
|
||||||
MessageId id = (MessageId)entry.getKey();
|
MessageId id = (MessageId)entry.getKey();
|
||||||
MessageDispatch msg = (MessageDispatch)entry.getValue();
|
MessageDispatch msg = (MessageDispatch)entry.getValue();
|
||||||
|
|
||||||
if (ack.getFirstMessageId() == null) {
|
|
||||||
ack.setFirstMessageId(id);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (transactionId != null) {
|
if (transactionId != null) {
|
||||||
if (!unconsumedMessage.contains(msg)) {
|
if (!unconsumedMessage.contains(msg)) {
|
||||||
unconsumedMessage.add(msg);
|
unconsumedMessage.add(msg);
|
||||||
|
count++;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
iter.remove();
|
iter.remove();
|
||||||
|
count++;
|
||||||
}
|
}
|
||||||
|
|
||||||
count++;
|
|
||||||
|
|
||||||
if (id.equals(msgId)) {
|
if (id.equals(msgId)) {
|
||||||
ack.setLastMessageId(id);
|
ack.setLastMessageId(id);
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -36,6 +36,7 @@ import javax.jms.MessageProducer;
|
||||||
import javax.jms.ObjectMessage;
|
import javax.jms.ObjectMessage;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
|
import javax.management.MalformedObjectNameException;
|
||||||
import javax.management.ObjectName;
|
import javax.management.ObjectName;
|
||||||
|
|
||||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
@ -43,6 +44,7 @@ import org.apache.activemq.CombinationTestSupport;
|
||||||
import org.apache.activemq.broker.BrokerFactory;
|
import org.apache.activemq.broker.BrokerFactory;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.jmx.BrokerViewMBean;
|
import org.apache.activemq.broker.jmx.BrokerViewMBean;
|
||||||
|
import org.apache.activemq.broker.jmx.QueueViewMBean;
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
import org.apache.activemq.command.ActiveMQTextMessage;
|
import org.apache.activemq.command.ActiveMQTextMessage;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -55,7 +57,6 @@ public class StompTest extends CombinationTestSupport {
|
||||||
protected String confUri = "xbean:org/apache/activemq/transport/stomp/stomp-auth-broker.xml";
|
protected String confUri = "xbean:org/apache/activemq/transport/stomp/stomp-auth-broker.xml";
|
||||||
protected String jmsUri = "vm://localhost";
|
protected String jmsUri = "vm://localhost";
|
||||||
|
|
||||||
|
|
||||||
private BrokerService broker;
|
private BrokerService broker;
|
||||||
private StompConnection stompConnection = new StompConnection();
|
private StompConnection stompConnection = new StompConnection();
|
||||||
private Connection connection;
|
private Connection connection;
|
||||||
|
@ -1398,6 +1399,8 @@ public class StompTest extends CombinationTestSupport {
|
||||||
stompConnection.ack(frame5, "tx3");
|
stompConnection.ack(frame5, "tx3");
|
||||||
stompConnection.commit("tx3");
|
stompConnection.commit("tx3");
|
||||||
|
|
||||||
|
waitForFrameToTakeEffect();
|
||||||
|
|
||||||
stompDisconnect();
|
stompDisconnect();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1464,7 +1467,6 @@ public class StompTest extends CombinationTestSupport {
|
||||||
TextMessage message = (TextMessage)consumer.receive(5000);
|
TextMessage message = (TextMessage)consumer.receive(5000);
|
||||||
assertNotNull(message);
|
assertNotNull(message);
|
||||||
assertEquals("system", message.getStringProperty(Stomp.Headers.Message.USERID));
|
assertEquals("system", message.getStringProperty(Stomp.Headers.Message.USERID));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testJMSXUserIDIsSetInStompMessage() throws Exception {
|
public void testJMSXUserIDIsSetInStompMessage() throws Exception {
|
||||||
|
@ -1493,10 +1495,8 @@ public class StompTest extends CombinationTestSupport {
|
||||||
headers.put(Stomp.Headers.Message.SUBSCRIPTION, "Thisisnotallowed");
|
headers.put(Stomp.Headers.Message.SUBSCRIPTION, "Thisisnotallowed");
|
||||||
headers.put(Stomp.Headers.Message.USERID, "Thisisnotallowed");
|
headers.put(Stomp.Headers.Message.USERID, "Thisisnotallowed");
|
||||||
|
|
||||||
|
|
||||||
stompConnection.connect("system", "manager");
|
stompConnection.connect("system", "manager");
|
||||||
|
|
||||||
|
|
||||||
stompConnection.send("/queue/" + getQueueName(), "msg", null, headers);
|
stompConnection.send("/queue/" + getQueueName(), "msg", null, headers);
|
||||||
|
|
||||||
stompConnection.subscribe("/queue/" + getQueueName());
|
stompConnection.subscribe("/queue/" + getQueueName());
|
||||||
|
@ -1511,7 +1511,6 @@ public class StompTest extends CombinationTestSupport {
|
||||||
assertNull(mess_headers.get(Stomp.Headers.Message.REDELIVERED));
|
assertNull(mess_headers.get(Stomp.Headers.Message.REDELIVERED));
|
||||||
assertNull(mess_headers.get(Stomp.Headers.Message.SUBSCRIPTION));
|
assertNull(mess_headers.get(Stomp.Headers.Message.SUBSCRIPTION));
|
||||||
assertEquals("system", mess_headers.get(Stomp.Headers.Message.USERID));
|
assertEquals("system", mess_headers.get(Stomp.Headers.Message.USERID));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testExpire() throws Exception {
|
public void testExpire() throws Exception {
|
||||||
|
@ -1559,30 +1558,28 @@ public class StompTest extends CombinationTestSupport {
|
||||||
assertNotNull(stompMessage);
|
assertNotNull(stompMessage);
|
||||||
assertNull(stompMessage.getHeaders().get(Stomp.Headers.Message.PERSISTENT));
|
assertNull(stompMessage.getHeaders().get(Stomp.Headers.Message.PERSISTENT));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testReceiptNewQueue() throws Exception {
|
public void testReceiptNewQueue() throws Exception {
|
||||||
|
|
||||||
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||||
stompConnection.sendFrame(frame);
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
frame = stompConnection.receiveFrame();
|
frame = stompConnection.receiveFrame();
|
||||||
assertTrue(frame.startsWith("CONNECTED"));
|
assertTrue(frame.startsWith("CONNECTED"));
|
||||||
|
|
||||||
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + 1234 + "\n" + "id:8fee4b8-4e5c9f66-4703-e936-3" + "\n" + "receipt:8fee4b8-4e5c9f66-4703-e936-2" + "\n\n" + Stomp.NULL;
|
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + 1234 + "\n" + "id:8fee4b8-4e5c9f66-4703-e936-3" + "\n" + "receipt:8fee4b8-4e5c9f66-4703-e936-2" + "\n\n" + Stomp.NULL;
|
||||||
stompConnection.sendFrame(frame);
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
StompFrame receipt = stompConnection.receive();
|
StompFrame receipt = stompConnection.receive();
|
||||||
assertTrue(receipt.getAction().startsWith("RECEIPT"));
|
assertTrue(receipt.getAction().startsWith("RECEIPT"));
|
||||||
assertEquals("8fee4b8-4e5c9f66-4703-e936-2", receipt.getHeaders().get("receipt-id"));
|
assertEquals("8fee4b8-4e5c9f66-4703-e936-2", receipt.getHeaders().get("receipt-id"));
|
||||||
|
|
||||||
|
|
||||||
frame = "SEND\n destination:/queue/" + getQueueName() + 123 + "\ncontent-length:0" + " \n\n" + Stomp.NULL;
|
frame = "SEND\n destination:/queue/" + getQueueName() + 123 + "\ncontent-length:0" + " \n\n" + Stomp.NULL;
|
||||||
stompConnection.sendFrame(frame);
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
|
||||||
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + 123 + "\n" + "id:8fee4b8-4e5c9f66-4703-e936-2" + "\n" + "receipt:8fee4b8-4e5c9f66-4703-e936-1" + "\n\n" + Stomp.NULL;
|
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + 123 + "\n" + "id:8fee4b8-4e5c9f66-4703-e936-2" + "\n" + "receipt:8fee4b8-4e5c9f66-4703-e936-1" + "\n\n" + Stomp.NULL;
|
||||||
stompConnection.sendFrame(frame);
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
receipt = stompConnection.receive();
|
receipt = stompConnection.receive();
|
||||||
assertTrue(receipt.getAction().startsWith("RECEIPT"));
|
assertTrue(receipt.getAction().startsWith("RECEIPT"));
|
||||||
assertEquals("8fee4b8-4e5c9f66-4703-e936-1", receipt.getHeaders().get("receipt-id"));
|
assertEquals("8fee4b8-4e5c9f66-4703-e936-1", receipt.getHeaders().get("receipt-id"));
|
||||||
|
@ -1598,6 +1595,51 @@ public class StompTest extends CombinationTestSupport {
|
||||||
stompConnection.sendFrame(frame);
|
stompConnection.sendFrame(frame);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testTransactedClientAckBrokerStats() throws Exception {
|
||||||
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
frame = stompConnection.receiveFrame();
|
||||||
|
assertTrue(frame.startsWith("CONNECTED"));
|
||||||
|
|
||||||
|
sendMessage(getName());
|
||||||
|
sendMessage(getName());
|
||||||
|
|
||||||
|
stompConnection.begin("tx1");
|
||||||
|
|
||||||
|
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
StompFrame message = stompConnection.receive();
|
||||||
|
assertTrue(message.getAction().equals("MESSAGE"));
|
||||||
|
stompConnection.ack(message, "tx1");
|
||||||
|
|
||||||
|
message = stompConnection.receive();
|
||||||
|
assertTrue(message.getAction().equals("MESSAGE"));
|
||||||
|
stompConnection.ack(message, "tx1");
|
||||||
|
|
||||||
|
stompConnection.commit("tx1");
|
||||||
|
|
||||||
|
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
waitForFrameToTakeEffect();
|
||||||
|
|
||||||
|
QueueViewMBean queueView = getProxyToQueue(getQueueName());
|
||||||
|
assertEquals(2, queueView.getDispatchCount());
|
||||||
|
assertEquals(2, queueView.getDequeueCount());
|
||||||
|
assertEquals(0, queueView.getQueueSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
private QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException {
|
||||||
|
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq"
|
||||||
|
+ ":Type=Queue,Destination=" + name
|
||||||
|
+ ",BrokerName=localhost");
|
||||||
|
QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
|
||||||
|
.newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
|
||||||
|
return proxy;
|
||||||
|
}
|
||||||
|
|
||||||
protected void assertClients(int expected) throws Exception {
|
protected void assertClients(int expected) throws Exception {
|
||||||
org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients();
|
org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients();
|
||||||
int actual = clients.length;
|
int actual = clients.length;
|
||||||
|
|
Loading…
Reference in New Issue