diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java index f2aa5b48d9..45d92299e4 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java @@ -111,4 +111,12 @@ public interface CoreRemotingConnection extends RemotingConnection { * @return the principal */ ActiveMQPrincipal getDefaultActiveMQPrincipal(); + + /** + * + * @param size size we are trying to write + * @param timeout + * @return + */ + boolean blockUntilWritable(int size, long timeout); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java index ddb734ee80..a86c5c102b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java @@ -24,6 +24,8 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; */ public interface Packet { + int INITIAL_PACKET_SIZE = 1500; + /** * Sets the channel id that should be used once the packet has been successfully decoded it is * sent to the correct channel. @@ -32,6 +34,14 @@ public interface Packet { */ void setChannelID(long channelID); + /** + * This will return the expected packet size for the encoding + * @return + */ + default int expectedEncodeSize() { + return INITIAL_PACKET_SIZE; + } + /** * Returns the channel id of the channel that should handle this packet. * diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java index 75f50862ba..97a7973f7a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java @@ -39,8 +39,6 @@ public class PacketImpl implements Packet { public static final int PACKET_HEADERS_SIZE = DataConstants.SIZE_INT + DataConstants.SIZE_BYTE + DataConstants.SIZE_LONG; - private static final int INITIAL_PACKET_SIZE = 1500; - protected long channelID; private final byte type; @@ -329,10 +327,13 @@ public class PacketImpl implements Packet { } protected ActiveMQBuffer createPacket(RemotingConnection connection) { + + int size = expectedEncodeSize(); + if (connection == null) { - return new ChannelBufferWrapper(Unpooled.buffer(INITIAL_PACKET_SIZE)); + return new ChannelBufferWrapper(Unpooled.buffer(size)); } else { - return connection.createTransportBuffer(PacketImpl.INITIAL_PACKET_SIZE); + return connection.createTransportBuffer(size); } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java index f4efcb2308..cc1d6852b1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -230,6 +231,11 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement callClosingListeners(); } + @Override + public boolean blockUntilWritable(int size, long timeout) { + return transportConnection.blockUntilWritable(size, timeout, TimeUnit.MILLISECONDS); + } + @Override public void disconnect(final boolean criticalError) { disconnect(null, criticalError); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java index 0c32007f0c..97b476d386 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java @@ -16,12 +16,8 @@ */ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; -import io.netty.buffer.Unpooled; -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ICoreMessage; -import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; -import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; public abstract class MessagePacket extends PacketImpl implements MessagePacketI { @@ -43,12 +39,4 @@ public abstract class MessagePacket extends PacketImpl implements MessagePacketI return super.getParentString() + ", message=" + message; } - protected ActiveMQBuffer internalCreatePacket(int size, RemotingConnection connection) { - if (connection == null) { - return new ChannelBufferWrapper(Unpooled.buffer(size)); - } else { - return connection.createTransportBuffer(size); - } - } - } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java index d2a4266db2..a894594d32 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java @@ -18,11 +18,8 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import java.util.Arrays; -import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; -import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.utils.DataConstants; public abstract class SessionContinuationMessage extends PacketImpl { @@ -74,18 +71,9 @@ public abstract class SessionContinuationMessage extends PacketImpl { * * @return the size in bytes of the expected encoded packet */ - protected int expectedEncodedSize() { - return SESSION_CONTINUATION_BASE_SIZE + (body == null ? 0 : body.length); - } - @Override - protected final ActiveMQBuffer createPacket(RemotingConnection connection) { - final int expectedEncodedSize = expectedEncodedSize(); - if (connection == null) { - return new ChannelBufferWrapper(Unpooled.buffer(expectedEncodedSize)); - } else { - return connection.createTransportBuffer(expectedEncodedSize); - } + public int expectedEncodeSize() { + return SESSION_CONTINUATION_BASE_SIZE + (body == null ? 0 : body.length); } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java index 44ad1bbf49..41e786bec3 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java @@ -70,8 +70,8 @@ public class SessionReceiveContinuationMessage extends SessionContinuationMessag // Protected ----------------------------------------------------- @Override - protected final int expectedEncodedSize() { - return super.expectedEncodedSize() + DataConstants.SIZE_LONG; + public int expectedEncodeSize() { + return super.expectedEncodeSize() + DataConstants.SIZE_LONG; } // Public -------------------------------------------------------- @@ -128,4 +128,4 @@ public class SessionReceiveContinuationMessage extends SessionContinuationMessag return true; } -} \ No newline at end of file +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java index d89e394b65..4fbd48f30d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java @@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.core.message.impl.CoreMessage; -import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.utils.DataConstants; public class SessionReceiveMessage extends MessagePacket { @@ -53,9 +52,10 @@ public class SessionReceiveMessage extends MessagePacket { return deliveryCount; } + @Override - protected ActiveMQBuffer createPacket(RemotingConnection connection) { - return internalCreatePacket(message.getEncodeSize() + PACKET_HEADERS_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_INT, connection); + public int expectedEncodeSize() { + return message.getEncodeSize() + PACKET_HEADERS_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_INT; } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java index 1c600e9e18..26eedd76fc 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java @@ -93,8 +93,8 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage { } @Override - protected final int expectedEncodedSize() { - return super.expectedEncodedSize() + (!continues ? DataConstants.SIZE_LONG : 0) + DataConstants.SIZE_BOOLEAN; + public int expectedEncodeSize() { + return super.expectedEncodeSize() + (!continues ? DataConstants.SIZE_LONG : 0) + DataConstants.SIZE_BOOLEAN; } @Override @@ -160,4 +160,4 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage { public SendAcknowledgementHandler getHandler() { return handler; } -} \ No newline at end of file +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java index 79cb4cb6eb..9f76c2d2d1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java @@ -21,7 +21,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; import org.apache.activemq.artemis.core.message.impl.CoreMessage; -import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; public class SessionSendMessage extends MessagePacket { @@ -62,8 +61,8 @@ public class SessionSendMessage extends MessagePacket { } @Override - protected ActiveMQBuffer createPacket(RemotingConnection connection) { - return internalCreatePacket(message.getEncodeSize() + PACKET_HEADERS_SIZE + 1, connection); + public int expectedEncodeSize() { + return message.getEncodeSize() + PACKET_HEADERS_SIZE + 1; } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java index 8d22fabf4c..4a5a8b5bcd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java @@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE; +import org.apache.activemq.artemis.utils.DataConstants; public final class ReplicationAddMessage extends PacketImpl { @@ -63,10 +64,20 @@ public final class ReplicationAddMessage extends PacketImpl { // Public -------------------------------------------------------- + @Override + public int expectedEncodeSize() { + return PACKET_HEADERS_SIZE + + DataConstants.SIZE_BYTE + // buffer.writeByte(journalID); + DataConstants.SIZE_BOOLEAN + // buffer.writeBoolean(operation.toBoolean()); + DataConstants.SIZE_LONG + // buffer.writeLong(id); + DataConstants.SIZE_BYTE + // buffer.writeByte(journalRecordType); + DataConstants.SIZE_INT + // buffer.writeInt(persister.getEncodeSize(encodingData)); + persister.getEncodeSize(encodingData);// persister.encode(buffer, encodingData); + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeByte(journalID); - buffer.writeBoolean(operation.toBoolean()); buffer.writeLong(id); buffer.writeByte(journalRecordType); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java index a6fd02bdf9..fd7946a3e6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java @@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE; +import org.apache.activemq.artemis.utils.DataConstants; public class ReplicationAddTXMessage extends PacketImpl { @@ -67,6 +68,18 @@ public class ReplicationAddTXMessage extends PacketImpl { // Public -------------------------------------------------------- + @Override + public int expectedEncodeSize() { + return PACKET_HEADERS_SIZE + + DataConstants.SIZE_BYTE + // buffer.writeByte(journalID); + DataConstants.SIZE_BOOLEAN + // buffer.writeBoolean(operation.toBoolean()); + DataConstants.SIZE_LONG + // buffer.writeLong(txId); + DataConstants.SIZE_LONG + // buffer.writeLong(id); + DataConstants.SIZE_BYTE + // buffer.writeByte(recordType); + DataConstants.SIZE_INT + // buffer.writeInt(persister.getEncodeSize(encodingData)); + persister.getEncodeSize(encodingData); // persister.encode(buffer, encodingData); + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeByte(journalID); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationCommitMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationCommitMessage.java index 1987b3db1b..245ec18eb4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationCommitMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationCommitMessage.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.utils.DataConstants; public final class ReplicationCommitMessage extends PacketImpl { @@ -41,6 +42,14 @@ public final class ReplicationCommitMessage extends PacketImpl { this.txId = txId; } + @Override + public int expectedEncodeSize() { + return PACKET_HEADERS_SIZE + + DataConstants.SIZE_BYTE + // buffer.writeByte(journalID); + DataConstants.SIZE_BOOLEAN + // buffer.writeBoolean(rollback); + DataConstants.SIZE_LONG; // buffer.writeLong(txId); + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeByte(journalID); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteMessage.java index ab97b18579..fdbfb9b7fa 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteMessage.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.utils.DataConstants; public final class ReplicationDeleteMessage extends PacketImpl { @@ -38,6 +39,14 @@ public final class ReplicationDeleteMessage extends PacketImpl { this.id = id; } + @Override + public int expectedEncodeSize() { + return PACKET_HEADERS_SIZE + + DataConstants.SIZE_BYTE + // buffer.writeByte(journalID); + DataConstants.SIZE_LONG; // buffer.writeLong(id); + } + + @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeByte(journalID); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteTXMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteTXMessage.java index 0d7552396b..4e942bd58f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteTXMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteTXMessage.java @@ -21,6 +21,7 @@ import java.util.Arrays; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.utils.DataConstants; public class ReplicationDeleteTXMessage extends PacketImpl { @@ -52,6 +53,16 @@ public class ReplicationDeleteTXMessage extends PacketImpl { this.encodingData = encodingData; } + @Override + public int expectedEncodeSize() { + return PACKET_HEADERS_SIZE + + DataConstants.SIZE_BYTE + // buffer.writeByte(journalID); + DataConstants.SIZE_LONG + // buffer.writeLong(txId); + DataConstants.SIZE_LONG + // buffer.writeLong(id); + DataConstants.SIZE_INT + // buffer.writeInt(encodingData.getEncodeSize()); + encodingData.getEncodeSize(); // encodingData.encode(buffer); + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeByte(journalID); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageBeginMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageBeginMessage.java index 20af68c82a..1ecaa683c0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageBeginMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageBeginMessage.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.utils.DataConstants; public class ReplicationLargeMessageBeginMessage extends PacketImpl { @@ -32,6 +33,14 @@ public class ReplicationLargeMessageBeginMessage extends PacketImpl { super(PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN); } + + @Override + public int expectedEncodeSize() { + return PACKET_HEADERS_SIZE + + DataConstants.SIZE_LONG; // buffer.writeLong(messageId); + } + + @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeLong(messageId); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java index bb779292cb..4a09cc034c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.utils.DataConstants; public class ReplicationLargeMessageEndMessage extends PacketImpl { @@ -32,6 +33,13 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl { this.messageId = messageId; } + + @Override + public int expectedEncodeSize() { + return PACKET_HEADERS_SIZE + + DataConstants.SIZE_LONG; // buffer.writeLong(messageId); + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeLong(messageId); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageWriteMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageWriteMessage.java index f60c62994b..ffee14c8ac 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageWriteMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageWriteMessage.java @@ -20,6 +20,7 @@ import java.util.Arrays; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.utils.DataConstants; public final class ReplicationLargeMessageWriteMessage extends PacketImpl { @@ -42,6 +43,15 @@ public final class ReplicationLargeMessageWriteMessage extends PacketImpl { this.body = body; } + @Override + public int expectedEncodeSize() { + return PACKET_HEADERS_SIZE + + DataConstants.SIZE_LONG + // buffer.writeLong(messageId); + DataConstants.SIZE_LONG + // buffer.writeLong(messageId); + DataConstants.SIZE_INT + // buffer.writeInt(body.length); + body.length; // buffer.writeBytes(body); + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeLong(messageId); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLiveIsStoppingMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLiveIsStoppingMessage.java index 339cfa82da..456a46bb0c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLiveIsStoppingMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLiveIsStoppingMessage.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.utils.DataConstants; /** * Message indicating that the live is stopping (a scheduled stop). @@ -59,6 +60,12 @@ public final class ReplicationLiveIsStoppingMessage extends PacketImpl { this.liveStopping = b; } + @Override + public int expectedEncodeSize() { + return PACKET_HEADERS_SIZE + + DataConstants.SIZE_INT; // buffer.writeInt(liveStopping.code); + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeInt(liveStopping.code); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageEventMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageEventMessage.java index 31680daf49..ea929e4d70 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageEventMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageEventMessage.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.utils.DataConstants; public class ReplicationPageEventMessage extends PacketImpl { @@ -42,6 +43,14 @@ public class ReplicationPageEventMessage extends PacketImpl { this.storeName = storeName; } + @Override + public int expectedEncodeSize() { + return PACKET_HEADERS_SIZE + + SimpleString.sizeofString(storeName) + // buffer.writeSimpleString(storeName); + DataConstants.SIZE_INT + // buffer.writeInt(pageNumber); + DataConstants.SIZE_BOOLEAN; // buffer.writeBoolean(isDelete); + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeSimpleString(storeName); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java index b88e0feee2..4f5079de9d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java @@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.impl.PagedMessageImpl; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.utils.DataConstants; public class ReplicationPageWriteMessage extends PacketImpl { @@ -39,6 +40,13 @@ public class ReplicationPageWriteMessage extends PacketImpl { // Public -------------------------------------------------------- + @Override + public int expectedEncodeSize() { + return PACKET_HEADERS_SIZE + + DataConstants.SIZE_INT + // buffer.writeInt(pageNumber); + pagedMessage.getEncodeSize(); // pagedMessage.encode(buffer); + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeInt(pageNumber); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPrepareMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPrepareMessage.java index ebdc1c4e2a..cf993a3083 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPrepareMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPrepareMessage.java @@ -21,6 +21,7 @@ import java.util.Arrays; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.utils.DataConstants; public final class ReplicationPrepareMessage extends PacketImpl { @@ -48,6 +49,15 @@ public final class ReplicationPrepareMessage extends PacketImpl { // Public -------------------------------------------------------- + @Override + public int expectedEncodeSize() { + return PACKET_HEADERS_SIZE + + DataConstants.SIZE_BYTE + // buffer.writeByte(journalID); + DataConstants.SIZE_LONG + // buffer.writeLong(txId); + DataConstants.SIZE_INT + // buffer.writeInt(encodingData.getEncodeSize()); + encodingData.getEncodeSize(); // encodingData.encode(buffer); + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeByte(journalID); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessage.java index c7eff85466..d9d8b1a569 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessage.java @@ -27,4 +27,11 @@ public class ReplicationResponseMessage extends PacketImpl { public ReplicationResponseMessage(byte replicationResponseV2) { super(replicationResponseV2); } + + + @Override + public int expectedEncodeSize() { + return PACKET_HEADERS_SIZE; + } + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java index f9001c641d..b26084bb4c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.utils.DataConstants; public final class ReplicationResponseMessageV2 extends ReplicationResponseMessage { @@ -41,6 +42,12 @@ public final class ReplicationResponseMessageV2 extends ReplicationResponseMessa this.synchronizationIsFinishedAcknowledgement = synchronizationIsFinishedAcknowledgement; } + @Override + public int expectedEncodeSize() { + return PACKET_HEADERS_SIZE + + DataConstants.SIZE_BOOLEAN; // buffer.writeBoolean(synchronizationIsFinishedAcknowledgement); + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { super.encodeRest(buffer); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java index a44707f017..018535f426 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java @@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.utils.DataConstants; /** * This message may signal start or end of the replication synchronization. @@ -109,6 +110,26 @@ public class ReplicationStartSyncMessage extends PacketImpl { } } + + @Override + public int expectedEncodeSize() { + int size = PACKET_HEADERS_SIZE + + DataConstants.SIZE_BOOLEAN + // buffer.writeBoolean(synchronizationIsFinished); + DataConstants.SIZE_BOOLEAN + // buffer.writeBoolean(allowsAutoFailBack); + nodeID.length() * 3; // buffer.writeString(nodeID); -- an estimate + + + if (synchronizationIsFinished) { + return size; + } + size += DataConstants.SIZE_BYTE + // buffer.writeByte(dataType.code); + DataConstants.SIZE_INT + // buffer.writeInt(ids.length); + DataConstants.SIZE_LONG * ids.length; // the write loop + + return size; + } + + @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeBoolean(synchronizationIsFinished); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java index 90d2ca0b4a..4d3c32fa2c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java @@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.utils.DataConstants; /** * Message is used to sync {@link org.apache.activemq.artemis.core.io.SequentialFile}s to a backup server. The {@link FileType} controls @@ -98,6 +99,38 @@ public final class ReplicationSyncFileMessage extends PacketImpl { } } + @Override + public int expectedEncodeSize() { + int size = PACKET_HEADERS_SIZE + + DataConstants.SIZE_LONG; // buffer.writeLong(fileId); + + if (fileId == -1) + return size; + + size += DataConstants.SIZE_BYTE; // buffer.writeByte(fileType.code); + switch (fileType) { + case JOURNAL: { + size += DataConstants.SIZE_BYTE; // buffer.writeByte(journalType.typeByte); + break; + } + case PAGE: { + size += SimpleString.sizeofString(pageStoreName); + break; + } + case LARGE_MESSAGE: + default: + // no-op + } + + size += DataConstants.SIZE_INT; // buffer.writeInt(dataSize); + + if (dataSize > 0) { + size += byteBuffer.writerIndex(); // buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex()); + } + + return size; + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeLong(fileId); @@ -126,11 +159,6 @@ public final class ReplicationSyncFileMessage extends PacketImpl { if (dataSize > 0) { buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex()); } - - if (byteBuffer != null) { - byteBuffer.release(); - byteBuffer = null; - } } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyServerConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyServerConnection.java index 29e39d5bfe..f9e1b3d2f2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyServerConnection.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyServerConnection.java @@ -19,8 +19,6 @@ package org.apache.activemq.artemis.core.remoting.impl.netty; import java.util.Map; import io.netty.channel.Channel; -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener; public class NettyServerConnection extends NettyConnection { @@ -33,8 +31,4 @@ public class NettyServerConnection extends NettyConnection { super(configuration, channel, listener, batchingEnabled, directDeliver); } - @Override - public ActiveMQBuffer createTransportBuffer(int size) { - return new ChannelBufferWrapper(channel.alloc().directBuffer(size), true); - } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index 8b91c02a5a..91dfa98d94 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -33,16 +33,15 @@ import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; -import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.SessionFailureListener; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.journal.EncodingSupport; -import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.persistence.OperationContext; +import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl; import org.apache.activemq.artemis.core.protocol.core.Channel; @@ -71,7 +70,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; -import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.ReusableLatch; import org.jboss.logging.Logger; @@ -84,7 +82,7 @@ import org.jboss.logging.Logger; * * @see ReplicationEndpoint */ -public final class ReplicationManager implements ActiveMQComponent, ReadyListener { +public final class ReplicationManager implements ActiveMQComponent { private static final Logger logger = Logger.getLogger(ReplicationManager.class); @@ -119,8 +117,6 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene private final AtomicBoolean writable = new AtomicBoolean(true); - private final Object replicationLock = new Object(); - private final Queue pendingTokens = new ConcurrentLinkedQueue<>(); private final ExecutorFactory executorFactory; @@ -292,12 +288,9 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene replicatingChannel.getConnection().getTransportConnection().fireReady(true); } - synchronized (replicationLock) { - enabled = false; - writable.set(true); - replicationLock.notifyAll(); - clearReplicationTokens(); - } + enabled = false; + writable.set(true); + clearReplicationTokens(); RemotingConnection toStop = remotingConnection; if (toStop != null) { @@ -315,16 +308,13 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene */ public void clearReplicationTokens() { logger.trace("clearReplicationTokens initiating"); - synchronized (replicationLock) { - logger.trace("clearReplicationTokens entered the lock"); - while (!pendingTokens.isEmpty()) { - OperationContext ctx = pendingTokens.poll(); - logger.trace("Calling ctx.replicationDone()"); - try { - ctx.replicationDone(); - } catch (Throwable e) { - ActiveMQServerLogger.LOGGER.errorCompletingCallbackOnReplicationManager(e); - } + while (!pendingTokens.isEmpty()) { + OperationContext ctx = pendingTokens.poll(); + logger.trace("Calling ctx.replicationDone()"); + try { + ctx.replicationDone(); + } catch (Throwable e) { + ActiveMQServerLogger.LOGGER.errorCompletingCallbackOnReplicationManager(e); } } logger.trace("clearReplicationTokens finished"); @@ -362,24 +352,22 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene repliToken.replicationLineUp(); } - synchronized (replicationLock) { - if (enabled) { - pendingTokens.add(repliToken); - if (useExecutor) { - replicationStream.execute(() -> { - if (enabled) { - flowControl(); - replicatingChannel.send(packet); - } - }); - } else { - flowControl(); - replicatingChannel.send(packet); - } + if (enabled) { + pendingTokens.add(repliToken); + if (useExecutor) { + replicationStream.execute(() -> { + if (enabled) { + flowControl(packet.expectedEncodeSize()); + replicatingChannel.send(packet); + } + }); } else { - // Already replicating channel failed, so just play the action now - runItNow = true; + flowControl(packet.expectedEncodeSize()); + replicatingChannel.send(packet); } + } else { + // Already replicating channel failed, so just play the action now + runItNow = true; } // Execute outside lock @@ -395,47 +383,20 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene * This was written as a refactoring of sendReplicatePacket. * In case you refactor this in any way, this method must hold a lock on replication lock. . */ - private boolean flowControl() { - synchronized (replicationLock) { - // synchronized (replicationLock) { -- I'm not adding this because the caller already has it - // future maintainers of this code please be aware that the intention here is hold the lock on replication lock - if (!replicatingChannel.getConnection().isWritable(this)) { - try { - logger.trace("flowControl waiting on writable replication"); - writable.set(false); - //don't wait for ever as this may hang tests etc, we've probably been closed anyway - long now = System.currentTimeMillis(); - long deadline = now + timeout; - while (!writable.get() && now < deadline) { - replicationLock.wait(deadline - now); - now = System.currentTimeMillis(); - } - logger.trace("flow control done on replication"); + private boolean flowControl(int size) { + boolean flowWorked = replicatingChannel.getConnection().blockUntilWritable(size, timeout); - if (!writable.get()) { - ActiveMQServerLogger.LOGGER.slowReplicationResponse(); - logger.tracef("There was no response from replication backup after %s seconds, server being stopped now", System.currentTimeMillis() - now); - try { - stop(); - } catch (Exception e) { - logger.warn(e.getMessage(), e); - } - return false; - } - } catch (InterruptedException e) { - throw new ActiveMQInterruptedException(e); - } + if (!flowWorked) { + try { + ActiveMQServerLogger.LOGGER.slowReplicationResponse(); + stop(); + } catch (Exception e) { + logger.warn(e.getMessage(), e); } } - return true; - } - @Override - public void readyForWriting() { - synchronized (replicationLock) { - writable.set(true); - replicationLock.notifyAll(); - } + + return flowWorked; } /** @@ -569,15 +530,17 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene if (!file.isOpen()) { file.open(); } + int size = 32 * 1024; + final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size); + try { try (final FileInputStream fis = new FileInputStream(file.getJavaFile()); final FileChannel channel = fis.getChannel()) { // We can afford having a single buffer here for this entire loop // because sendReplicatePacket will encode the packet as a NettyBuffer // through ActiveMQBuffer class leaving this buffer free to be reused on the next copy - int size = 1 << 17; while (true) { - final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size); + buffer.clear(); ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer(); final int bytesRead = channel.read(byteBuffer); int toSend = bytesRead; @@ -600,6 +563,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene } } } finally { + buffer.release(); if (file.isOpen()) file.close(); }