git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@646437 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-04-09 17:02:04 +00:00
parent aba1195cc5
commit 114a923779
7 changed files with 69 additions and 15 deletions

View File

@ -142,6 +142,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private int closeTimeout = 15000; private int closeTimeout = 15000;
private boolean watchTopicAdvisories = true; private boolean watchTopicAdvisories = true;
private long warnAboutUnstartedConnectionTimeout = 500L; private long warnAboutUnstartedConnectionTimeout = 500L;
private int sendTimeout =0;
private final Transport transport; private final Transport transport;
private final IdGenerator clientIdGenerator; private final IdGenerator clientIdGenerator;
@ -1519,6 +1520,21 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout; this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
} }
/**
* @return the sendTimeout
*/
public int getSendTimeout() {
return sendTimeout;
}
/**
* @param sendTimeout the sendTimeout to set
*/
public void setSendTimeout(int sendTimeout) {
this.sendTimeout = sendTimeout;
}
/** /**
* Returns the time this connection was created * Returns the time this connection was created
*/ */
@ -2091,5 +2107,4 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) { protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
connectionAudit.rollbackDuplicate(dispatcher, message); connectionAudit.rollbackDuplicate(dispatcher, message);
} }
} }

View File

@ -106,6 +106,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
private boolean watchTopicAdvisories = true; private boolean watchTopicAdvisories = true;
private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE; private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE;
private long warnAboutUnstartedConnectionTimeout = 500L; private long warnAboutUnstartedConnectionTimeout = 500L;
private int sendTimeout =0;
private TransportListener transportListener; private TransportListener transportListener;
// ///////////////////////////////////////////// // /////////////////////////////////////////////
@ -302,6 +303,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
connection.setWatchTopicAdvisories(isWatchTopicAdvisories()); connection.setWatchTopicAdvisories(isWatchTopicAdvisories());
connection.setProducerWindowSize(getProducerWindowSize()); connection.setProducerWindowSize(getProducerWindowSize());
connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout()); connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout());
connection.setSendTimeout(getSendTimeout());
if (transportListener != null) { if (transportListener != null) {
connection.addTransportListener(transportListener); connection.addTransportListener(transportListener);
} }
@ -534,6 +536,21 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
return transformer; return transformer;
} }
/**
* @return the sendTimeout
*/
public int getSendTimeout() {
return sendTimeout;
}
/**
* @param sendTimeout the sendTimeout to set
*/
public void setSendTimeout(int sendTimeout) {
this.sendTimeout = sendTimeout;
}
/** /**
* Sets the transformer used to transform messages before they are sent on * Sets the transformer used to transform messages before they are sent on
* to the JMS bus or when they are received from the bus but before they are * to the JMS bus or when they are received from the bus but before they are
@ -627,6 +644,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
props.setProperty("statsEnabled", Boolean.toString(isStatsEnabled())); props.setProperty("statsEnabled", Boolean.toString(isStatsEnabled()));
props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend())); props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend()));
props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize())); props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize()));
props.setProperty("sendTimeout", Integer.toString(getSendTimeout()));
} }
public boolean isUseCompression() { public boolean isUseCompression() {

View File

@ -79,7 +79,7 @@ public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport impl
private MessageTransformer transformer; private MessageTransformer transformer;
private MemoryUsage producerWindow; private MemoryUsage producerWindow;
protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination) throws JMSException { protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination, int sendTimeout) throws JMSException {
super(session); super(session);
this.info = new ProducerInfo(producerId); this.info = new ProducerInfo(producerId);
this.info.setWindowSize(session.connection.getProducerWindowSize()); this.info.setWindowSize(session.connection.getProducerWindowSize());
@ -104,6 +104,7 @@ public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport impl
this.stats = new JMSProducerStatsImpl(session.getSessionStats(), destination); this.stats = new JMSProducerStatsImpl(session.getSessionStats(), destination);
this.session.addProducer(this); this.session.addProducer(this);
this.session.asyncSendPacket(info); this.session.asyncSendPacket(info);
this.setSendTimeout(sendTimeout);
setTransformer(session.getTransformer()); setTransformer(session.getTransformer());
} }
@ -223,7 +224,7 @@ public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport impl
} }
} }
this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow); this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow,sendTimeout);
stats.onMessage(); stats.onMessage();
} }

View File

@ -35,6 +35,7 @@ public abstract class ActiveMQMessageProducerSupport implements MessageProducer,
protected int defaultDeliveryMode; protected int defaultDeliveryMode;
protected int defaultPriority; protected int defaultPriority;
protected long defaultTimeToLive; protected long defaultTimeToLive;
protected int sendTimeout=0;
public ActiveMQMessageProducerSupport(ActiveMQSession session) { public ActiveMQMessageProducerSupport(ActiveMQSession session) {
this.session = session; this.session = session;
@ -305,4 +306,18 @@ public abstract class ActiveMQMessageProducerSupport implements MessageProducer,
protected abstract void checkClosed() throws IllegalStateException; protected abstract void checkClosed() throws IllegalStateException;
/**
* @return the sendTimeout
*/
public int getSendTimeout() {
return sendTimeout;
}
/**
* @param sendTimeout the sendTimeout to set
*/
public void setSendTimeout(int sendTimeout) {
this.sendTimeout = sendTimeout;
}
} }

