ARTEMIS-4669 Clarify Storage Manager usage around large messages

This commit is contained in:
Clebert Suconic 2024-03-05 16:15:53 -05:00 committed by clebertsuconic
parent 661a4e6fdc
commit 5ce70f9e37
20 changed files with 37 additions and 67 deletions

View File

@ -266,11 +266,6 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
} }
} }
@Override
public void validateFile() throws ActiveMQException {
largeBody.validateFile();
}
public void setFileDurable(boolean value) { public void setFileDurable(boolean value) {
this.fileDurable = value; this.fileDurable = value;
} }

View File

@ -93,7 +93,7 @@ public class AMQPLargeMessageReader implements MessageReader {
sessionSPI.getStorageManager()); sessionSPI.getStorageManager());
currentMessage.parseHeader(dataBuffer); currentMessage.parseHeader(dataBuffer);
sessionSPI.getStorageManager().largeMessageCreated(id, currentMessage); sessionSPI.getStorageManager().onLargeMessageCreate(id, currentMessage);
} }
currentMessage.addBytes(dataBuffer); currentMessage.addBytes(dataBuffer);

View File

@ -295,7 +295,7 @@ public class AMQPTunneledCoreLargeMessageReader implements MessageReader {
coreMessage.decodeHeadersAndProperties(coreHeadersBuffer); coreMessage.decodeHeadersAndProperties(coreHeadersBuffer);
coreLargeMessage = sessionSPI.getStorageManager().createLargeMessage(id, coreMessage); coreLargeMessage = sessionSPI.getStorageManager().createCoreLargeMessage(id, coreMessage);
coreHeadersBuffer = null; // Buffer can be discarded once the decode is done coreHeadersBuffer = null; // Buffer can be discarded once the decode is done
state = State.BODY_SECTION_PENDING; state = State.BODY_SECTION_PENDING;
} catch (ActiveMQException ex) { } catch (ActiveMQException ex) {

View File

@ -420,7 +420,7 @@ public class StompSession implements SessionCallback {
StorageManager storageManager = ((ServerSessionImpl) session).getStorageManager(); StorageManager storageManager = ((ServerSessionImpl) session).getStorageManager();
long id = storageManager.generateID(); long id = storageManager.generateID();
LargeServerMessage largeMessage = storageManager.createLargeMessage(id, message); LargeServerMessage largeMessage = storageManager.createCoreLargeMessage(id, message);
ActiveMQBuffer body = message.getReadOnlyBodyBuffer(); ActiveMQBuffer body = message.getReadOnlyBodyBuffer();
byte[] bytes = new byte[body.readableBytes()]; byte[] bytes = new byte[body.readableBytes()];

View File

@ -161,7 +161,7 @@ public class PagedMessageImpl implements PagedMessage {
@Override @Override
public void initMessage(StorageManager storage) { public void initMessage(StorageManager storage) {
if (largeMessageLazyData != null) { if (largeMessageLazyData != null) {
LargeServerMessage lgMessage = storage.createLargeMessage(); LargeServerMessage lgMessage = storage.createCoreLargeMessage();
ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(largeMessageLazyData); ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(largeMessageLazyData);
lgMessage = LargeMessagePersister.getInstance().decode(buffer, lgMessage, null); lgMessage = LargeMessagePersister.getInstance().decode(buffer, lgMessage, null);
@ -213,7 +213,7 @@ public class PagedMessageImpl implements PagedMessage {
largeMessageLazyData = new byte[largeMessageHeaderSize]; largeMessageLazyData = new byte[largeMessageHeaderSize];
buffer.readBytes(largeMessageLazyData); buffer.readBytes(largeMessageLazyData);
} else { } else {
this.message = storageManager.createLargeMessage().toMessage(); this.message = storageManager.createCoreLargeMessage().toMessage();
LargeMessagePersister.getInstance().decode(buffer, (LargeServerMessage) message, null); LargeMessagePersister.getInstance().decode(buffer, (LargeServerMessage) message, null);
((LargeServerMessage) message).setStorageManager(storageManager); ((LargeServerMessage) message).setStorageManager(storageManager);
((LargeServerMessage) message).toMessage().usageUp(); ((LargeServerMessage) message).toMessage().usageUp();

View File

@ -221,10 +221,10 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
void deleteDuplicateIDTransactional(long txID, long recordID) throws Exception; void deleteDuplicateIDTransactional(long txID, long recordID) throws Exception;
LargeServerMessage createLargeMessage(); LargeServerMessage createCoreLargeMessage();
/** /**
* Creates a new LargeMessage with the given id. * Creates a new LargeServerMessage for the core Protocol with the given id.
* *
* @param id * @param id
* @param message This is a temporary message that holds the parsed properties. The remoting * @param message This is a temporary message that holds the parsed properties. The remoting
@ -232,9 +232,10 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
* @return a large message object * @return a large message object
* @throws Exception * @throws Exception
*/ */
LargeServerMessage createLargeMessage(long id, Message message) throws Exception; LargeServerMessage createCoreLargeMessage(long id, Message message) throws Exception;
LargeServerMessage largeMessageCreated(long id, LargeServerMessage largeMessage) throws Exception; /** Other protocols may inform the storage manager when a large message was created. */
LargeServerMessage onLargeMessageCreate(long id, LargeServerMessage largeMessage) throws Exception;
enum LargeMessageExtension { enum LargeMessageExtension {
DURABLE(".msg"), TEMPORARY(".tmp"), SYNC(".sync"); DURABLE(".msg"), TEMPORARY(".tmp"), SYNC(".sync");

View File

@ -353,7 +353,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
* @throws Exception * @throws Exception
*/ */
protected LargeServerMessage parseLargeMessage(final ActiveMQBuffer buff) throws Exception { protected LargeServerMessage parseLargeMessage(final ActiveMQBuffer buff) throws Exception {
LargeServerMessage largeMessage = createLargeMessage(); LargeServerMessage largeMessage = createCoreLargeMessage();
LargeMessagePersister.getInstance().decode(buff, largeMessage, null); LargeMessagePersister.getInstance().decode(buff, largeMessage, null);
@ -501,12 +501,12 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
} }
@Override @Override
public LargeServerMessage createLargeMessage() { public LargeServerMessage createCoreLargeMessage() {
return new LargeServerMessageImpl(this); return new LargeServerMessageImpl(this);
} }
@Override @Override
public LargeServerMessage createLargeMessage(final long id, final Message message) throws Exception { public LargeServerMessage createCoreLargeMessage(final long id, final Message message) throws Exception {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Initializing large message {}", id, new Exception("trace")); logger.trace("Initializing large message {}", id, new Exception("trace"));
} }
@ -515,16 +515,16 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
replicator.largeMessageBegin(id); replicator.largeMessageBegin(id);
} }
LargeServerMessageImpl largeMessage = (LargeServerMessageImpl) createLargeMessage(); LargeServerMessageImpl largeMessage = (LargeServerMessageImpl) createCoreLargeMessage();
largeMessage.moveHeadersAndProperties(message); largeMessage.moveHeadersAndProperties(message);
return largeMessageCreated(id, largeMessage); return onLargeMessageCreate(id, largeMessage);
} }
} }
@Override @Override
public LargeServerMessage largeMessageCreated(long id, LargeServerMessage largeMessage) throws Exception { public LargeServerMessage onLargeMessageCreate(long id, LargeServerMessage largeMessage) throws Exception {
largeMessage.setMessageID(id); largeMessage.setMessageID(id);
// Check durable large massage size before to allocate resources if it can't be stored // Check durable large massage size before to allocate resources if it can't be stored
@ -545,11 +545,6 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
} }
} }
// We do this here to avoid a case where the replication gets a list without this file
// to avoid a race
largeMessage.validateFile();
return largeMessage; return largeMessage;
} }

