From 087d6aed5c0282330c5b64764fdbe5b29d9d1f6d Mon Sep 17 00:00:00 2001 From: Nicolas Filotto Date: Tue, 3 Oct 2023 13:54:40 +0200 Subject: [PATCH] [AMQ-9255] Initialize the transient field of the class Message (#1051) (cherry picked from commit 538b04aa0c18f61cd47b261c7372a3e559c2ca0e) --- .../activemq/command/ActiveMQMapMessage.java | 4 +- .../org/apache/activemq/command/Message.java | 13 ++ .../org/apache/activemq/bugs/AMQ9255Test.java | 150 ++++++++++++++++++ 3 files changed, 166 insertions(+), 1 deletion(-) create mode 100644 activemq-http/src/test/java/org/apache/activemq/bugs/AMQ9255Test.java diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java index 5619e87e8c..a00e868030 100644 --- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java @@ -104,7 +104,9 @@ public class ActiveMQMapMessage extends ActiveMQMessage implements MapMessage { protected transient Map map = new HashMap(); - private Object readResolve() throws ObjectStreamException { + @Override + protected Object readResolve() throws ObjectStreamException { + super.readResolve(); if (this.map == null) { this.map = new HashMap(); } diff --git a/activemq-client/src/main/java/org/apache/activemq/command/Message.java b/activemq-client/src/main/java/org/apache/activemq/command/Message.java index e74e1f39a4..8516962548 100644 --- a/activemq-client/src/main/java/org/apache/activemq/command/Message.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/Message.java @@ -20,6 +20,7 @@ import java.beans.Transient; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.io.ObjectStreamException; import java.io.OutputStream; import java.util.Collections; import java.util.HashMap; @@ -858,4 +859,16 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess public boolean canProcessAsExpired() { return processAsExpired.compareAndSet(false, true); } + + /** + * Initialize the transient fields at deserialization to get a normal state. + * + * @see Serializable Javadoc + */ + protected Object readResolve() throws ObjectStreamException { + if (this.processAsExpired == null) { + this.processAsExpired = new AtomicBoolean(); + } + return this; + } } diff --git a/activemq-http/src/test/java/org/apache/activemq/bugs/AMQ9255Test.java b/activemq-http/src/test/java/org/apache/activemq/bugs/AMQ9255Test.java new file mode 100644 index 0000000000..ad0f9f356f --- /dev/null +++ b/activemq-http/src/test/java/org/apache/activemq/bugs/AMQ9255Test.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import jakarta.jms.Connection; +import jakarta.jms.DeliveryMode; +import jakarta.jms.MessageConsumer; +import jakarta.jms.MessageProducer; +import jakarta.jms.Session; +import jakarta.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.transport.http.WaitForJettyListener; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class AMQ9255Test { + + private static final Logger LOG = LoggerFactory.getLogger(AMQ9255Test.class); + + @Rule + public TestName name = new TestName(); + private BrokerService broker; + private ActiveMQConnectionFactory connectionFactory; + private Connection sendConnection, receiveConnection; + private Session sendSession, receiveSession; + private MessageConsumer consumer; + private MessageProducer producer; + + @Before + public void setUp() throws Exception { + if (broker == null) { + broker = createBroker(); + broker.start(); + } + WaitForJettyListener.waitForJettySocketToAccept(getBrokerURL()); + connectionFactory = createConnectionFactory(); + LOG.info("Creating send connection"); + sendConnection = createSendConnection(); + LOG.info("Starting send connection"); + sendConnection.start(); + + LOG.info("Creating receive connection"); + receiveConnection = createReceiveConnection(); + LOG.info("Starting receive connection"); + receiveConnection.start(); + + sendSession = sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + receiveSession = receiveConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + LOG.info("Created sendSession: " + sendSession); + LOG.info("Created receiveSession: " + receiveSession); + + producer = sendSession.createProducer(sendSession.createQueue(getProducerSubject())); + consumer = receiveSession.createConsumer(receiveSession.createQueue(getConsumerSubject())); + + LOG.info("Created consumer of type: " + consumer.getClass()); + LOG.info("Created producer of type: " + producer.getClass()); + } + + @After + public void tearDown() throws Exception { + if (receiveSession != null) { + receiveSession.close(); + } + if (sendSession != null) { + sendSession.close(); + } + if (receiveConnection != null) { + receiveConnection.close(); + } + if (sendConnection != null) { + sendConnection.close(); + } + if (broker != null) { + broker.stop(); + } + } + + private String getConsumerSubject() { + return "ActiveMQ.DLQ"; + } + + private String getProducerSubject() { + return name.getMethodName(); + } + + private Connection createReceiveConnection() throws Exception { + return connectionFactory.createConnection(); + } + + private Connection createSendConnection() throws Exception { + return connectionFactory.createConnection(); + } + + private ActiveMQConnectionFactory createConnectionFactory() { + return new ActiveMQConnectionFactory(getBrokerURL()); + } + + protected String getBrokerURL() { + return "http://localhost:8161"; + } + + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + answer.setPersistent(false); + answer.addConnector(getBrokerURL()); + answer.setUseJmx(false); + return answer; + } + + @Test + public void testExpiredMessages() throws Exception { + // Given + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + producer.setTimeToLive(100L); + String text = name.toString(); + + // When + producer.send(sendSession.createTextMessage(text)); + + // Then + TextMessage message = (TextMessage) consumer.receive(30_000); + assertNotNull(message); + assertEquals(text, message.getText()); + } +}