diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java index f2aa5b48d9..45d92299e4 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java @@ -111,4 +111,12 @@ public interface CoreRemotingConnection extends RemotingConnection { * @return the principal */ ActiveMQPrincipal getDefaultActiveMQPrincipal(); + + /** + * + * @param size size we are trying to write + * @param timeout + * @return + */ + boolean blockUntilWritable(int size, long timeout); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java index ddb734ee80..a86c5c102b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java @@ -24,6 +24,8 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; */ public interface Packet { + int INITIAL_PACKET_SIZE = 1500; + /** * Sets the channel id that should be used once the packet has been successfully decoded it is * sent to the correct channel. @@ -32,6 +34,14 @@ public interface Packet { */ void setChannelID(long channelID); + /** + * This will return the expected packet size for the encoding + * @return + */ + default int expectedEncodeSize() { + return INITIAL_PACKET_SIZE; + } + /** * Returns the channel id of the channel that should handle this packet. * diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java index 6dddf3bd4b..99c052bd3a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java @@ -16,7 +16,9 @@ */ package org.apache.activemq.artemis.core.protocol.core.impl; +import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.utils.DataConstants; @@ -29,8 +31,6 @@ public class PacketImpl implements Packet { public static final int PACKET_HEADERS_SIZE = DataConstants.SIZE_INT + DataConstants.SIZE_BYTE + DataConstants.SIZE_LONG; - private static final int INITIAL_PACKET_SIZE = 1500; - protected long channelID; private final byte type; @@ -294,6 +294,17 @@ public class PacketImpl implements Packet { return buffer; } + protected ActiveMQBuffer createPacket(RemotingConnection connection) { + + int size = expectedEncodeSize(); + + if (connection == null) { + return new ChannelBufferWrapper(Unpooled.buffer(size)); + } else { + return connection.createTransportBuffer(size); + } + } + @Override public void decode(final ActiveMQBuffer buffer) { channelID = buffer.readLong(); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java index 8bd62ca4e6..506c60231b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -230,6 +231,11 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement callClosingListeners(); } + @Override + public boolean blockUntilWritable(int size, long timeout) { + return transportConnection.blockUntilWritable(size, timeout, TimeUnit.MILLISECONDS); + } + @Override public void disconnect(final boolean criticalError) { disconnect(null, criticalError); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java index 40d3622f9d..cd0ab91ac0 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java @@ -100,15 +100,6 @@ public abstract class SessionContinuationMessage extends PacketImpl { return buffer; } - protected final ActiveMQBuffer createPacket(RemotingConnection connection) { - final int expectedEncodedSize = expectedEncodedSize(); - if (connection == null) { - return new ChannelBufferWrapper(Unpooled.buffer(expectedEncodedSize)); - } else { - return connection.createTransportBuffer(expectedEncodedSize); - } - } - @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeInt(body.length); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java index 44ad1bbf49..41e786bec3 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java @@ -70,8 +70,8 @@ public class SessionReceiveContinuationMessage extends SessionContinuationMessag // Protected ----------------------------------------------------- @Override - protected final int expectedEncodedSize() { - return super.expectedEncodedSize() + DataConstants.SIZE_LONG; + public int expectedEncodeSize() { + return super.expectedEncodeSize() + DataConstants.SIZE_LONG; } // Public -------------------------------------------------------- @@ -128,4 +128,4 @@ public class SessionReceiveContinuationMessage extends SessionContinuationMessag return true; } -} \ No newline at end of file +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java index 2129b49ec4..300d9d7830 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java @@ -52,6 +52,7 @@ public class SessionReceiveMessage extends MessagePacket { return deliveryCount; } + @Override public ActiveMQBuffer encode(final RemotingConnection connection) { ActiveMQBuffer buffer = message.getEncodedBuffer(); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java index e718b3dc93..5ace5d9145 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java @@ -93,8 +93,8 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage { } @Override - protected final int expectedEncodedSize() { - return super.expectedEncodedSize() + (!continues ? DataConstants.SIZE_LONG : 0) + DataConstants.SIZE_BOOLEAN; + public int expectedEncodeSize() { + return super.expectedEncodeSize() + (!continues ? DataConstants.SIZE_LONG : 0) + DataConstants.SIZE_BOOLEAN; } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java index 89d28630e7..05a8750584 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java @@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE; +import org.apache.activemq.artemis.utils.DataConstants; public final class ReplicationAddMessage extends PacketImpl { @@ -59,10 +60,20 @@ public final class ReplicationAddMessage extends PacketImpl { // Public -------------------------------------------------------- + @Override + public int expectedEncodeSize() { + return PACKET_HEADERS_SIZE + + DataConstants.SIZE_BYTE + // buffer.writeByte(journalID); + DataConstants.SIZE_BOOLEAN + // buffer.writeBoolean(operation.toBoolean()); + DataConstants.SIZE_LONG + // buffer.writeLong(id); + DataConstants.SIZE_BYTE + // buffer.writeByte(journalRecordType); + DataConstants.SIZE_INT + // buffer.writeInt(persister.getEncodeSize(encodingData)); + persister.getEncodeSize(encodingData);// persister.encode(buffer, encodingData); + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeByte(journalID); - buffer.writeBoolean(operation.toBoolean()); buffer.writeLong(id); buffer.writeByte(journalRecordType); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java index 59475e060e..fd686825f2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java @@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE; +import org.apache.activemq.artemis.utils.DataConstants; public class ReplicationAddTXMessage extends PacketImpl { @@ -63,6 +64,18 @@ public class ReplicationAddTXMessage extends PacketImpl { // Public -------------------------------------------------------- + @Override + public int expectedEncodeSize() { + return PACKET_HEADERS_SIZE + + DataConstants.SIZE_BYTE + // buffer.writeByte(journalID); + DataConstants.SIZE_BOOLEAN + // buffer.writeBoolean(operation.toBoolean()); + DataConstants.SIZE_LONG + // buffer.writeLong(txId); + DataConstants.SIZE_LONG + // buffer.writeLong(id); + DataConstants.SIZE_BYTE + // buffer.writeByte(recordType); + DataConstants.SIZE_INT + // buffer.writeInt(persister.getEncodeSize(encodingData)); + persister.getEncodeSize(encodingData); // persister.encode(buffer, encodingData); + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeByte(journalID); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationCommitMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationCommitMessage.java index 1987b3db1b..245ec18eb4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationCommitMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationCommitMessage.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.utils.DataConstants; public final class ReplicationCommitMessage extends PacketImpl { @@ -41,6 +42,14 @@ public final class ReplicationCommitMessage extends PacketImpl { this.txId = txId; } + @Override + public int expectedEncodeSize() { + return PACKET_HEADERS_SIZE + + DataConstants.SIZE_BYTE + // buffer.writeByte(journalID); + DataConstants.SIZE_BOOLEAN + // buffer.writeBoolean(rollback); + DataConstants.SIZE_LONG; // buffer.writeLong(txId); + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeByte(journalID); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteMessage.java index ab97b18579..fdbfb9b7fa 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteMessage.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.utils.DataConstants; public final class ReplicationDeleteMessage extends PacketImpl { @@ -38,6 +39,14 @@ public final class ReplicationDeleteMessage extends PacketImpl { this.id = id; } + @Override + public int expectedEncodeSize() { + return PACKET_HEADERS_SIZE + + DataConstants.SIZE_BYTE + // buffer.writeByte(journalID); + DataConstants.SIZE_LONG; // buffer.writeLong(id); + } + + @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeByte(journalID); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteTXMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteTXMessage.java index 0d7552396b..4e942bd58f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteTXMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteTXMessage.java @@ -21,6 +21,7 @@ import java.util.Arrays; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.utils.DataConstants; public class ReplicationDeleteTXMessage extends PacketImpl { @@ -52,6 +53,16 @@ public class ReplicationDeleteTXMessage extends PacketImpl { this.encodingData = encodingData; } + @Override + public int expectedEncodeSize() { + return PACKET_HEADERS_SIZE + + DataConstants.SIZE_BYTE + // buffer.writeByte(journalID); + DataConstants.SIZE_LONG + // buffer.writeLong(txId); + DataConstants.SIZE_LONG + // buffer.writeLong(id); + DataConstants.SIZE_INT + // buffer.writeInt(encodingData.getEncodeSize()); + encodingData.getEncodeSize(); // encodingData.encode(buffer); + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeByte(journalID); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageBeginMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageBeginMessage.java index 20af68c82a..1ecaa683c0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageBeginMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageBeginMessage.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.utils.DataConstants; public class ReplicationLargeMessageBeginMessage extends PacketImpl { @@ -32,6 +33,14 @@ public class ReplicationLargeMessageBeginMessage extends PacketImpl { super(PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN); } + + @Override + public int expectedEncodeSize() { + return PACKET_HEADERS_SIZE + + DataConstants.SIZE_LONG; // buffer.writeLong(messageId); + } + + @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeLong(messageId); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java index bb779292cb..4a09cc034c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.utils.DataConstants; public class ReplicationLargeMessageEndMessage extends PacketImpl { @@ -32,6 +33,13 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl { this.messageId = messageId; } + + @Override + public int expectedEncodeSize() { + return PACKET_HEADERS_SIZE + + DataConstants.SIZE_LONG; // buffer.writeLong(messageId); + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeLong(messageId); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageWriteMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageWriteMessage.java index f60c62994b..ffee14c8ac 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageWriteMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageWriteMessage.java @@ -20,6 +20,7 @@ import java.util.Arrays; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.utils.DataConstants; public final class ReplicationLargeMessageWriteMessage extends PacketImpl { @@ -42,6 +43,15 @@ public final class ReplicationLargeMessageWriteMessage extends PacketImpl { this.body = body; } + @Override + public int expectedEncodeSize() { + return PACKET_HEADERS_SIZE + + DataConstants.SIZE_LONG + // buffer.writeLong(messageId); + DataConstants.SIZE_LONG + // buffer.writeLong(messageId); + DataConstants.SIZE_INT + // buffer.writeInt(body.length); + body.length; // buffer.writeBytes(body); + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeLong(messageId); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLiveIsStoppingMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLiveIsStoppingMessage.java index 339cfa82da..456a46bb0c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLiveIsStoppingMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLiveIsStoppingMessage.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.utils.DataConstants; /** * Message indicating that the live is stopping (a scheduled stop). @@ -59,6 +60,12 @@ public final class ReplicationLiveIsStoppingMessage extends PacketImpl { this.liveStopping = b; } + @Override + public int expectedEncodeSize() { + return PACKET_HEADERS_SIZE + + DataConstants.SIZE_INT; // buffer.writeInt(liveStopping.code); + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeInt(liveStopping.code); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageEventMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageEventMessage.java index 31680daf49..ea929e4d70 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageEventMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageEventMessage.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.utils.DataConstants; public class ReplicationPageEventMessage extends PacketImpl { @@ -42,6 +43,14 @@ public class ReplicationPageEventMessage extends PacketImpl { this.storeName = storeName; } + @Override + public int expectedEncodeSize() { + return PACKET_HEADERS_SIZE + + SimpleString.sizeofString(storeName) + // buffer.writeSimpleString(storeName); + DataConstants.SIZE_INT + // buffer.writeInt(pageNumber); + DataConstants.SIZE_BOOLEAN; // buffer.writeBoolean(isDelete); + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeSimpleString(storeName); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java index 7307151993..4a65e6613d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java @@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.impl.PagedMessageImpl; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.utils.DataConstants; public class ReplicationPageWriteMessage extends PacketImpl { @@ -39,6 +40,13 @@ public class ReplicationPageWriteMessage extends PacketImpl { // Public -------------------------------------------------------- + @Override + public int expectedEncodeSize() { + return PACKET_HEADERS_SIZE + + DataConstants.SIZE_INT + // buffer.writeInt(pageNumber); + pagedMessage.getEncodeSize(); // pagedMessage.encode(buffer); + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeInt(pageNumber); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPrepareMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPrepareMessage.java index ebdc1c4e2a..cf993a3083 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPrepareMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPrepareMessage.java @@ -21,6 +21,7 @@ import java.util.Arrays; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.utils.DataConstants; public final class ReplicationPrepareMessage extends PacketImpl { @@ -48,6 +49,15 @@ public final class ReplicationPrepareMessage extends PacketImpl { // Public -------------------------------------------------------- + @Override + public int expectedEncodeSize() { + return PACKET_HEADERS_SIZE + + DataConstants.SIZE_BYTE + // buffer.writeByte(journalID); + DataConstants.SIZE_LONG + // buffer.writeLong(txId); + DataConstants.SIZE_INT + // buffer.writeInt(encodingData.getEncodeSize()); + encodingData.getEncodeSize(); // encodingData.encode(buffer); + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeByte(journalID); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessage.java index c7eff85466..d9d8b1a569 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessage.java @@ -27,4 +27,11 @@ public class ReplicationResponseMessage extends PacketImpl { public ReplicationResponseMessage(byte replicationResponseV2) { super(replicationResponseV2); } + + + @Override + public int expectedEncodeSize() { + return PACKET_HEADERS_SIZE; + } + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java index f9001c641d..b26084bb4c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.utils.DataConstants; public final class ReplicationResponseMessageV2 extends ReplicationResponseMessage { @@ -41,6 +42,12 @@ public final class ReplicationResponseMessageV2 extends ReplicationResponseMessa this.synchronizationIsFinishedAcknowledgement = synchronizationIsFinishedAcknowledgement; } + @Override + public int expectedEncodeSize() { + return PACKET_HEADERS_SIZE + + DataConstants.SIZE_BOOLEAN; // buffer.writeBoolean(synchronizationIsFinishedAcknowledgement); + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { super.encodeRest(buffer); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java index a44707f017..018535f426 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java @@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.utils.DataConstants; /** * This message may signal start or end of the replication synchronization. @@ -109,6 +110,26 @@ public class ReplicationStartSyncMessage extends PacketImpl { } } + + @Override + public int expectedEncodeSize() { + int size = PACKET_HEADERS_SIZE + + DataConstants.SIZE_BOOLEAN + // buffer.writeBoolean(synchronizationIsFinished); + DataConstants.SIZE_BOOLEAN + // buffer.writeBoolean(allowsAutoFailBack); + nodeID.length() * 3; // buffer.writeString(nodeID); -- an estimate + + + if (synchronizationIsFinished) { + return size; + } + size += DataConstants.SIZE_BYTE + // buffer.writeByte(dataType.code); + DataConstants.SIZE_INT + // buffer.writeInt(ids.length); + DataConstants.SIZE_LONG * ids.length; // the write loop + + return size; + } + + @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeBoolean(synchronizationIsFinished); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java index 90d2ca0b4a..4d3c32fa2c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java @@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.utils.DataConstants; /** * Message is used to sync {@link org.apache.activemq.artemis.core.io.SequentialFile}s to a backup server. The {@link FileType} controls @@ -98,6 +99,38 @@ public final class ReplicationSyncFileMessage extends PacketImpl { } } + @Override + public int expectedEncodeSize() { + int size = PACKET_HEADERS_SIZE + + DataConstants.SIZE_LONG; // buffer.writeLong(fileId); + + if (fileId == -1) + return size; + + size += DataConstants.SIZE_BYTE; // buffer.writeByte(fileType.code); + switch (fileType) { + case JOURNAL: { + size += DataConstants.SIZE_BYTE; // buffer.writeByte(journalType.typeByte); + break; + } + case PAGE: { + size += SimpleString.sizeofString(pageStoreName); + break; + } + case LARGE_MESSAGE: + default: + // no-op + } + + size += DataConstants.SIZE_INT; // buffer.writeInt(dataSize); + + if (dataSize > 0) { + size += byteBuffer.writerIndex(); // buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex()); + } + + return size; + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeLong(fileId); @@ -126,11 +159,6 @@ public final class ReplicationSyncFileMessage extends PacketImpl { if (dataSize > 0) { buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex()); } - - if (byteBuffer != null) { - byteBuffer.release(); - byteBuffer = null; - } } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyServerConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyServerConnection.java index 29e39d5bfe..f9e1b3d2f2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyServerConnection.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyServerConnection.java @@ -19,8 +19,6 @@ package org.apache.activemq.artemis.core.remoting.impl.netty; import java.util.Map; import io.netty.channel.Channel; -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener; public class NettyServerConnection extends NettyConnection { @@ -33,8 +31,4 @@ public class NettyServerConnection extends NettyConnection { super(configuration, channel, listener, batchingEnabled, directDeliver); } - @Override - public ActiveMQBuffer createTransportBuffer(int size) { - return new ChannelBufferWrapper(channel.alloc().directBuffer(size), true); - } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index 7e0881cb26..1507edb0c2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -33,7 +33,6 @@ import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; -import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.SessionFailureListener; @@ -42,6 +41,7 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.persistence.OperationContext; +import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl; import org.apache.activemq.artemis.core.protocol.core.Channel; @@ -70,7 +70,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; -import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.ReusableLatch; import org.jboss.logging.Logger; @@ -83,7 +82,7 @@ import org.jboss.logging.Logger; * * @see ReplicationEndpoint */ -public final class ReplicationManager implements ActiveMQComponent, ReadyListener { +public final class ReplicationManager implements ActiveMQComponent { private static final Logger logger = Logger.getLogger(ReplicationManager.class); @@ -118,8 +117,6 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene private final AtomicBoolean writable = new AtomicBoolean(true); - private final Object replicationLock = new Object(); - private final Queue pendingTokens = new ConcurrentLinkedQueue<>(); private final ExecutorFactory executorFactory; @@ -289,12 +286,9 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene replicatingChannel.getConnection().getTransportConnection().fireReady(true); } - synchronized (replicationLock) { - enabled = false; - writable.set(true); - replicationLock.notifyAll(); - clearReplicationTokens(); - } + enabled = false; + writable.set(true); + clearReplicationTokens(); RemotingConnection toStop = remotingConnection; if (toStop != null) { @@ -312,16 +306,13 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene */ public void clearReplicationTokens() { logger.trace("clearReplicationTokens initiating"); - synchronized (replicationLock) { - logger.trace("clearReplicationTokens entered the lock"); - while (!pendingTokens.isEmpty()) { - OperationContext ctx = pendingTokens.poll(); - logger.trace("Calling ctx.replicationDone()"); - try { - ctx.replicationDone(); - } catch (Throwable e) { - ActiveMQServerLogger.LOGGER.errorCompletingCallbackOnReplicationManager(e); - } + while (!pendingTokens.isEmpty()) { + OperationContext ctx = pendingTokens.poll(); + logger.trace("Calling ctx.replicationDone()"); + try { + ctx.replicationDone(); + } catch (Throwable e) { + ActiveMQServerLogger.LOGGER.errorCompletingCallbackOnReplicationManager(e); } } logger.trace("clearReplicationTokens finished"); @@ -359,24 +350,22 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene repliToken.replicationLineUp(); } - synchronized (replicationLock) { - if (enabled) { - pendingTokens.add(repliToken); - if (useExecutor) { - replicationStream.execute(() -> { - if (enabled) { - flowControl(); - replicatingChannel.send(packet); - } - }); - } else { - flowControl(); - replicatingChannel.send(packet); - } + if (enabled) { + pendingTokens.add(repliToken); + if (useExecutor) { + replicationStream.execute(() -> { + if (enabled) { + flowControl(packet.expectedEncodeSize()); + replicatingChannel.send(packet); + } + }); } else { - // Already replicating channel failed, so just play the action now - runItNow = true; + flowControl(packet.expectedEncodeSize()); + replicatingChannel.send(packet); } + } else { + // Already replicating channel failed, so just play the action now + runItNow = true; } // Execute outside lock @@ -392,47 +381,20 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene * This was written as a refactoring of sendReplicatePacket. * In case you refactor this in any way, this method must hold a lock on replication lock. . */ - private boolean flowControl() { - synchronized (replicationLock) { - // synchronized (replicationLock) { -- I'm not adding this because the caller already has it - // future maintainers of this code please be aware that the intention here is hold the lock on replication lock - if (!replicatingChannel.getConnection().isWritable(this)) { - try { - logger.trace("flowControl waiting on writable replication"); - writable.set(false); - //don't wait for ever as this may hang tests etc, we've probably been closed anyway - long now = System.currentTimeMillis(); - long deadline = now + timeout; - while (!writable.get() && now < deadline) { - replicationLock.wait(deadline - now); - now = System.currentTimeMillis(); - } - logger.trace("flow control done on replication"); + private boolean flowControl(int size) { + boolean flowWorked = replicatingChannel.getConnection().blockUntilWritable(size, timeout); - if (!writable.get()) { - ActiveMQServerLogger.LOGGER.slowReplicationResponse(); - logger.tracef("There was no response from replication backup after %s seconds, server being stopped now", System.currentTimeMillis() - now); - try { - stop(); - } catch (Exception e) { - logger.warn(e.getMessage(), e); - } - return false; - } - } catch (InterruptedException e) { - throw new ActiveMQInterruptedException(e); - } + if (!flowWorked) { + try { + ActiveMQServerLogger.LOGGER.slowReplicationResponse(); + stop(); + } catch (Exception e) { + logger.warn(e.getMessage(), e); } } - return true; - } - @Override - public void readyForWriting() { - synchronized (replicationLock) { - writable.set(true); - replicationLock.notifyAll(); - } + + return flowWorked; } /** @@ -566,15 +528,17 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene if (!file.isOpen()) { file.open(); } + int size = 32 * 1024; + final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size); + try { try (final FileInputStream fis = new FileInputStream(file.getJavaFile()); final FileChannel channel = fis.getChannel()) { // We can afford having a single buffer here for this entire loop // because sendReplicatePacket will encode the packet as a NettyBuffer // through ActiveMQBuffer class leaving this buffer free to be reused on the next copy - int size = 1 << 17; while (true) { - final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size); + buffer.clear(); ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer(); final int bytesRead = channel.read(byteBuffer); int toSend = bytesRead; @@ -597,6 +561,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene } } } finally { + buffer.release(); if (file.isOpen()) file.close(); }