This commit is contained in:
Clebert Suconic 2020-11-22 23:42:50 -05:00
commit a5fd97d2c9
3 changed files with 68 additions and 1 deletions

View File

@ -65,6 +65,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
@ -3460,7 +3461,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
copy.reencode();
return copy;
// in some edge cases a large message can become large during the copy
return LargeServerMessageImpl.checkLargeMessage(copy, storageManager);
}
private void expire(final Transaction tx, final MessageReference ref) throws Exception {

View File

@ -67,6 +67,7 @@ import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
@ -125,6 +126,7 @@ import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.Activation;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.LiveOnlyActivation;
import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
@ -1598,6 +1600,11 @@ public abstract class ActiveMQTestBase extends Assert {
}
}
protected void createAnycastPair(ActiveMQServer server, String queueName) throws Exception {
server.addAddressInfo(new AddressInfo(queueName).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST).setAddress(queueName));
}
protected void createQueue(final String address, final String queue) throws Exception {
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory sf = locator.createSessionFactory();

View File

@ -686,6 +686,64 @@ public class LargeMessageTest extends LargeMessageTestBase {
}
@Test
public void testDLQAlmostLarge() throws Exception {
SimpleString addressName = SimpleString.toSimpleString("SomewhatHugeNameToBeUsedxxxxxxxxxxxxxxxxxxxiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiixxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx");
SimpleString dlqName = SimpleString.toSimpleString("DLQ" + addressName.toString());
ClientSession session = null;
ActiveMQServer server = createServer(true, isNetty(), storeType);
server.getConfiguration().setJournalFileSize(1024 * 1024);
server.getConfiguration().setJournalBufferSize_AIO(100 * 1024);
server.start();
server.getAddressSettingsRepository().clear();
AddressSettings settings = new AddressSettings().setDeadLetterAddress(dlqName).setMaxDeliveryAttempts(1);
server.getAddressSettingsRepository().addMatch("#", settings);
createAnycastPair(server, dlqName.toString());
createAnycastPair(server, addressName.toString());
locator.setMinLargeMessageSize(1024 * 1024);
ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
session = sf.createSession(false, false, false);
ClientProducer producer = session.createProducer(addressName);
ClientMessage clientMessage = session.createMessage(true);
clientMessage.getBodyBuffer().writeBytes(new byte[100 * 1024 - 900]);
producer.send(clientMessage);
session.commit();
session.start();
ClientConsumer consumer = session.createConsumer(addressName);
for (int i = 0; i < 2; i++) {
if (i == 0) {
ClientMessage msg = consumer.receive(10000);
Assert.assertNotNull(msg);
msg.acknowledge();
session.rollback();
} else {
ClientMessage msg = consumer.receiveImmediate();
Assert.assertNull(msg);
}
}
consumer.close();
consumer = session.createConsumer(dlqName);
ClientMessage msg = consumer.receive(1000);
Assert.assertNotNull(msg);
}
@Test
public void testDLAOnExpiryNonDurableMessage() throws Exception {
final int messageSize = (int) (3.5 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);