View File

@ -158,7 +158,7 @@ public class LargeBody {
bodySize += readableBytes; bodySize += readableBytes;
} }
public synchronized void validateFile() throws ActiveMQException { private void validateFile() throws ActiveMQException {
this.ensureFileExists(true); this.ensureFileExists(true);
} }

View File

@ -76,7 +76,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("asLargeMessage create largeMessage with id={}", id); logger.debug("asLargeMessage create largeMessage with id={}", id);
} }
LargeServerMessage lsm = storageManager.createLargeMessage(id, coreMessage); LargeServerMessage lsm = storageManager.createCoreLargeMessage(id, coreMessage);
ActiveMQBuffer messageBodyBuffer = coreMessage.getReadOnlyBodyBuffer(); ActiveMQBuffer messageBodyBuffer = coreMessage.getReadOnlyBodyBuffer();
final int readableBytes = messageBodyBuffer.readableBytes(); final int readableBytes = messageBodyBuffer.readableBytes();
@ -327,7 +327,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Copy large message id={} as newID={}", this.getMessageID(), newID); logger.debug("Copy large message id={} as newID={}", this.getMessageID(), newID);
} }
LargeServerMessage newMessage = storageManager.createLargeMessage(newID, this); LargeServerMessage newMessage = storageManager.createCoreLargeMessage(newID, this);
largeBody.copyInto(newMessage); largeBody.copyInto(newMessage);
newMessage.releaseResources(true, true); newMessage.releaseResources(true, true);
return newMessage.toMessage(); return newMessage.toMessage();
@ -361,17 +361,4 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
return "LargeServerMessage[messageID=" + messageID + "]"; return "LargeServerMessage[messageID=" + messageID + "]";
} }
} }
@Override
public void validateFile() throws ActiveMQException {
this.ensureFileExists(true);
}
public void ensureFileExists(boolean toOpen) throws ActiveMQException {
synchronized (largeBody) {
largeBody.ensureFileExists(toOpen);
}
}
} }

