From fb4c68681edde5225980b1acf0935af124d0b766 Mon Sep 17 00:00:00 2001 From: Artyom Tarasenko Date: Fri, 23 Jun 2023 17:55:26 +0200 Subject: [PATCH] ARTEMIS-4095: fix delivering message size accounting Signed-off-by: Artyom Tarasenko --- .../protocol/openwire/amq/AMQConsumer.java | 4 +-- .../openwire/amq/AMQConsumerTest.java | 27 +++++++++++++++++++ 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index a98369f48e..3165b14faf 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -288,9 +288,7 @@ public class AMQConsumer { return 0; } - if (session.getConnection().isNoLocal() || session.isInternal()) { - //internal session always delivers messages to noLocal advisory consumers - //so we need to remove this property too. + if (session.getConnection().isNoLocal() || (session.isInternal() && AdvisorySupport.isAdvisoryTopic(openwireDestination))) { message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME); } //handleDeliver is performed by an executor (see JBPAPP-6030): any AMQConsumer can share the session.wireFormat() diff --git a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerTest.java b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerTest.java index 18dcfc055f..7992c4f040 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerTest.java +++ b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerTest.java @@ -16,29 +16,55 @@ */ package org.apache.activemq.artemis.core.protocol.openwire.amq; +import static org.junit.Assert.assertEquals; + import java.util.concurrent.ScheduledExecutorService; import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection; +import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.NodeManager; +import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; +import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl; import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; +import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.utils.UUID; import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.openwire.OpenWireFormatFactory; +import org.apache.activemq.wireformat.WireFormat; import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; public class AMQConsumerTest { + final OpenWireFormatFactory formatFactory = new OpenWireFormatFactory(); + final WireFormat openWireFormat = formatFactory.createWireFormat(); + + @Test + public void testClientId() throws Exception { + final String CID_ID = "client-12345-6789012345678-0:-1"; + + ActiveMQMessage classicMessage = new ActiveMQMessage(); + classicMessage.setProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME_STRING, CID_ID); + Message artemisMessage = OpenWireMessageConverter.inbound(classicMessage.getMessage(), openWireFormat, null); + assertEquals(CID_ID, artemisMessage.getStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME_STRING)); + MessageReference messageReference = new MessageReferenceImpl(artemisMessage, Mockito.mock(Queue.class)); + AMQConsumer amqConsumer = getConsumer(0); + amqConsumer.handleDeliver(messageReference, (ICoreMessage) artemisMessage); + assertEquals(CID_ID, artemisMessage.getStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME_STRING)); + } @Test public void testCreditsWithPrefetch() throws Exception { @@ -69,6 +95,7 @@ public class AMQConsumerTest { ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.nullable(Integer.class))).thenReturn(Mockito.mock(ServerConsumerImpl.class)); AMQSession session = Mockito.mock(AMQSession.class); + Mockito.when(session.isInternal()).thenReturn(true); Mockito.when(session.getConnection()).thenReturn(Mockito.mock(OpenWireConnection.class)); Mockito.when(session.getCoreServer()).thenReturn(coreServer); Mockito.when(session.getCoreSession()).thenReturn(coreSession);