synchronize around the send on a Message

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@487232 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-12-14 15:18:25 +00:00
parent 87d15d6b98
commit e500f2ecc4

View File

@ -199,6 +199,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
protected boolean closed;
protected boolean asyncDispatch;
protected boolean sessionAsyncDispatch;
protected Object sendMutex = new Object();
/**
* Construct the Session
@ -1493,92 +1494,87 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* message expiration.
* @throws JMSException
*/
protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode,
int priority, long timeToLive) throws JMSException {
checkClosed();
protected void send(ActiveMQMessageProducer producer,
ActiveMQDestination destination,Message message,int deliveryMode,
int priority,long timeToLive) throws JMSException{
checkClosed();
if(destination.isTemporary()&&connection.isDeleted(destination)){
throw new JMSException("Cannot publish to a deleted Destination: "
+destination);
}
synchronized(sendMutex){
// tell the Broker we are about to start a new transaction
doStartTransaction();
TransactionId txid=transactionContext.getTransactionId();
message.setJMSDestination(destination);
message.setJMSDeliveryMode(deliveryMode);
long expiration=0L;
if(!producer.getDisableMessageTimestamp()){
long timeStamp=System.currentTimeMillis();
message.setJMSTimestamp(timeStamp);
if(timeToLive>0){
expiration=timeToLive+timeStamp;
}
}
message.setJMSExpiration(expiration);
message.setJMSPriority(priority);
long sequenceNumber=producer.getMessageSequence();
message.setJMSRedelivered(false);
// transform to our own message format here
ActiveMQMessage msg=ActiveMQMessageTransformation.transformMessage(
message,connection);
// Set the message id.
if(msg==message){
msg.setMessageId(new MessageId(producer.getProducerInfo()
.getProducerId(),sequenceNumber));
}else{
msg.setMessageId(new MessageId(producer.getProducerInfo()
.getProducerId(),sequenceNumber));
message.setJMSMessageID(msg.getMessageId().toString());
}
msg.setTransactionId(txid);
if(connection.isCopyMessageOnSend()){
msg=(ActiveMQMessage)msg.copy();
}
msg.onSend();
msg.setProducerId(msg.getMessageId().getProducerId());
if(log.isDebugEnabled()){
log.debug("Sending message: "+msg);
}
if(!msg.isPersistent()||connection.isUseAsyncSend()||txid!=null){
this.connection.asyncSendPacket(msg);
}else{
this.connection.syncSendPacket(msg);
}
}
}
if( destination.isTemporary() && connection.isDeleted(destination) ) {
throw new JMSException("Cannot publish to a deleted Destination: "+destination);
}
// tell the Broker we are about to start a new transaction
doStartTransaction();
TransactionId txid = transactionContext.getTransactionId();
message.setJMSDestination(destination);
message.setJMSDeliveryMode(deliveryMode);
long expiration = 0L;
if (!producer.getDisableMessageTimestamp()) {
long timeStamp = System.currentTimeMillis();
message.setJMSTimestamp(timeStamp);
if (timeToLive > 0) {
expiration = timeToLive + timeStamp;
}
}
message.setJMSExpiration(expiration);
message.setJMSPriority(priority);
long sequenceNumber = producer.getMessageSequence();
message.setJMSRedelivered(false);
// transform to our own message format here
ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
// Set the message id.
if( msg == message ) {
msg.setMessageId( new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber) );
} else {
msg.setMessageId( new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber) );
message.setJMSMessageID(msg.getMessageId().toString());
}
msg.setTransactionId(txid);
if ( connection.isCopyMessageOnSend() ){
msg = (ActiveMQMessage) msg.copy();
}
msg.onSend();
msg.setProducerId(msg.getMessageId().getProducerId());
if (log.isDebugEnabled()) {
log.debug("Sending message: " + msg);
}
if(!msg.isPersistent() || connection.isUseAsyncSend() || txid!=null) {
this.connection.asyncSendPacket(msg);
} else {
this.connection.syncSendPacket(msg);
}
}
/**
* Send TransactionInfo to indicate transaction has started
*
* @throws JMSException
* if some internal error occurs
*/
protected void doStartTransaction() throws JMSException{
if(getTransacted()&&!transactionContext.isInXATransaction()){
transactionContext.begin();
}
}
/**
* Send TransactionInfo to indicate transaction has started
*
* @throws JMSException
* if some internal error occurs
*/
protected void doStartTransaction() throws JMSException {
if (getTransacted() && !transactionContext.isInXATransaction()) {
transactionContext.begin();
}
}
/**
* Checks whether the session has unconsumed messages.
*
* @return true - if there are unconsumed messages.
*/
* Checks whether the session has unconsumed messages.
*
* @return true - if there are unconsumed messages.
*/
public boolean hasUncomsumedMessages() {
return executor.hasUncomsumedMessages();
}
/**
* Checks whether the session uses transactions.
*
* @return true - if the session uses transactions.
*/
* Checks whether the session uses transactions.
*
* @return true - if the session uses transactions.
*/
public boolean isTransacted() {
return this.acknowledgementMode == Session.SESSION_TRANSACTED;
}