extra peformance tuning parameters

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@393294 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-04-11 19:09:11 +00:00
parent 014c62bc56
commit d964145532
5 changed files with 174 additions and 66 deletions

View File

@ -116,8 +116,10 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private boolean copyMessageOnSend = true;
private boolean useCompression = false;
private boolean objectMessageSerializationDefered = false;
protected boolean asyncDispatch = true;
protected boolean asyncDispatch = false;
protected boolean alwaysSessionAsync=true;
private boolean useAsyncSend = false;
private boolean optimizeAcknowledge = true;
private boolean useRetroactiveConsumer;
private int closeTimeout = 15000;
@ -247,15 +249,18 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* @see Session#DUPS_OK_ACKNOWLEDGE
* @since 1.1
*/
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
public Session createSession(boolean transacted,int acknowledgeMode) throws JMSException{
checkClosedOrFailed();
ensureConnectionInfoSent();
return new ActiveMQSession(this, getNextSessionId(), (transacted ? Session.SESSION_TRANSACTED
: (acknowledgeMode == Session.SESSION_TRANSACTED ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode)), asyncDispatch);
boolean doSessionAsync=alwaysSessionAsync||sessions.size()>0||transacted
||acknowledgeMode==Session.CLIENT_ACKNOWLEDGE;
return new ActiveMQSession(this,getNextSessionId(),(transacted?Session.SESSION_TRANSACTED
:(acknowledgeMode==Session.SESSION_TRANSACTED?Session.AUTO_ACKNOWLEDGE:acknowledgeMode)),
asyncDispatch,alwaysSessionAsync);
}
/**
* @return
* @return sessionId
*/
protected SessionId getNextSessionId() {
return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
@ -1325,6 +1330,37 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
this.redeliveryPolicy = redeliveryPolicy;
}
/**
* @return Returns the alwaysSessionAsync.
*/
public boolean isAlwaysSessionAsync(){
return alwaysSessionAsync;
}
/**
* @param alwaysSessionAsync The alwaysSessionAsync to set.
*/
public void setAlwaysSessionAsync(boolean alwaysSessionAsync){
this.alwaysSessionAsync=alwaysSessionAsync;
}
/**
* @return Returns the optimizeAcknowledge.
*/
public boolean isOptimizeAcknowledge(){
return optimizeAcknowledge;
}
/**
* @param optimizeAcknowledge The optimizeAcknowledge to set.
*/
public void setOptimizeAcknowledge(boolean optimizeAcknowledge){
this.optimizeAcknowledge=optimizeAcknowledge;
}
private void waitForBrokerInfo() throws JMSException {
try {
@ -1516,7 +1552,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
}
public void setAsyncDispatch(boolean asyncDispatch) {
this.asyncDispatch = asyncDispatch;
//this.asyncDispatch = asyncDispatch;
}
public boolean isObjectMessageSerializationDefered() {
@ -1702,4 +1738,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public String toString() {
return "ActiveMQConnection {id="+info.getConnectionId()+",clientId="+info.getClientId()+",started="+started.get()+"}";
}
}

View File

@ -79,8 +79,10 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
private boolean copyMessageOnSend = true;
private boolean useCompression = false;
private boolean objectMessageSerializationDefered = false;
protected boolean asyncDispatch = true;
protected boolean asyncDispatch = false;
protected boolean alwaysSessionAsync=true;
private boolean useAsyncSend = false;
private boolean optimizeAcknowledge = true;
private int closeTimeout = 15000;
private boolean useRetroactiveConsumer;
@ -233,6 +235,8 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
connection.setAsyncDispatch(isAsyncDispatch());
connection.setUseAsyncSend(isUseAsyncSend());
connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
connection.setRedeliveryPolicy(getRedeliveryPolicy());
@ -417,6 +421,9 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer()));
props.setProperty("userName", getUserName());
props.setProperty("closeTimeout", Integer.toString(getCloseTimeout()));
props.setProperty("alwaysSessionAsync", Boolean.toString(isAlwaysSessionAsync()));
props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge()));
}
public boolean isOnSendPrepareMessageBody() {
@ -464,4 +471,32 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
public void setCloseTimeout(int closeTimeout){
this.closeTimeout=closeTimeout;
}
/**
* @return Returns the alwaysSessionAsync.
*/
public boolean isAlwaysSessionAsync(){
return alwaysSessionAsync;
}
/**
* @param alwaysSessionAsync The alwaysSessionAsync to set.
*/
public void setAlwaysSessionAsync(boolean alwaysSessionAsync){
this.alwaysSessionAsync=alwaysSessionAsync;
}
/**
* @return Returns the optimizeAcknowledge.
*/
public boolean isOptimizeAcknowledge(){
return optimizeAcknowledge;
}
/**
* @param optimizeAcknowledge The optimizeAcknowledge to set.
*/
public void setOptimizeAcknowledge(boolean optimizeAcknowledge){
this.optimizeAcknowledge=optimizeAcknowledge;
}
}

