diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java index e911579306..bc9bb386bc 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java @@ -251,24 +251,9 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne connection.setUserName(userName); connection.setPassword(password); - connection.setPrefetchPolicy(getPrefetchPolicy()); - connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault()); - connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch()); - connection.setCopyMessageOnSend(isCopyMessageOnSend()); - connection.setUseCompression(isUseCompression()); - connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered()); - connection.setDispatchAsync(isDispatchAsync()); - connection.setUseAsyncSend(isUseAsyncSend()); - connection.setAlwaysSyncSend(isAlwaysSyncSend()); - connection.setAlwaysSessionAsync(isAlwaysSessionAsync()); - connection.setOptimizeAcknowledge(isOptimizeAcknowledge()); - connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer()); - connection.setRedeliveryPolicy(getRedeliveryPolicy()); - connection.setTransformer(getTransformer()); - connection.setBlobTransferPolicy(getBlobTransferPolicy().copy()); - connection.setWatchTopicAdvisories(isWatchTopicAdvisories()); - connection.setProducerWindowSize(getProducerWindowSize()); - connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout()); + + configureConnection(connection); + transport.start(); if( clientID !=null ) @@ -293,6 +278,28 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne return connection; } + + protected void configureConnection(ActiveMQConnection connection) { + connection.setPrefetchPolicy(getPrefetchPolicy()); + connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault()); + connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch()); + connection.setCopyMessageOnSend(isCopyMessageOnSend()); + connection.setUseCompression(isUseCompression()); + connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered()); + connection.setDispatchAsync(isDispatchAsync()); + connection.setUseAsyncSend(isUseAsyncSend()); + connection.setAlwaysSyncSend(isAlwaysSyncSend()); + connection.setAlwaysSessionAsync(isAlwaysSessionAsync()); + connection.setOptimizeAcknowledge(isOptimizeAcknowledge()); + connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer()); + connection.setRedeliveryPolicy(getRedeliveryPolicy()); + connection.setTransformer(getTransformer()); + connection.setBlobTransferPolicy(getBlobTransferPolicy().copy()); + connection.setWatchTopicAdvisories(isWatchTopicAdvisories()); + connection.setProducerWindowSize(getProducerWindowSize()); + connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout()); + } + // ///////////////////////////////////////////// // // Property Accessors diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java index d11da070b3..e79150ccd0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java @@ -17,18 +17,6 @@ */ package org.apache.activemq; -import java.util.HashMap; -import java.util.concurrent.atomic.AtomicLong; - -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.IllegalStateException; -import javax.jms.InvalidDestinationException; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageFormatException; -import javax.jms.MessageProducer; - import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ProducerAck; import org.apache.activemq.command.ProducerId; @@ -39,6 +27,14 @@ import org.apache.activemq.management.StatsImpl; import org.apache.activemq.memory.UsageManager; import org.apache.activemq.util.IntrospectionSupport; +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.InvalidDestinationException; +import javax.jms.JMSException; +import javax.jms.Message; +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicLong; + /** * A client uses a MessageProducer object to send messages to a * destination. A MessageProducer object is created by passing a @@ -72,26 +68,20 @@ import org.apache.activemq.util.IntrospectionSupport; * @see javax.jms.QueueSender * @see javax.jms.Session#createProducer */ -public class ActiveMQMessageProducer implements MessageProducer, StatsCapable, Closeable, Disposable { +public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport implements StatsCapable, Disposable { - protected ActiveMQSession session; protected ProducerInfo info; private JMSProducerStatsImpl stats; private AtomicLong messageSequence; protected boolean closed; - private boolean disableMessageID; - private boolean disableMessageTimestamp; - private int defaultDeliveryMode; - private int defaultPriority; - private long defaultTimeToLive; private long startTime; private MessageTransformer transformer; private UsageManager producerWindow; protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination) throws JMSException { - this.session = session; + super(session); this.info = new ProducerInfo(producerId); this.info.setWindowSize(session.connection.getProducerWindowSize()); if (destination!=null && destination.getOptions() != null) { @@ -105,9 +95,7 @@ public class ActiveMQMessageProducer implements MessageProducer, StatsCapable, C producerWindow = new UsageManager("Producer Window: "+producerId); producerWindow.setLimit(this.info.getWindowSize()); } - - this.disableMessageID = false; - this.disableMessageTimestamp = session.connection.isDisableTimeStampsByDefault(); + this.defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE; this.defaultPriority = Message.DEFAULT_PRIORITY; this.defaultTimeToLive = Message.DEFAULT_TIME_TO_LIVE; @@ -127,183 +115,6 @@ public class ActiveMQMessageProducer implements MessageProducer, StatsCapable, C return stats; } - /** - * Sets whether message IDs are disabled. - *