View File

@ -72,9 +72,9 @@ import org.apache.activemq.command.ActiveMQDestination;
public class ActiveMQQueueSender extends ActiveMQMessageProducer implements QueueSender { public class ActiveMQQueueSender extends ActiveMQMessageProducer implements QueueSender {
protected ActiveMQQueueSender(ActiveMQSession session, ActiveMQDestination destination) protected ActiveMQQueueSender(ActiveMQSession session, ActiveMQDestination destination,int sendTimeout)
throws JMSException { throws JMSException {
super(session, session.getNextProducerId(), destination); super(session, session.getNextProducerId(), destination,sendTimeout);
} }
/** /**

View File

@ -812,8 +812,8 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
CustomDestination customDestination = (CustomDestination)destination; CustomDestination customDestination = (CustomDestination)destination;
return customDestination.createProducer(this); return customDestination.createProducer(this);
} }
int timeSendOut = connection.getSendTimeout();
return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination)); return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut);
} }
/** /**
@ -1293,8 +1293,8 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
CustomDestination customDestination = (CustomDestination)queue; CustomDestination customDestination = (CustomDestination)queue;
return customDestination.createSender(this); return customDestination.createSender(this);
} }
int timeSendOut = connection.getSendTimeout();
return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue)); return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut);
} }
/** /**
@ -1390,7 +1390,8 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
CustomDestination customDestination = (CustomDestination)topic; CustomDestination customDestination = (CustomDestination)topic;
return customDestination.createPublisher(this); return customDestination.createPublisher(this);
} }
return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic)); int timeSendOut = connection.getSendTimeout();
return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut);
} }
/** /**
@ -1576,7 +1577,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @throws JMSException * @throws JMSException
*/ */
protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
MemoryUsage producerWindow) throws JMSException { MemoryUsage producerWindow, int sendTimeout) throws JMSException {
checkClosed(); checkClosed();
if (destination.isTemporary() && connection.isDeleted(destination)) { if (destination.isTemporary() && connection.isDeleted(destination)) {
@ -1623,7 +1624,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
if (this.debug) { if (this.debug) {
LOG.debug(getSessionId() + " sending message: " + msg); LOG.debug(getSessionId() + " sending message: " + msg);
} }
if (!msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) { if (sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
this.connection.asyncSendPacket(msg); this.connection.asyncSendPacket(msg);
if (producerWindow != null) { if (producerWindow != null) {
// Since we defer lots of the marshaling till we hit the // Since we defer lots of the marshaling till we hit the
@ -1636,9 +1637,13 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
int size = msg.getSize(); int size = msg.getSize();
producerWindow.increaseUsage(size); producerWindow.increaseUsage(size);
} }
} else {
if (sendTimeout > 0) {
this.connection.syncSendPacket(msg,sendTimeout);
}else { }else {
this.connection.syncSendPacket(msg); this.connection.syncSendPacket(msg);
} }
}
} }
} }

View File

@ -85,8 +85,8 @@ public class ActiveMQTopicPublisher extends ActiveMQMessageProducer implements
TopicPublisher { TopicPublisher {
protected ActiveMQTopicPublisher(ActiveMQSession session, protected ActiveMQTopicPublisher(ActiveMQSession session,
ActiveMQDestination destination) throws JMSException { ActiveMQDestination destination, int sendTimeout) throws JMSException {
super(session, session.getNextProducerId(), destination); super(session, session.getNextProducerId(), destination,sendTimeout);
} }
/** /**