View File

@ -45,7 +45,7 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage {
* @param storageManager * @param storageManager
*/ */
public LargeServerMessageInSync(StorageManager storageManager) { public LargeServerMessageInSync(StorageManager storageManager) {
mainLM = storageManager.createLargeMessage(); mainLM = storageManager.createCoreLargeMessage();
this.storageManager = storageManager; this.storageManager = storageManager;
} }

View File

@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.persistence.impl.nullpm;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFile;
@ -67,11 +66,6 @@ class NullStorageLargeServerMessage extends CoreMessage implements CoreLargeServ
buffer.writeBytes(bytes); buffer.writeBytes(bytes);
} }
@Override
public void validateFile() throws ActiveMQException {
}
@Override @Override
public void setStorageManager(StorageManager storageManager) { public void setStorageManager(StorageManager storageManager) {
this.storageManager = storageManager; this.storageManager = storageManager;

View File

@ -315,12 +315,12 @@ public class NullStorageManager implements StorageManager {
} }
@Override @Override
public LargeServerMessage createLargeMessage() { public LargeServerMessage createCoreLargeMessage() {
return new NullStorageLargeServerMessage(); return new NullStorageLargeServerMessage();
} }
@Override @Override
public LargeServerMessage createLargeMessage(final long id, final Message message) { public LargeServerMessage createCoreLargeMessage(final long id, final Message message) {
NullStorageLargeServerMessage largeMessage = new NullStorageLargeServerMessage(); NullStorageLargeServerMessage largeMessage = new NullStorageLargeServerMessage();
largeMessage.moveHeadersAndProperties(message); largeMessage.moveHeadersAndProperties(message);
@ -331,7 +331,7 @@ public class NullStorageManager implements StorageManager {
} }
@Override @Override
public LargeServerMessage largeMessageCreated(long id, LargeServerMessage largeMessage) throws Exception { public LargeServerMessage onLargeMessageCreate(long id, LargeServerMessage largeMessage) throws Exception {
return null; return null;
} }

View File

@ -1083,7 +1083,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("initializing large message {}", id); logger.debug("initializing large message {}", id);
} }
LargeServerMessage largeMsg = storageManager.createLargeMessage(id, message); LargeServerMessage largeMsg = storageManager.createCoreLargeMessage(id, message);
logger.trace("sendLarge::{}", largeMsg); logger.trace("sendLarge::{}", largeMsg);

View File

@ -683,7 +683,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
if (liveToBackupSync) { if (liveToBackupSync) {
msg = new LargeServerMessageInSync(storageManager); msg = new LargeServerMessageInSync(storageManager);
} else { } else {
msg = storageManager.createLargeMessage(); msg = storageManager.createCoreLargeMessage();
} }
msg.setDurable(true); msg.setDurable(true);

