ARTEMIS-4095: fix delivering message size accounting

Signed-off-by: Artyom Tarasenko <atar4qemu@gmail.com>
This commit is contained in:
Artyom Tarasenko 2023-06-23 17:55:26 +02:00 committed by Justin Bertram
parent 5b86f8d59e
commit fb4c68681e
2 changed files with 28 additions and 3 deletions

View File

@ -288,9 +288,7 @@ public class AMQConsumer {
return 0;
}
if (session.getConnection().isNoLocal() || session.isInternal()) {
//internal session always delivers messages to noLocal advisory consumers
//so we need to remove this property too.
if (session.getConnection().isNoLocal() || (session.isInternal() && AdvisorySupport.isAdvisoryTopic(openwireDestination))) {
message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME);
}
//handleDeliver is performed by an executor (see JBPAPP-6030): any AMQConsumer can share the session.wireFormat()

View File

@ -16,29 +16,55 @@
*/
package org.apache.activemq.artemis.core.protocol.openwire.amq;
import static org.junit.Assert.assertEquals;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.wireformat.WireFormat;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
public class AMQConsumerTest {
final OpenWireFormatFactory formatFactory = new OpenWireFormatFactory();
final WireFormat openWireFormat = formatFactory.createWireFormat();
@Test
public void testClientId() throws Exception {
final String CID_ID = "client-12345-6789012345678-0:-1";
ActiveMQMessage classicMessage = new ActiveMQMessage();
classicMessage.setProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME_STRING, CID_ID);
Message artemisMessage = OpenWireMessageConverter.inbound(classicMessage.getMessage(), openWireFormat, null);
assertEquals(CID_ID, artemisMessage.getStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME_STRING));
MessageReference messageReference = new MessageReferenceImpl(artemisMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = getConsumer(0);
amqConsumer.handleDeliver(messageReference, (ICoreMessage) artemisMessage);
assertEquals(CID_ID, artemisMessage.getStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME_STRING));
}
@Test
public void testCreditsWithPrefetch() throws Exception {
@ -69,6 +95,7 @@ public class AMQConsumerTest {
ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean(),
ArgumentMatchers.nullable(Integer.class))).thenReturn(Mockito.mock(ServerConsumerImpl.class));
AMQSession session = Mockito.mock(AMQSession.class);
Mockito.when(session.isInternal()).thenReturn(true);
Mockito.when(session.getConnection()).thenReturn(Mockito.mock(OpenWireConnection.class));
Mockito.when(session.getCoreServer()).thenReturn(coreServer);
Mockito.when(session.getCoreSession()).thenReturn(coreSession);