diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java index 0cbafae7e2..e45809abd0 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java @@ -19,11 +19,13 @@ package org.apache.activemq; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; + import javax.jms.Destination; import javax.jms.IllegalStateException; import javax.jms.InvalidDestinationException; import javax.jms.JMSException; import javax.jms.Message; + import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ProducerAck; import org.apache.activemq.command.ProducerId; @@ -118,12 +120,18 @@ public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport impl this.startTime = System.currentTimeMillis(); this.messageSequence = new AtomicLong(0); this.stats = new JMSProducerStatsImpl(session.getSessionStats(), destination); - this.session.addProducer(this); - this.session.asyncSendPacket(info); + try { + this.session.addProducer(this); + this.session.syncSendPacket(info); + } catch (JMSException e) { + this.session.removeProducer(this); + throw e; + } this.setSendTimeout(sendTimeout); setTransformer(session.getTransformer()); } + @Override public StatsImpl getStats() { return stats; } @@ -140,6 +148,7 @@ public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport impl * some internal error. * @since 1.1 */ + @Override public Destination getDestination() throws JMSException { checkClosed(); return this.info.getDestination(); @@ -157,6 +166,7 @@ public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport impl * @throws JMSException if the JMS provider fails to close the producer due * to some internal error. */ + @Override public void close() throws JMSException { if (!closed) { dispose(); @@ -164,6 +174,7 @@ public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport impl } } + @Override public void dispose() { if (!closed) { this.session.removeProducer(this); @@ -208,6 +219,7 @@ public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport impl * @see javax.jms.Session#createProducer * @since 1.1 */ + @Override public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { this.send(destination, message, deliveryMode, priority, timeToLive, null); } diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4213Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4213Test.java new file mode 100644 index 0000000000..59e8e4c802 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4213Test.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.fail; + +import javax.jms.JMSException; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerPluginSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ProducerInfo; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class AMQ4213Test { + + private static BrokerService brokerService; + private static String BROKER_ADDRESS = "tcp://localhost:0"; + private static String TEST_QUEUE = "testQueue"; + private static ActiveMQQueue queue = new ActiveMQQueue(TEST_QUEUE); + + private String connectionUri; + + @Before + public void setUp() throws Exception { + brokerService = new BrokerService(); + brokerService.setPersistent(false); + brokerService.setUseJmx(true); + brokerService.setDeleteAllMessagesOnStartup(true); + connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString(); + + brokerService.setPlugins(new BrokerPlugin[]{ + new BrokerPluginSupport() { + + @Override + public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { + throw new javax.jms.JMSSecurityException(connectionUri); + } + } + }); + + brokerService.start(); + brokerService.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + brokerService.stop(); + brokerService.waitUntilStopped(); + } + + @Test + public void testExceptionOnProducerCreateThrows() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); + ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + connection.start(); + + try { + session.createProducer(queue); + fail("Should not be able to create this producer."); + } catch (JMSException ex) { + } + } +}