ARTEMIS-3827 anon OpenWire producer may lose sent msg

This commit is contained in:
AntonRoskvist 2022-05-12 19:56:19 +02:00 committed by Justin Bertram
parent 5ca0d72cad
commit 548747c71d
No known key found for this signature in database
GPG Key ID: F41830B875BB8633
3 changed files with 62 additions and 1 deletions

View File

@ -368,7 +368,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
setLastCommand(command); setLastCommand(command);
response = command.visit(commandProcessorInstance); response = command.visit(commandProcessorInstance);
} catch (Exception e) { } catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn("Errors occurred during the buffering operation ", e); ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
if (responseRequired) { if (responseRequired) {
response = convertException(e); response = convertException(e);
} }
@ -1683,6 +1683,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
} catch (Exception e) { } catch (Exception e) {
if (tx != null) { if (tx != null) {
tx.markAsRollbackOnly(new ActiveMQException(e.getMessage())); tx.markAsRollbackOnly(new ActiveMQException(e.getMessage()));
} else if (e instanceof ActiveMQNonExistentQueueException && producerInfo.getDestination() == null) {
//Send exception for non transacted anonymous producers using an incorrect destination
sendException(e);
} }
throw e; throw e;
} finally { } finally {

View File

@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
@ -250,6 +251,10 @@ public class AMQSession implements SessionCallback {
coreSession.createQueue(new QueueConfiguration(queueNameToUse).setAddress(addressToUse).setRoutingType(routingTypeToUse).setTemporary(isTemporary).setAutoCreated(true).setFilterString(filter)); coreSession.createQueue(new QueueConfiguration(queueNameToUse).setAddress(addressToUse).setRoutingType(routingTypeToUse).setTemporary(isTemporary).setAutoCreated(true).setFilterString(filter));
connection.addKnownDestination(queueName); connection.addKnownDestination(queueName);
} else { } else {
if (server.getAddressInfo(queueName) == null) {
//Address does not exist and will not get autocreated
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(queueName);
}
hasQueue = false; hasQueue = false;
} }
} }

View File

@ -21,8 +21,11 @@ import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest; import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.command.ActiveMQDestination;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
@ -144,4 +147,54 @@ public class ProducerAutoCreateQueueTest extends BasicOpenWireTest {
Wait.assertTrue(() -> server.locateQueue(new SimpleString("trash")) != null); Wait.assertTrue(() -> server.locateQueue(new SimpleString("trash")) != null);
Wait.assertTrue(() -> server.getAddressInfo(new SimpleString("trash")) != null); Wait.assertTrue(() -> server.getAddressInfo(new SimpleString("trash")) != null);
} }
@Rule
public ExpectedException thrown = ExpectedException.none();
@Test()
public void testSendFailsWithoutAutoCreate() throws Exception {
thrown.expect(javax.jms.JMSException.class);
Connection connection = null;
try {
AddressSettings setting = new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false);
server.getAddressSettingsRepository().addMatch("WRONG.#", setting);
connection = factory.createConnection("admin", "password");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination destination = ActiveMQDestination.createDestination("WRONG.QUEUE", ActiveMQDestination.QUEUE_TYPE);
final MessageProducer producer = session.createProducer(null);
producer.send(destination, session.createTextMessage("foo"));
} finally {
if (connection != null) {
connection.close();
}
}
}
@Test()
public void testTransactedSendFailsWithoutAutoCreate() throws Exception {
thrown.expect(javax.jms.JMSException.class);
Connection connection = null;
try {
AddressSettings setting = new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false);
server.getAddressSettingsRepository().addMatch("WRONG.#", setting);
connection = factory.createConnection("admin", "password");
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination destination = ActiveMQDestination.createDestination("WRONG.QUEUE", ActiveMQDestination.QUEUE_TYPE);
final MessageProducer producer = session.createProducer(null);
producer.send(destination, session.createTextMessage("foo"));
session.commit();
} finally {
if (connection != null) {
connection.close();
}
}
}
} }