This commit is contained in:
Clebert Suconic 2019-11-19 21:51:15 -05:00
commit 153a5d2404
2 changed files with 70 additions and 3 deletions

View File

@ -302,10 +302,14 @@ public final class OpenTypeSupport {
Map<String, Object> rc = super.getFields(ref);
ICoreMessage m = ref.getMessage().toCore();
if (!m.isLargeMessage()) {
if (m.containsProperty(Message.HDR_LARGE_COMPRESSED)) {
rc.put(CompositeDataConstants.TEXT_BODY, "[compressed]");
} else {
SimpleString text = m.getReadOnlyBodyBuffer().readNullableSimpleString();
rc.put(CompositeDataConstants.TEXT_BODY, text != null ? text.toString() : "");
}
} else {
rc.put(CompositeDataConstants.TEXT_BODY, "");
rc.put(CompositeDataConstants.TEXT_BODY, "[large message]");
}
return rc;
}

View File

@ -33,11 +33,14 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.management.impl.QueueControlImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Test;
import javax.management.openmbean.CompositeData;
/**
* A LargeMessageCompressTest
* <br>
@ -61,6 +64,66 @@ public class LargeMessageCompressTest extends LargeMessageTest {
return super.createFactory(isNetty).setCompressLargeMessage(true);
}
@Test
public void testLargeMessageCompressionNotCompressedAndBrowsed() throws Exception {
final int messageSize = (int) (3.5 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
ActiveMQServer server = createServer(true, isNetty());
server.start();
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = addClientSession(sf.createSession(false, false, false));
session.createTemporaryQueue(ADDRESS, ADDRESS);
ClientProducer producer = session.createProducer(ADDRESS);
Message clientFile = createLargeClientMessageStreaming(session, messageSize, true);
clientFile.setType(Message.TEXT_TYPE);
producer.send(clientFile);
session.commit();
session.close();
QueueControlImpl queueControl = (QueueControlImpl) server.getManagementService().getResource("queue.SimpleAddress");
CompositeData[] browse = queueControl.browse();
Assert.assertNotNull(browse);
Assert.assertEquals(browse.length, 1);
Assert.assertEquals(browse[0].get("text"), "[compressed]");
//clean up
session = addClientSession(sf.createSession(false, false, false));
session.start();
ClientConsumer consumer = session.createConsumer(ADDRESS);
ClientMessage msg1 = consumer.receive(1000);
Assert.assertNotNull(msg1);
for (int i = 0; i < messageSize; i++) {
byte b = msg1.getBodyBuffer().readByte();
assertEquals("position = " + i, getSamplebyte(i), b);
}
msg1.acknowledge();
session.commit();
consumer.close();
session.close();
validateNoFilesOnLargeDir();
}
@Test
public void testLargeMessageCompression() throws Exception {
final int messageSize = (int) (3.5 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);