ARTEMIS-2877 Fix journal replication scalability
This is allowing journal appends to happen in burst during replication, by batching replication response into the network at the end of the append burst.
This commit is contained in:
parent
a6bf7d0e04
commit
40f20cfe6a
|
@ -1227,6 +1227,27 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
logger.debug("TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void endOfBatch(final Object connectionID) {
|
||||
RemotingConnection theConn = connection;
|
||||
|
||||
if (theConn != null && connectionID.equals(theConn.getID())) {
|
||||
try {
|
||||
theConn.endOfBatch(connectionID);
|
||||
} catch (final RuntimeException e) {
|
||||
ActiveMQClientLogger.LOGGER.disconnectOnErrorDecoding(e);
|
||||
threadPool.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
theConn.fail(new ActiveMQException(e.getMessage()));
|
||||
}
|
||||
});
|
||||
}
|
||||
} else {
|
||||
logger.debug("TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private final class DelegatingFailureListener implements FailureListener {
|
||||
|
|
|
@ -84,6 +84,18 @@ public interface Channel {
|
|||
*/
|
||||
boolean sendBatched(Packet packet);
|
||||
|
||||
/**
|
||||
* Sends a packet on this channel, but request it to be flushed (along with the un-flushed previous ones) only iff
|
||||
* {@code flushConnection} is {@code true}.
|
||||
*
|
||||
* @param packet the packet to send
|
||||
* @param flushConnection if {@code true} requests this {@code packet} and any un-flushed previous sent one to be flushed
|
||||
* to the underlying connection
|
||||
* @return false if the packet was rejected by an outgoing interceptor; true if the send was
|
||||
* successful
|
||||
*/
|
||||
boolean send(Packet packet, boolean flushConnection);
|
||||
|
||||
/**
|
||||
* Sends a packet on this channel and then blocks until it has been written to the connection.
|
||||
*
|
||||
|
@ -131,6 +143,8 @@ public interface Channel {
|
|||
*/
|
||||
ChannelHandler getHandler();
|
||||
|
||||
void endOfBatch();
|
||||
|
||||
/**
|
||||
* Closes this channel.
|
||||
* <p>
|
||||
|
|
|
@ -28,4 +28,8 @@ public interface ChannelHandler {
|
|||
* @param packet the packet received
|
||||
*/
|
||||
void handlePacket(Packet packet);
|
||||
|
||||
default void endOfBatch() {
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -231,6 +231,79 @@ public final class ChannelImpl implements Channel {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean send(Packet packet, boolean flushConnection) {
|
||||
if (invokeInterceptors(packet, interceptors, connection) != null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
final ResponseCache responseAsyncCache = this.responseAsyncCache;
|
||||
|
||||
synchronized (sendLock) {
|
||||
packet.setChannelID(id);
|
||||
|
||||
if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
|
||||
packet.setCorrelationID(responseAsyncCache.nextCorrelationID());
|
||||
}
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id);
|
||||
}
|
||||
|
||||
ActiveMQBuffer buffer = packet.encode(connection);
|
||||
|
||||
lock.lock();
|
||||
|
||||
try {
|
||||
if (failingOver) {
|
||||
waitForFailOver("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " timed-out waiting for fail-over condition on non-blocking send");
|
||||
}
|
||||
|
||||
// Sanity check
|
||||
if (transferring) {
|
||||
throw ActiveMQClientMessageBundle.BUNDLE.cannotSendPacketDuringFailover();
|
||||
}
|
||||
|
||||
if (resendCache != null && packet.isRequiresConfirmations()) {
|
||||
addResendPacket(packet);
|
||||
}
|
||||
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Writing buffer for channelID=" + id);
|
||||
}
|
||||
|
||||
//We do this outside the lock as ResponseCache is threadsafe and allows responses to come in,
|
||||
//As the send could block if the response cache cannot add, preventing responses to be handled.
|
||||
if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
|
||||
while (!responseAsyncCache.add(packet)) {
|
||||
try {
|
||||
Thread.sleep(1);
|
||||
} catch (Exception e) {
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
|
||||
// buffer is full, preventing any incoming buffers being handled and blocking failover
|
||||
try {
|
||||
connection.getTransportConnection().write(buffer, flushConnection);
|
||||
} catch (Throwable t) {
|
||||
//If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full.
|
||||
//The client would get still know about this as the exception bubbles up the call stack instead.
|
||||
if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
|
||||
responseAsyncCache.remove(packet.getCorrelationID());
|
||||
}
|
||||
throw t;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean sendAndFlush(final Packet packet) {
|
||||
return send(packet, -1, true, false);
|
||||
|
@ -547,6 +620,15 @@ public final class ChannelImpl implements Channel {
|
|||
return handler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void endOfBatch() {
|
||||
ChannelHandler handler = this.handler;
|
||||
if (handler == null) {
|
||||
return;
|
||||
}
|
||||
handler.endOfBatch();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (closed) {
|
||||
|
|
|
@ -391,6 +391,15 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void endOfBatch(Object connectionID) {
|
||||
super.endOfBatch(connectionID);
|
||||
// TODO we really need a lock here?
|
||||
synchronized (transferLock) {
|
||||
channels.forEach((channelID, channel) -> channel.endOfBatch());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTransportLocalAddress() {
|
||||
return getTransportConnection().getLocalAddress();
|
||||
|
|
|
@ -76,6 +76,12 @@ public class ActiveMQChannelHandler extends ChannelDuplexHandler {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||
super.channelReadComplete(ctx);
|
||||
handler.endOfBatch(channelId(ctx.channel()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
|
||||
synchronized (this) {
|
||||
|
|
|
@ -281,6 +281,17 @@ public class NettyConnection implements Connection {
|
|||
write(buffer, false, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ActiveMQBuffer buffer, boolean requestFlush) {
|
||||
final Channel channel = this.channel;
|
||||
final ByteBuf bytes = buffer.byteBuf();
|
||||
if (requestFlush) {
|
||||
channel.writeAndFlush(bytes, channel.voidPromise());
|
||||
} else {
|
||||
channel.write(bytes, channel.voidPromise());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void write(ActiveMQBuffer buffer, final boolean flush, final boolean batched) {
|
||||
write(buffer, flush, batched, null);
|
||||
|
|
|
@ -32,4 +32,8 @@ public interface BufferHandler {
|
|||
* @param buffer the buffer to decode
|
||||
*/
|
||||
void bufferReceived(Object connectionID, ActiveMQBuffer buffer);
|
||||
|
||||
default void endOfBatch(Object connectionID) {
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -76,6 +76,15 @@ public interface Connection {
|
|||
*/
|
||||
Object getID();
|
||||
|
||||
/**
|
||||
* writes the buffer to the connection and if flush is true request to flush the buffer
|
||||
* (and any previous un-flushed ones) into the wire.
|
||||
*
|
||||
* @param buffer the buffer to write
|
||||
* @param requestFlush whether to request flush onto the wire
|
||||
*/
|
||||
void write(ActiveMQBuffer buffer, boolean requestFlush);
|
||||
|
||||
/**
|
||||
* writes the buffer to the connection and if flush is true returns only when the buffer has been physically written to the connection.
|
||||
*
|
||||
|
|
|
@ -374,6 +374,11 @@ public class ChannelImplTest {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ActiveMQBuffer buffer, boolean requestFlush) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ActiveMQBuffer buffer, boolean flush, boolean batched) {
|
||||
|
||||
|
|
|
@ -175,6 +175,11 @@ public class InVMConnection implements Connection {
|
|||
public void checkFlushBatchBuffer() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ActiveMQBuffer buffer, boolean requestFlush) {
|
||||
write(buffer, false, false, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(final ActiveMQBuffer buffer) {
|
||||
write(buffer, false, false, null);
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.io.FileOutputStream;
|
|||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
@ -51,6 +52,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournal
|
|||
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageInSync;
|
||||
import org.apache.activemq.artemis.core.protocol.core.Channel;
|
||||
import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
|
||||
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
||||
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
|
||||
|
@ -73,6 +75,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Replicatio
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage.SyncDataType;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
|
||||
import org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||
|
@ -98,6 +101,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
|||
private final SharedNothingBackupActivation activation;
|
||||
private final boolean noSync = false;
|
||||
private Channel channel;
|
||||
private boolean supportResponseBatching;
|
||||
|
||||
private Journal[] journals;
|
||||
private final JournalLoadInformation[] journalLoadInformation = new JournalLoadInformation[2];
|
||||
|
@ -130,6 +134,8 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
|||
|
||||
private List<Interceptor> outgoingInterceptors = null;
|
||||
|
||||
private final ArrayList<Packet> pendingPackets;
|
||||
|
||||
|
||||
// Constructors --------------------------------------------------
|
||||
public ReplicationEndpoint(final ActiveMQServerImpl server,
|
||||
|
@ -140,6 +146,8 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
|||
this.criticalErrorListener = criticalErrorListener;
|
||||
this.wantedFailBack = wantedFailBack;
|
||||
this.activation = activation;
|
||||
this.pendingPackets = new ArrayList<>();
|
||||
this.supportResponseBatching = false;
|
||||
}
|
||||
|
||||
// Public --------------------------------------------------------
|
||||
|
@ -242,15 +250,31 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
|||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Returning " + response);
|
||||
}
|
||||
|
||||
sendResponse(response);
|
||||
if (supportResponseBatching) {
|
||||
pendingPackets.add(response);
|
||||
} else {
|
||||
channel.send(response);
|
||||
}
|
||||
} else {
|
||||
logger.trace("Response is null, ignoring response");
|
||||
}
|
||||
}
|
||||
|
||||
protected void sendResponse(PacketImpl response) {
|
||||
channel.send(response);
|
||||
@Override
|
||||
public void endOfBatch() {
|
||||
final ArrayList<Packet> pendingPackets = this.pendingPackets;
|
||||
if (pendingPackets.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
for (int i = 0, size = pendingPackets.size(); i < size; i++) {
|
||||
final Packet packet = pendingPackets.get(i);
|
||||
final boolean isLast = i == (size - 1);
|
||||
channel.send(packet, isLast);
|
||||
}
|
||||
} finally {
|
||||
pendingPackets.clear();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -365,6 +389,21 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
|||
|
||||
public void setChannel(final Channel channel) {
|
||||
this.channel = channel;
|
||||
if (channel == null) {
|
||||
supportResponseBatching = false;
|
||||
} else {
|
||||
try {
|
||||
final CoreRemotingConnection connection = channel.getConnection();
|
||||
if (connection != null) {
|
||||
this.supportResponseBatching = connection.getTransportConnection() instanceof NettyConnection;
|
||||
} else {
|
||||
this.supportResponseBatching = false;
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
logger.warn("Error while checking the channel connection", t);
|
||||
this.supportResponseBatching = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (this.channel != null && outgoingInterceptors != null) {
|
||||
if (channel.getConnection() instanceof RemotingConnectionImpl) {
|
||||
|
@ -551,7 +590,10 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
|||
registerJournal(journalContent.typeByte, syncJournal);
|
||||
|
||||
// We send a response now, to avoid a situation where we handle votes during the deactivation of the live during a failback.
|
||||
sendResponse(replicationResponseMessage);
|
||||
if (supportResponseBatching) {
|
||||
endOfBatch();
|
||||
}
|
||||
channel.send(replicationResponseMessage);
|
||||
replicationResponseMessage = null;
|
||||
|
||||
// This needs to be done after the response is sent, to avoid voting shutting it down for any reason.
|
||||
|
|
|
@ -194,6 +194,11 @@ public class BackupSyncDelay implements Interceptor {
|
|||
this.channel = channel;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean send(Packet packet, boolean flushConnection) {
|
||||
return channel.send(packet, flushConnection);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ChannelWrapper(" + channel + ")";
|
||||
|
@ -237,6 +242,11 @@ public class BackupSyncDelay implements Interceptor {
|
|||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void endOfBatch() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
throw new UnsupportedOperationException();
|
||||
|
|
Loading…
Reference in New Issue