ARTEMIS-3105 large message file not closed on backup side
This commit is contained in:
parent
e65eff0f24
commit
9c5ec1b07c
|
@ -150,7 +150,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
|
|||
}
|
||||
|
||||
public void closeLargeMessage() throws Exception {
|
||||
largeBody.releaseResources(false);
|
||||
largeBody.releaseResources(false, true);
|
||||
parsingData.freeDirectBuffer();
|
||||
parsingData = null;
|
||||
}
|
||||
|
@ -443,8 +443,8 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
|
|||
}
|
||||
|
||||
@Override
|
||||
public void releaseResources(boolean sync) {
|
||||
largeBody.releaseResources(sync);
|
||||
public void releaseResources(boolean sync, boolean sendEvent) {
|
||||
largeBody.releaseResources(sync, sendEvent);
|
||||
|
||||
}
|
||||
|
||||
|
@ -526,7 +526,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
|
|||
}
|
||||
|
||||
largeBody.copyInto(copy, bufferNewHeader, place.intValue());
|
||||
copy.releaseResources(true);
|
||||
copy.releaseResources(true, true);
|
||||
return copy;
|
||||
|
||||
} catch (Exception e) {
|
||||
|
|
|
@ -380,7 +380,7 @@ public class StompSession implements SessionCallback {
|
|||
|
||||
largeMessage.addBytes(bytes);
|
||||
|
||||
largeMessage.releaseResources(true);
|
||||
largeMessage.releaseResources(true, true);
|
||||
|
||||
largeMessage.toMessage().putLongProperty(Message.HDR_LARGE_BODY_SIZE, bytes.length);
|
||||
|
||||
|
|
|
@ -272,6 +272,8 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
|
|||
*/
|
||||
SequentialFile createFileForLargeMessage(long messageID, LargeMessageExtension extension);
|
||||
|
||||
void largeMessageClosed(LargeServerMessage largeServerMessage) throws ActiveMQException;
|
||||
|
||||
void deleteLargeMessageBody(LargeServerMessage largeServerMessage) throws ActiveMQException;
|
||||
|
||||
default SequentialFile createFileForLargeMessage(long messageID, boolean durable) {
|
||||
|
|
|
@ -449,6 +449,18 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void largeMessageClosed(LargeServerMessage largeServerMessage) throws ActiveMQException {
|
||||
readLock();
|
||||
try {
|
||||
if (isReplicated()) {
|
||||
replicator.largeMessageClosed(largeServerMessage.toMessage().getMessageID(), JournalStorageManager.this);
|
||||
}
|
||||
} finally {
|
||||
readUnLock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteLargeMessageBody(final LargeServerMessage largeServerMessage) throws ActiveMQException {
|
||||
synchronized (largeServerMessage) {
|
||||
|
|
|
@ -116,7 +116,7 @@ public class LargeBody {
|
|||
public synchronized void deleteFile() {
|
||||
try {
|
||||
validateFile();
|
||||
releaseResources(false);
|
||||
releaseResources(false, false);
|
||||
storageManager.deleteLargeMessageBody(message);
|
||||
} catch (Exception e) {
|
||||
storageManager.criticalError(e);
|
||||
|
@ -303,13 +303,20 @@ public class LargeBody {
|
|||
}
|
||||
}
|
||||
|
||||
public synchronized void releaseResources(boolean sync) {
|
||||
/**
|
||||
* sendEvent means it's a close happening from end of write largemessage.
|
||||
* While reading the largemessage we don't need (and shouldn't inform the backup
|
||||
*/
|
||||
public synchronized void releaseResources(boolean sync, boolean sendEvent) {
|
||||
if (file != null && file.isOpen()) {
|
||||
try {
|
||||
if (sync) {
|
||||
file.sync();
|
||||
}
|
||||
file.close(false, false);
|
||||
if (sendEvent) {
|
||||
storageManager.largeMessageClosed(message);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.largeMessageErrorReleasingResources(e);
|
||||
}
|
||||
|
|
|
@ -63,7 +63,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
|
|||
ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
|
||||
final int readableBytes = buffer.readableBytes();
|
||||
lsm.addBytes(buffer);
|
||||
lsm.releaseResources(true);
|
||||
lsm.releaseResources(true, true);
|
||||
lsm.toMessage().putLongProperty(Message.HDR_LARGE_BODY_SIZE, readableBytes);
|
||||
return lsm.toMessage();
|
||||
}
|
||||
|
@ -254,9 +254,9 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
|
|||
}
|
||||
|
||||
@Override
|
||||
public void releaseResources(boolean sync) {
|
||||
public void releaseResources(boolean sync, boolean sendEvent) {
|
||||
synchronized (largeBody) {
|
||||
largeBody.releaseResources(sync);
|
||||
largeBody.releaseResources(sync, sendEvent);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -293,7 +293,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
|
|||
try {
|
||||
LargeServerMessage newMessage = storageManager.createLargeMessage(newID, this);
|
||||
largeBody.copyInto(newMessage);
|
||||
newMessage.releaseResources(true);
|
||||
newMessage.releaseResources(true, true);
|
||||
return newMessage.toMessage();
|
||||
|
||||
} catch (Exception e) {
|
||||
|
|
|
@ -96,11 +96,11 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage {
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized void releaseResources(boolean sync) {
|
||||
public synchronized void releaseResources(boolean sync, boolean sendEvent) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("release resources called on " + mainLM, new Exception("trace"));
|
||||
}
|
||||
mainLM.releaseResources(sync);
|
||||
mainLM.releaseResources(sync, sendEvent);
|
||||
if (appendFile != null && appendFile.isOpen()) {
|
||||
try {
|
||||
appendFile.close();
|
||||
|
|
|
@ -38,7 +38,7 @@ class NullStorageLargeServerMessage extends CoreMessage implements CoreLargeServ
|
|||
}
|
||||
|
||||
@Override
|
||||
public void releaseResources(boolean sync) {
|
||||
public void releaseResources(boolean sync, boolean sendEvent) {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -607,6 +607,11 @@ public class NullStorageManager implements StorageManager {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void largeMessageClosed(LargeServerMessage largeServerMessage) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean addToPage(PagingStore store,
|
||||
Message msg,
|
||||
|
|
|
@ -1035,7 +1035,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
|||
currentLargeMessage.addBytes(body);
|
||||
|
||||
if (!continues) {
|
||||
currentLargeMessage.releaseResources(true);
|
||||
currentLargeMessage.releaseResources(true, true);
|
||||
|
||||
if (messageBodySize >= 0) {
|
||||
currentLargeMessage.toMessage().putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
|
||||
|
|
|
@ -24,31 +24,38 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl {
|
|||
|
||||
long messageId;
|
||||
long pendingRecordId;
|
||||
/**
|
||||
* True = delete file, False = close file
|
||||
*/
|
||||
private boolean isDelete;
|
||||
|
||||
public ReplicationLargeMessageEndMessage() {
|
||||
super(PacketImpl.REPLICATION_LARGE_MESSAGE_END);
|
||||
}
|
||||
|
||||
public ReplicationLargeMessageEndMessage(final long messageId, final long pendingRecordId) {
|
||||
public ReplicationLargeMessageEndMessage(final long messageId, final long pendingRecordId, final boolean isDelete) {
|
||||
this();
|
||||
this.messageId = messageId;
|
||||
//we use negative value to indicate that this id is pre-generated by live node
|
||||
//so that it won't be generated at backup.
|
||||
//see https://issues.apache.org/jira/browse/ARTEMIS-1221
|
||||
this.pendingRecordId = -pendingRecordId;
|
||||
this.isDelete = isDelete;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int expectedEncodeSize() {
|
||||
return PACKET_HEADERS_SIZE +
|
||||
DataConstants.SIZE_LONG + // buffer.writeLong(messageId)
|
||||
DataConstants.SIZE_LONG; // buffer.writeLong(pendingRecordId);
|
||||
DataConstants.SIZE_LONG + // buffer.writeLong(pendingRecordId);
|
||||
DataConstants.SIZE_BOOLEAN; // buffer.writeBoolean(isDelete);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encodeRest(final ActiveMQBuffer buffer) {
|
||||
buffer.writeLong(messageId);
|
||||
buffer.writeLong(pendingRecordId);
|
||||
buffer.writeBoolean(isDelete);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -57,6 +64,9 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl {
|
|||
if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
|
||||
pendingRecordId = buffer.readLong();
|
||||
}
|
||||
if (buffer.readableBytes() >= DataConstants.SIZE_BOOLEAN) {
|
||||
isDelete = buffer.readBoolean();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -70,6 +80,7 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl {
|
|||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = super.hashCode();
|
||||
result = prime * result + (isDelete ? 1231 : 1237);
|
||||
result = prime * result + (int) (messageId ^ (messageId >>> 32));
|
||||
return result;
|
||||
}
|
||||
|
@ -77,7 +88,7 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl {
|
|||
@Override
|
||||
public String toString() {
|
||||
return "ReplicationLargeMessageEndMessage{" +
|
||||
"messageId=" + messageId +
|
||||
"messageId=" + messageId + ", isDelete=" + isDelete +
|
||||
'}';
|
||||
}
|
||||
|
||||
|
@ -92,10 +103,19 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl {
|
|||
ReplicationLargeMessageEndMessage other = (ReplicationLargeMessageEndMessage) obj;
|
||||
if (messageId != other.messageId)
|
||||
return false;
|
||||
if (isDelete != other.isDelete)
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
public long getPendingRecordId() {
|
||||
return pendingRecordId;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the isDelete
|
||||
*/
|
||||
public boolean isDelete() {
|
||||
return isDelete;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,9 +38,9 @@ public interface ReplicatedLargeMessage {
|
|||
Message setMessageID(long id);
|
||||
|
||||
/**
|
||||
* @see org.apache.activemq.artemis.core.server.LargeServerMessage#releaseResources(boolean)
|
||||
* @see org.apache.activemq.artemis.core.server.LargeServerMessage#releaseResources(boolean,boolean)
|
||||
*/
|
||||
void releaseResources(boolean sync);
|
||||
void releaseResources(boolean sync, boolean sendEvent);
|
||||
|
||||
/**
|
||||
* @see org.apache.activemq.artemis.core.server.LargeServerMessage#deleteFile()
|
||||
|
|
|
@ -346,7 +346,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
|||
}
|
||||
|
||||
for (ReplicatedLargeMessage largeMessage : largeMessages.values()) {
|
||||
largeMessage.releaseResources(true);
|
||||
largeMessage.releaseResources(true, false);
|
||||
}
|
||||
largeMessages.clear();
|
||||
|
||||
|
@ -615,22 +615,29 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
|||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("handleLargeMessageEnd on " + packet.getMessageId());
|
||||
}
|
||||
final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), true, false);
|
||||
final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), packet.isDelete(), false);
|
||||
if (message != null) {
|
||||
message.setPendingRecordID(packet.getPendingRecordId());
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Deleting LargeMessage " + packet.getMessageId() + " on the executor @ handleLargeMessageEnd");
|
||||
}
|
||||
message.deleteFile();
|
||||
} catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.errorDeletingLargeMessage(e, packet.getMessageId());
|
||||
}
|
||||
if (!packet.isDelete()) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Closing LargeMessage " + packet.getMessageId() + " on the executor @ handleLargeMessageEnd");
|
||||
}
|
||||
});
|
||||
message.releaseResources(true, false);
|
||||
} else {
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Deleting LargeMessage " + packet.getMessageId() + " on the executor @ handleLargeMessageEnd");
|
||||
}
|
||||
message.deleteFile();
|
||||
} catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.errorDeletingLargeMessage(e, packet.getMessageId());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -903,4 +910,11 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
|||
public void setExecutor(Executor executor2) {
|
||||
this.executor = executor2;
|
||||
}
|
||||
|
||||
/**
|
||||
* This is for tests basically, do not use it as its API is not guaranteed for future usage.
|
||||
*/
|
||||
public ConcurrentMap<Long, ReplicatedLargeMessage> getLargeMessages() {
|
||||
return largeMessages;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -256,7 +256,13 @@ public final class ReplicationManager implements ActiveMQComponent {
|
|||
public void largeMessageDelete(final Long messageId, JournalStorageManager storageManager) {
|
||||
if (enabled) {
|
||||
long pendingRecordID = storageManager.generateID();
|
||||
sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId, pendingRecordID));
|
||||
sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId, pendingRecordID, true));
|
||||
}
|
||||
}
|
||||
|
||||
public void largeMessageClosed(final Long messageId, JournalStorageManager storageManager) {
|
||||
if (enabled) {
|
||||
sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId, -1, false));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -52,7 +52,7 @@ public interface LargeServerMessage extends ReplicatedLargeMessage {
|
|||
* Close the files if opened
|
||||
*/
|
||||
@Override
|
||||
void releaseResources(boolean sync);
|
||||
void releaseResources(boolean sync, boolean sendEvent);
|
||||
|
||||
@Override
|
||||
void deleteFile() throws Exception;
|
||||
|
|
|
@ -1418,7 +1418,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
|||
context = null;
|
||||
}
|
||||
|
||||
largeMessage.releaseResources(false);
|
||||
largeMessage.releaseResources(false, false);
|
||||
|
||||
largeMessage.toMessage().usageDown();
|
||||
|
||||
|
|
|
@ -251,6 +251,11 @@ public class TransactionImplTest extends ActiveMQTestBase {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void largeMessageClosed(LargeServerMessage largeServerMessage) throws ActiveMQException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addBytesToLargeMessage(SequentialFile file, long messageId, ActiveMQBuffer bytes) throws Exception {
|
||||
|
||||
|
|
|
@ -71,7 +71,7 @@ public class AmqpPageTest extends PageTest {
|
|||
} else {
|
||||
final AMQPLargeMessage message = createLargeMessage(storageManager, address, msgID, content);
|
||||
page.write(new PagedMessageImpl(message, new long[0]));
|
||||
message.releaseResources(false);
|
||||
message.releaseResources(false, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2495,7 +2495,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
// The server would be doing this
|
||||
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, largeMessageSize);
|
||||
|
||||
fileMessage.releaseResources(false);
|
||||
fileMessage.releaseResources(false, false);
|
||||
|
||||
Assert.assertEquals(largeMessageSize, fileMessage.getBodyBufferSize());
|
||||
}
|
||||
|
@ -2522,7 +2522,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
// The server would be doing this
|
||||
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, largeMessageSize);
|
||||
|
||||
fileMessage.releaseResources(false);
|
||||
fileMessage.releaseResources(false, false);
|
||||
|
||||
session.createQueue(new QueueConfiguration(ADDRESS));
|
||||
|
||||
|
@ -2687,7 +2687,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
fileMessage.addBytes(new byte[]{ActiveMQTestBase.getSamplebyte(i)});
|
||||
}
|
||||
|
||||
fileMessage.releaseResources(false);
|
||||
fileMessage.releaseResources(false, false);
|
||||
|
||||
session.createQueue(new QueueConfiguration(ADDRESS));
|
||||
|
||||
|
|
|
@ -862,6 +862,11 @@ public class SendAckFailTest extends SpawnedTestBase {
|
|||
manager.deleteLargeMessageBody(largeServerMessage);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void largeMessageClosed(LargeServerMessage largeServerMessage) throws ActiveMQException {
|
||||
manager.largeMessageClosed(largeServerMessage);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addBytesToLargeMessage(SequentialFile file, long messageId, ActiveMQBuffer bytes) throws Exception {
|
||||
manager.addBytesToLargeMessage(file, messageId, bytes);
|
||||
|
|
|
@ -116,7 +116,7 @@ public class ServerLargeMessageTest extends ActiveMQTestBase {
|
|||
// The server would be doing this
|
||||
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
|
||||
|
||||
fileMessage.releaseResources(false);
|
||||
fileMessage.releaseResources(false, true);
|
||||
|
||||
session.createQueue(new QueueConfiguration("A").setRoutingType(RoutingType.ANYCAST));
|
||||
|
||||
|
@ -339,7 +339,7 @@ public class ServerLargeMessageTest extends ActiveMQTestBase {
|
|||
largeServerMessage.setMessageID(1234);
|
||||
largeServerMessage.addBytes(new byte[0]);
|
||||
assertTrue(open.get());
|
||||
largeServerMessage.releaseResources(true);
|
||||
largeServerMessage.releaseResources(true, true);
|
||||
assertTrue(sync.get());
|
||||
}
|
||||
|
||||
|
|
|
@ -472,7 +472,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
|||
|
||||
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
|
||||
|
||||
fileMessage.releaseResources(false);
|
||||
fileMessage.releaseResources(false, true);
|
||||
|
||||
session.createQueue(new QueueConfiguration("A"));
|
||||
|
||||
|
@ -544,7 +544,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
|||
|
||||
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
|
||||
|
||||
fileMessage.releaseResources(false);
|
||||
fileMessage.releaseResources(false, true);
|
||||
|
||||
session.createQueue(new QueueConfiguration("A"));
|
||||
|
||||
|
@ -888,7 +888,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
|
|||
|
||||
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
|
||||
|
||||
fileMessage.releaseResources(false);
|
||||
fileMessage.releaseResources(false, true);
|
||||
|
||||
producer.send(fileMessage);
|
||||
|
||||
|
|
|
@ -71,6 +71,7 @@ import org.apache.activemq.artemis.core.persistence.OperationContext;
|
|||
import org.apache.activemq.artemis.core.persistence.Persister;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
|
||||
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
||||
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||
|
@ -78,6 +79,7 @@ import org.apache.activemq.artemis.core.replication.ReplicatedJournal;
|
|||
import org.apache.activemq.artemis.core.replication.ReplicationManager;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ClusterController;
|
||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
||||
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
|
||||
|
@ -539,6 +541,29 @@ public final class ReplicationTest extends ActiveMQTestBase {
|
|||
Assert.assertEquals(0, manager.getActiveTokens().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplicationLargeMessageFileClose() throws Exception {
|
||||
setupServer(true);
|
||||
|
||||
JournalStorageManager storage = getStorage();
|
||||
|
||||
manager = liveServer.getReplicationManager();
|
||||
waitForComponent(manager);
|
||||
|
||||
CoreMessage msg = new CoreMessage().initBuffer(1024).setMessageID(1);
|
||||
LargeServerMessage largeMsg = liveServer.getStorageManager().createLargeMessage(500, msg);
|
||||
largeMsg.addBytes(new byte[1024]);
|
||||
largeMsg.releaseResources(true, true);
|
||||
|
||||
blockOnReplication(storage, manager);
|
||||
|
||||
LargeServerMessageImpl message1 = (LargeServerMessageImpl) backupServer.getReplicationEndpoint().getLargeMessages().get(Long.valueOf(500));
|
||||
|
||||
Assert.assertNotNull(message1);
|
||||
Assert.assertFalse(largeMsg.getAppendFile().isOpen());
|
||||
Assert.assertFalse(message1.getAppendFile().isOpen());
|
||||
}
|
||||
|
||||
class FakeData implements EncodingSupport {
|
||||
|
||||
@Override
|
||||
|
|
|
@ -144,7 +144,7 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
|
|||
|
||||
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
|
||||
|
||||
fileMessage.releaseResources(false);
|
||||
fileMessage.releaseResources(false, false);
|
||||
|
||||
message = fileMessage;
|
||||
} else {
|
||||
|
|
|
@ -293,7 +293,7 @@ public class PageTest extends ActiveMQTestBase {
|
|||
msg.addBytes(content);
|
||||
msg.setAddress(address);
|
||||
page.write(new PagedMessageImpl(msg, new long[0]));
|
||||
msg.releaseResources(false);
|
||||
msg.releaseResources(false, false);
|
||||
} else {
|
||||
ICoreMessage msg = new CoreMessage().initBuffer(100);
|
||||
msg.setMessageID(msgID);
|
||||
|
|
Loading…
Reference in New Issue