mirror of https://github.com/apache/activemq.git
provide the option to always send messages synchronously - even if the messages are
non-persistent git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@513543 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c0cd83870c
commit
4e2acd4a77
|
@ -132,6 +132,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
||||||
private boolean optimizeAcknowledge = false;
|
private boolean optimizeAcknowledge = false;
|
||||||
private boolean nestedMapAndListEnabled = true;
|
private boolean nestedMapAndListEnabled = true;
|
||||||
private boolean useRetroactiveConsumer;
|
private boolean useRetroactiveConsumer;
|
||||||
|
private boolean alwaysSyncSend;
|
||||||
private int closeTimeout = 15000;
|
private int closeTimeout = 15000;
|
||||||
|
|
||||||
private final Transport transport;
|
private final Transport transport;
|
||||||
|
@ -1302,6 +1303,22 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
||||||
this.useAsyncSend = useAsyncSend;
|
this.useAsyncSend = useAsyncSend;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true if always sync send messages
|
||||||
|
*/
|
||||||
|
public boolean isAlwaysSyncSend(){
|
||||||
|
return this.alwaysSyncSend;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set true if always require messages to be sync sent
|
||||||
|
* @param alwaysSyncSend
|
||||||
|
*/
|
||||||
|
public void setAlwaysSyncSend(boolean alwaysSyncSend){
|
||||||
|
this.alwaysSyncSend=alwaysSyncSend;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Cleans up this connection so that it's state is as if the connection was
|
* Cleans up this connection so that it's state is as if the connection was
|
||||||
* just created. This allows the Resource Adapter to clean up a connection
|
* just created. This allows the Resource Adapter to clean up a connection
|
||||||
|
@ -1930,7 +1947,4 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "ActiveMQConnection {id="+info.getConnectionId()+",clientId="+info.getClientId()+",started="+started.get()+"}";
|
return "ActiveMQConnection {id="+info.getConnectionId()+",clientId="+info.getClientId()+",started="+started.get()+"}";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -87,6 +87,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
|
||||||
private boolean useRetroactiveConsumer;
|
private boolean useRetroactiveConsumer;
|
||||||
private boolean nestedMapAndListEnabled = true;
|
private boolean nestedMapAndListEnabled = true;
|
||||||
JMSStatsImpl factoryStats = new JMSStatsImpl();
|
JMSStatsImpl factoryStats = new JMSStatsImpl();
|
||||||
|
private boolean alwaysSyncSend;
|
||||||
|
|
||||||
static protected final Executor DEFAULT_CONNECTION_EXECUTOR = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
|
static protected final Executor DEFAULT_CONNECTION_EXECUTOR = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
|
||||||
public Thread newThread(Runnable run) {
|
public Thread newThread(Runnable run) {
|
||||||
|
@ -426,6 +427,21 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
|
||||||
this.useAsyncSend = useAsyncSend;
|
this.useAsyncSend = useAsyncSend;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true if always sync send messages
|
||||||
|
*/
|
||||||
|
public boolean isAlwaysSyncSend(){
|
||||||
|
return this.alwaysSyncSend;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set true if always require messages to be sync sent
|
||||||
|
* @param alwaysSyncSend
|
||||||
|
*/
|
||||||
|
public void setAlwaysSyncSend(boolean alwaysSyncSend){
|
||||||
|
this.alwaysSyncSend=alwaysSyncSend;
|
||||||
|
}
|
||||||
|
|
||||||
public String getUserName() {
|
public String getUserName() {
|
||||||
return userName;
|
return userName;
|
||||||
}
|
}
|
||||||
|
@ -553,6 +569,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
|
||||||
props.setProperty("alwaysSessionAsync", Boolean.toString(isAlwaysSessionAsync()));
|
props.setProperty("alwaysSessionAsync", Boolean.toString(isAlwaysSessionAsync()));
|
||||||
props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge()));
|
props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge()));
|
||||||
props.setProperty("statsEnabled",Boolean.toString(isStatsEnabled()));
|
props.setProperty("statsEnabled",Boolean.toString(isStatsEnabled()));
|
||||||
|
props.setProperty("alwaysSyncSend",Boolean.toString(isAlwaysSyncSend()));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1594,7 +1594,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
if(this.debug){
|
if(this.debug){
|
||||||
log.debug("Sending message: "+msg);
|
log.debug("Sending message: "+msg);
|
||||||
}
|
}
|
||||||
if(!msg.isPersistent()||connection.isUseAsyncSend()||txid!=null){
|
if(!connection.isAlwaysSyncSend()&&(!msg.isPersistent()||connection.isUseAsyncSend()||txid!=null)){
|
||||||
this.connection.asyncSendPacket(msg);
|
this.connection.asyncSendPacket(msg);
|
||||||
}else{
|
}else{
|
||||||
this.connection.syncSendPacket(msg);
|
this.connection.syncSendPacket(msg);
|
||||||
|
|
Loading…
Reference in New Issue