diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index a9c607ac75..aea6794e63 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -132,6 +132,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon private boolean optimizeAcknowledge = false; private boolean nestedMapAndListEnabled = true; private boolean useRetroactiveConsumer; + private boolean alwaysSyncSend; private int closeTimeout = 15000; private final Transport transport; @@ -1301,6 +1302,22 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon public void setUseAsyncSend(boolean useAsyncSend) { this.useAsyncSend = useAsyncSend; } + + /** + * @return true if always sync send messages + */ + public boolean isAlwaysSyncSend(){ + return this.alwaysSyncSend; + } + + /** + * Set true if always require messages to be sync sent + * @param alwaysSyncSend + */ + public void setAlwaysSyncSend(boolean alwaysSyncSend){ + this.alwaysSyncSend=alwaysSyncSend; + } + /** * Cleans up this connection so that it's state is as if the connection was @@ -1929,8 +1946,5 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon public String toString() { return "ActiveMQConnection {id="+info.getConnectionId()+",clientId="+info.getClientId()+",started="+started.get()+"}"; - } - - - + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java index f561e9ccb9..cef01052db 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java @@ -87,6 +87,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne private boolean useRetroactiveConsumer; private boolean nestedMapAndListEnabled = true; JMSStatsImpl factoryStats = new JMSStatsImpl(); + private boolean alwaysSyncSend; static protected final Executor DEFAULT_CONNECTION_EXECUTOR = new ScheduledThreadPoolExecutor(5, new ThreadFactory() { public Thread newThread(Runnable run) { @@ -425,6 +426,21 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne public void setUseAsyncSend(boolean useAsyncSend) { this.useAsyncSend = useAsyncSend; } + + /** + * @return true if always sync send messages + */ + public boolean isAlwaysSyncSend(){ + return this.alwaysSyncSend; + } + + /** + * Set true if always require messages to be sync sent + * @param alwaysSyncSend + */ + public void setAlwaysSyncSend(boolean alwaysSyncSend){ + this.alwaysSyncSend=alwaysSyncSend; + } public String getUserName() { return userName; @@ -553,6 +569,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne props.setProperty("alwaysSessionAsync", Boolean.toString(isAlwaysSessionAsync())); props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge())); props.setProperty("statsEnabled",Boolean.toString(isStatsEnabled())); + props.setProperty("alwaysSyncSend",Boolean.toString(isAlwaysSyncSend())); } diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java index b3142dd84a..b6a16e381b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -1594,11 +1594,11 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta if(this.debug){ log.debug("Sending message: "+msg); } - if(!msg.isPersistent()||connection.isUseAsyncSend()||txid!=null){ - this.connection.asyncSendPacket(msg); - }else{ - this.connection.syncSendPacket(msg); - } + if(!connection.isAlwaysSyncSend()&&(!msg.isPersistent()||connection.isUseAsyncSend()||txid!=null)){ + this.connection.asyncSendPacket(msg); + }else{ + this.connection.syncSendPacket(msg); + } } }