From d8b5d5c4cd6b695b6e28e56fe18909c3a12a865e Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Tue, 20 May 2008 08:14:47 +0000 Subject: [PATCH] Fix for https://issues.apache.org/activemq/browse/AMQ-1735 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@658154 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/activemq/ActiveMQConnection.java | 15 +++++++++++++++ .../activemq/ActiveMQConnectionFactory.java | 17 +++++++++++++++++ .../activemq/ActiveMQMessageConsumer.java | 16 ++++++++-------- .../org/apache/activemq/ActiveMQSession.java | 12 ++++++++++++ 4 files changed, 52 insertions(+), 8 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index 4fdcab094e..a1ae464c35 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -143,6 +143,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon private boolean watchTopicAdvisories = true; private long warnAboutUnstartedConnectionTimeout = 500L; private int sendTimeout =0; + private boolean sendAcksAsync=true; private final Transport transport; private final IdGenerator clientIdGenerator; @@ -1533,6 +1534,20 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon public void setSendTimeout(int sendTimeout) { this.sendTimeout = sendTimeout; } + + /** + * @return the sendAcksAsync + */ + public boolean isSendAcksAsync() { + return sendAcksAsync; + } + + /** + * @param sendAcksAsync the sendAcksAsync to set + */ + public void setSendAcksAsync(boolean sendAcksAsync) { + this.sendAcksAsync = sendAcksAsync; + } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java index 2b85362932..9ec4789584 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java @@ -107,6 +107,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE; private long warnAboutUnstartedConnectionTimeout = 500L; private int sendTimeout =0; + private boolean sendAcksAsync=true; private TransportListener transportListener; // ///////////////////////////////////////////// @@ -304,6 +305,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne connection.setProducerWindowSize(getProducerWindowSize()); connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout()); connection.setSendTimeout(getSendTimeout()); + connection.setSendAcksAsync(isSendAcksAsync()); if (transportListener != null) { connection.addTransportListener(transportListener); } @@ -549,6 +551,20 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne public void setSendTimeout(int sendTimeout) { this.sendTimeout = sendTimeout; } + + /** + * @return the sendAcksAsync + */ + public boolean isSendAcksAsync() { + return sendAcksAsync; + } + + /** + * @param sendAcksAsync the sendAcksAsync to set + */ + public void setSendAcksAsync(boolean sendAcksAsync) { + this.sendAcksAsync = sendAcksAsync; + } /** @@ -645,6 +661,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend())); props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize())); props.setProperty("sendTimeout", Integer.toString(getSendTimeout())); + props.setProperty("sendAcksAsync",Boolean.toString(isSendAcksAsync())); } public boolean isUseCompression() { diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index 1f65c28ca0..4e52850ef6 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -624,7 +624,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC executorService.submit(new Runnable() { public void run() { try { - session.asyncSendPacket(ackToSend); + session.sendAck(ackToSend,true); } catch (JMSException e) { LOG.error(getConsumerId() + " failed to delivered acknowledgements", e); } finally { @@ -757,7 +757,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC if (ackCounter >= (info .getCurrentPrefetchSize() * .65)) { MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size()); - session.asyncSendPacket(ack); + session.sendAck(ack); ackCounter = 0; deliveredMessages.clear(); } @@ -765,7 +765,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } } else { MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size()); - session.asyncSendPacket(ack); + session.sendAck(ack); deliveredMessages.clear(); } } @@ -815,7 +815,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize)) { MessageAck ack = new MessageAck(md, ackType, deliveredCounter); ack.setTransactionId(session.getTransactionContext().getTransactionId()); - session.asyncSendPacket(ack); + session.sendAck(ack); additionalWindowSize = deliveredCounter; // When using DUPS ok, we do a real ack. @@ -845,7 +845,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC session.doStartTransaction(); ack.setTransactionId(session.getTransactionContext().getTransactionId()); } - session.asyncSendPacket(ack); + session.sendAck(ack); // Adjust the counters deliveredCounter -= deliveredMessages.size(); @@ -859,7 +859,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC void acknowledge(MessageDispatch md) throws JMSException { MessageAck ack = new MessageAck(md,MessageAck.INDIVIDUAL_ACK_TYPE,1); - session.asyncSendPacket(ack); + session.sendAck(ack); synchronized(deliveredMessages){ deliveredMessages.remove(md); } @@ -910,7 +910,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC // Acknowledge the last message. MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size()); - session.asyncSendPacket(ack); + session.sendAck(ack,true); // ensure we don't filter this as a duplicate session.connection.rollbackDuplicate(this, lastMd.getMessage()); // Adjust the window size. @@ -919,7 +919,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } else { MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size()); - session.asyncSendPacket(ack); + session.sendAck(ack,true); // stop the delivery of messages. unconsumedMessages.stop(); diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java index f050776616..688f99510e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -1903,5 +1903,17 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta } return false; } + + protected void sendAck(MessageAck ack) throws JMSException { + sendAck(ack,false); + } + + protected void sendAck(MessageAck ack, boolean lazy) throws JMSException { + if (lazy || connection.isSendAcksAsync() || isTransacted()) { + asyncSendPacket(ack); + } else { + syncSendPacket(ack); + } + } }