This commit is contained in:
Clebert Suconic 2017-07-11 10:58:40 -04:00
commit 83256c5d0d
4 changed files with 48 additions and 16 deletions

View File

@ -389,8 +389,9 @@ public class StompSession implements SessionCallback {
long id = storageManager.generateID(); long id = storageManager.generateID();
LargeServerMessage largeMessage = storageManager.createLargeMessage(id, message); LargeServerMessage largeMessage = storageManager.createLargeMessage(id, message);
byte[] bytes = new byte[message.getBodyBuffer().writerIndex() - CoreMessage.BODY_OFFSET]; ActiveMQBuffer body = message.getReadOnlyBodyBuffer();
message.getBodyBuffer().readBytes(bytes); byte[] bytes = new byte[body.readableBytes()];
body.readBytes(bytes);
largeMessage.addBytes(bytes); largeMessage.addBytes(bytes);

View File

@ -299,8 +299,6 @@ public abstract class VersionedStompFrameHandler {
ActiveMQBuffer buffer = bodyBuffer != null ? bodyBuffer : serverMessage.getReadOnlyBodyBuffer(); ActiveMQBuffer buffer = bodyBuffer != null ? bodyBuffer : serverMessage.getReadOnlyBodyBuffer();
int bodyPos = (serverMessage).getEndOfBodyPosition() == -1 ? buffer.writerIndex() : (serverMessage).getEndOfBodyPosition();
int size = buffer.writerIndex(); int size = buffer.writerIndex();
byte[] data = new byte[size]; byte[] data = new byte[size];

View File

@ -16,6 +16,12 @@
*/ */
package org.apache.activemq.artemis.tests.integration.stomp; package org.apache.activemq.artemis.tests.integration.stomp;
import javax.jms.BytesMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.HashSet; import java.util.HashSet;
@ -26,13 +32,6 @@ import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import javax.jms.BytesMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
@ -159,12 +158,16 @@ public class StompTest extends StompTestBase {
.setMaxUsage(0) .setMaxUsage(0)
.tick(); .tick();
// Connection should be closed by broker when disk is full and attempt to send
Exception e = null;
try {
for (int i = 1; i <= count; i++) { for (int i = 1; i <= count; i++) {
// Thread.sleep(1);
// log.info(">>> " + i);
send(conn, getQueuePrefix() + getQueueName(), null, "Hello World!"); send(conn, getQueuePrefix() + getQueueName(), null, "Hello World!");
} }
} catch (Exception se) {
e = se;
}
assertNotNull(e);
// It should encounter the exception on logs // It should encounter the exception on logs
AssertionLoggerHandler.findText("AMQ119119"); AssertionLoggerHandler.findText("AMQ119119");
} finally { } finally {
@ -254,6 +257,33 @@ public class StompTest extends StompTestBase {
clientProvider.disconnect(); clientProvider.disconnect();
} }
@Test
public void testSendReceiveLargeMessage() throws Exception {
String address = "testLargeMessageAddress";
server.getActiveMQServer().createQueue(SimpleString.toSimpleString(address), RoutingType.ANYCAST, SimpleString.toSimpleString(address), null, true, false);
// STOMP default is UTF-8 == 1 byte per char.
int largeMessageStringSize = 10 * 1024 * 1024; // 10MB
StringBuilder b = new StringBuilder(largeMessageStringSize);
for (int i = 0; i < largeMessageStringSize; i++) {
b.append('t');
}
String payload = b.toString();
// Set up STOMP subscription
conn.connect(defUser, defPass);
subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO, null, null, address, true);
// Send Large Message
System.out.println("Sending Message Size: " + largeMessageStringSize);
send(conn, address, null, payload);
// Receive STOMP Message
ClientStompFrame frame = conn.receiveFrame();
System.out.println(frame.getBody().length());
assertTrue(frame.getBody().equals(payload));
}
@Test @Test
public void sendMQTTReceiveSTOMP() throws Exception { public void sendMQTTReceiveSTOMP() throws Exception {
String payload = "This is a test message"; String payload = "This is a test message";

View File

@ -137,6 +137,7 @@ public abstract class StompTestBase extends ActiveMQTestBase {
connection.start(); connection.start();
} }
/** /**
* @return * @return
* @throws Exception * @throws Exception
@ -168,6 +169,8 @@ public abstract class StompTestBase extends ActiveMQTestBase {
config.setOutgoingInterceptorClassNames(getOutgoingInterceptors()); config.setOutgoingInterceptorClassNames(getOutgoingInterceptors());
} }
config.setPersistenceEnabled(true);
ActiveMQServer activeMQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass)); ActiveMQServer activeMQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass));
if (isSecurityEnabled()) { if (isSecurityEnabled()) {