- * Since message IDs take some effort to create and increase a message's - * size, some JMS providers may be able to optimize message overhead if - * they are given a hint that the message ID is not used by an application. - * By calling the setDisableMessageID method on this message - * producer, a JMS client enables this potential optimization for all - * messages sent by this message producer. If the JMS provider accepts this - * hint, these messages must have the message ID set to null; if the - * provider ignores the hint, the message ID must be set to its normal - * unique value. - *

- * Message IDs are enabled by default. - * - * @param value indicates if message IDs are disabled - * @throws JMSException if the JMS provider fails to close the producer due to - * some internal error. - */ - public void setDisableMessageID(boolean value) throws JMSException { - checkClosed(); - this.disableMessageID = value; - } - - /** - * Gets an indication of whether message IDs are disabled. - * - * @return an indication of whether message IDs are disabled - * @throws JMSException if the JMS provider fails to determine if message IDs are - * disabled due to some internal error. - */ - public boolean getDisableMessageID() throws JMSException { - checkClosed(); - return this.disableMessageID; - } - - /** - * Sets whether message timestamps are disabled. - *

- * Since timestamps take some effort to create and increase a message's - * size, some JMS providers may be able to optimize message overhead if - * they are given a hint that the timestamp is not used by an application. - * By calling the setDisableMessageTimestamp method on this - * message producer, a JMS client enables this potential optimization for - * all messages sent by this message producer. If the JMS provider accepts - * this hint, these messages must have the timestamp set to zero; if the - * provider ignores the hint, the timestamp must be set to its normal - * value. - *

- * Message timestamps are enabled by default. - * - * @param value indicates if message timestamps are disabled - * @throws JMSException if the JMS provider fails to close the producer due to - * some internal error. - */ - public void setDisableMessageTimestamp(boolean value) throws JMSException { - checkClosed(); - this.disableMessageTimestamp = value; - } - - /** - * Gets an indication of whether message timestamps are disabled. - * - * @return an indication of whether message timestamps are disabled - * @throws JMSException if the JMS provider fails to close the producer due to - * some internal error. - */ - public boolean getDisableMessageTimestamp() throws JMSException { - checkClosed(); - return this.disableMessageTimestamp; - } - - /** - * Sets the producer's default delivery mode. - *

- * Delivery mode is set to PERSISTENT by default. - * - * @param newDeliveryMode the message delivery mode for this message producer; legal - * values are DeliveryMode.NON_PERSISTENT and - * DeliveryMode.PERSISTENT - * @throws JMSException if the JMS provider fails to set the delivery mode due to - * some internal error. - * @see javax.jms.MessageProducer#getDeliveryMode - * @see javax.jms.DeliveryMode#NON_PERSISTENT - * @see javax.jms.DeliveryMode#PERSISTENT - * @see javax.jms.Message#DEFAULT_DELIVERY_MODE - */ - public void setDeliveryMode(int newDeliveryMode) throws JMSException { - if (newDeliveryMode != DeliveryMode.PERSISTENT && newDeliveryMode != DeliveryMode.NON_PERSISTENT) { - throw new IllegalStateException("unkown delivery mode: " + newDeliveryMode); - } - checkClosed(); - this.defaultDeliveryMode = newDeliveryMode; - } - - /** - * Gets the producer's default delivery mode. - * - * @return the message delivery mode for this message producer - * @throws JMSException if the JMS provider fails to close the producer due to - * some internal error. - */ - public int getDeliveryMode() throws JMSException { - checkClosed(); - return this.defaultDeliveryMode; - } - - /** - * Sets the producer's default priority. - *

- * The JMS API defines ten levels of priority value, with 0 as the lowest - * priority and 9 as the highest. Clients should consider priorities 0-4 as - * gradations of normal priority and priorities 5-9 as gradations of - * expedited priority. Priority is set to 4 by default. - * - * @param newDefaultPriority the message priority for this message producer; must be a - * value between 0 and 9 - * @throws JMSException if the JMS provider fails to set the delivery mode due to - * some internal error. - * @see javax.jms.MessageProducer#getPriority - * @see javax.jms.Message#DEFAULT_PRIORITY - */ - public void setPriority(int newDefaultPriority) throws JMSException { - if (newDefaultPriority < 0 || newDefaultPriority > 9) { - throw new IllegalStateException("default priority must be a value between 0 and 9"); - } - checkClosed(); - this.defaultPriority = newDefaultPriority; - } - - /** - * Gets the producer's default priority. - * - * @return the message priority for this message producer - * @throws JMSException if the JMS provider fails to close the producer due to - * some internal error. - * @see javax.jms.MessageProducer#setPriority - */ - public int getPriority() throws JMSException { - checkClosed(); - return this.defaultPriority; - } - - /** - * Sets the default length of time in milliseconds from its dispatch time - * that a produced message should be retained by the message system. - *

- * Time to live is set to zero by default. - * - * @param timeToLive the message time to live in milliseconds; zero is unlimited - * @throws JMSException if the JMS provider fails to set the time to live due to - * some internal error. - * @see javax.jms.MessageProducer#getTimeToLive - * @see javax.jms.Message#DEFAULT_TIME_TO_LIVE - */ - public void setTimeToLive(long timeToLive) throws JMSException { - if (timeToLive < 0l) { - throw new IllegalStateException("cannot set a negative timeToLive"); - } - checkClosed(); - this.defaultTimeToLive = timeToLive; - } - - /** - * Gets the default length of time in milliseconds from its dispatch time - * that a produced message should be retained by the message system. - * - * @return the message time to live in milliseconds; zero is unlimited - * @throws JMSException if the JMS provider fails to get the time to live due to - * some internal error. - * @see javax.jms.MessageProducer#setTimeToLive - */ - public long getTimeToLive() throws JMSException { - checkClosed(); - return this.defaultTimeToLive; - } - /** * Gets the destination associated with this MessageProducer. * @@ -353,91 +164,6 @@ public class ActiveMQMessageProducer implements MessageProducer, StatsCapable, C } } - /** - * Sends a message using the MessageProducer's default - * delivery mode, priority, and time to live. - * - * @param message the message to send - * @throws JMSException if the JMS provider fails to send the message due to some - * internal error. - * @throws MessageFormatException if an invalid message is specified. - * @throws InvalidDestinationException if a client uses this method with a - * MessageProducer with an invalid destination. - * @throws java.lang.UnsupportedOperationException - * if a client uses this method with a - * MessageProducer that did not specify a - * destination at creation time. - * @see javax.jms.Session#createProducer - * @see javax.jms.MessageProducer - * @since 1.1 - */ - public void send(Message message) throws JMSException { - this.send(this.getDestination(), - message, - this.defaultDeliveryMode, - this.defaultPriority, - this.defaultTimeToLive); - } - - /** - * Sends a message to the destination, specifying delivery mode, priority, - * and time to live. - * - * @param message the message to send - * @param deliveryMode the delivery mode to use - * @param priority the priority for this message - * @param timeToLive the message's lifetime (in milliseconds) - * @throws JMSException if the JMS provider fails to send the message due to some - * internal error. - * @throws MessageFormatException if an invalid message is specified. - * @throws InvalidDestinationException if a client uses this method with a - * MessageProducer with an invalid destination. - * @throws java.lang.UnsupportedOperationException - * if a client uses this method with a - * MessageProducer that did not specify a - * destination at creation time. - * @see javax.jms.Session#createProducer - * @since 1.1 - */ - public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { - this.send(this.getDestination(), - message, - deliveryMode, - priority, - timeToLive); - } - - /** - * Sends a message to a destination for an unidentified message producer. - * Uses the MessageProducer's default delivery mode, - * priority, and time to live. - *

- * Typically, a message producer is assigned a destination at creation - * time; however, the JMS API also supports unidentified message producers, - * which require that the destination be supplied every time a message is - * sent. - * - * @param destination the destination to send this message to - * @param message the message to send - * @throws JMSException if the JMS provider fails to send the message due to some - * internal error. - * @throws MessageFormatException if an invalid message is specified. - * @throws InvalidDestinationException if a client uses this method with an invalid destination. - * @throws java.lang.UnsupportedOperationException - * if a client uses this method with a - * MessageProducer that specified a destination at - * creation time. - * @see javax.jms.Session#createProducer - * @see javax.jms.MessageProducer - */ - public void send(Destination destination, Message message) throws JMSException { - this.send(destination, - message, - this.defaultDeliveryMode, - this.defaultPriority, - this.defaultTimeToLive); - } - /** * Sends a message to a destination for an unidentified message producer, * specifying delivery mode, priority and time to live. diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java new file mode 100644 index 0000000000..242fd1c36d --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.*; +import javax.jms.IllegalStateException; +import javax.jms.Message; + +/** + * A useful base class for implementing a {@link MessageProducer} + * + * @version $Revision: $ + */ +public abstract class ActiveMQMessageProducerSupport implements MessageProducer, Closeable { + protected ActiveMQSession session; + protected boolean disableMessageID; + protected boolean disableMessageTimestamp; + protected int defaultDeliveryMode; + protected int defaultPriority; + protected long defaultTimeToLive; + + public ActiveMQMessageProducerSupport(ActiveMQSession session) { + this.session = session; + disableMessageTimestamp = session.connection.isDisableTimeStampsByDefault(); + } + + /** + * Sets whether message IDs are disabled. + *

+ * Since message IDs take some effort to create and increase a message's + * size, some JMS providers may be able to optimize message overhead if + * they are given a hint that the message ID is not used by an application. + * By calling the setDisableMessageID method on this message + * producer, a JMS client enables this potential optimization for all + * messages sent by this message producer. If the JMS provider accepts this + * hint, these messages must have the message ID set to null; if the + * provider ignores the hint, the message ID must be set to its normal + * unique value. + *

+ * Message IDs are enabled by default. + * + * @param value indicates if message IDs are disabled + * @throws javax.jms.JMSException if the JMS provider fails to close the producer due to + * some internal error. + */ + public void setDisableMessageID(boolean value) throws JMSException { + checkClosed(); + this.disableMessageID = value; + } + + /** + * Gets an indication of whether message IDs are disabled. + * + * @return an indication of whether message IDs are disabled + * @throws javax.jms.JMSException if the JMS provider fails to determine if message IDs are + * disabled due to some internal error. + */ + public boolean getDisableMessageID() throws JMSException { + checkClosed(); + return this.disableMessageID; + } + + /** + * Sets whether message timestamps are disabled. + *

+ * Since timestamps take some effort to create and increase a message's + * size, some JMS providers may be able to optimize message overhead if + * they are given a hint that the timestamp is not used by an application. + * By calling the setDisableMessageTimestamp method on this + * message producer, a JMS client enables this potential optimization for + * all messages sent by this message producer. If the JMS provider accepts + * this hint, these messages must have the timestamp set to zero; if the + * provider ignores the hint, the timestamp must be set to its normal + * value. + *

+ * Message timestamps are enabled by default. + * + * @param value indicates if message timestamps are disabled + * @throws javax.jms.JMSException if the JMS provider fails to close the producer due to + * some internal error. + */ + public void setDisableMessageTimestamp(boolean value) throws JMSException { + checkClosed(); + this.disableMessageTimestamp = value; + } + + /** + * Gets an indication of whether message timestamps are disabled. + * + * @return an indication of whether message timestamps are disabled + * @throws javax.jms.JMSException if the JMS provider fails to close the producer due to + * some internal error. + */ + public boolean getDisableMessageTimestamp() throws JMSException { + checkClosed(); + return this.disableMessageTimestamp; + } + + /** + * Sets the producer's default delivery mode. + *

+ * Delivery mode is set to PERSISTENT by default. + * + * @param newDeliveryMode the message delivery mode for this message producer; legal + * values are DeliveryMode.NON_PERSISTENT and + * DeliveryMode.PERSISTENT + * @throws javax.jms.JMSException if the JMS provider fails to set the delivery mode due to + * some internal error. + * @see javax.jms.MessageProducer#getDeliveryMode + * @see javax.jms.DeliveryMode#NON_PERSISTENT + * @see javax.jms.DeliveryMode#PERSISTENT + * @see javax.jms.Message#DEFAULT_DELIVERY_MODE + */ + public void setDeliveryMode(int newDeliveryMode) throws JMSException { + if (newDeliveryMode != DeliveryMode.PERSISTENT && newDeliveryMode != DeliveryMode.NON_PERSISTENT) { + throw new javax.jms.IllegalStateException("unkown delivery mode: " + newDeliveryMode); + } + checkClosed(); + this.defaultDeliveryMode = newDeliveryMode; + } + + /** + * Gets the producer's default delivery mode. + * + * @return the message delivery mode for this message producer + * @throws javax.jms.JMSException if the JMS provider fails to close the producer due to + * some internal error. + */ + public int getDeliveryMode() throws JMSException { + checkClosed(); + return this.defaultDeliveryMode; + } + + /** + * Sets the producer's default priority. + *

+ * The JMS API defines ten levels of priority value, with 0 as the lowest + * priority and 9 as the highest. Clients should consider priorities 0-4 as + * gradations of normal priority and priorities 5-9 as gradations of + * expedited priority. Priority is set to 4 by default. + * + * @param newDefaultPriority the message priority for this message producer; must be a + * value between 0 and 9 + * @throws javax.jms.JMSException if the JMS provider fails to set the delivery mode due to + * some internal error. + * @see javax.jms.MessageProducer#getPriority + * @see javax.jms.Message#DEFAULT_PRIORITY + */ + public void setPriority(int newDefaultPriority) throws JMSException { + if (newDefaultPriority < 0 || newDefaultPriority > 9) { + throw new IllegalStateException("default priority must be a value between 0 and 9"); + } + checkClosed(); + this.defaultPriority = newDefaultPriority; + } + + /** + * Gets the producer's default priority. + * + * @return the message priority for this message producer + * @throws javax.jms.JMSException if the JMS provider fails to close the producer due to + * some internal error. + * @see javax.jms.MessageProducer#setPriority + */ + public int getPriority() throws JMSException { + checkClosed(); + return this.defaultPriority; + } + + /** + * Sets the default length of time in milliseconds from its dispatch time + * that a produced message should be retained by the message system. + *

+ * Time to live is set to zero by default. + * + * @param timeToLive the message time to live in milliseconds; zero is unlimited + * @throws javax.jms.JMSException if the JMS provider fails to set the time to live due to + * some internal error. + * @see javax.jms.MessageProducer#getTimeToLive + * @see javax.jms.Message#DEFAULT_TIME_TO_LIVE + */ + public void setTimeToLive(long timeToLive) throws JMSException { + if (timeToLive < 0l) { + throw new IllegalStateException("cannot set a negative timeToLive"); + } + checkClosed(); + this.defaultTimeToLive = timeToLive; + } + + /** + * Gets the default length of time in milliseconds from its dispatch time + * that a produced message should be retained by the message system. + * + * @return the message time to live in milliseconds; zero is unlimited + * @throws javax.jms.JMSException if the JMS provider fails to get the time to live due to + * some internal error. + * @see javax.jms.MessageProducer#setTimeToLive + */ + public long getTimeToLive() throws JMSException { + checkClosed(); + return this.defaultTimeToLive; + } + + /** + * Sends a message using the MessageProducer's default + * delivery mode, priority, and time to live. + * + * @param message the message to send + * @throws javax.jms.JMSException if the JMS provider fails to send the message due to some + * internal error. + * @throws javax.jms.MessageFormatException if an invalid message is specified. + * @throws javax.jms.InvalidDestinationException if a client uses this method with a + * MessageProducer with an invalid destination. + * @throws UnsupportedOperationException + * if a client uses this method with a + * MessageProducer that did not specify a + * destination at creation time. + * @see javax.jms.Session#createProducer + * @see javax.jms.MessageProducer + * @since 1.1 + */ + public void send(Message message) throws JMSException { + this.send(this.getDestination(), + message, + this.defaultDeliveryMode, + this.defaultPriority, + this.defaultTimeToLive); + } + + /** + * Sends a message to the destination, specifying delivery mode, priority, + * and time to live. + * + * @param message the message to send + * @param deliveryMode the delivery mode to use + * @param priority the priority for this message + * @param timeToLive the message's lifetime (in milliseconds) + * @throws javax.jms.JMSException if the JMS provider fails to send the message due to some + * internal error. + * @throws javax.jms.MessageFormatException if an invalid message is specified. + * @throws javax.jms.InvalidDestinationException if a client uses this method with a + * MessageProducer with an invalid destination. + * @throws UnsupportedOperationException + * if a client uses this method with a + * MessageProducer that did not specify a + * destination at creation time. + * @see javax.jms.Session#createProducer + * @since 1.1 + */ + public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { + this.send(this.getDestination(), + message, + deliveryMode, + priority, + timeToLive); + } + + /** + * Sends a message to a destination for an unidentified message producer. + * Uses the MessageProducer's default delivery mode, + * priority, and time to live. + *

+ * Typically, a message producer is assigned a destination at creation + * time; however, the JMS API also supports unidentified message producers, + * which require that the destination be supplied every time a message is + * sent. + * + * @param destination the destination to send this message to + * @param message the message to send + * @throws javax.jms.JMSException if the JMS provider fails to send the message due to some + * internal error. + * @throws javax.jms.MessageFormatException if an invalid message is specified. + * @throws javax.jms.InvalidDestinationException if a client uses this method with an invalid destination. + * @throws UnsupportedOperationException + * if a client uses this method with a + * MessageProducer that specified a destination at + * creation time. + * @see javax.jms.Session#createProducer + * @see javax.jms.MessageProducer + */ + public void send(Destination destination, Message message) throws JMSException { + this.send(destination, + message, + this.defaultDeliveryMode, + this.defaultPriority, + this.defaultTimeToLive); + } + + + protected abstract void checkClosed() throws IllegalStateException; +} diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java index 0898ef0941..40d560f5ee 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -851,6 +851,11 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta */ public MessageProducer createProducer(Destination destination) throws JMSException { checkClosed(); + if (destination instanceof CustomDestination) { + CustomDestination customDestination = (CustomDestination) destination; + return customDestination.createProducer(this); + } + return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation .transformDestination(destination)); } @@ -904,6 +909,12 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta */ public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { checkClosed(); + + if (destination instanceof CustomDestination) { + CustomDestination customDestination = (CustomDestination) destination; + return customDestination.createConsumer(this, messageSelector); + } + int prefetch = 0; ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy(); @@ -958,7 +969,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * expression are delivered. A value of null or an empty string * indicates that there is no message selector for the message * consumer. - * @param NoLocal - + * @param noLocal - * if true, and the destination is a topic, inhibits the delivery * of messages published by its own connection. The behavior for * NoLocal is not specified if the destination is @@ -973,13 +984,20 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * if the message selector is invalid. * @since 1.1 */ - public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal) + public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { checkClosed(); + + if (destination instanceof CustomDestination) { + CustomDestination customDestination = (CustomDestination) destination; + return customDestination.createConsumer(this, messageSelector, noLocal); + } + + ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy(); return new ActiveMQMessageConsumer(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(destination), null, messageSelector, - prefetchPolicy.getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), NoLocal, false, asyncDispatch); + prefetchPolicy.getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch); } /** @@ -1145,6 +1163,12 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta public TopicSubscriber createDurableSubscriber(Topic topic,String name,String messageSelector,boolean noLocal) throws JMSException{ checkClosed(); + + if (topic instanceof CustomDestination) { + CustomDestination customDestination = (CustomDestination) topic; + return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal); + } + connection.checkClientIDWasManuallySpecified(); ActiveMQPrefetchPolicy prefetchPolicy=this.connection.getPrefetchPolicy(); int prefetch=isAutoAcknowledge()&&connection.isOptimizedMessageDispatch()?prefetchPolicy @@ -1272,6 +1296,12 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta */ public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { checkClosed(); + + if (queue instanceof CustomDestination) { + CustomDestination customDestination = (CustomDestination) queue; + return customDestination.createReceiver(this, messageSelector); + } + ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation .transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(), @@ -1294,6 +1324,11 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta */ public QueueSender createSender(Queue queue) throws JMSException { checkClosed(); + if (queue instanceof CustomDestination) { + CustomDestination customDestination = (CustomDestination) queue; + return customDestination.createSender(this); + } + return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue)); } @@ -1366,6 +1401,12 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta */ public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { checkClosed(); + + if (topic instanceof CustomDestination) { + CustomDestination customDestination = (CustomDestination) topic; + return customDestination.createSubscriber(this, messageSelector, noLocal); + } + ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation .transformDestination(topic), null, messageSelector, prefetchPolicy.getTopicPrefetch(), @@ -1392,6 +1433,11 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta */ public TopicPublisher createPublisher(Topic topic) throws JMSException { checkClosed(); + + if (topic instanceof CustomDestination) { + CustomDestination customDestination = (CustomDestination) topic; + return customDestination.createPublisher(this); + } return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic)); } @@ -1807,6 +1853,10 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta return transformer; } + public ActiveMQConnection getConnection() { + return connection; + } + /** * Sets the transformer used to transform messages before they are sent on to the JMS bus * or when they are received from the bus but before they are delivered to the JMS client @@ -1831,7 +1881,6 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta return executor.getUnconsumedMessages(); } - public String toString() { return "ActiveMQSession {id="+info.getSessionId()+",started="+started.get()+"}"; } diff --git a/activemq-core/src/main/java/org/apache/activemq/CustomDestination.java b/activemq-core/src/main/java/org/apache/activemq/CustomDestination.java new file mode 100644 index 0000000000..2db8be6a2b --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/CustomDestination.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.TopicSubscriber; +import javax.jms.QueueReceiver; +import javax.jms.TopicPublisher; +import javax.jms.QueueSender; +import javax.jms.JMSException; + +/** + * Represents a hook to allow the support of custom destinations + * such as to support Apache Camel + * to create and manage endpoints + * + * @version $Revision: $ + */ +public interface CustomDestination extends Destination { + + // Consumers + //----------------------------------------------------------------------- + MessageConsumer createConsumer(ActiveMQSession session, String messageSelector); + MessageConsumer createConsumer(ActiveMQSession session, String messageSelector, boolean noLocal); + + TopicSubscriber createSubscriber(ActiveMQSession session, String messageSelector, boolean noLocal); + TopicSubscriber createDurableSubscriber(ActiveMQSession session, String name, String messageSelector, boolean noLocal); + + QueueReceiver createReceiver(ActiveMQSession session, String messageSelector); + + // Producers + //----------------------------------------------------------------------- + MessageProducer createProducer(ActiveMQSession session) throws JMSException; + + TopicPublisher createPublisher(ActiveMQSession session) throws JMSException; + + QueueSender createSender(ActiveMQSession session) throws JMSException; +}