diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java index 31d9aad389..926ac1bd3b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java @@ -141,7 +141,7 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter @Override public String toString() { - return "ClientMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",userID=" + (getUserID() != null ? getUserID() : "null") + ",properties=" + properties.toString() + "]"; + return getClass().getSimpleName() + "[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",userID=" + (getUserID() != null ? getUserID() : "null") + ",properties=" + properties.toString() + "]"; } @Override diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQJMSClientLogger.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQJMSClientLogger.java index 7eb56cd1dd..3f116969c2 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQJMSClientLogger.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQJMSClientLogger.java @@ -58,12 +58,12 @@ public interface ActiveMQJMSClientLogger extends BasicLogger { void errorCallingExcListener(@Cause Exception e); @LogMessage(level = Logger.Level.ERROR) - @Message(id = 124002, value = "Queue Browser failed to create message", format = Message.Format.MESSAGE_FORMAT) - void errorCreatingMessage(@Cause Throwable e); + @Message(id = 124002, value = "Queue Browser failed to create message {0}", format = Message.Format.MESSAGE_FORMAT) + void errorCreatingMessage(String messageToString, @Cause Throwable e); @LogMessage(level = Logger.Level.ERROR) - @Message(id = 124003, value = "Message Listener failed to prepare message for receipt", format = Message.Format.MESSAGE_FORMAT) - void errorPreparingMessageForReceipt(@Cause Throwable e); + @Message(id = 124003, value = "Message Listener failed to prepare message for receipt, message={0}", format = Message.Format.MESSAGE_FORMAT) + void errorPreparingMessageForReceipt(String messagetoString, @Cause Throwable e); @LogMessage(level = Logger.Level.ERROR) @Message(id = 124004, value = "Message Listener failed to process message", format = Message.Format.MESSAGE_FORMAT) diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java index 3f47209910..04e4f41c07 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java @@ -32,6 +32,7 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.MessageHandler; import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants; +import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; /** * ActiveMQ Artemis implementation of a JMS MessageConsumer. @@ -211,7 +212,18 @@ public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscr boolean needSession = ackMode == Session.CLIENT_ACKNOWLEDGE || ackMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE; jmsMsg = ActiveMQMessage.createMessage(coreMessage, needSession ? session.getCoreSession() : null); - jmsMsg.doBeforeReceive(); + try { + jmsMsg.doBeforeReceive(); + } + catch (IndexOutOfBoundsException ioob) { + // In case this exception happen you will need to know where it happened. + // it has been a bug here in the past, and this was used to debug it. + // nothing better than keep it for future investigations in case it happened again + IndexOutOfBoundsException newIOOB = new IndexOutOfBoundsException(ioob.getMessage() + "@" + jmsMsg.getCoreMessage()); + newIOOB.initCause(ioob); + ActiveMQClientLogger.LOGGER.warn(newIOOB.getMessage(), newIOOB); + throw ioob; + } // We Do the ack after doBeforeRecive, as in the case of large messages, this may fail so we don't want messages redelivered // https://issues.jboss.org/browse/JBPAPP-6110 diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueueBrowser.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueueBrowser.java index 5022fcd90a..4cf34ea3b4 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueueBrowser.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueueBrowser.java @@ -141,7 +141,7 @@ public final class ActiveMQQueueBrowser implements QueueBrowser { msg.doBeforeReceive(); } catch (Exception e) { - ActiveMQJMSClientLogger.LOGGER.errorCreatingMessage(e); + ActiveMQJMSClientLogger.LOGGER.errorCreatingMessage(msg.getCoreMessage().toString(), e); return null; } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java index 0d831f0f1a..ab62dbc87b 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java @@ -73,7 +73,7 @@ public class JMSMessageListenerWrapper implements MessageHandler { msg.doBeforeReceive(); } catch (Exception e) { - ActiveMQJMSClientLogger.LOGGER.errorPreparingMessageForReceipt(e); + ActiveMQJMSClientLogger.LOGGER.errorPreparingMessageForReceipt(msg.getCoreMessage().toString(), e); return; } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java index efeeb2e28d..1a109cb4df 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java @@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.io.DummyCallback; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; import org.apache.activemq.artemis.jlibaio.LibaioFile; +import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.utils.ReusableLatch; public class AIOSequentialFile extends AbstractSequentialFile { @@ -202,7 +203,14 @@ public class AIOSequentialFile extends AbstractSequentialFile { */ @Override public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOCallback callback) { - checkOpened(); + try { + checkOpened(); + } + catch (Exception e) { + ActiveMQJournalLogger.LOGGER.warn(e.getMessage(), e); + callback.onError(-1, e.getMessage()); + return; + } final int bytesToWrite = factory.calculateBlockSize(bytes.limit()); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/util/FileIOUtil.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/util/FileIOUtil.java new file mode 100644 index 0000000000..f9d3ab7b6d --- /dev/null +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/util/FileIOUtil.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.io.util; + +import java.nio.ByteBuffer; + +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.jboss.logging.Logger; + +public class FileIOUtil { + + private static final Logger logger = Logger.getLogger(Logger.class); + private static final boolean isTrace = logger.isTraceEnabled(); + + public static void copyData(SequentialFile from, SequentialFile to, ByteBuffer buffer) throws Exception { + + boolean fromIsOpen = from.isOpen(); + boolean toIsOpen = to.isOpen(); + + from.close(); + from.open(); + + if (!toIsOpen) { + to.open(); + } + + to.position(to.size()); + + from.position(0); + + try { + for (;;) { + // The buffer is reused... + // We need to make sure we clear the limits and the buffer before reusing it + buffer.clear(); + int bytesRead = from.read(buffer); + + if (isTrace) { + logger.trace("appending " + bytesRead + " bytes on " + to.getFileName()); + } + + if (bytesRead > 0) { + to.writeDirect(buffer, false); + } + + if (bytesRead < buffer.capacity()) { + logger.trace("Interrupting reading as the whole thing was sent on " + to.getFileName()); + break; + } + } + } + finally { + if (!fromIsOpen) { + from.close(); + } + else { + from.position(from.size()); + } + if (!toIsOpen) { + to.close(); + } + else { + to.position(to.size()); + } + } + + } + +} diff --git a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/aio/FileIOUtilTest.java b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/aio/FileIOUtilTest.java new file mode 100644 index 0000000000..0e3d7d7a2e --- /dev/null +++ b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/aio/FileIOUtilTest.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.io.aio; + +import java.io.File; +import java.nio.ByteBuffer; + +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.util.FileIOUtil; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class FileIOUtilTest { + + @Rule + public TemporaryFolder temporaryFolder; + + public FileIOUtilTest() { + File parent = new File("./target"); + parent.mkdirs(); + temporaryFolder = new TemporaryFolder(parent); + } + + @Test + public void testCopy() throws Exception { + System.out.println("Data at " + temporaryFolder.getRoot()); + SequentialFileFactory factory = new NIOSequentialFileFactory(temporaryFolder.getRoot(), 100); + SequentialFile file = factory.createSequentialFile("file1.bin"); + file.open(); + + ByteBuffer buffer = ByteBuffer.allocate(204800); + buffer.put(new byte[204800]); + buffer.rewind(); + file.writeDirect(buffer, true); + + buffer = ByteBuffer.allocate(409605); + buffer.put(new byte[409605]); + buffer.rewind(); + + SequentialFile file2 = factory.createSequentialFile("file2.bin"); + + file2.open(); + file2.writeDirect(buffer, true); + + + // This is allocating a reusable buffer to perform the copy, just like it's used within LargeMessageInSync + buffer = ByteBuffer.allocate(4 * 1024); + + SequentialFile newFile = factory.createSequentialFile("file1.cop"); + FileIOUtil.copyData(file, newFile, buffer); + + SequentialFile newFile2 = factory.createSequentialFile("file2.cop"); + FileIOUtil.copyData(file2, newFile2, buffer); + + Assert.assertEquals(file.size(), newFile.size()); + Assert.assertEquals(file2.size(), newFile2.size()); + + newFile.close(); + newFile2.close(); + file.close(); + file2.close(); + + System.out.println("Test result::"); + + } + + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java index f264415052..274abeb994 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java @@ -21,14 +21,19 @@ import java.nio.ByteBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.util.FileIOUtil; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager.LargeMessageExtension; import org.apache.activemq.artemis.core.replication.ReplicatedLargeMessage; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.LargeServerMessage; +import org.jboss.logging.Logger; public final class LargeServerMessageInSync implements ReplicatedLargeMessage { + private static final Logger logger = Logger.getLogger(LargeServerMessageInSync.class); + private static final boolean isTrace = logger.isTraceEnabled(); + private final LargeServerMessage mainLM; private final StorageManager storageManager; private SequentialFile appendFile; @@ -50,20 +55,33 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage { if (!mainSeqFile.isOpen()) { mainSeqFile.open(); } - if (appendFile != null) { - appendFile.close(); - appendFile.open(); - for (;;) { - buffer.rewind(); - int bytesRead = appendFile.read(buffer); - if (bytesRead > 0) - mainSeqFile.writeDirect(buffer, false); - if (bytesRead < buffer.capacity()) { - break; + + try { + if (appendFile != null) { + if (isTrace) { + logger.trace("joinSyncedData on " + mainLM + ", currentSize on mainMessage=" + mainSeqFile.size() + ", appendFile size = " + appendFile.size()); + } + + FileIOUtil.copyData(appendFile, mainSeqFile, buffer); + deleteAppendFile(); + } + else { + if (isTrace) { + logger.trace("joinSyncedData, appendFile is null, ignoring joinSyncedData on " + mainLM); } } - deleteAppendFile(); } + catch (Throwable e) { + logger.warn("Error while sincing data on largeMessageInSync::" + mainLM); + } + + + if (isTrace) { + logger.trace("joinedSyncData on " + mainLM + " finished with " + mainSeqFile.size()); + } + + + syncDone = true; } @@ -85,6 +103,9 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage { @Override public synchronized void releaseResources() { + if (isTrace) { + logger.warn("release resources called on " + mainLM, new Exception("trace")); + } mainLM.releaseResources(); if (appendFile != null && appendFile.isOpen()) { try { @@ -122,11 +143,19 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage { public synchronized void addBytes(byte[] bytes) throws Exception { if (deleted) return; + if (syncDone) { + if (isTrace) { + logger.trace("Adding " + bytes.length + " towards sync message::" + mainLM); + } mainLM.addBytes(bytes); return; } + if (isTrace) { + logger.trace("addBytes(bytes.length=" + bytes.length + ") on message=" + mainLM); + } + if (appendFile == null) { appendFile = storageManager.createFileForLargeMessage(mainLM.getMessageID(), LargeMessageExtension.SYNC); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageBeginMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageBeginMessage.java index 0a36a56471..20af68c82a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageBeginMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageBeginMessage.java @@ -57,6 +57,13 @@ public class ReplicationLargeMessageBeginMessage extends PacketImpl { return result; } + @Override + public String toString() { + return "ReplicationLargeMessageBeginMessage{" + + "messageId=" + messageId + + '}'; + } + @Override public boolean equals(Object obj) { if (this == obj) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java index eea788a0d3..bb779292cb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java @@ -57,6 +57,13 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl { return result; } + @Override + public String toString() { + return "ReplicationLargeMessageEndMessage{" + + "messageId=" + messageId + + '}'; + } + @Override public boolean equals(Object obj) { if (this == obj) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageWriteMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageWriteMessage.java index 0970f0594a..f60c62994b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageWriteMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageWriteMessage.java @@ -80,6 +80,14 @@ public final class ReplicationLargeMessageWriteMessage extends PacketImpl { return result; } + @Override + public String toString() { + return "ReplicationLargeMessageWriteMessage{" + + "messageId=" + messageId + + ", body.size=" + body.length + + '}'; + } + @Override public boolean equals(Object obj) { if (this == obj) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java index 7cf5450130..3cd5bfd02f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java @@ -80,6 +80,7 @@ import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation; +import org.jboss.logging.Logger; /** * Handles all the synchronization necessary for replication on the backup side (that is the @@ -87,7 +88,8 @@ import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivatio */ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQComponent { - private static final boolean trace = ActiveMQServerLogger.LOGGER.isTraceEnabled(); + private static final Logger logger = Logger.getLogger(ReplicationEndpoint.class); + private static final boolean isTrace = logger.isTraceEnabled(); private final IOCriticalErrorListener criticalErrorListener; private final ActiveMQServerImpl server; @@ -153,11 +155,18 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon @Override public void handlePacket(final Packet packet) { + if (isTrace) { + logger.trace("handlePacket::handling " + packet); + } PacketImpl response = new ReplicationResponseMessage(); final byte type = packet.getType(); try { if (!started) { + if (isTrace) { + logger.trace("handlePacket::ignoring " + packet); + } + return; } @@ -340,56 +349,10 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon this.channel = channel; } - public void compareJournalInformation(final JournalLoadInformation[] journalInformation) throws ActiveMQException { - if (!activation.isRemoteBackupUpToDate()) { - throw ActiveMQMessageBundle.BUNDLE.journalsNotInSync(); - } - - if (journalLoadInformation == null || journalLoadInformation.length != journalInformation.length) { - throw ActiveMQMessageBundle.BUNDLE.replicationTooManyJournals(); - } - - for (int i = 0; i < journalInformation.length; i++) { - if (!journalInformation[i].equals(journalLoadInformation[i])) { - ActiveMQServerLogger.LOGGER.journalcomparisonMismatch(journalParametersToString(journalInformation)); - throw ActiveMQMessageBundle.BUNDLE.replicationTooManyJournals(); - } - } - - } - - /** - * Used on tests only. To simulate missing page deletes - */ - public void setDeletePages(final boolean deletePages) { - this.deletePages = deletePages; - } - - /** - * @param journalInformation - */ - private String journalParametersToString(final JournalLoadInformation[] journalInformation) { - return "**********************************************************\n" + "parameters:\n" + - "BindingsImpl = " + - journalInformation[0] + - "\n" + - "Messaging = " + - journalInformation[1] + - "\n" + - "**********************************************************" + - "\n" + - "Expected:" + - "\n" + - "BindingsImpl = " + - journalLoadInformation[0] + - "\n" + - "Messaging = " + - journalLoadInformation[1] + - "\n" + - "**********************************************************"; - } - private void finishSynchronization(String liveID) throws Exception { + if (isTrace) { + logger.trace("finishSynchronization::" + liveID); + } for (JournalContent jc : EnumSet.allOf(JournalContent.class)) { Journal journal = journalsHolder.remove(jc); journal.synchronizationLock(); @@ -427,7 +390,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon * @param msg * @throws Exception */ - private synchronized void handleReplicationSynchronization(ReplicationSyncFileMessage msg) throws Exception { + private void handleReplicationSynchronization(ReplicationSyncFileMessage msg) throws Exception { Long id = Long.valueOf(msg.getId()); byte[] data = msg.getData(); SequentialFile channel1; @@ -462,7 +425,6 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon } if (data == null) { - channel1.close(); return; } @@ -477,69 +439,73 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon * {@link FileWrapperJournal} in place to store messages while synchronization is going on. * * @param packet - * @throws Exception * @return if the incoming packet indicates the synchronization is finished then return an acknowledgement otherwise - * return an empty response + * return an empty response + * @throws Exception */ private ReplicationResponseMessageV2 handleStartReplicationSynchronization(final ReplicationStartSyncMessage packet) throws Exception { + + if (isTrace) { + logger.trace("handleStartReplicationSynchronization:: nodeID = " + packet); + } ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2(); - if (activation.isRemoteBackupUpToDate()) { - throw ActiveMQMessageBundle.BUNDLE.replicationBackupUpToDate(); + if (!started) + return replicationResponseMessage; + + if (packet.isSynchronizationFinished()) { + finishSynchronization(packet.getNodeID()); + replicationResponseMessage.setSynchronizationIsFinishedAcknowledgement(true); + return replicationResponseMessage; } - synchronized (this) { - if (!started) - return replicationResponseMessage; + switch (packet.getDataType()) { + case LargeMessages: + for (long msgID : packet.getFileIds()) { + createLargeMessage(msgID, true); + } + break; + case JournalBindings: + case JournalMessages: + if (wantedFailBack && !packet.isServerToFailBack()) { + ActiveMQServerLogger.LOGGER.autoFailBackDenied(); + } - if (packet.isSynchronizationFinished()) { - finishSynchronization(packet.getNodeID()); - replicationResponseMessage.setSynchronizationIsFinishedAcknowledgement(true); - return replicationResponseMessage; - } + final JournalContent journalContent = SyncDataType.getJournalContentType(packet.getDataType()); + final Journal journal = journalsHolder.get(journalContent); - switch (packet.getDataType()) { - case LargeMessages: - for (long msgID : packet.getFileIds()) { - createLargeMessage(msgID, true); - } - break; - case JournalBindings: - case JournalMessages: - if (wantedFailBack && !packet.isServerToFailBack()) { - ActiveMQServerLogger.LOGGER.autoFailBackDenied(); - } + if (packet.getNodeID() != null) { + // At the start of replication, we still do not know which is the nodeID that the live uses. + // This is the point where the backup gets this information. + backupQuorum.liveIDSet(packet.getNodeID()); + } + Map mapToFill = filesReservedForSync.get(journalContent); - final JournalContent journalContent = SyncDataType.getJournalContentType(packet.getDataType()); - final Journal journal = journalsHolder.get(journalContent); - - if (packet.getNodeID() != null) { - // At the start of replication, we still do not know which is the nodeID that the live uses. - // This is the point where the backup gets this information. - backupQuorum.liveIDSet(packet.getNodeID()); - } - Map mapToFill = filesReservedForSync.get(journalContent); - - for (Entry entry : journal.createFilesForBackupSync(packet.getFileIds()).entrySet()) { - mapToFill.put(entry.getKey(), new JournalSyncFile(entry.getValue())); - } - FileWrapperJournal syncJournal = new FileWrapperJournal(journal); - registerJournal(journalContent.typeByte, syncJournal); - break; - default: - throw ActiveMQMessageBundle.BUNDLE.replicationUnhandledDataType(); - } + for (Entry entry : journal.createFilesForBackupSync(packet.getFileIds()).entrySet()) { + mapToFill.put(entry.getKey(), new JournalSyncFile(entry.getValue())); + } + FileWrapperJournal syncJournal = new FileWrapperJournal(journal); + registerJournal(journalContent.typeByte, syncJournal); + break; + default: + throw ActiveMQMessageBundle.BUNDLE.replicationUnhandledDataType(); } return replicationResponseMessage; } private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet) { + if (isTrace) { + logger.trace("handleLargeMessageEnd on " + packet.getMessageId()); + } final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), true, false); if (message != null) { executor.execute(new Runnable() { @Override public void run() { try { + if (isTrace) { + logger.trace("Deleting LargeMessage " + packet.getMessageId() + " on the executor @ handleLargeMessageEnd"); + } message.deleteFile(); } catch (Exception e) { @@ -560,7 +526,9 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon } } - private ReplicatedLargeMessage lookupLargeMessage(final long messageId, final boolean delete, final boolean createIfNotExists) { + private ReplicatedLargeMessage lookupLargeMessage(final long messageId, + final boolean delete, + final boolean createIfNotExists) { ReplicatedLargeMessage message; if (delete) { @@ -590,7 +558,9 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon private void handleLargeMessageBegin(final ReplicationLargeMessageBeginMessage packet) { final long id = packet.getMessageId(); createLargeMessage(id, false); - ActiveMQServerLogger.LOGGER.trace("Receiving Large Message " + id + " on backup"); + if (isTrace) { + logger.trace("Receiving Large Message Begin " + id + " on backup"); + } } private void createLargeMessage(final long id, boolean liveToBackupSync) { @@ -666,14 +636,14 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon private void handleAppendAddRecord(final ReplicationAddMessage packet) throws Exception { Journal journalToUse = getJournal(packet.getJournalID()); if (packet.getRecord() == ADD_OPERATION_TYPE.UPDATE) { - if (ReplicationEndpoint.trace) { - ActiveMQServerLogger.LOGGER.trace("Endpoint appendUpdate id = " + packet.getId()); + if (isTrace) { + logger.trace("Endpoint appendUpdate id = " + packet.getId()); } journalToUse.appendUpdateRecord(packet.getId(), packet.getJournalRecordType(), packet.getRecordData(), noSync); } else { - if (ReplicationEndpoint.trace) { - ActiveMQServerLogger.LOGGER.trace("Endpoint append id = " + packet.getId()); + if (isTrace) { + logger.trace("Endpoint append id = " + packet.getId()); } journalToUse.appendAddRecord(packet.getId(), packet.getJournalRecordType(), packet.getRecordData(), noSync); } @@ -807,7 +777,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon * * @param backupQuorum */ - public synchronized void setBackupQuorum(SharedNothingBackupQuorum backupQuorum) { + public void setBackupQuorum(SharedNothingBackupQuorum backupQuorum) { this.backupQuorum = backupQuorum; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index 4aabbeae07..4081dd99dc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -70,6 +70,7 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.ReusableLatch; +import org.jboss.logging.Logger; /** * Manages replication tasks on the live server (that is the live server side of a "remote backup" @@ -81,6 +82,10 @@ import org.apache.activemq.artemis.utils.ReusableLatch; */ public final class ReplicationManager implements ActiveMQComponent, ReadyListener { + + Logger logger = Logger.getLogger(ReplicationManager.class); + final boolean isTrace = logger.isTraceEnabled(); + public enum ADD_OPERATION_TYPE { UPDATE { @Override @@ -330,7 +335,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene return sendReplicatePacket(packet, true); } - private synchronized OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) { + private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) { if (!enabled) return null; boolean runItNow = false; @@ -578,6 +583,11 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene */ public void sendSynchronizationDone(String nodeID, long initialReplicationSyncTimeout) { if (enabled) { + + if (isTrace) { + logger.trace("sendSynchronizationDone ::" + nodeID + ", " + initialReplicationSyncTimeout); + } + synchronizationIsFinishedAcknowledgement.countUp(); sendReplicatePacket(new ReplicationStartSyncMessage(nodeID)); try {