This commit is contained in:
Clebert Suconic 2020-04-16 10:27:56 -04:00
commit c1a000c870
18 changed files with 390 additions and 23 deletions

View File

@ -174,6 +174,11 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
}
}
@Override
public void validateFile() throws ActiveMQException {
largeBody.validateFile();
}
public void setFileDurable(boolean value) {
this.fileDurable = value;
}
@ -207,7 +212,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
return parsingData;
}
protected void parseHeader(ReadableBuffer buffer) {
public void parseHeader(ReadableBuffer buffer) {
DecoderImpl decoder = TLSEncode.getDecoder();
decoder.setBuffer(buffer);

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker;
import java.util.Map;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.Closeable;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -153,6 +154,10 @@ public class AMQPSessionCallback implements SessionCallback {
}
public void addCloseable(Closeable closeable) {
serverSession.addCloseable(closeable);
}
public void withinContext(Runnable run) throws Exception {
OperationContext context = recoverContext();
try {

View File

@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
@ -140,6 +141,24 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
this.creditRunnable = createCreditRunnable(amqpCredits, minCreditRefresh, receiver, connection).setRan();
useModified = this.connection.getProtocolManager().isUseModifiedForTransientDeliveryErrors();
this.minLargeMessageSize = connection.getProtocolManager().getAmqpMinLargeMessageSize();
if (sessionSPI != null) {
sessionSPI.addCloseable((boolean failed) -> clearLargeMessage());
}
}
protected void clearLargeMessage() {
connection.runNow(() -> {
if (currentLargeMessage != null) {
try {
currentLargeMessage.deleteFile();
} catch (Throwable error) {
ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error);
} finally {
currentLargeMessage = null;
}
}
});
}
@Override
@ -288,6 +307,8 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
try {
if (delivery.isAborted()) {
clearLargeMessage();
// Aborting implicitly remotely settles, so advance
// receiver to the next delivery and settle locally.
receiver.advance();
@ -352,7 +373,12 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
private void initializeCurrentLargeMessage(Delivery delivery, Receiver receiver) throws Exception {
long id = sessionSPI.getStorageManager().generateID();
currentLargeMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(), sessionSPI.getStorageManager());
currentLargeMessage.addBytes(receiver.recv());
ReadableBuffer dataBuffer = receiver.recv();
currentLargeMessage.parseHeader(dataBuffer);
sessionSPI.getStorageManager().largeMessageCreated(id, currentLargeMessage);
currentLargeMessage.addBytes(dataBuffer);
}
private void actualDelivery(AMQPMessage message, Delivery delivery, Receiver receiver, Transaction tx) {
@ -439,6 +465,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
public void close(ErrorCondition condition) throws ActiveMQAMQPException {
receiver.setCondition(condition);
close(false);
clearLargeMessage();
}
public void flow() {

View File

@ -30,6 +30,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -50,6 +51,7 @@ import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.stubbing.Answer;
@ -96,7 +98,14 @@ public class ProtonServerReceiverContextTest {
when(mockConnContext.getProtocolManager()).thenReturn(mock(ProtonProtocolManager.class));
ProtonServerReceiverContext rc = new ProtonServerReceiverContext(null, mockConnContext, null, mockReceiver);
AtomicInteger clearLargeMessage = new AtomicInteger(0);
ProtonServerReceiverContext rc = new ProtonServerReceiverContext(null, mockConnContext, null, mockReceiver) {
@Override
protected void clearLargeMessage() {
super.clearLargeMessage();
clearLargeMessage.incrementAndGet();
}
};
Delivery mockDelivery = mock(Delivery.class);
when(mockDelivery.isAborted()).thenReturn(true);
@ -120,6 +129,8 @@ public class ProtonServerReceiverContextTest {
verify(mockReceiver, times(1)).flow(1);
}
verifyNoMoreInteractions(mockReceiver);
Assert.assertTrue(clearLargeMessage.get() > 0);
}
private void doOnMessageWithDeliveryException(List<Symbol> sourceSymbols,

View File

@ -245,6 +245,8 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
*/
LargeServerMessage createLargeMessage(long id, Message message) throws Exception;
LargeServerMessage largeMessageCreated(long id, LargeServerMessage largeMessage) throws Exception;
enum LargeMessageExtension {
DURABLE(".msg"), TEMPORARY(".tmp"), SYNC(".sync");
final String extension;

View File

@ -536,13 +536,23 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
largeMessage.moveHeadersAndProperties(message);
largeMessage.setMessageID(id);
return largeMessageCreated(id, largeMessage);
} finally {
readUnLock();
}
}
@Override
public LargeServerMessage largeMessageCreated(long id, LargeServerMessage largeMessage) throws Exception {
largeMessage.setMessageID(id);
// Check durable large massage size before to allocate resources if it can't be stored
if (largeMessage.isDurable()) {
final long maxRecordSize = getMaxRecordSize();
final int messageEncodeSize = largeMessage.getEncodeSize();
// Check durable large massage size before to allocate resources if it can't be stored
if (largeMessage.toMessage().isDurable()) {
final long maxRecordSize = getMaxRecordSize();
if (largeMessage instanceof LargeServerMessageImpl) {
// the following check only applies to Core
LargeServerMessageImpl coreLarge = (LargeServerMessageImpl)largeMessage;
final int messageEncodeSize = coreLarge.getEncodeSize();
if (messageEncodeSize > maxRecordSize) {
ActiveMQServerLogger.LOGGER.messageWithHeaderTooLarge(largeMessage.getMessageID(), logger.getName());
@ -554,22 +564,20 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
throw ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(messageEncodeSize, maxRecordSize);
}
}
// We do this here to avoid a case where the replication gets a list without this file
// to avoid a race
largeMessage.validateFile();
if (largeMessage.isDurable()) {
// We store a marker on the journal that the large file is pending
long pendingRecordID = storePendingLargeMessage(id);
largeMessage.setPendingRecordID(pendingRecordID);
}
return largeMessage;
} finally {
readUnLock();
}
// We do this here to avoid a case where the replication gets a list without this file
// to avoid a race
largeMessage.validateFile();
if (largeMessage.toMessage().isDurable()) {
// We store a marker on the journal that the large file is pending
long pendingRecordID = storePendingLargeMessage(id);
largeMessage.setPendingRecordID(pendingRecordID);
}
return largeMessage;
}
@Override

View File

@ -309,6 +309,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
}
}
@Override
public synchronized void validateFile() throws ActiveMQException {
this.ensureFileExists(true);
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.persistence.impl.nullpm;
import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.io.SequentialFile;
@ -70,6 +71,11 @@ class NullStorageLargeServerMessage extends CoreMessage implements CoreLargeServ
}
@Override
public void validateFile() throws ActiveMQException {
}
@Override
public void setStorageManager(StorageManager storageManager) {

View File

@ -303,6 +303,11 @@ public class NullStorageManager implements StorageManager {
return largeMessage;
}
@Override
public LargeServerMessage largeMessageCreated(long id, LargeServerMessage largeMessage) throws Exception {
return null;
}
@Override
public long generateID() {
long id = idSequence.getAndIncrement();

View File

@ -609,6 +609,10 @@ public interface ActiveMQServer extends ServiceComponent {
Queue locateQueue(SimpleString queueName);
default Queue locateQueue(String queueName) {
return locateQueue(SimpleString.toSimpleString(queueName));
}
default BindingQueryResult bindingQuery(SimpleString address) throws Exception {
return bindingQuery(address, true);
}

View File

@ -66,5 +66,7 @@ public interface LargeServerMessage extends ReplicatedLargeMessage {
void setStorageManager(StorageManager storageManager);
void validateFile() throws ActiveMQException;
void finishParse() throws Exception;
}

View File

@ -464,6 +464,11 @@ public class TransactionImplTest extends ActiveMQTestBase {
return null;
}
@Override
public LargeServerMessage largeMessageCreated(long id, LargeServerMessage largeMessage) throws Exception {
return null;
}
@Override
public SequentialFile createFileForLargeMessage(long messageID, LargeMessageExtension extension) {
return null;

View File

@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp.largemessages;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.utils.SpawnedVMSupport;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.junit.Assert;
import org.junit.Test;
public class InterruptedAMQPLargeMessage extends AmqpClientTestSupport {
private static final int NUMBER_OF_THREADS = 10;
private static final int MINIMAL_SEND = 2;
private static final int MESSAGE_SIZE = 1024 * 300;
private static final String smallFrameAcceptor = new String("tcp://localhost:" + (AMQP_PORT + 8));
@Override
protected void addAdditionalAcceptors(ActiveMQServer server) throws Exception {
server.getConfiguration().addAcceptorConfiguration("flow", smallFrameAcceptor + "?protocols=AMQP;useEpoll=false;maxFrameSize=" + 512 + ";amqpMinLargeMessageSize=" + 10000);
}
public static void main(String[] arg) {
// have everybody aligned on sending before we start
CyclicBarrier startFlag = new CyclicBarrier(NUMBER_OF_THREADS);
CountDownLatch minimalKill = new CountDownLatch(MINIMAL_SEND * NUMBER_OF_THREADS);
Runnable runnable = () -> {
try {
AmqpClient client = createLocalClient();
AmqpConnection connection = client.createConnection();
connection.setMaxFrameSize(2 * 1024);
connection.connect();
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(arg[0]);
startFlag.await();
for (int m = 0; m < 1000; m++) {
AmqpMessage message = new AmqpMessage();
message.setDurable(true);
byte[] bytes = new byte[MESSAGE_SIZE];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = (byte) 'z';
}
message.setBytes(bytes);
sender.send(message);
minimalKill.countDown();
}
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
};
for (int t = 0; t < NUMBER_OF_THREADS; t++) {
Thread thread = new Thread(runnable);
thread.start();
}
try {
minimalKill.await();
} catch (Exception e) {
e.printStackTrace();
}
System.exit(-1);
}
private static AmqpClient createLocalClient() throws URISyntaxException {
return new AmqpClient(new URI(smallFrameAcceptor), null, null);
}
@Test
public void testInterruptedLargeMessage() throws Exception {
Process p = SpawnedVMSupport.spawnVM(InterruptedAMQPLargeMessage.class.getName(), getQueueName());
p.waitFor();
Queue serverQueue = server.locateQueue(getQueueName());
Assert.assertTrue(serverQueue.getMessageCount() >= MINIMAL_SEND * NUMBER_OF_THREADS);
LinkedListIterator<MessageReference> browserIterator = serverQueue.browserIterator();
while (browserIterator.hasNext()) {
MessageReference ref = browserIterator.next();
Message message = ref.getMessage();
Assert.assertNotNull(message);
Assert.assertTrue(message instanceof LargeServerMessage);
Assert.assertFalse(((LargeServerMessage)message).hasPendingRecord());
}
browserIterator.close();
System.out.println("There are " + serverQueue.getMessageCount() + " on the queue");
int messageCount = (int)serverQueue.getMessageCount();
AmqpClient client = createLocalClient();
AmqpConnection connection = addConnection(client.createConnection());
connection.setMaxFrameSize(2 * 1024);
connection.connect();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getQueueName());
int received = 0;
receiver.flow((int) (messageCount + 10));
for (int m = 0; m < messageCount; m++) {
receiver.flow(1);
AmqpMessage message = receiver.receive(10, TimeUnit.SECONDS);
Assert.assertNotNull(message);
message.accept(true);
received++;
System.out.println("Received " + received);
Data data = (Data)message.getWrappedMessage().getBody();
byte[] byteArray = data.getValue().getArray();
Assert.assertEquals(MESSAGE_SIZE, byteArray.length);
for (int i = 0; i < byteArray.length; i++) {
Assert.assertEquals((byte)'z', byteArray[i]);
}
}
Assert.assertNull(receiver.receiveNoWait());
validateNoFilesOnLargeDir();
}
}

View File

@ -30,10 +30,14 @@ import java.util.Arrays;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
@ -322,6 +326,35 @@ public class SimpleStreamingLargeMessageTest extends AmqpClientTestSupport {
session.commit();
Queue queue = server.locateQueue(SimpleString.toSimpleString(getQueueName()));
Wait.assertEquals(1, queue::getMessageCount);
LinkedListIterator<MessageReference> browserIterator = queue.browserIterator();
while (browserIterator.hasNext()) {
MessageReference ref = browserIterator.next();
org.apache.activemq.artemis.api.core.Message message = ref.getMessage();
Assert.assertNotNull(message);
Assert.assertTrue(message instanceof LargeServerMessage);
Assert.assertFalse(((LargeServerMessage)message).hasPendingRecord());
}
browserIterator.close();
connection.close();
server.stop();
server.start();
connection = client.createConnection();
addConnection(connection);
connection.setMaxFrameSize(2 * 1024);
connection.connect();
session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1);
for (int i = 0; i < 1; i++) {

View File

@ -44,6 +44,10 @@ public class LargeMessageAvoidLargeMessagesTest extends LargeMessageTest {
isCompressedTest = true;
}
@Override
protected void validateLargeMessageComplete(ActiveMQServer server) throws Exception {
}
@Override
protected boolean isNetty() {
return false;

View File

@ -55,6 +55,10 @@ public class LargeMessageCompressTest extends LargeMessageTest {
isCompressedTest = true;
}
@Override
protected void validateLargeMessageComplete(ActiveMQServer server) throws Exception {
}
@Override
protected boolean isNetty() {
return false;

View File

@ -45,6 +45,8 @@ import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@ -53,6 +55,7 @@ import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTe
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -247,6 +250,76 @@ public class LargeMessageTest extends LargeMessageTestBase {
validateNoFilesOnLargeDir();
}
@Test
public void testPendingRecord() throws Exception {
ActiveMQServer server = createServer(true, isNetty(), storeType);
server.start();
final int messageSize = (int) (3.5 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
ClientSession session = addClientSession(sf.createSession(false, true, false));
session.createQueue(new QueueConfiguration(ADDRESS));
ClientProducer producer = session.createProducer(ADDRESS);
Message clientFile = createLargeClientMessageStreaming(session, messageSize, true);
// Send large message which should be dropped and deleted from the filesystem
producer.send(clientFile);
validateLargeMessageComplete(server);
sf.close();
server.stop();
server = createServer(true, isNetty(), storeType);
server.start();
sf = addSessionFactory(createSessionFactory(locator));
session = addClientSession(sf.createSession(false, true, false));
ClientConsumer consumer = session.createConsumer(ADDRESS);
session.start();
ClientMessage message = consumer.receiveImmediate();
Assert.assertNotNull(message);
for (int i = 0; i < messageSize; i++) {
assertEquals("position = " + i, getSamplebyte(i), message.getBodyBuffer().readByte());
}
message.acknowledge();
validateNoFilesOnLargeDir();
}
protected void validateLargeMessageComplete(ActiveMQServer server) throws Exception {
Queue queue = server.locateQueue(ADDRESS);
Wait.assertEquals(1, queue::getMessageCount);
LinkedListIterator<MessageReference> browserIterator = queue.browserIterator();
while (browserIterator.hasNext()) {
MessageReference ref = browserIterator.next();
Message message = ref.getMessage();
Assert.assertNotNull(message);
Assert.assertTrue(message instanceof LargeServerMessage);
Assert.assertFalse(((LargeServerMessage)message).hasPendingRecord());
}
browserIterator.close();
}
@Test
public void testDeleteOnDrop() throws Exception {
fillAddress();

View File

@ -286,6 +286,11 @@ public class SendAckFailTest extends SpawnedTestBase {
manager.start();
}
@Override
public LargeServerMessage largeMessageCreated(long id, LargeServerMessage largeMessage) throws Exception {
return manager.largeMessageCreated(id, largeMessage);
}
@Override
public void stop() throws Exception {
manager.stop();