ARTEMIS-4530 clean up SessionCallback interface

This commit is contained in:
Justin Bertram 2023-12-06 16:02:53 -06:00
parent 29a2b2a879
commit 159416bc09
9 changed files with 44 additions and 71 deletions

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.artemis.protocol.amqp.broker; package org.apache.activemq.artemis.protocol.amqp.broker;
import java.lang.invoke.MethodHandles;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@ -30,9 +31,9 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.core.security.CheckType;
@ -76,7 +77,6 @@ import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Receiver;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
public class AMQPSessionCallback implements SessionCallback { public class AMQPSessionCallback implements SessionCallback {
@ -649,7 +649,7 @@ public class AMQPSessionCallback implements SessionCallback {
} }
@Override @Override
public int sendMessage(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) { public int sendMessage(MessageReference ref, ServerConsumer consumer, int deliveryCount) {
ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext(); ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext();
@ -667,7 +667,6 @@ public class AMQPSessionCallback implements SessionCallback {
@Override @Override
public int sendLargeMessage(MessageReference ref, public int sendLargeMessage(MessageReference ref,
Message message,
ServerConsumer consumer, ServerConsumer consumer,
long bodySize, long bodySize,
int deliveryCount) { int deliveryCount) {

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.activemq.artemis.core.protocol.mqtt; package org.apache.activemq.artemis.core.protocol.mqtt;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerConsumer;
@ -44,14 +43,13 @@ public class MQTTSessionCallback implements SessionCallback {
} }
@Override @Override
public int sendMessage(MessageReference reference, public int sendMessage(MessageReference ref,
Message message,
ServerConsumer consumer, ServerConsumer consumer,
int deliveryCount) { int deliveryCount) {
try { try {
session.getMqttPublishManager().sendMessage(message.toCore(), consumer, deliveryCount); session.getMqttPublishManager().sendMessage(ref.getMessage().toCore(), consumer, deliveryCount);
} catch (Exception e) { } catch (Exception e) {
MQTTLogger.LOGGER.unableToSendMessage(reference, e); MQTTLogger.LOGGER.unableToSendMessage(ref, e);
} }
return 1; return 1;
} }
@ -70,12 +68,11 @@ public class MQTTSessionCallback implements SessionCallback {
} }
@Override @Override
public int sendLargeMessage(MessageReference reference, public int sendLargeMessage(MessageReference ref,
Message message,
ServerConsumer consumer, ServerConsumer consumer,
long bodySize, long bodySize,
int deliveryCount) { int deliveryCount) {
return sendMessage(reference, message, consumer, deliveryCount); return sendMessage(ref, consumer, deliveryCount);
} }
@Override @Override

View File

@ -291,19 +291,17 @@ public class AMQSession implements SessionCallback {
} }
@Override @Override
public int sendMessage(MessageReference reference, public int sendMessage(MessageReference ref,
org.apache.activemq.artemis.api.core.Message message,
ServerConsumer consumer, ServerConsumer consumer,
int deliveryCount) { int deliveryCount) {
AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData(); AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData();
//clear up possible rolledback ids. //clear up possible rolledback ids.
theConsumer.removeRolledback(reference); theConsumer.removeRolledback(ref);
return theConsumer.handleDeliver(reference, message.toCore()); return theConsumer.handleDeliver(ref, ref.getMessage().toCore());
} }
@Override @Override
public int sendLargeMessage(MessageReference reference, public int sendLargeMessage(MessageReference ref,
org.apache.activemq.artemis.api.core.Message message,
ServerConsumer consumerID, ServerConsumer consumerID,
long bodySize, long bodySize,
int deliveryCount) { int deliveryCount) {

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.artemis.core.protocol.stomp; package org.apache.activemq.artemis.core.protocol.stomp;
import java.lang.invoke.MethodHandles;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -53,7 +54,6 @@ import org.apache.activemq.artemis.utils.PendingTask;
import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE; import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
import static org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory.STOMP_PROTOCOL_NAME; import static org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory.STOMP_PROTOCOL_NAME;
@ -153,7 +153,7 @@ public class StompSession implements SessionCallback {
} }
@Override @Override
public int sendMessage(MessageReference ref, Message serverMessage, final ServerConsumer consumer, int deliveryCount) { public int sendMessage(MessageReference ref, final ServerConsumer consumer, int deliveryCount) {
ICoreMessage message = ref.getMessage().toCore(); ICoreMessage message = ref.getMessage().toCore();
try { try {
StompSubscription subscription = subscriptions.get(consumer.getID()); StompSubscription subscription = subscriptions.get(consumer.getID());
@ -206,7 +206,6 @@ public class StompSession implements SessionCallback {
@Override @Override
public int sendLargeMessage(MessageReference ref, public int sendLargeMessage(MessageReference ref,
Message msg,
ServerConsumer consumer, ServerConsumer consumer,
long bodySize, long bodySize,
int deliveryCount) { int deliveryCount) {

View File

@ -23,7 +23,6 @@ import java.util.concurrent.Future;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.remoting.FailureListener;
@ -229,13 +228,12 @@ public class ManagementRemotingConnection implements RemotingConnection {
} }
@Override @Override
public int sendMessage(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) { public int sendMessage(MessageReference ref, ServerConsumer consumerID, int deliveryCount) {
return 0; return 0;
} }
@Override @Override
public int sendLargeMessage(MessageReference reference, public int sendLargeMessage(MessageReference ref,
Message message,
ServerConsumer consumerID, ServerConsumer consumerID,
long bodySize, long bodySize,
int deliveryCount) { int deliveryCount) {

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.activemq.artemis.core.protocol.core.impl; package org.apache.activemq.artemis.core.protocol.core.impl;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools; import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.protocol.core.Channel; import org.apache.activemq.artemis.core.protocol.core.Channel;
@ -88,11 +87,10 @@ public final class CoreSessionCallback implements SessionCallback {
@Override @Override
public int sendLargeMessage(MessageReference ref, public int sendLargeMessage(MessageReference ref,
Message message,
ServerConsumer consumer, ServerConsumer consumer,
long bodySize, long bodySize,
int deliveryCount) { int deliveryCount) {
Packet packet = new SessionReceiveLargeMessage(consumer.getID(), message, bodySize, deliveryCount); Packet packet = new SessionReceiveLargeMessage(consumer.getID(), ref.getMessage(), bodySize, deliveryCount);
channel.send(packet); channel.send(packet);
@ -114,13 +112,13 @@ public final class CoreSessionCallback implements SessionCallback {
} }
@Override @Override
public int sendMessage(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) { public int sendMessage(MessageReference ref, ServerConsumer consumer, int deliveryCount) {
Packet packet; Packet packet;
if (channel.getConnection().isVersionBeforeAddressChange()) { if (channel.getConnection().isVersionBeforeAddressChange()) {
packet = new SessionReceiveMessage_1X(consumer.getID(), message.toCore(coreMessageObjectPools), deliveryCount); packet = new SessionReceiveMessage_1X(consumer.getID(), ref.getMessage().toCore(coreMessageObjectPools), deliveryCount);
} else { } else {
packet = new SessionReceiveMessage(consumer.getID(), message.toCore(coreMessageObjectPools), deliveryCount); packet = new SessionReceiveMessage(consumer.getID(), ref.getMessage().toCore(coreMessageObjectPools), deliveryCount);
} }
int size = 0; int size = 0;

View File

@ -487,7 +487,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
// The deliverer will increase the usageUp, so the preAck has to be done after this is created // The deliverer will increase the usageUp, so the preAck has to be done after this is created
// otherwise we may have a removed message early on // otherwise we may have a removed message early on
if (message instanceof CoreLargeServerMessage && this.supportLargeMessage) { if (message instanceof CoreLargeServerMessage && this.supportLargeMessage) {
largeMessageDeliverer = new CoreLargeMessageDeliverer((LargeServerMessage) message, ref); largeMessageDeliverer = new CoreLargeMessageDeliverer(ref);
} }
if (preAcknowledge) { if (preAcknowledge) {
@ -507,8 +507,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
@Override @Override
public void proceedDeliver(MessageReference reference) throws Exception { public void proceedDeliver(MessageReference reference) throws Exception {
try { try {
Message message = reference.getMessage();
if (AuditLogger.isMessageLoggingEnabled()) { if (AuditLogger.isMessageLoggingEnabled()) {
AuditLogger.coreConsumeMessage(session.getRemotingConnection().getSubject(), session.getRemotingConnection().getRemoteAddress(), getQueueName().toString(), reference.toString()); AuditLogger.coreConsumeMessage(session.getRemotingConnection().getSubject(), session.getRemotingConnection().getRemoteAddress(), getQueueName().toString(), reference.toString());
} }
@ -516,18 +514,18 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
server.callBrokerMessagePlugins(plugin -> plugin.beforeDeliver(this, reference)); server.callBrokerMessagePlugins(plugin -> plugin.beforeDeliver(this, reference));
} }
if (message instanceof CoreLargeServerMessage && supportLargeMessage) { if (reference.getMessage() instanceof CoreLargeServerMessage && supportLargeMessage) {
if (largeMessageDeliverer == null) { if (largeMessageDeliverer == null) {
// This can't really happen as handle had already crated the deliverer // This can't really happen as handle had already created the deliverer
// instead of throwing an exception in weird cases there is no problem on just go ahead and create it // instead of throwing an exception in weird cases there is no problem on just go ahead and create it
// again here // again here
largeMessageDeliverer = new CoreLargeMessageDeliverer((LargeServerMessage) message, reference); largeMessageDeliverer = new CoreLargeMessageDeliverer(reference);
} }
// The deliverer was prepared during handle, as we can't have more than one pending large message // The deliverer was prepared during handle, as we can't have more than one pending large message
// as it would return busy if there is anything pending // as it would return busy if there is anything pending
largeMessageDeliverer.deliver(); largeMessageDeliverer.deliver();
} else { } else {
deliverStandardMessage(reference, message); deliverStandardMessage(reference);
} }
} finally { } finally {
pendingDelivery.countDown(); pendingDelivery.countDown();
@ -681,15 +679,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
@Override @Override
public void forceDelivery(final long sequence) { public void forceDelivery(final long sequence) {
forceDelivery(sequence, () -> { forceDelivery(sequence, () -> {
Message forcedDeliveryMessage = new CoreMessage(storageManager.generateID(), 50); Message forcedDeliveryMessage = new CoreMessage(storageManager.generateID(), 50)
.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence)
.setAddress(messageQueue.getName());
MessageReference reference = MessageReference.Factory.createReference(forcedDeliveryMessage, messageQueue); MessageReference reference = MessageReference.Factory.createReference(forcedDeliveryMessage, messageQueue);
reference.setDeliveryCount(0); reference.setDeliveryCount(0);
forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
forcedDeliveryMessage.setAddress(messageQueue.getName());
applyPrefixForLegacyConsumer(forcedDeliveryMessage); applyPrefixForLegacyConsumer(forcedDeliveryMessage);
callback.sendMessage(reference, forcedDeliveryMessage, ServerConsumerImpl.this, 0); callback.sendMessage(reference, ServerConsumerImpl.this, 0);
}); });
} }
@ -1220,13 +1218,9 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
messageQueue.getExecutor().execute(resumeLargeMessageRunnable); messageQueue.getExecutor().execute(resumeLargeMessageRunnable);
} }
/** private void deliverStandardMessage(final MessageReference ref) {
* @param ref applyPrefixForLegacyConsumer(ref.getMessage());
* @param message int packetSize = callback.sendMessage(ref, ServerConsumerImpl.this, ref.getDeliveryCount());
*/
private void deliverStandardMessage(final MessageReference ref, Message message) throws ActiveMQException {
applyPrefixForLegacyConsumer(message);
int packetSize = callback.sendMessage(ref, message, ServerConsumerImpl.this, ref.getDeliveryCount());
if (availableCredits != null) { if (availableCredits != null) {
availableCredits.addAndGet(-packetSize); availableCredits.addAndGet(-packetSize);
@ -1301,13 +1295,13 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
private ByteBuffer chunkBytes; private ByteBuffer chunkBytes;
private CoreLargeMessageDeliverer(final LargeServerMessage message, final MessageReference ref) throws Exception { private CoreLargeMessageDeliverer(final MessageReference ref) {
largeMessage = message; this.ref = ref;
largeMessage = (LargeServerMessage) ref.getMessage();
largeMessage.toMessage().usageUp(); largeMessage.toMessage().usageUp();
this.ref = ref;
this.chunkBytes = null; this.chunkBytes = null;
} }
@ -1357,7 +1351,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
sentInitialPacket = true; sentInitialPacket = true;
int packetSize = callback.sendLargeMessage(ref, currentLargeMessage.toMessage(), ServerConsumerImpl.this, context.getSize(), ref.getDeliveryCount()); int packetSize = callback.sendLargeMessage(ref, ServerConsumerImpl.this, context.getSize(), ref.getDeliveryCount());
if (availableCredits != null) { if (availableCredits != null) {
final int credits = availableCredits.addAndGet(-packetSize); final int credits = availableCredits.addAndGet(-packetSize);

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.activemq.artemis.spi.core.protocol; package org.apache.activemq.artemis.spi.core.protocol;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerConsumer;
@ -69,16 +68,9 @@ public interface SessionCallback {
void sendProducerCreditsFailMessage(int credits, SimpleString address); void sendProducerCreditsFailMessage(int credits, SimpleString address);
// Note: don't be tempted to remove the parameter message int sendMessage(MessageReference ref, ServerConsumer consumerID, int deliveryCount);
// Even though ref will contain the message in certain cases
// such as paging the message could be a SoftReference or WeakReference
// and I wanted to avoid re-fetching paged data in case of GCs on this specific case.
//
// Future developments may change this, but beware why I have chosen to keep the parameter separated here
int sendMessage(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount);
int sendLargeMessage(MessageReference reference, int sendLargeMessage(MessageReference ref,
Message message,
ServerConsumer consumerID, ServerConsumer consumerID,
long bodySize, long bodySize,
int deliveryCount); int deliveryCount);

View File

@ -28,7 +28,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
@ -518,7 +517,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
* @see SessionCallback#sendJmsMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, int) * @see SessionCallback#sendJmsMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, int)
*/ */
@Override @Override
public int sendMessage(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) { public int sendMessage(MessageReference ref, ServerConsumer consumer, int deliveryCount) {
inCall.countDown(); inCall.countDown();
try { try {
callbackSemaphore.acquire(); callbackSemaphore.acquire();
@ -528,7 +527,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
} }
try { try {
return targetCallback.sendMessage(ref, message, consumer, deliveryCount); return targetCallback.sendMessage(ref, consumer, deliveryCount);
} finally { } finally {
callbackSemaphore.release(); callbackSemaphore.release();
inCall.countUp(); inCall.countUp();
@ -539,12 +538,11 @@ public class HangConsumerTest extends ActiveMQTestBase {
* @see SessionCallback#sendLargeMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, long, int) * @see SessionCallback#sendLargeMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, long, int)
*/ */
@Override @Override
public int sendLargeMessage(MessageReference reference, public int sendLargeMessage(MessageReference ref,
Message message,
ServerConsumer consumer, ServerConsumer consumer,
long bodySize, long bodySize,
int deliveryCount) { int deliveryCount) {
return targetCallback.sendLargeMessage(reference, message, consumer, bodySize, deliveryCount); return targetCallback.sendLargeMessage(ref, consumer, bodySize, deliveryCount);
} }
/* (non-Javadoc) /* (non-Javadoc)