This closes #72 Stomp changes

This commit is contained in:
Clebert Suconic 2015-07-13 09:37:34 -04:00
commit 87ba02b9e5
5 changed files with 15 additions and 85 deletions

View File

@ -29,7 +29,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
*/
public class StompFrame
{
private static final byte[] END_OF_FRAME = new byte[]{0, '\n'};
protected static final byte[] END_OF_FRAME = new byte[]{0, '\n'};
protected final String command;

View File

@ -165,7 +165,7 @@ public abstract class VersionedStompFrameHandler
public StompFrame handleReceipt(String receiptID)
{
StompFrame receipt = new StompFrame(Stomp.Responses.RECEIPT);
StompFrame receipt = createStompFrame(Stomp.Responses.RECEIPT);
receipt.addHeader(Stomp.Headers.Response.RECEIPT_ID, receiptID);
return receipt;

View File

@ -103,7 +103,8 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
else
{
// not valid
response = new StompFrame(Stomp.Responses.ERROR, true);
response = createStompFrame(Stomp.Responses.ERROR);
response.setNeedsDisconnect(true);
response.addHeader(Stomp.Headers.Error.MESSAGE, "Failed to connect");
response.setBody("The login account is not valid.");
}
@ -268,7 +269,7 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
public StompFrame createPingFrame()
{
StompFrame frame = new StompFrame(Stomp.Commands.STOMP);
StompFrame frame = createStompFrame(Stomp.Commands.STOMP);
frame.setPing(true);
return frame;
}

View File

@ -28,10 +28,8 @@ import org.apache.activemq.artemis.core.protocol.stomp.StompFrame;
public class StompFrameV11 extends StompFrame
{
private static final byte[] END_OF_FRAME = new byte[]{0, '\n'};
//stomp 1.1 talks about repetitive headers.
private final List<Header> allHeaders = new ArrayList<Header>();
protected final List<Header> allHeaders = new ArrayList<Header>();
public StompFrameV11(String command, Map<String, String> headers, byte[] content)
{
@ -68,6 +66,13 @@ public class StompFrameV11 extends StompFrame
head.append(h.getEncodedValue());
head.append(Stomp.NEWLINE);
}
if (bytesBody != null && bytesBody.length > 0)
{
head.append(Stomp.Headers.CONTENT_LENGTH);
head.append(Stomp.Headers.SEPARATOR);
head.append(bytesBody.length);
head.append(Stomp.NEWLINE);
}
// Add a newline to separate the headers from the content.
head.append(Stomp.NEWLINE);
@ -97,6 +102,4 @@ public class StompFrameV11 extends StompFrame
allHeaders.add(new Header(key, val));
}
}
}

View File

@ -16,23 +16,12 @@
*/
package org.apache.activemq.artemis.core.protocol.stomp.v12;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.core.protocol.stomp.StompFrame;
import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameV11;
public class StompFrameV12 extends StompFrame
public class StompFrameV12 extends StompFrameV11
{
private static final byte[] END_OF_FRAME = new byte[]{0, '\n'};
//stomp 1.1 talks about repetitive headers.
private final List<Header> allHeaders = new ArrayList<Header>();
public StompFrameV12(String command, Map<String, String> headers, byte[] content)
{
super(command, headers, content);
@ -42,67 +31,4 @@ public class StompFrameV12 extends StompFrame
{
super(command);
}
@Override
public ActiveMQBuffer toActiveMQBuffer() throws Exception
{
if (buffer == null)
{
if (bytesBody != null)
{
buffer = ActiveMQBuffers.dynamicBuffer(bytesBody.length + 512);
}
else
{
buffer = ActiveMQBuffers.dynamicBuffer(512);
}
StringBuffer head = new StringBuffer();
head.append(command);
head.append(Stomp.NEWLINE);
// Output the headers.
for (Header h : allHeaders)
{
head.append(h.getEncodedKey());
head.append(Stomp.Headers.SEPARATOR);
head.append(h.getEncodedValue());
head.append(Stomp.NEWLINE);
}
if ((bytesBody != null) && (bytesBody.length > 0))
{
head.append(Stomp.Headers.CONTENT_LENGTH);
head.append(Stomp.Headers.SEPARATOR);
head.append(bytesBody.length);
head.append(Stomp.NEWLINE);
}
// Add a newline to separate the headers from the content.
head.append(Stomp.NEWLINE);
buffer.writeBytes(head.toString().getBytes(StandardCharsets.UTF_8));
if (bytesBody != null)
{
buffer.writeBytes(bytesBody);
}
buffer.writeBytes(END_OF_FRAME);
size = buffer.writerIndex();
}
return buffer;
}
@Override
public void addHeader(String key, String val)
{
if (!headers.containsKey(key))
{
headers.put(key, val);
allHeaders.add(new Header(key, val));
}
else if (!key.equals(Stomp.Headers.CONTENT_LENGTH))
{
allHeaders.add(new Header(key, val));
}
}
}