mirror of
https://github.com/apache/activemq.git
synced 2025-02-09 11:35:36 +00:00
https://issues.apache.org/jira/browse/AMQ-3653 - content-length in ack frames
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1227577 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
71d5fef019
commit
a35fdd13fb
@ -16,21 +16,15 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.activemq.transport.stomp;
|
package org.apache.activemq.transport.stomp;
|
||||||
|
|
||||||
import java.io.DataInput;
|
|
||||||
import java.io.DataInputStream;
|
|
||||||
import java.io.DataOutput;
|
|
||||||
import java.io.DataOutputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.PushbackInputStream;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import org.apache.activemq.util.ByteArrayInputStream;
|
import org.apache.activemq.util.ByteArrayInputStream;
|
||||||
import org.apache.activemq.util.ByteArrayOutputStream;
|
import org.apache.activemq.util.ByteArrayOutputStream;
|
||||||
import org.apache.activemq.util.ByteSequence;
|
import org.apache.activemq.util.ByteSequence;
|
||||||
import org.apache.activemq.wireformat.WireFormat;
|
import org.apache.activemq.wireformat.WireFormat;
|
||||||
|
|
||||||
|
import java.io.*;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implements marshalling and unmarsalling the <a
|
* Implements marshalling and unmarsalling the <a
|
||||||
* href="http://stomp.codehaus.org/">Stomp</a> protocol.
|
* href="http://stomp.codehaus.org/">Stomp</a> protocol.
|
||||||
@ -103,7 +97,7 @@ public class StompWireFormat implements WireFormat {
|
|||||||
// Read in the data part.
|
// Read in the data part.
|
||||||
byte[] data = NO_DATA;
|
byte[] data = NO_DATA;
|
||||||
String contentLength = headers.get(Stomp.Headers.CONTENT_LENGTH);
|
String contentLength = headers.get(Stomp.Headers.CONTENT_LENGTH);
|
||||||
if (contentLength != null) {
|
if ((action.equals(Stomp.Commands.SEND) || action.equals(Stomp.Responses.MESSAGE)) && contentLength != null) {
|
||||||
|
|
||||||
// Bless the client, he's telling us how much data to read in.
|
// Bless the client, he's telling us how much data to read in.
|
||||||
int length = parseContentLength(contentLength);
|
int length = parseContentLength(contentLength);
|
||||||
|
@ -729,6 +729,39 @@ public class StompTest extends CombinationTestSupport {
|
|||||||
assertTrue(message.getJMSRedelivered());
|
assertTrue(message.getJMSRedelivered());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testSubscribeWithClientAckAndContentLength() throws Exception {
|
||||||
|
|
||||||
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
frame = stompConnection.receiveFrame();
|
||||||
|
assertTrue(frame.startsWith("CONNECTED"));
|
||||||
|
|
||||||
|
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL;
|
||||||
|
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
sendMessage(getName());
|
||||||
|
StompFrame msg = stompConnection.receive();
|
||||||
|
|
||||||
|
|
||||||
|
assertTrue(msg.getAction().equals("MESSAGE"));
|
||||||
|
|
||||||
|
HashMap<String, String> ackHeaders = new HashMap<String, String>();
|
||||||
|
ackHeaders.put("message-id", msg.getHeaders().get("message-id"));
|
||||||
|
ackHeaders.put("content-length", "8511");
|
||||||
|
|
||||||
|
StompFrame ack = new StompFrame("ACK", ackHeaders);
|
||||||
|
stompConnection.sendFrame(ack.format());
|
||||||
|
|
||||||
|
|
||||||
|
stompDisconnect();
|
||||||
|
|
||||||
|
// message should not be received since it was acknowledged
|
||||||
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
TextMessage message = (TextMessage)consumer.receive(500);
|
||||||
|
assertNull(message);
|
||||||
|
}
|
||||||
|
|
||||||
public void testUnsubscribe() throws Exception {
|
public void testUnsubscribe() throws Exception {
|
||||||
|
|
||||||
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||||
|
@ -68,7 +68,7 @@
|
|||||||
</plugins>
|
</plugins>
|
||||||
|
|
||||||
<transportConnectors>
|
<transportConnectors>
|
||||||
<transportConnector name="stomp" uri="stomp://localhost:61613?transport.keepAlive=true&transport.soLinger=0"/>
|
<transportConnector name="stomp" uri="stomp://localhost:61613?trace=true&transport.keepAlive=true&transport.soLinger=0"/>
|
||||||
</transportConnectors>
|
</transportConnectors>
|
||||||
|
|
||||||
</broker>
|
</broker>
|
||||||
|
Loading…
x
Reference in New Issue
Block a user