This commit is contained in:
Clebert Suconic 2021-02-07 11:15:26 -05:00
commit 8862c116b6
25 changed files with 149 additions and 48 deletions

View File

@ -150,7 +150,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
}
public void closeLargeMessage() throws Exception {
largeBody.releaseResources(false);
largeBody.releaseResources(false, true);
parsingData.freeDirectBuffer();
parsingData = null;
}
@ -443,8 +443,8 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
}
@Override
public void releaseResources(boolean sync) {
largeBody.releaseResources(sync);
public void releaseResources(boolean sync, boolean sendEvent) {
largeBody.releaseResources(sync, sendEvent);
}
@ -526,7 +526,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
}
largeBody.copyInto(copy, bufferNewHeader, place.intValue());
copy.releaseResources(true);
copy.releaseResources(true, true);
return copy;
} catch (Exception e) {

View File

@ -380,7 +380,7 @@ public class StompSession implements SessionCallback {
largeMessage.addBytes(bytes);
largeMessage.releaseResources(true);
largeMessage.releaseResources(true, true);
largeMessage.toMessage().putLongProperty(Message.HDR_LARGE_BODY_SIZE, bytes.length);

View File

@ -272,6 +272,8 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
*/
SequentialFile createFileForLargeMessage(long messageID, LargeMessageExtension extension);
void largeMessageClosed(LargeServerMessage largeServerMessage) throws ActiveMQException;
void deleteLargeMessageBody(LargeServerMessage largeServerMessage) throws ActiveMQException;
default SequentialFile createFileForLargeMessage(long messageID, boolean durable) {

View File

@ -449,6 +449,18 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
}
}
@Override
public void largeMessageClosed(LargeServerMessage largeServerMessage) throws ActiveMQException {
readLock();
try {
if (isReplicated()) {
replicator.largeMessageClosed(largeServerMessage.toMessage().getMessageID(), JournalStorageManager.this);
}
} finally {
readUnLock();
}
}
@Override
public void deleteLargeMessageBody(final LargeServerMessage largeServerMessage) throws ActiveMQException {
synchronized (largeServerMessage) {

View File

@ -116,7 +116,7 @@ public class LargeBody {
public synchronized void deleteFile() {
try {
validateFile();
releaseResources(false);
releaseResources(false, false);
storageManager.deleteLargeMessageBody(message);
} catch (Exception e) {
storageManager.criticalError(e);
@ -303,13 +303,20 @@ public class LargeBody {
}
}
public synchronized void releaseResources(boolean sync) {
/**
* sendEvent means it's a close happening from end of write largemessage.
* While reading the largemessage we don't need (and shouldn't inform the backup
*/
public synchronized void releaseResources(boolean sync, boolean sendEvent) {
if (file != null && file.isOpen()) {
try {
if (sync) {
file.sync();
}
file.close(false, false);
if (sendEvent) {
storageManager.largeMessageClosed(message);
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.largeMessageErrorReleasingResources(e);
}

View File

@ -63,7 +63,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
final int readableBytes = buffer.readableBytes();
lsm.addBytes(buffer);
lsm.releaseResources(true);
lsm.releaseResources(true, true);
lsm.toMessage().putLongProperty(Message.HDR_LARGE_BODY_SIZE, readableBytes);
return lsm.toMessage();
}
@ -254,9 +254,9 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
}
@Override
public void releaseResources(boolean sync) {
public void releaseResources(boolean sync, boolean sendEvent) {
synchronized (largeBody) {
largeBody.releaseResources(sync);
largeBody.releaseResources(sync, sendEvent);
}
}
@ -293,7 +293,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
try {
LargeServerMessage newMessage = storageManager.createLargeMessage(newID, this);
largeBody.copyInto(newMessage);
newMessage.releaseResources(true);
newMessage.releaseResources(true, true);
return newMessage.toMessage();
} catch (Exception e) {

View File

@ -96,11 +96,11 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage {
}
@Override
public synchronized void releaseResources(boolean sync) {
public synchronized void releaseResources(boolean sync, boolean sendEvent) {
if (logger.isTraceEnabled()) {
logger.trace("release resources called on " + mainLM, new Exception("trace"));
}
mainLM.releaseResources(sync);
mainLM.releaseResources(sync, sendEvent);
if (appendFile != null && appendFile.isOpen()) {
try {
appendFile.close();

View File

@ -38,7 +38,7 @@ class NullStorageLargeServerMessage extends CoreMessage implements CoreLargeServ
}
@Override
public void releaseResources(boolean sync) {
public void releaseResources(boolean sync, boolean sendEvent) {
}
@Override

View File

@ -607,6 +607,11 @@ public class NullStorageManager implements StorageManager {
}
@Override
public void largeMessageClosed(LargeServerMessage largeServerMessage) throws ActiveMQException {
}
@Override
public boolean addToPage(PagingStore store,
Message msg,

View File

@ -1035,7 +1035,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
currentLargeMessage.addBytes(body);
if (!continues) {
currentLargeMessage.releaseResources(true);
currentLargeMessage.releaseResources(true, true);
if (messageBodySize >= 0) {
currentLargeMessage.toMessage().putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);

View File

@ -24,31 +24,38 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl {
long messageId;
long pendingRecordId;
/**
* True = delete file, False = close file
*/
private boolean isDelete;
public ReplicationLargeMessageEndMessage() {
super(PacketImpl.REPLICATION_LARGE_MESSAGE_END);
}
public ReplicationLargeMessageEndMessage(final long messageId, final long pendingRecordId) {
public ReplicationLargeMessageEndMessage(final long messageId, final long pendingRecordId, final boolean isDelete) {
this();
this.messageId = messageId;
//we use negative value to indicate that this id is pre-generated by live node
//so that it won't be generated at backup.
//see https://issues.apache.org/jira/browse/ARTEMIS-1221
this.pendingRecordId = -pendingRecordId;
this.isDelete = isDelete;
}
@Override
public int expectedEncodeSize() {
return PACKET_HEADERS_SIZE +
DataConstants.SIZE_LONG + // buffer.writeLong(messageId)
DataConstants.SIZE_LONG; // buffer.writeLong(pendingRecordId);
DataConstants.SIZE_LONG + // buffer.writeLong(pendingRecordId);
DataConstants.SIZE_BOOLEAN; // buffer.writeBoolean(isDelete);
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeLong(messageId);
buffer.writeLong(pendingRecordId);
buffer.writeBoolean(isDelete);
}
@Override
@ -57,6 +64,9 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl {
if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
pendingRecordId = buffer.readLong();
}
if (buffer.readableBytes() >= DataConstants.SIZE_BOOLEAN) {
isDelete = buffer.readBoolean();
}
}
/**
@ -70,6 +80,7 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl {
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + (isDelete ? 1231 : 1237);
result = prime * result + (int) (messageId ^ (messageId >>> 32));
return result;
}
@ -77,7 +88,7 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl {
@Override
public String toString() {
return "ReplicationLargeMessageEndMessage{" +
"messageId=" + messageId +
"messageId=" + messageId + ", isDelete=" + isDelete +
'}';
}
@ -92,10 +103,19 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl {
ReplicationLargeMessageEndMessage other = (ReplicationLargeMessageEndMessage) obj;
if (messageId != other.messageId)
return false;
if (isDelete != other.isDelete)
return false;
return true;
}
public long getPendingRecordId() {
return pendingRecordId;
}
/**
* @return the isDelete
*/
public boolean isDelete() {
return isDelete;
}
}

View File

@ -38,9 +38,9 @@ public interface ReplicatedLargeMessage {
Message setMessageID(long id);
/**
* @see org.apache.activemq.artemis.core.server.LargeServerMessage#releaseResources(boolean)
* @see org.apache.activemq.artemis.core.server.LargeServerMessage#releaseResources(boolean,boolean)
*/
void releaseResources(boolean sync);
void releaseResources(boolean sync, boolean sendEvent);
/**
* @see org.apache.activemq.artemis.core.server.LargeServerMessage#deleteFile()

View File

@ -346,7 +346,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
}
for (ReplicatedLargeMessage largeMessage : largeMessages.values()) {
largeMessage.releaseResources(true);
largeMessage.releaseResources(true, false);
}
largeMessages.clear();
@ -615,22 +615,29 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
if (logger.isTraceEnabled()) {
logger.trace("handleLargeMessageEnd on " + packet.getMessageId());
}
final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), true, false);
final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), packet.isDelete(), false);
if (message != null) {
message.setPendingRecordID(packet.getPendingRecordId());
executor.execute(new Runnable() {
@Override
public void run() {
try {
if (logger.isTraceEnabled()) {
logger.trace("Deleting LargeMessage " + packet.getMessageId() + " on the executor @ handleLargeMessageEnd");
}
message.deleteFile();
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorDeletingLargeMessage(e, packet.getMessageId());
}
if (!packet.isDelete()) {
if (logger.isTraceEnabled()) {
logger.trace("Closing LargeMessage " + packet.getMessageId() + " on the executor @ handleLargeMessageEnd");
}
});
message.releaseResources(true, false);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
try {
if (logger.isTraceEnabled()) {
logger.trace("Deleting LargeMessage " + packet.getMessageId() + " on the executor @ handleLargeMessageEnd");
}
message.deleteFile();
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorDeletingLargeMessage(e, packet.getMessageId());
}
}
});
}
}
}
@ -903,4 +910,11 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
public void setExecutor(Executor executor2) {
this.executor = executor2;
}
/**
* This is for tests basically, do not use it as its API is not guaranteed for future usage.
*/
public ConcurrentMap<Long, ReplicatedLargeMessage> getLargeMessages() {
return largeMessages;
}
}

View File

@ -256,7 +256,13 @@ public final class ReplicationManager implements ActiveMQComponent {
public void largeMessageDelete(final Long messageId, JournalStorageManager storageManager) {
if (enabled) {
long pendingRecordID = storageManager.generateID();
sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId, pendingRecordID));
sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId, pendingRecordID, true));
}
}
public void largeMessageClosed(final Long messageId, JournalStorageManager storageManager) {
if (enabled) {
sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId, -1, false));
}
}

View File

@ -52,7 +52,7 @@ public interface LargeServerMessage extends ReplicatedLargeMessage {
* Close the files if opened
*/
@Override
void releaseResources(boolean sync);
void releaseResources(boolean sync, boolean sendEvent);
@Override
void deleteFile() throws Exception;

View File

@ -1418,7 +1418,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
context = null;
}
largeMessage.releaseResources(false);
largeMessage.releaseResources(false, false);
largeMessage.toMessage().usageDown();

View File

@ -251,6 +251,11 @@ public class TransactionImplTest extends ActiveMQTestBase {
}
@Override
public void largeMessageClosed(LargeServerMessage largeServerMessage) throws ActiveMQException {
}
@Override
public void addBytesToLargeMessage(SequentialFile file, long messageId, ActiveMQBuffer bytes) throws Exception {

View File

@ -71,7 +71,7 @@ public class AmqpPageTest extends PageTest {
} else {
final AMQPLargeMessage message = createLargeMessage(storageManager, address, msgID, content);
page.write(new PagedMessageImpl(message, new long[0]));
message.releaseResources(false);
message.releaseResources(false, false);
}
}
}

View File

@ -2495,7 +2495,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
// The server would be doing this
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, largeMessageSize);
fileMessage.releaseResources(false);
fileMessage.releaseResources(false, false);
Assert.assertEquals(largeMessageSize, fileMessage.getBodyBufferSize());
}
@ -2522,7 +2522,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
// The server would be doing this
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, largeMessageSize);
fileMessage.releaseResources(false);
fileMessage.releaseResources(false, false);
session.createQueue(new QueueConfiguration(ADDRESS));
@ -2687,7 +2687,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
fileMessage.addBytes(new byte[]{ActiveMQTestBase.getSamplebyte(i)});
}
fileMessage.releaseResources(false);
fileMessage.releaseResources(false, false);
session.createQueue(new QueueConfiguration(ADDRESS));

View File

@ -862,6 +862,11 @@ public class SendAckFailTest extends SpawnedTestBase {
manager.deleteLargeMessageBody(largeServerMessage);
}
@Override
public void largeMessageClosed(LargeServerMessage largeServerMessage) throws ActiveMQException {
manager.largeMessageClosed(largeServerMessage);
}
@Override
public void addBytesToLargeMessage(SequentialFile file, long messageId, ActiveMQBuffer bytes) throws Exception {
manager.addBytesToLargeMessage(file, messageId, bytes);

View File

@ -116,7 +116,7 @@ public class ServerLargeMessageTest extends ActiveMQTestBase {
// The server would be doing this
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
fileMessage.releaseResources(false);
fileMessage.releaseResources(false, true);
session.createQueue(new QueueConfiguration("A").setRoutingType(RoutingType.ANYCAST));
@ -339,7 +339,7 @@ public class ServerLargeMessageTest extends ActiveMQTestBase {
largeServerMessage.setMessageID(1234);
largeServerMessage.addBytes(new byte[0]);
assertTrue(open.get());
largeServerMessage.releaseResources(true);
largeServerMessage.releaseResources(true, true);
assertTrue(sync.get());
}

View File

@ -472,7 +472,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
fileMessage.releaseResources(false);
fileMessage.releaseResources(false, true);
session.createQueue(new QueueConfiguration("A"));
@ -544,7 +544,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
fileMessage.releaseResources(false);
fileMessage.releaseResources(false, true);
session.createQueue(new QueueConfiguration("A"));
@ -888,7 +888,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
fileMessage.releaseResources(false);
fileMessage.releaseResources(false, true);
producer.send(fileMessage);

View File

@ -71,6 +71,7 @@ import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
@ -78,6 +79,7 @@ import org.apache.activemq.artemis.core.replication.ReplicatedJournal;
import org.apache.activemq.artemis.core.replication.ReplicationManager;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.cluster.ClusterController;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
@ -539,6 +541,29 @@ public final class ReplicationTest extends ActiveMQTestBase {
Assert.assertEquals(0, manager.getActiveTokens().size());
}
@Test
public void testReplicationLargeMessageFileClose() throws Exception {
setupServer(true);
JournalStorageManager storage = getStorage();
manager = liveServer.getReplicationManager();
waitForComponent(manager);
CoreMessage msg = new CoreMessage().initBuffer(1024).setMessageID(1);
LargeServerMessage largeMsg = liveServer.getStorageManager().createLargeMessage(500, msg);
largeMsg.addBytes(new byte[1024]);
largeMsg.releaseResources(true, true);
blockOnReplication(storage, manager);
LargeServerMessageImpl message1 = (LargeServerMessageImpl) backupServer.getReplicationEndpoint().getLargeMessages().get(Long.valueOf(500));
Assert.assertNotNull(message1);
Assert.assertFalse(largeMsg.getAppendFile().isOpen());
Assert.assertFalse(message1.getAppendFile().isOpen());
}
class FakeData implements EncodingSupport {
@Override

View File

@ -144,7 +144,7 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
fileMessage.releaseResources(false);
fileMessage.releaseResources(false, false);
message = fileMessage;
} else {

View File

@ -293,7 +293,7 @@ public class PageTest extends ActiveMQTestBase {
msg.addBytes(content);
msg.setAddress(address);
page.write(new PagedMessageImpl(msg, new long[0]));
msg.releaseResources(false);
msg.releaseResources(false, false);
} else {
ICoreMessage msg = new CoreMessage().initBuffer(100);
msg.setMessageID(msgID);