diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index 75981cf8c9..ba4387866c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -1227,6 +1227,27 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C logger.debug("TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet"); } } + + @Override + public void endOfBatch(final Object connectionID) { + RemotingConnection theConn = connection; + + if (theConn != null && connectionID.equals(theConn.getID())) { + try { + theConn.endOfBatch(connectionID); + } catch (final RuntimeException e) { + ActiveMQClientLogger.LOGGER.disconnectOnErrorDecoding(e); + threadPool.execute(new Runnable() { + @Override + public void run() { + theConn.fail(new ActiveMQException(e.getMessage())); + } + }); + } + } else { + logger.debug("TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet"); + } + } } private final class DelegatingFailureListener implements FailureListener { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java index 56f825959f..355e502d30 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java @@ -84,6 +84,18 @@ public interface Channel { */ boolean sendBatched(Packet packet); + /** + * Sends a packet on this channel, but request it to be flushed (along with the un-flushed previous ones) only iff + * {@code flushConnection} is {@code true}. + * + * @param packet the packet to send + * @param flushConnection if {@code true} requests this {@code packet} and any un-flushed previous sent one to be flushed + * to the underlying connection + * @return false if the packet was rejected by an outgoing interceptor; true if the send was + * successful + */ + boolean send(Packet packet, boolean flushConnection); + /** * Sends a packet on this channel and then blocks until it has been written to the connection. * @@ -131,6 +143,8 @@ public interface Channel { */ ChannelHandler getHandler(); + void endOfBatch(); + /** * Closes this channel. *
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ChannelHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ChannelHandler.java
index a44b6d5234..4eef1813f4 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ChannelHandler.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/ChannelHandler.java
@@ -28,4 +28,8 @@ public interface ChannelHandler {
* @param packet the packet received
*/
void handlePacket(Packet packet);
+
+ default void endOfBatch() {
+
+ }
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index 84cc135fcf..47fecc8ca2 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -231,6 +231,79 @@ public final class ChannelImpl implements Channel {
}
}
+ @Override
+ public boolean send(Packet packet, boolean flushConnection) {
+ if (invokeInterceptors(packet, interceptors, connection) != null) {
+ return false;
+ }
+
+ final ResponseCache responseAsyncCache = this.responseAsyncCache;
+
+ synchronized (sendLock) {
+ packet.setChannelID(id);
+
+ if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
+ packet.setCorrelationID(responseAsyncCache.nextCorrelationID());
+ }
+
+ if (logger.isTraceEnabled()) {
+ logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id);
+ }
+
+ ActiveMQBuffer buffer = packet.encode(connection);
+
+ lock.lock();
+
+ try {
+ if (failingOver) {
+ waitForFailOver("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " timed-out waiting for fail-over condition on non-blocking send");
+ }
+
+ // Sanity check
+ if (transferring) {
+ throw ActiveMQClientMessageBundle.BUNDLE.cannotSendPacketDuringFailover();
+ }
+
+ if (resendCache != null && packet.isRequiresConfirmations()) {
+ addResendPacket(packet);
+ }
+
+ } finally {
+ lock.unlock();
+ }
+
+ if (logger.isTraceEnabled()) {
+ logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Writing buffer for channelID=" + id);
+ }
+
+ //We do this outside the lock as ResponseCache is threadsafe and allows responses to come in,
+ //As the send could block if the response cache cannot add, preventing responses to be handled.
+ if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
+ while (!responseAsyncCache.add(packet)) {
+ try {
+ Thread.sleep(1);
+ } catch (Exception e) {
+ // Ignore
+ }
+ }
+ }
+
+ // The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
+ // buffer is full, preventing any incoming buffers being handled and blocking failover
+ try {
+ connection.getTransportConnection().write(buffer, flushConnection);
+ } catch (Throwable t) {
+ //If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full.
+ //The client would get still know about this as the exception bubbles up the call stack instead.
+ if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
+ responseAsyncCache.remove(packet.getCorrelationID());
+ }
+ throw t;
+ }
+ return true;
+ }
+ }
+
@Override
public boolean sendAndFlush(final Packet packet) {
return send(packet, -1, true, false);
@@ -547,6 +620,15 @@ public final class ChannelImpl implements Channel {
return handler;
}
+ @Override
+ public void endOfBatch() {
+ ChannelHandler handler = this.handler;
+ if (handler == null) {
+ return;
+ }
+ handler.endOfBatch();
+ }
+
@Override
public void close() {
if (closed) {
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
index 418e3f150b..dcc8ecbe75 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
@@ -391,6 +391,15 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
}
}
+ @Override
+ public void endOfBatch(Object connectionID) {
+ super.endOfBatch(connectionID);
+ // TODO we really need a lock here?
+ synchronized (transferLock) {
+ channels.forEach((channelID, channel) -> channel.endOfBatch());
+ }
+ }
+
@Override
public String getTransportLocalAddress() {
return getTransportConnection().getLocalAddress();
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java
index de8b49ef56..acd314ce92 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java
@@ -76,6 +76,12 @@ public class ActiveMQChannelHandler extends ChannelDuplexHandler {
}
}
+ @Override
+ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+ super.channelReadComplete(ctx);
+ handler.endOfBatch(channelId(ctx.channel()));
+ }
+
@Override
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
synchronized (this) {
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
index 51330c727b..cde37ae7e9 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
@@ -281,6 +281,17 @@ public class NettyConnection implements Connection {
write(buffer, false, false);
}
+ @Override
+ public void write(ActiveMQBuffer buffer, boolean requestFlush) {
+ final Channel channel = this.channel;
+ final ByteBuf bytes = buffer.byteBuf();
+ if (requestFlush) {
+ channel.writeAndFlush(bytes, channel.voidPromise());
+ } else {
+ channel.write(bytes, channel.voidPromise());
+ }
+ }
+
@Override
public final void write(ActiveMQBuffer buffer, final boolean flush, final boolean batched) {
write(buffer, flush, batched, null);
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/BufferHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/BufferHandler.java
index 5f390a1728..f4ddf9d748 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/BufferHandler.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/BufferHandler.java
@@ -32,4 +32,8 @@ public interface BufferHandler {
* @param buffer the buffer to decode
*/
void bufferReceived(Object connectionID, ActiveMQBuffer buffer);
+
+ default void endOfBatch(Object connectionID) {
+
+ }
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
index ebde456034..0f76354729 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
@@ -76,6 +76,15 @@ public interface Connection {
*/
Object getID();
+ /**
+ * writes the buffer to the connection and if flush is true request to flush the buffer
+ * (and any previous un-flushed ones) into the wire.
+ *
+ * @param buffer the buffer to write
+ * @param requestFlush whether to request flush onto the wire
+ */
+ void write(ActiveMQBuffer buffer, boolean requestFlush);
+
/**
* writes the buffer to the connection and if flush is true returns only when the buffer has been physically written to the connection.
*
diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
index 4863028293..d2eccf3378 100644
--- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
+++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
@@ -374,6 +374,11 @@ public class ChannelImplTest {
return null;
}
+ @Override
+ public void write(ActiveMQBuffer buffer, boolean requestFlush) {
+
+ }
+
@Override
public void write(ActiveMQBuffer buffer, boolean flush, boolean batched) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
index b2fc576db3..b2d8338ed0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
@@ -175,6 +175,11 @@ public class InVMConnection implements Connection {
public void checkFlushBatchBuffer() {
}
+ @Override
+ public void write(ActiveMQBuffer buffer, boolean requestFlush) {
+ write(buffer, false, false, null);
+ }
+
@Override
public void write(final ActiveMQBuffer buffer) {
write(buffer, false, false, null);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
index 2a49ec3fe2..17294760e2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
@@ -21,6 +21,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
@@ -51,6 +52,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournal
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageInSync;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
+import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
@@ -73,6 +75,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Replicatio
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage.SyncDataType;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
@@ -98,6 +101,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
private final SharedNothingBackupActivation activation;
private final boolean noSync = false;
private Channel channel;
+ private boolean supportResponseBatching;
private Journal[] journals;
private final JournalLoadInformation[] journalLoadInformation = new JournalLoadInformation[2];
@@ -130,6 +134,8 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
private List