From 6613b8f1f64b07e98efc6bb3487ac2ef6f0ad60d Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Wed, 14 Jun 2017 15:15:09 -0400 Subject: [PATCH] AMQ-6699 Fix STOMP over WS not encoding header values When sending STOMP frames out over WS the marshal isn't doing a proper encode based on the STOMP version in use and so header values can be transmitted without proper escaping. (cherry picked from commit 2490c85fc5cc27cb32f01553784ef6bc63cd15f0) --- .../transport/ws/StompWSConnection.java | 4 +- .../transport/ws/jetty9/StompSocket.java | 2 +- .../transport/ws/StompWSTransportTest.java | 32 ++++++++++++ .../transport/stomp/StompWireFormat.java | 52 +++++++++++++------ 4 files changed, 72 insertions(+), 18 deletions(-) diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java index 340505a71c..3d04847b33 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java @@ -23,6 +23,7 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import org.apache.activemq.transport.stomp.StompFrame; +import org.apache.activemq.transport.stomp.StompWireFormat; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.WebSocketAdapter; import org.eclipse.jetty.websocket.api.WebSocketListener; @@ -40,6 +41,7 @@ public class StompWSConnection extends WebSocketAdapter implements WebSocketList private final CountDownLatch connectLatch = new CountDownLatch(1); private final BlockingQueue prefetch = new LinkedBlockingDeque(); + private final StompWireFormat wireFormat = new StompWireFormat(); private int closeCode = -1; private String closeMessage; @@ -68,7 +70,7 @@ public class StompWSConnection extends WebSocketAdapter implements WebSocketList public synchronized void sendFrame(StompFrame frame) throws Exception { checkConnected(); - connection.getRemote().sendString(frame.format()); + connection.getRemote().sendString(wireFormat.marshalToString(frame)); } public synchronized void keepAlive() throws Exception { diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java index 72efef7a28..1e3d312b07 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java @@ -47,7 +47,7 @@ public class StompSocket extends AbstractStompSocket implements WebSocketListene public void sendToStomp(StompFrame command) throws IOException { try { //timeout after a period of time so we don't wait forever and hold the protocol lock - session.getRemote().sendStringByFuture(command.format()).get(getDefaultSendTimeOut(), TimeUnit.SECONDS); + session.getRemote().sendStringByFuture(getWireFormat().marshalToString(command)).get(getDefaultSendTimeOut(), TimeUnit.SECONDS); } catch (Exception e) { throw IOExceptionSupport.create(e); } diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSTransportTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSTransportTest.java index 12e31c09b6..03818083f4 100644 --- a/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSTransportTest.java +++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSTransportTest.java @@ -301,4 +301,36 @@ public class StompWSTransportTest extends WSTransportTestSupport { LOG.info("Caught exception on write of disconnect", ex); } } + + @Test(timeout = 60000) + public void testEscapedHeaders() throws Exception { + String connectFrame = "STOMP\n" + + "login:system\n" + + "passcode:manager\n" + + "accept-version:1.1\n" + + "heart-beat:0,0\n" + + "host:localhost\n" + + "\n" + Stomp.NULL; + + wsStompConnection.sendRawFrame(connectFrame); + String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS); + assertTrue(incoming.startsWith("CONNECTED")); + + String message = "SEND\n" + "destination:/queue/" + getTestName() + "\nescaped-header:one\\ntwo\\cthree\n\n" + "Hello World" + Stomp.NULL; + wsStompConnection.sendRawFrame(message); + + String frame = "SUBSCRIBE\n" + "destination:/queue/" + getTestName() + "\n" + + "id:12345\n" + "ack:auto\n\n" + Stomp.NULL; + wsStompConnection.sendRawFrame(frame); + + incoming = wsStompConnection.receive(30, TimeUnit.SECONDS); + assertTrue(incoming.startsWith("MESSAGE")); + assertTrue(incoming.indexOf("escaped-header:one\\ntwo\\cthree") >= 0); + + try { + wsStompConnection.sendFrame(new StompFrame(Stomp.Commands.DISCONNECT)); + } catch (Exception ex) { + LOG.info("Caught exception on write of disconnect", ex); + } + } } diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java index 01debf19e2..a172e67795 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -74,16 +74,7 @@ public class StompWireFormat implements WireFormat { return unmarshal(dis); } - @Override - public void marshal(Object command, DataOutput os) throws IOException { - StompFrame stomp = (org.apache.activemq.transport.stomp.StompFrame)command; - - if (stomp.getAction().equals(Stomp.Commands.KEEPALIVE)) { - os.write(Stomp.BREAK); - return; - } - - StringBuilder buffer = new StringBuilder(); + private StringBuilder marshalHeaders(StompFrame stomp, StringBuilder buffer) throws IOException { buffer.append(stomp.getAction()); buffer.append(Stomp.NEWLINE); @@ -95,19 +86,48 @@ public class StompWireFormat implements WireFormat { buffer.append(Stomp.NEWLINE); } - // Add a newline to seperate the headers from the content. + // Add a newline to separate the headers from the content. buffer.append(Stomp.NEWLINE); - os.write(buffer.toString().getBytes("UTF-8")); + return buffer; + } + + @Override + public void marshal(Object command, DataOutput os) throws IOException { + StompFrame stomp = (org.apache.activemq.transport.stomp.StompFrame)command; + + if (stomp.getAction().equals(Stomp.Commands.KEEPALIVE)) { + os.write(Stomp.BREAK); + return; + } + + StringBuilder builder = new StringBuilder(); + + os.write(marshalHeaders(stomp, builder).toString().getBytes("UTF-8")); os.write(stomp.getContent()); os.write(END_OF_FRAME); } + public String marshalToString(StompFrame stomp) throws IOException { + if (stomp.getAction().equals(Stomp.Commands.KEEPALIVE)) { + return String.valueOf((char)Stomp.BREAK); + } + + StringBuilder buffer = new StringBuilder(); + marshalHeaders(stomp, buffer); + + if (stomp.getContent() != null) { + String contentString = new String(stomp.getContent(), "UTF-8"); + buffer.append(contentString); + } + + buffer.append('\u0000'); + return buffer.toString(); + } + @Override public Object unmarshal(DataInput in) throws IOException { - try { - // parse action String action = parseAction(in, frameSize); @@ -212,7 +232,7 @@ public class StompWireFormat implements WireFormat { } protected HashMap parseHeaders(DataInput in, AtomicLong frameSize) throws IOException { - HashMap headers = new HashMap(25); + HashMap headers = new HashMap<>(25); while (true) { ByteSequence line = readHeaderLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded"); if (line != null && line.length > 1) {