This commit is contained in:
Clebert Suconic 2021-03-03 13:51:05 -05:00
commit 4d934e364e
6 changed files with 109 additions and 16 deletions

View File

@ -470,5 +470,4 @@ public class PacketImpl implements Packet {
public void setCorrelationID(long correlationID) {
}
}

View File

@ -28,4 +28,14 @@ public class NullResponseMessage extends PacketImpl {
public boolean isResponse() {
return true;
}
@Override
public int expectedEncodeSize() {
return PACKET_HEADERS_SIZE;
}
public void reset() {
size = 0;
channelID = 0;
}
}

View File

@ -39,6 +39,11 @@ public class NullResponseMessage_V2 extends NullResponseMessage {
return correlationID;
}
@Override
public void setCorrelationID(long correlationID) {
this.correlationID = correlationID;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
super.encodeRest(buffer);
@ -53,6 +58,11 @@ public class NullResponseMessage_V2 extends NullResponseMessage {
}
}
@Override
public int expectedEncodeSize() {
return super.expectedEncodeSize() + DataConstants.SIZE_LONG;
}
@Override
public final boolean isResponse() {
return true;
@ -93,4 +103,10 @@ public class NullResponseMessage_V2 extends NullResponseMessage {
}
return true;
}
@Override
public void reset() {
super.reset();
correlationID = 0;
}
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.utils.DataConstants;
/**
* Ping is sent on the client side by {@link org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl}. At the server's
@ -56,6 +57,11 @@ public final class Ping extends PacketImpl {
return false;
}
@Override
public int expectedEncodeSize() {
return PACKET_HEADERS_SIZE + DataConstants.SIZE_LONG;
}
@Override
public String toString() {
StringBuffer buf = new StringBuffer(getParentString());

View File

@ -20,7 +20,9 @@ import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
@ -35,7 +37,6 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.exception.ActiveMQXAException;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManager;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2;
@ -144,6 +145,8 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES
public class ServerSessionPacketHandler implements ChannelHandler {
private static final int MAX_CACHED_NULL_RESPONSES = 32;
private static final Logger logger = Logger.getLogger(ServerSessionPacketHandler.class);
private final ServerSession session;
@ -158,8 +161,6 @@ public class ServerSessionPacketHandler implements ChannelHandler {
private final ArtemisExecutor callExecutor;
private final CoreProtocolManager manager;
// The current currentLargeMessage being processed
private volatile LargeServerMessage currentLargeMessage;
@ -167,18 +168,18 @@ public class ServerSessionPacketHandler implements ChannelHandler {
private final Object largeMessageLock = new Object();
public ServerSessionPacketHandler(final ActiveMQServer server,
final CoreProtocolManager manager,
final ServerSession session,
final StorageManager storageManager,
final Channel channel) {
this.manager = manager;
private final Queue<NullResponseMessage> cachedNullRes;
private final Queue<NullResponseMessage_V2> cachedNullRes_V2;
public ServerSessionPacketHandler(final ActiveMQServer server,
final ServerSession session,
final Channel channel) {
this.session = session;
session.addCloseable((boolean failed) -> clearLargeMessage());
this.storageManager = storageManager;
this.storageManager = server.getStorageManager();
this.channel = channel;
@ -195,6 +196,16 @@ public class ServerSessionPacketHandler implements ChannelHandler {
this.packetActor = new Actor<>(callExecutor, this::onMessagePacket);
this.direct = conn.isDirectDeliver();
// no confirmation window size means no resend cache hence NullResponsePackets
// won't get cached on it because need confirmation
if (this.channel.getConfirmationWindowSize() == -1) {
cachedNullRes = PlatformDependent.newFixedMpscQueue(MAX_CACHED_NULL_RESPONSES);
cachedNullRes_V2 = PlatformDependent.newFixedMpscQueue(MAX_CACHED_NULL_RESPONSES);
} else {
cachedNullRes = null;
cachedNullRes_V2 = null;
}
}
private void clearLargeMessage() {
@ -653,17 +664,51 @@ public class ServerSessionPacketHandler implements ChannelHandler {
return RoutingType.MULTICAST;
}
private boolean requireNullResponseMessage_V1(Packet packet) {
return !packet.isResponseAsync() || channel.getConnection().isVersionBeforeAsyncResponseChange();
}
private Packet createNullResponseMessage(Packet packet) {
final Packet response;
if (!packet.isResponseAsync() || channel.getConnection().isVersionBeforeAsyncResponseChange()) {
private NullResponseMessage createNullResponseMessage_V1(Packet packet) {
assert requireNullResponseMessage_V1(packet);
NullResponseMessage response;
if (cachedNullRes != null) {
response = cachedNullRes.poll();
if (response == null) {
response = new NullResponseMessage();
} else {
response.reset();
}
} else {
response = new NullResponseMessage();
}
return response;
}
private NullResponseMessage_V2 createNullResponseMessage_V2(Packet packet) {
assert !requireNullResponseMessage_V1(packet);
NullResponseMessage_V2 response;
if (cachedNullRes_V2 != null) {
response = cachedNullRes_V2.poll();
if (response == null) {
response = new NullResponseMessage_V2(packet.getCorrelationID());
} else {
response.reset();
// this should be already set by the channel too, but let's do it just in case
response.setCorrelationID(packet.getCorrelationID());
}
} else {
response = new NullResponseMessage_V2(packet.getCorrelationID());
}
return response;
}
private Packet createNullResponseMessage(Packet packet) {
if (requireNullResponseMessage_V1(packet)) {
return createNullResponseMessage_V1(packet);
}
return createNullResponseMessage_V2(packet);
}
private Packet createSessionXAResponseMessage(Packet packet) {
Packet response;
if (packet.isResponseAsync()) {
@ -674,6 +719,19 @@ public class ServerSessionPacketHandler implements ChannelHandler {
return response;
}
private void releaseResponse(Packet packet) {
if (cachedNullRes == null || cachedNullRes_V2 == null) {
return;
}
if (packet instanceof NullResponseMessage) {
cachedNullRes.offer((NullResponseMessage) packet);
return;
}
if (packet instanceof NullResponseMessage_V2) {
cachedNullRes_V2.offer((NullResponseMessage_V2) packet);
}
}
private void onSessionAcknowledge(Packet packet) {
this.storageManager.setContext(session.getSessionContext());
try {
@ -921,7 +979,11 @@ public class ServerSessionPacketHandler implements ChannelHandler {
}
if (response != null) {
channel.send(response);
try {
channel.send(response);
} finally {
releaseResponse(response);
}
}
if (closeChannel) {

View File

@ -173,7 +173,7 @@ public class ActiveMQPacketHandler implements ChannelHandler {
ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), sessionCallback, true, sessionOperationContext, routingTypeMap, protocolManager.getSecurityDomain());
ServerProducer serverProducer = new ServerProducerImpl(session.getName(), "CORE", request.getDefaultAddress());
session.addProducer(serverProducer);
ServerSessionPacketHandler handler = new ServerSessionPacketHandler(server, protocolManager, session, server.getStorageManager(), channel);
ServerSessionPacketHandler handler = new ServerSessionPacketHandler(server, session, channel);
channel.setHandler(handler);
sessionCallback.setSessionHandler(handler);