View File

@ -100,7 +100,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
private int additionalWindowSize = 0;
private int rollbackCounter = 0;
private long redeliveryDelay = 0;
private int ackCounter = 0;
private MessageListener messageListener;
private JMSConsumerStatsImpl stats;
@ -111,6 +111,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
private MessageAvailableListener availableListener;
private RedeliveryPolicy redeliveryPolicy;
private boolean optimizeAcknowledge;
/**
* Create a MessageConsumer
@ -188,7 +189,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
this.session.removeConsumer(this);
throw e;
}
this.optimizeAcknowledge=session.connection.isOptimizeAcknowledge()&&!info.isDurable()
&&!info.getDestination().isQueue()
&&session.isAutoAcknowledge();
if (session.connection.isStarted())
start();
}
@ -540,27 +543,36 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
deliveredMessages.addFirst(md);
}
private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
if (unconsumedMessages.isClosed())
private void afterMessageIsConsumed(MessageDispatch md,boolean messageExpired) throws JMSException{
if(unconsumedMessages.isClosed())
return;
if (messageExpired) {
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
} else {
if(messageExpired){
ackLater(md,MessageAck.DELIVERED_ACK_TYPE);
}else{
stats.onMessage();
if (session.isTransacted()) {
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
} else if (session.isAutoAcknowledge()) {
if (!deliveredMessages.isEmpty()) {
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredMessages.size());
session.asyncSendPacket(ack);
deliveredMessages.clear();
if(session.isTransacted()){
ackLater(md,MessageAck.DELIVERED_ACK_TYPE);
}else if(session.isAutoAcknowledge()){
if(!deliveredMessages.isEmpty()){
if(this.optimizeAcknowledge){
ackCounter++;
if(ackCounter>=(info.getPrefetchSize()*.75)){
MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,ackCounter);
session.asyncSendPacket(ack);
ackCounter=0;
deliveredMessages.clear();
}
}else{
MessageAck ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
session.asyncSendPacket(ack);
deliveredMessages.clear();
}
}
} else if (session.isDupsOkAcknowledge()) {
ackLater(md, MessageAck.STANDARD_ACK_TYPE);
} else if (session.isClientAcknowledge()) {
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
} else {
}else if(session.isDupsOkAcknowledge()){
ackLater(md,MessageAck.STANDARD_ACK_TYPE);
}else if(session.isClientAcknowledge()){
ackLater(md,MessageAck.DELIVERED_ACK_TYPE);
}else{
throw new IllegalStateException("Invalid session state.");
}
}
@ -645,63 +657,59 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
redeliveryDelay = 0;
}
public void rollback() throws JMSException {
synchronized (unconsumedMessages.getMutex()) {
if (deliveredMessages.isEmpty())
public void rollback() throws JMSException{
synchronized(unconsumedMessages.getMutex()){
if(optimizeAcknowledge){
// remove messages read but not acked at the broker yet through optimizeAcknowledge
for(int i=0;(i<deliveredMessages.size())&&(i<ackCounter);i++){
deliveredMessages.removeLast();
}
}
if(deliveredMessages.isEmpty())
return;
rollbackCounter++;
if (rollbackCounter > redeliveryPolicy.getMaximumRedeliveries()) {
if(rollbackCounter>redeliveryPolicy.getMaximumRedeliveries()){
// We need to NACK the messages so that they get sent to the
// DLQ.
// Acknowledge the last message.
MessageDispatch lastMd = (MessageDispatch) deliveredMessages.get(0);
MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
MessageDispatch lastMd=(MessageDispatch) deliveredMessages.get(0);
MessageAck ack=new MessageAck(lastMd,MessageAck.POSION_ACK_TYPE,deliveredMessages.size());
session.asyncSendPacket(ack);
// Adjust the window size.
additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
rollbackCounter = 0;
redeliveryDelay = 0;
} else {
additionalWindowSize=Math.max(0,additionalWindowSize-deliveredMessages.size());
rollbackCounter=0;
redeliveryDelay=0;
}else{
// stop the delivery of messages.
unconsumedMessages.stop();
// Start up the delivery again a little later.
if (redeliveryDelay == 0) {
redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
} else {
if (redeliveryPolicy.isUseExponentialBackOff())
redeliveryDelay *= redeliveryPolicy.getBackOffMultiplier();
if(redeliveryDelay==0){
redeliveryDelay=redeliveryPolicy.getInitialRedeliveryDelay();
}else{
if(redeliveryPolicy.isUseExponentialBackOff())
redeliveryDelay*=redeliveryPolicy.getBackOffMultiplier();
}
Scheduler.executeAfterDelay(new Runnable() {
public void run() {
try {
if (started.get())
Scheduler.executeAfterDelay(new Runnable(){
public void run(){
try{
if(started.get())
start();
} catch (JMSException e) {
}catch(JMSException e){
session.connection.onAsyncException(e);
}
}
}, redeliveryDelay);
for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) {
MessageDispatch md = (MessageDispatch) iter.next();
},redeliveryDelay);
for(Iterator iter=deliveredMessages.iterator();iter.hasNext();){
MessageDispatch md=(MessageDispatch) iter.next();
md.getMessage().onMessageRolledBack();
unconsumedMessages.enqueueFirst(md);
}
}
deliveredCounter -= deliveredMessages.size();
deliveredCounter-=deliveredMessages.size();
deliveredMessages.clear();
}
if (messageListener != null) {
if(messageListener!=null){
session.redispatch(unconsumedMessages);
}
}

View File

@ -72,6 +72,7 @@ import org.apache.activemq.management.JMSSessionStatsImpl;
import org.apache.activemq.management.StatsCapable;
import org.apache.activemq.management.StatsImpl;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.LongSequenceGenerator;
@ -196,24 +197,29 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
protected boolean closed;
protected boolean asyncDispatch;
protected boolean sessionAsyncDispatch;
protected TaskRunner taskRunner;
/**
* Construct the Session
*
* @param connection
* @param sessionId
* @param acknowledgeMode
* n.b if transacted - the acknowledgeMode ==
* Session.SESSION_TRANSACTED
* @param asyncDispatch
* @param sessionAsyncDispatch
* @throws JMSException
* on internal error
*/
protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch)
protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch,boolean sessionAsyncDispatch)
throws JMSException {
this.connection = connection;
this.acknowledgementMode = acknowledgeMode;
this.asyncDispatch=asyncDispatch;
this.sessionAsyncDispatch = sessionAsyncDispatch;
this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue());
setTransactionContext(new TransactionContext(connection));
connection.addSession(this);
@ -224,6 +230,10 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
start();
}
protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch)throws JMSException {
this(connection,sessionId,acknowledgeMode,asyncDispatch,true);
}
/**
* Sets the transaction context of the session.
@ -1663,6 +1673,20 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
public void setAsyncDispatch(boolean asyncDispatch) {
this.asyncDispatch = asyncDispatch;
}
/**
* @return Returns the sessionAsyncDispatch.
*/
public boolean isSessionAsyncDispatch(){
return sessionAsyncDispatch;
}
/**
* @param sessionAsyncDispatch The sessionAsyncDispatch to set.
*/
public void setSessionAsyncDispatch(boolean sessionAsyncDispatch){
this.sessionAsyncDispatch=sessionAsyncDispatch;
}
public List getUnconsumedMessages() {
return executor.getUnconsumedMessages();
@ -1684,4 +1708,6 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
}
}
}

View File

@ -52,7 +52,7 @@ public class ActiveMQSessionExecutor implements Task {
void execute(MessageDispatch message) throws InterruptedException {
if (!session.isAsyncDispatch() && !dispatchedBySessionPool){
if (!session.isSessionAsyncDispatch() && !dispatchedBySessionPool){
dispatch(message);
}else {
messageQueue.enqueue(message);