ARTEMIS-484 Large Message Loss on Initial replication

https://issues.apache.org/jira/browse/ARTEMIS-484

The File copy after the initial synchronization on large messages was broken.
On this commit we fix how the buffer is cleaned up before each read since
a previously unfinished body read would make the buffer dirty.

I'm keeping also lots of Traces I have added to debug this issue, so they will
be useful if anything like this happens again.
This commit is contained in:
Clebert Suconic 2016-04-14 13:24:46 -04:00
parent 3ecd8b7c44
commit d6c7e30594
14 changed files with 344 additions and 122 deletions

View File

@ -141,7 +141,7 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
@Override @Override
public String toString() { public String toString() {
return "ClientMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",userID=" + (getUserID() != null ? getUserID() : "null") + ",properties=" + properties.toString() + "]"; return getClass().getSimpleName() + "[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",userID=" + (getUserID() != null ? getUserID() : "null") + ",properties=" + properties.toString() + "]";
} }
@Override @Override

View File

@ -58,12 +58,12 @@ public interface ActiveMQJMSClientLogger extends BasicLogger {
void errorCallingExcListener(@Cause Exception e); void errorCallingExcListener(@Cause Exception e);
@LogMessage(level = Logger.Level.ERROR) @LogMessage(level = Logger.Level.ERROR)
@Message(id = 124002, value = "Queue Browser failed to create message", format = Message.Format.MESSAGE_FORMAT) @Message(id = 124002, value = "Queue Browser failed to create message {0}", format = Message.Format.MESSAGE_FORMAT)
void errorCreatingMessage(@Cause Throwable e); void errorCreatingMessage(String messageToString, @Cause Throwable e);
@LogMessage(level = Logger.Level.ERROR) @LogMessage(level = Logger.Level.ERROR)
@Message(id = 124003, value = "Message Listener failed to prepare message for receipt", format = Message.Format.MESSAGE_FORMAT) @Message(id = 124003, value = "Message Listener failed to prepare message for receipt, message={0}", format = Message.Format.MESSAGE_FORMAT)
void errorPreparingMessageForReceipt(@Cause Throwable e); void errorPreparingMessageForReceipt(String messagetoString, @Cause Throwable e);
@LogMessage(level = Logger.Level.ERROR) @LogMessage(level = Logger.Level.ERROR)
@Message(id = 124004, value = "Message Listener failed to process message", format = Message.Format.MESSAGE_FORMAT) @Message(id = 124004, value = "Message Listener failed to process message", format = Message.Format.MESSAGE_FORMAT)

View File

@ -32,6 +32,7 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.MessageHandler; import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants; import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
/** /**
* ActiveMQ Artemis implementation of a JMS MessageConsumer. * ActiveMQ Artemis implementation of a JMS MessageConsumer.
@ -211,7 +212,18 @@ public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscr
boolean needSession = ackMode == Session.CLIENT_ACKNOWLEDGE || ackMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE; boolean needSession = ackMode == Session.CLIENT_ACKNOWLEDGE || ackMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE;
jmsMsg = ActiveMQMessage.createMessage(coreMessage, needSession ? session.getCoreSession() : null); jmsMsg = ActiveMQMessage.createMessage(coreMessage, needSession ? session.getCoreSession() : null);
try {
jmsMsg.doBeforeReceive(); jmsMsg.doBeforeReceive();
}
catch (IndexOutOfBoundsException ioob) {
// In case this exception happen you will need to know where it happened.
// it has been a bug here in the past, and this was used to debug it.
// nothing better than keep it for future investigations in case it happened again
IndexOutOfBoundsException newIOOB = new IndexOutOfBoundsException(ioob.getMessage() + "@" + jmsMsg.getCoreMessage());
newIOOB.initCause(ioob);
ActiveMQClientLogger.LOGGER.warn(newIOOB.getMessage(), newIOOB);
throw ioob;
}
// We Do the ack after doBeforeRecive, as in the case of large messages, this may fail so we don't want messages redelivered // We Do the ack after doBeforeRecive, as in the case of large messages, this may fail so we don't want messages redelivered
// https://issues.jboss.org/browse/JBPAPP-6110 // https://issues.jboss.org/browse/JBPAPP-6110

View File

@ -141,7 +141,7 @@ public final class ActiveMQQueueBrowser implements QueueBrowser {
msg.doBeforeReceive(); msg.doBeforeReceive();
} }
catch (Exception e) { catch (Exception e) {
ActiveMQJMSClientLogger.LOGGER.errorCreatingMessage(e); ActiveMQJMSClientLogger.LOGGER.errorCreatingMessage(msg.getCoreMessage().toString(), e);
return null; return null;
} }

View File

@ -73,7 +73,7 @@ public class JMSMessageListenerWrapper implements MessageHandler {
msg.doBeforeReceive(); msg.doBeforeReceive();
} }
catch (Exception e) { catch (Exception e) {
ActiveMQJMSClientLogger.LOGGER.errorPreparingMessageForReceipt(e); ActiveMQJMSClientLogger.LOGGER.errorPreparingMessageForReceipt(msg.getCoreMessage().toString(), e);
return; return;
} }

View File

@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.io.DummyCallback;
import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
import org.apache.activemq.artemis.jlibaio.LibaioFile; import org.apache.activemq.artemis.jlibaio.LibaioFile;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.ReusableLatch;
public class AIOSequentialFile extends AbstractSequentialFile { public class AIOSequentialFile extends AbstractSequentialFile {
@ -202,7 +203,14 @@ public class AIOSequentialFile extends AbstractSequentialFile {
*/ */
@Override @Override
public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOCallback callback) { public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOCallback callback) {
try {
checkOpened(); checkOpened();
}
catch (Exception e) {
ActiveMQJournalLogger.LOGGER.warn(e.getMessage(), e);
callback.onError(-1, e.getMessage());
return;
}
final int bytesToWrite = factory.calculateBlockSize(bytes.limit()); final int bytesToWrite = factory.calculateBlockSize(bytes.limit());

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.io.util;
import java.nio.ByteBuffer;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.jboss.logging.Logger;
public class FileIOUtil {
private static final Logger logger = Logger.getLogger(Logger.class);
private static final boolean isTrace = logger.isTraceEnabled();
public static void copyData(SequentialFile from, SequentialFile to, ByteBuffer buffer) throws Exception {
boolean fromIsOpen = from.isOpen();
boolean toIsOpen = to.isOpen();
from.close();
from.open();
if (!toIsOpen) {
to.open();
}
to.position(to.size());
from.position(0);
try {
for (;;) {
// The buffer is reused...
// We need to make sure we clear the limits and the buffer before reusing it
buffer.clear();
int bytesRead = from.read(buffer);
if (isTrace) {
logger.trace("appending " + bytesRead + " bytes on " + to.getFileName());
}
if (bytesRead > 0) {
to.writeDirect(buffer, false);
}
if (bytesRead < buffer.capacity()) {
logger.trace("Interrupting reading as the whole thing was sent on " + to.getFileName());
break;
}
}
}
finally {
if (!fromIsOpen) {
from.close();
}
else {
from.position(from.size());
}
if (!toIsOpen) {
to.close();
}
else {
to.position(to.size());
}
}
}
}

View File

@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.io.aio;
import java.io.File;
import java.nio.ByteBuffer;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.io.util.FileIOUtil;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class FileIOUtilTest {
@Rule
public TemporaryFolder temporaryFolder;
public FileIOUtilTest() {
File parent = new File("./target");
parent.mkdirs();
temporaryFolder = new TemporaryFolder(parent);
}
@Test
public void testCopy() throws Exception {
System.out.println("Data at " + temporaryFolder.getRoot());
SequentialFileFactory factory = new NIOSequentialFileFactory(temporaryFolder.getRoot(), 100);
SequentialFile file = factory.createSequentialFile("file1.bin");
file.open();
ByteBuffer buffer = ByteBuffer.allocate(204800);
buffer.put(new byte[204800]);
buffer.rewind();
file.writeDirect(buffer, true);
buffer = ByteBuffer.allocate(409605);
buffer.put(new byte[409605]);
buffer.rewind();
SequentialFile file2 = factory.createSequentialFile("file2.bin");
file2.open();
file2.writeDirect(buffer, true);
// This is allocating a reusable buffer to perform the copy, just like it's used within LargeMessageInSync
buffer = ByteBuffer.allocate(4 * 1024);
SequentialFile newFile = factory.createSequentialFile("file1.cop");
FileIOUtil.copyData(file, newFile, buffer);
SequentialFile newFile2 = factory.createSequentialFile("file2.cop");
FileIOUtil.copyData(file2, newFile2, buffer);
Assert.assertEquals(file.size(), newFile.size());
Assert.assertEquals(file2.size(), newFile2.size());
newFile.close();
newFile2.close();
file.close();
file2.close();
System.out.println("Test result::");
}
}

View File

@ -21,14 +21,19 @@ import java.nio.ByteBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.util.FileIOUtil;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.StorageManager.LargeMessageExtension; import org.apache.activemq.artemis.core.persistence.StorageManager.LargeMessageExtension;
import org.apache.activemq.artemis.core.replication.ReplicatedLargeMessage; import org.apache.activemq.artemis.core.replication.ReplicatedLargeMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.jboss.logging.Logger;
public final class LargeServerMessageInSync implements ReplicatedLargeMessage { public final class LargeServerMessageInSync implements ReplicatedLargeMessage {
private static final Logger logger = Logger.getLogger(LargeServerMessageInSync.class);
private static final boolean isTrace = logger.isTraceEnabled();
private final LargeServerMessage mainLM; private final LargeServerMessage mainLM;
private final StorageManager storageManager; private final StorageManager storageManager;
private SequentialFile appendFile; private SequentialFile appendFile;
@ -50,20 +55,33 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage {
if (!mainSeqFile.isOpen()) { if (!mainSeqFile.isOpen()) {
mainSeqFile.open(); mainSeqFile.open();
} }
try {
if (appendFile != null) { if (appendFile != null) {
appendFile.close(); if (isTrace) {
appendFile.open(); logger.trace("joinSyncedData on " + mainLM + ", currentSize on mainMessage=" + mainSeqFile.size() + ", appendFile size = " + appendFile.size());
for (;;) {
buffer.rewind();
int bytesRead = appendFile.read(buffer);
if (bytesRead > 0)
mainSeqFile.writeDirect(buffer, false);
if (bytesRead < buffer.capacity()) {
break;
}
} }
FileIOUtil.copyData(appendFile, mainSeqFile, buffer);
deleteAppendFile(); deleteAppendFile();
} }
else {
if (isTrace) {
logger.trace("joinSyncedData, appendFile is null, ignoring joinSyncedData on " + mainLM);
}
}
}
catch (Throwable e) {
logger.warn("Error while sincing data on largeMessageInSync::" + mainLM);
}
if (isTrace) {
logger.trace("joinedSyncData on " + mainLM + " finished with " + mainSeqFile.size());
}
syncDone = true; syncDone = true;
} }
@ -85,6 +103,9 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage {
@Override @Override
public synchronized void releaseResources() { public synchronized void releaseResources() {
if (isTrace) {
logger.warn("release resources called on " + mainLM, new Exception("trace"));
}
mainLM.releaseResources(); mainLM.releaseResources();
if (appendFile != null && appendFile.isOpen()) { if (appendFile != null && appendFile.isOpen()) {
try { try {
@ -122,11 +143,19 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage {
public synchronized void addBytes(byte[] bytes) throws Exception { public synchronized void addBytes(byte[] bytes) throws Exception {
if (deleted) if (deleted)
return; return;
if (syncDone) { if (syncDone) {
if (isTrace) {
logger.trace("Adding " + bytes.length + " towards sync message::" + mainLM);
}
mainLM.addBytes(bytes); mainLM.addBytes(bytes);
return; return;
} }
if (isTrace) {
logger.trace("addBytes(bytes.length=" + bytes.length + ") on message=" + mainLM);
}
if (appendFile == null) { if (appendFile == null) {
appendFile = storageManager.createFileForLargeMessage(mainLM.getMessageID(), LargeMessageExtension.SYNC); appendFile = storageManager.createFileForLargeMessage(mainLM.getMessageID(), LargeMessageExtension.SYNC);
} }

View File

@ -57,6 +57,13 @@ public class ReplicationLargeMessageBeginMessage extends PacketImpl {
return result; return result;
} }
@Override
public String toString() {
return "ReplicationLargeMessageBeginMessage{" +
"messageId=" + messageId +
'}';
}
@Override @Override
public boolean equals(Object obj) { public boolean equals(Object obj) {
if (this == obj) if (this == obj)

View File

@ -57,6 +57,13 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl {
return result; return result;
} }
@Override
public String toString() {
return "ReplicationLargeMessageEndMessage{" +
"messageId=" + messageId +
'}';
}
@Override @Override
public boolean equals(Object obj) { public boolean equals(Object obj) {
if (this == obj) if (this == obj)

View File

@ -80,6 +80,14 @@ public final class ReplicationLargeMessageWriteMessage extends PacketImpl {
return result; return result;
} }
@Override
public String toString() {
return "ReplicationLargeMessageWriteMessage{" +
"messageId=" + messageId +
", body.size=" + body.length +
'}';
}
@Override @Override
public boolean equals(Object obj) { public boolean equals(Object obj) {
if (this == obj) if (this == obj)

View File

@ -80,6 +80,7 @@ import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum; import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation; import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
import org.jboss.logging.Logger;
/** /**
* Handles all the synchronization necessary for replication on the backup side (that is the * Handles all the synchronization necessary for replication on the backup side (that is the
@ -87,7 +88,8 @@ import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivatio
*/ */
public final class ReplicationEndpoint implements ChannelHandler, ActiveMQComponent { public final class ReplicationEndpoint implements ChannelHandler, ActiveMQComponent {
private static final boolean trace = ActiveMQServerLogger.LOGGER.isTraceEnabled(); private static final Logger logger = Logger.getLogger(ReplicationEndpoint.class);
private static final boolean isTrace = logger.isTraceEnabled();
private final IOCriticalErrorListener criticalErrorListener; private final IOCriticalErrorListener criticalErrorListener;
private final ActiveMQServerImpl server; private final ActiveMQServerImpl server;
@ -153,11 +155,18 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
@Override @Override
public void handlePacket(final Packet packet) { public void handlePacket(final Packet packet) {
if (isTrace) {
logger.trace("handlePacket::handling " + packet);
}
PacketImpl response = new ReplicationResponseMessage(); PacketImpl response = new ReplicationResponseMessage();
final byte type = packet.getType(); final byte type = packet.getType();
try { try {
if (!started) { if (!started) {
if (isTrace) {
logger.trace("handlePacket::ignoring " + packet);
}
return; return;
} }
@ -340,56 +349,10 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
this.channel = channel; this.channel = channel;
} }
public void compareJournalInformation(final JournalLoadInformation[] journalInformation) throws ActiveMQException {
if (!activation.isRemoteBackupUpToDate()) {
throw ActiveMQMessageBundle.BUNDLE.journalsNotInSync();
}
if (journalLoadInformation == null || journalLoadInformation.length != journalInformation.length) {
throw ActiveMQMessageBundle.BUNDLE.replicationTooManyJournals();
}
for (int i = 0; i < journalInformation.length; i++) {
if (!journalInformation[i].equals(journalLoadInformation[i])) {
ActiveMQServerLogger.LOGGER.journalcomparisonMismatch(journalParametersToString(journalInformation));
throw ActiveMQMessageBundle.BUNDLE.replicationTooManyJournals();
}
}
}
/**
* Used on tests only. To simulate missing page deletes
*/
public void setDeletePages(final boolean deletePages) {
this.deletePages = deletePages;
}
/**
* @param journalInformation
*/
private String journalParametersToString(final JournalLoadInformation[] journalInformation) {
return "**********************************************************\n" + "parameters:\n" +
"BindingsImpl = " +
journalInformation[0] +
"\n" +
"Messaging = " +
journalInformation[1] +
"\n" +
"**********************************************************" +
"\n" +
"Expected:" +
"\n" +
"BindingsImpl = " +
journalLoadInformation[0] +
"\n" +
"Messaging = " +
journalLoadInformation[1] +
"\n" +
"**********************************************************";
}
private void finishSynchronization(String liveID) throws Exception { private void finishSynchronization(String liveID) throws Exception {
if (isTrace) {
logger.trace("finishSynchronization::" + liveID);
}
for (JournalContent jc : EnumSet.allOf(JournalContent.class)) { for (JournalContent jc : EnumSet.allOf(JournalContent.class)) {
Journal journal = journalsHolder.remove(jc); Journal journal = journalsHolder.remove(jc);
journal.synchronizationLock(); journal.synchronizationLock();
@ -427,7 +390,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
* @param msg * @param msg
* @throws Exception * @throws Exception
*/ */
private synchronized void handleReplicationSynchronization(ReplicationSyncFileMessage msg) throws Exception { private void handleReplicationSynchronization(ReplicationSyncFileMessage msg) throws Exception {
Long id = Long.valueOf(msg.getId()); Long id = Long.valueOf(msg.getId());
byte[] data = msg.getData(); byte[] data = msg.getData();
SequentialFile channel1; SequentialFile channel1;
@ -462,7 +425,6 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
} }
if (data == null) { if (data == null) {
channel1.close();
return; return;
} }
@ -477,17 +439,16 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
* {@link FileWrapperJournal} in place to store messages while synchronization is going on. * {@link FileWrapperJournal} in place to store messages while synchronization is going on.
* *
* @param packet * @param packet
* @throws Exception
* @return if the incoming packet indicates the synchronization is finished then return an acknowledgement otherwise * @return if the incoming packet indicates the synchronization is finished then return an acknowledgement otherwise
* return an empty response * return an empty response
* @throws Exception
*/ */
private ReplicationResponseMessageV2 handleStartReplicationSynchronization(final ReplicationStartSyncMessage packet) throws Exception { private ReplicationResponseMessageV2 handleStartReplicationSynchronization(final ReplicationStartSyncMessage packet) throws Exception {
ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2();
if (activation.isRemoteBackupUpToDate()) {
throw ActiveMQMessageBundle.BUNDLE.replicationBackupUpToDate();
}
synchronized (this) { if (isTrace) {
logger.trace("handleStartReplicationSynchronization:: nodeID = " + packet);
}
ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2();
if (!started) if (!started)
return replicationResponseMessage; return replicationResponseMessage;
@ -528,18 +489,23 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
default: default:
throw ActiveMQMessageBundle.BUNDLE.replicationUnhandledDataType(); throw ActiveMQMessageBundle.BUNDLE.replicationUnhandledDataType();
} }
}
return replicationResponseMessage; return replicationResponseMessage;
} }
private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet) { private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet) {
if (isTrace) {
logger.trace("handleLargeMessageEnd on " + packet.getMessageId());
}
final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), true, false); final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), true, false);
if (message != null) { if (message != null) {
executor.execute(new Runnable() { executor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
try { try {
if (isTrace) {
logger.trace("Deleting LargeMessage " + packet.getMessageId() + " on the executor @ handleLargeMessageEnd");
}
message.deleteFile(); message.deleteFile();
} }
catch (Exception e) { catch (Exception e) {
@ -560,7 +526,9 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
} }
} }
private ReplicatedLargeMessage lookupLargeMessage(final long messageId, final boolean delete, final boolean createIfNotExists) { private ReplicatedLargeMessage lookupLargeMessage(final long messageId,
final boolean delete,
final boolean createIfNotExists) {
ReplicatedLargeMessage message; ReplicatedLargeMessage message;
if (delete) { if (delete) {
@ -590,7 +558,9 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
private void handleLargeMessageBegin(final ReplicationLargeMessageBeginMessage packet) { private void handleLargeMessageBegin(final ReplicationLargeMessageBeginMessage packet) {
final long id = packet.getMessageId(); final long id = packet.getMessageId();
createLargeMessage(id, false); createLargeMessage(id, false);
ActiveMQServerLogger.LOGGER.trace("Receiving Large Message " + id + " on backup"); if (isTrace) {
logger.trace("Receiving Large Message Begin " + id + " on backup");
}
} }
private void createLargeMessage(final long id, boolean liveToBackupSync) { private void createLargeMessage(final long id, boolean liveToBackupSync) {
@ -666,14 +636,14 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
private void handleAppendAddRecord(final ReplicationAddMessage packet) throws Exception { private void handleAppendAddRecord(final ReplicationAddMessage packet) throws Exception {
Journal journalToUse = getJournal(packet.getJournalID()); Journal journalToUse = getJournal(packet.getJournalID());
if (packet.getRecord() == ADD_OPERATION_TYPE.UPDATE) { if (packet.getRecord() == ADD_OPERATION_TYPE.UPDATE) {
if (ReplicationEndpoint.trace) { if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("Endpoint appendUpdate id = " + packet.getId()); logger.trace("Endpoint appendUpdate id = " + packet.getId());
} }
journalToUse.appendUpdateRecord(packet.getId(), packet.getJournalRecordType(), packet.getRecordData(), noSync); journalToUse.appendUpdateRecord(packet.getId(), packet.getJournalRecordType(), packet.getRecordData(), noSync);
} }
else { else {
if (ReplicationEndpoint.trace) { if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("Endpoint append id = " + packet.getId()); logger.trace("Endpoint append id = " + packet.getId());
} }
journalToUse.appendAddRecord(packet.getId(), packet.getJournalRecordType(), packet.getRecordData(), noSync); journalToUse.appendAddRecord(packet.getId(), packet.getJournalRecordType(), packet.getRecordData(), noSync);
} }
@ -807,7 +777,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
* *
* @param backupQuorum * @param backupQuorum
*/ */
public synchronized void setBackupQuorum(SharedNothingBackupQuorum backupQuorum) { public void setBackupQuorum(SharedNothingBackupQuorum backupQuorum) {
this.backupQuorum = backupQuorum; this.backupQuorum = backupQuorum;
} }

View File

@ -70,6 +70,7 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.ReusableLatch;
import org.jboss.logging.Logger;
/** /**
* Manages replication tasks on the live server (that is the live server side of a "remote backup" * Manages replication tasks on the live server (that is the live server side of a "remote backup"
@ -81,6 +82,10 @@ import org.apache.activemq.artemis.utils.ReusableLatch;
*/ */
public final class ReplicationManager implements ActiveMQComponent, ReadyListener { public final class ReplicationManager implements ActiveMQComponent, ReadyListener {
Logger logger = Logger.getLogger(ReplicationManager.class);
final boolean isTrace = logger.isTraceEnabled();
public enum ADD_OPERATION_TYPE { public enum ADD_OPERATION_TYPE {
UPDATE { UPDATE {
@Override @Override
@ -330,7 +335,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
return sendReplicatePacket(packet, true); return sendReplicatePacket(packet, true);
} }
private synchronized OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) { private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) {
if (!enabled) if (!enabled)
return null; return null;
boolean runItNow = false; boolean runItNow = false;
@ -578,6 +583,11 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
*/ */
public void sendSynchronizationDone(String nodeID, long initialReplicationSyncTimeout) { public void sendSynchronizationDone(String nodeID, long initialReplicationSyncTimeout) {
if (enabled) { if (enabled) {
if (isTrace) {
logger.trace("sendSynchronizationDone ::" + nodeID + ", " + initialReplicationSyncTimeout);
}
synchronizationIsFinishedAcknowledgement.countUp(); synchronizationIsFinishedAcknowledgement.countUp();
sendReplicatePacket(new ReplicationStartSyncMessage(nodeID)); sendReplicatePacket(new ReplicationStartSyncMessage(nodeID));
try { try {