This commit is contained in:
Clebert Suconic 2020-08-17 13:01:42 -04:00
commit f6ac058f7f
15 changed files with 243 additions and 23 deletions

View File

@ -1227,6 +1227,27 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
logger.debug("TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet");
}
}
@Override
public void endOfBatch(final Object connectionID) {
RemotingConnection theConn = connection;
if (theConn != null && connectionID.equals(theConn.getID())) {
try {
theConn.endOfBatch(connectionID);
} catch (final RuntimeException e) {
ActiveMQClientLogger.LOGGER.disconnectOnErrorDecoding(e);
threadPool.execute(new Runnable() {
@Override
public void run() {
theConn.fail(new ActiveMQException(e.getMessage()));
}
});
}
} else {
logger.debug("TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet");
}
}
}
private final class DelegatingFailureListener implements FailureListener {

View File

@ -84,6 +84,18 @@ public interface Channel {
*/
boolean sendBatched(Packet packet);
/**
* Sends a packet on this channel, but request it to be flushed (along with the un-flushed previous ones) only iff
* {@code flushConnection} is {@code true}.
*
* @param packet the packet to send
* @param flushConnection if {@code true} requests this {@code packet} and any un-flushed previous sent one to be flushed
* to the underlying connection
* @return false if the packet was rejected by an outgoing interceptor; true if the send was
* successful
*/
boolean send(Packet packet, boolean flushConnection);
/**
* Sends a packet on this channel and then blocks until it has been written to the connection.
*
@ -131,6 +143,8 @@ public interface Channel {
*/
ChannelHandler getHandler();
void endOfBatch();
/**
* Closes this channel.
* <p>

View File

@ -28,4 +28,8 @@ public interface ChannelHandler {
* @param packet the packet received
*/
void handlePacket(Packet packet);
default void endOfBatch() {
}
}

View File

@ -231,6 +231,79 @@ public final class ChannelImpl implements Channel {
}
}
@Override
public boolean send(Packet packet, boolean flushConnection) {
if (invokeInterceptors(packet, interceptors, connection) != null) {
return false;
}
final ResponseCache responseAsyncCache = this.responseAsyncCache;
synchronized (sendLock) {
packet.setChannelID(id);
if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
packet.setCorrelationID(responseAsyncCache.nextCorrelationID());
}
if (logger.isTraceEnabled()) {
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id);
}
ActiveMQBuffer buffer = packet.encode(connection);
lock.lock();
try {
if (failingOver) {
waitForFailOver("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " timed-out waiting for fail-over condition on non-blocking send");
}
// Sanity check
if (transferring) {
throw ActiveMQClientMessageBundle.BUNDLE.cannotSendPacketDuringFailover();
}
if (resendCache != null && packet.isRequiresConfirmations()) {
addResendPacket(packet);
}
} finally {
lock.unlock();
}
if (logger.isTraceEnabled()) {
logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Writing buffer for channelID=" + id);
}
//We do this outside the lock as ResponseCache is threadsafe and allows responses to come in,
//As the send could block if the response cache cannot add, preventing responses to be handled.
if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
while (!responseAsyncCache.add(packet)) {
try {
Thread.sleep(1);
} catch (Exception e) {
// Ignore
}
}
}
// The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
// buffer is full, preventing any incoming buffers being handled and blocking failover
try {
connection.getTransportConnection().write(buffer, flushConnection);
} catch (Throwable t) {
//If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full.
//The client would get still know about this as the exception bubbles up the call stack instead.
if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
responseAsyncCache.remove(packet.getCorrelationID());
}
throw t;
}
return true;
}
}
@Override
public boolean sendAndFlush(final Packet packet) {
return send(packet, -1, true, false);
@ -547,6 +620,15 @@ public final class ChannelImpl implements Channel {
return handler;
}
@Override
public void endOfBatch() {
ChannelHandler handler = this.handler;
if (handler == null) {
return;
}
handler.endOfBatch();
}
@Override
public void close() {
if (closed) {

View File

@ -391,6 +391,15 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
}
}
@Override
public void endOfBatch(Object connectionID) {
super.endOfBatch(connectionID);
// TODO we really need a lock here?
synchronized (transferLock) {
channels.forEach((channelID, channel) -> channel.endOfBatch());
}
}
@Override
public String getTransportLocalAddress() {
return getTransportConnection().getLocalAddress();

View File

@ -76,6 +76,12 @@ public class ActiveMQChannelHandler extends ChannelDuplexHandler {
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
super.channelReadComplete(ctx);
handler.endOfBatch(channelId(ctx.channel()));
}
@Override
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
synchronized (this) {

View File

@ -281,6 +281,17 @@ public class NettyConnection implements Connection {
write(buffer, false, false);
}
@Override
public void write(ActiveMQBuffer buffer, boolean requestFlush) {
final Channel channel = this.channel;
final ByteBuf bytes = buffer.byteBuf();
if (requestFlush) {
channel.writeAndFlush(bytes, channel.voidPromise());
} else {
channel.write(bytes, channel.voidPromise());
}
}
@Override
public final void write(ActiveMQBuffer buffer, final boolean flush, final boolean batched) {
write(buffer, flush, batched, null);

View File

@ -32,4 +32,8 @@ public interface BufferHandler {
* @param buffer the buffer to decode
*/
void bufferReceived(Object connectionID, ActiveMQBuffer buffer);
default void endOfBatch(Object connectionID) {
}
}

View File

@ -76,6 +76,15 @@ public interface Connection {
*/
Object getID();
/**
* writes the buffer to the connection and if flush is true request to flush the buffer
* (and any previous un-flushed ones) into the wire.
*
* @param buffer the buffer to write
* @param requestFlush whether to request flush onto the wire
*/
void write(ActiveMQBuffer buffer, boolean requestFlush);
/**
* writes the buffer to the connection and if flush is true returns only when the buffer has been physically written to the connection.
*

View File

@ -374,6 +374,11 @@ public class ChannelImplTest {
return null;
}
@Override
public void write(ActiveMQBuffer buffer, boolean requestFlush) {
}
@Override
public void write(ActiveMQBuffer buffer, boolean flush, boolean batched) {

View File

@ -88,11 +88,14 @@ public class JournalFilesRepository {
private final Runnable pushOpenRunnable = new Runnable() {
@Override
public void run() {
try {
pushOpenedFile();
} catch (Exception e) {
ActiveMQJournalLogger.LOGGER.errorPushingFile(e);
fileFactory.onIOError(e, "unable to open ", null);
// if there's already an opened file there is no need to push a new one
if (openedFiles.isEmpty()) {
try {
pushOpenedFile();
} catch (Exception e) {
ActiveMQJournalLogger.LOGGER.errorPushingFile(e);
fileFactory.onIOError(e, "unable to open ", null);
}
}
}
};

View File

@ -47,7 +47,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQShutdownException;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.io.DummyCallback;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
@ -841,7 +840,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
throw ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(addRecordEncodeSize, maxRecordSize);
}
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(callback);
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
appendExecutor.execute(new Runnable() {
@Override
public void run() {
@ -933,7 +932,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
Object record,
boolean sync,
IOCompletion callback) throws InterruptedException, java.util.concurrent.ExecutionException {
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(callback);
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
appendExecutor.execute(new Runnable() {
@Override
public void run() {
@ -1017,7 +1016,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
private void internalAppendDeleteRecord(long id,
boolean sync,
IOCompletion callback) throws InterruptedException, java.util.concurrent.ExecutionException {
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(callback);
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
appendExecutor.execute(new Runnable() {
@Override
public void run() {
@ -1064,12 +1063,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
result.get();
}
private static SimpleFuture newSyncAndCallbackResult(IOCompletion callback) {
if (callback != null && callback != DummyCallback.getInstance()) {
return SimpleFuture.dumb();
} else {
return new SimpleFutureImpl<>();
}
private static SimpleFuture newSyncAndCallbackResult(boolean sync, IOCompletion callback) {
return (sync && callback == null) ? new SimpleFutureImpl<>() : SimpleFuture.dumb();
}
@Override
@ -1295,7 +1290,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
logger.trace("scheduling appendPrepareRecord::txID=" + txID);
}
final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(callback);
final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(sync, callback);
appendExecutor.execute(new Runnable() {
@Override
@ -1381,7 +1376,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(callback);
final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(sync, callback);
appendExecutor.execute(new Runnable() {
@Override
@ -1435,7 +1430,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(callback);
final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(sync, callback);
appendExecutor.execute(new Runnable() {
@Override
public void run() {

View File

@ -175,6 +175,11 @@ public class InVMConnection implements Connection {
public void checkFlushBatchBuffer() {
}
@Override
public void write(ActiveMQBuffer buffer, boolean requestFlush) {
write(buffer, false, false, null);
}
@Override
public void write(final ActiveMQBuffer buffer) {
write(buffer, false, false, null);

View File

@ -21,6 +21,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
@ -51,6 +52,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournal
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageInSync;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
@ -73,6 +75,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Replicatio
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage.SyncDataType;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
@ -98,6 +101,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
private final SharedNothingBackupActivation activation;
private final boolean noSync = false;
private Channel channel;
private boolean supportResponseBatching;
private Journal[] journals;
private final JournalLoadInformation[] journalLoadInformation = new JournalLoadInformation[2];
@ -130,6 +134,8 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
private List<Interceptor> outgoingInterceptors = null;
private final ArrayList<Packet> pendingPackets;
// Constructors --------------------------------------------------
public ReplicationEndpoint(final ActiveMQServerImpl server,
@ -140,6 +146,8 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
this.criticalErrorListener = criticalErrorListener;
this.wantedFailBack = wantedFailBack;
this.activation = activation;
this.pendingPackets = new ArrayList<>();
this.supportResponseBatching = false;
}
// Public --------------------------------------------------------
@ -242,15 +250,31 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
if (logger.isTraceEnabled()) {
logger.trace("Returning " + response);
}
sendResponse(response);
if (supportResponseBatching) {
pendingPackets.add(response);
} else {
channel.send(response);
}
} else {
logger.trace("Response is null, ignoring response");
}
}
protected void sendResponse(PacketImpl response) {
channel.send(response);
@Override
public void endOfBatch() {
final ArrayList<Packet> pendingPackets = this.pendingPackets;
if (pendingPackets.isEmpty()) {
return;
}
try {
for (int i = 0, size = pendingPackets.size(); i < size; i++) {
final Packet packet = pendingPackets.get(i);
final boolean isLast = i == (size - 1);
channel.send(packet, isLast);
}
} finally {
pendingPackets.clear();
}
}
/**
@ -365,6 +389,21 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
public void setChannel(final Channel channel) {
this.channel = channel;
if (channel == null) {
supportResponseBatching = false;
} else {
try {
final CoreRemotingConnection connection = channel.getConnection();
if (connection != null) {
this.supportResponseBatching = connection.getTransportConnection() instanceof NettyConnection;
} else {
this.supportResponseBatching = false;
}
} catch (Throwable t) {
logger.warn("Error while checking the channel connection", t);
this.supportResponseBatching = false;
}
}
if (this.channel != null && outgoingInterceptors != null) {
if (channel.getConnection() instanceof RemotingConnectionImpl) {
@ -551,7 +590,10 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
registerJournal(journalContent.typeByte, syncJournal);
// We send a response now, to avoid a situation where we handle votes during the deactivation of the live during a failback.
sendResponse(replicationResponseMessage);
if (supportResponseBatching) {
endOfBatch();
}
channel.send(replicationResponseMessage);
replicationResponseMessage = null;
// This needs to be done after the response is sent, to avoid voting shutting it down for any reason.

View File

@ -194,6 +194,11 @@ public class BackupSyncDelay implements Interceptor {
this.channel = channel;
}
@Override
public boolean send(Packet packet, boolean flushConnection) {
return channel.send(packet, flushConnection);
}
@Override
public String toString() {
return "ChannelWrapper(" + channel + ")";
@ -237,6 +242,11 @@ public class BackupSyncDelay implements Interceptor {
throw new UnsupportedOperationException();
}
@Override
public void endOfBatch() {
throw new UnsupportedOperationException();
}
@Override
public void close() {
throw new UnsupportedOperationException();