View File

@ -65,6 +65,4 @@ public interface LargeServerMessage extends ReplicatedLargeMessage {
LargeBody getLargeBody(); LargeBody getLargeBody();
void setStorageManager(StorageManager storageManager); void setStorageManager(StorageManager storageManager);
void validateFile() throws ActiveMQException;
} }

View File

@ -277,7 +277,7 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi
if (message instanceof ClientLargeMessageInternal) { if (message instanceof ClientLargeMessageInternal) {
final StorageManager storageManager = server.getStorageManager(); final StorageManager storageManager = server.getStorageManager();
LargeServerMessage lsm = storageManager.createLargeMessage(storageManager.generateID(), message); LargeServerMessage lsm = storageManager.createCoreLargeMessage(storageManager.generateID(), message);
LargeData largeData = null; LargeData largeData = null;
do { do {

View File

@ -3879,7 +3879,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
for (Pair<Long, Long> msgToDelete : pendingLargeMessages) { for (Pair<Long, Long> msgToDelete : pendingLargeMessages) {
ActiveMQServerLogger.LOGGER.deletingPendingMessage(msgToDelete); ActiveMQServerLogger.LOGGER.deletingPendingMessage(msgToDelete);
LargeServerMessage msg = storageManager.createLargeMessage(); LargeServerMessage msg = storageManager.createCoreLargeMessage();
msg.setMessageID(msgToDelete.getB()); msg.setMessageID(msgToDelete.getB());
msg.setDurable(true); msg.setDurable(true);
msg.deleteFile(); msg.deleteFile();

View File

@ -579,17 +579,17 @@ public class TransactionImplTest extends ServerTestBase {
} }
@Override @Override
public LargeServerMessage createLargeMessage() { public LargeServerMessage createCoreLargeMessage() {
return null; return null;
} }
@Override @Override
public LargeServerMessage createLargeMessage(long id, Message message) throws Exception { public LargeServerMessage createCoreLargeMessage(long id, Message message) throws Exception {
return null; return null;
} }
@Override @Override
public LargeServerMessage largeMessageCreated(long id, LargeServerMessage largeMessage) throws Exception { public LargeServerMessage onLargeMessageCreate(long id, LargeServerMessage largeMessage) throws Exception {
return null; return null;
} }

View File

@ -300,8 +300,8 @@ public class SendAckFailTest extends SpawnedTestBase {
} }
@Override @Override
public LargeServerMessage largeMessageCreated(long id, LargeServerMessage largeMessage) throws Exception { public LargeServerMessage onLargeMessageCreate(long id, LargeServerMessage largeMessage) throws Exception {
return manager.largeMessageCreated(id, largeMessage); return manager.onLargeMessageCreate(id, largeMessage);
} }
@Override @Override
@ -547,13 +547,13 @@ public class SendAckFailTest extends SpawnedTestBase {
} }
@Override @Override
public LargeServerMessage createLargeMessage() { public LargeServerMessage createCoreLargeMessage() {
return manager.createLargeMessage(); return manager.createCoreLargeMessage();
} }
@Override @Override
public LargeServerMessage createLargeMessage(long id, Message message) throws Exception { public LargeServerMessage createCoreLargeMessage(long id, Message message) throws Exception {
return manager.createLargeMessage(id, message); return manager.createCoreLargeMessage(id, message);
} }
@Override @Override

View File

@ -574,7 +574,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
waitForComponent(manager); waitForComponent(manager);
CoreMessage msg = new CoreMessage().initBuffer(1024).setMessageID(1); CoreMessage msg = new CoreMessage().initBuffer(1024).setMessageID(1);
LargeServerMessage largeMsg = liveServer.getStorageManager().createLargeMessage(500, msg); LargeServerMessage largeMsg = liveServer.getStorageManager().createCoreLargeMessage(500, msg);
largeMsg.addBytes(new byte[1024]); largeMsg.addBytes(new byte[1024]);
largeMsg.releaseResources(true, true); largeMsg.releaseResources(true, true);