Clean up a bit, extend test timeout to account for slow CI machines,
remove System prints and replace with LOG.
This commit is contained in:
Timothy Bish 2015-07-06 16:47:58 -04:00
parent 3ba28f6221
commit c38a61d7ae
3 changed files with 48 additions and 76 deletions

View File

@ -648,17 +648,17 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
synchronized(dispatchLock) {
pending.remove();
node.decrementReferenceCount();
if( !isDropped(node) && canDispatch(node)) {
if (!isDropped(node) && canDispatch(node)) {
// Message may have been sitting in the pending
// list a while waiting for the consumer to ak the message.
if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
if (node != QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
//increment number to dispatch
numberToDispatch++;
if (broker.isExpired(node)) {
((Destination)node.getRegionDestination()).messageExpired(context, this, node);
}
//AMQ-5340
if (!isBrowser()) {
continue;
}

View File

@ -492,20 +492,15 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
} else if (md.getMessage() == null) {
return null;
//AMQ-5340 - only check for expired if not a browser
} else if (!isBrowser() && isConsumerExpiryCheckEnabled() && md.getMessage().isExpired()) {
if (LOG.isDebugEnabled()) {
LOG.debug(getConsumerId() + " received expired message: " + md);
}
} else if (consumeExpiredMessage(md)) {
LOG.debug("{} received expired message: {}", getConsumerId(), md);
beforeMessageIsConsumed(md);
afterMessageIsConsumed(md, true);
if (timeout > 0) {
timeout = Math.max(deadline - System.currentTimeMillis(), 0);
}
} else if (redeliveryExceeded(md)) {
if (LOG.isDebugEnabled()) {
LOG.debug(getConsumerId() + " received with excessive redelivered: " + md);
}
LOG.debug("{} received with excessive redelivered: {}", getConsumerId(), md);
posionAck(md, "dispatch to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
} else {
if (LOG.isTraceEnabled()) {
@ -520,6 +515,14 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
}
private boolean consumeExpiredMessage(MessageDispatch dispatch) {
if (dispatch.getMessage().isExpired()) {
return !isBrowser() && isConsumerExpiryCheckEnabled();
}
return false;
}
private void posionAck(MessageDispatch md, String cause) throws JMSException {
MessageAck posionAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
posionAck.setFirstMessageId(md.getMessage().getMessageId());
@ -721,9 +724,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
boolean interrupted = Thread.interrupted();
dispose();
RemoveInfo removeCommand = info.createRemoveCommand();
if (LOG.isDebugEnabled()) {
LOG.debug("remove: " + this.getConsumerId() + ", lastDeliveredSequenceId:" + lastDeliveredSequenceId);
}
LOG.debug("remove: {}, lastDeliveredSequenceId: {}", getConsumerId(), lastDeliveredSequenceId);
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
this.session.asyncSendPacket(removeCommand);
if (interrupted) {
@ -741,9 +742,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
if (inProgressClearRequiredFlag.get() > 0) {
synchronized (unconsumedMessages.getMutex()) {
if (inProgressClearRequiredFlag.get() > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug(getConsumerId() + " clearing unconsumed list (" + unconsumedMessages.size() + ") on transport interrupt");
}
LOG.debug("{} clearing unconsumed list ({}) on transport interrupt", getConsumerId(), unconsumedMessages.size());
// ensure unconsumed are rolledback up front as they may get redelivered to another consumer
List<MessageDispatch> list = unconsumedMessages.removeAll();
if (!this.info.isBrowser()) {
@ -849,9 +848,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
if (!this.info.isBrowser()) {
for (MessageDispatch old : list) {
// ensure we don't filter this as a duplicate
if (LOG.isDebugEnabled()) {
LOG.debug("on close, rollback duplicate: " + old.getMessage().getMessageId());
}
LOG.debug("on close, rollback duplicate: {}", old.getMessage().getMessageId());
session.connection.rollbackDuplicate(this, old.getMessage());
}
}
@ -1000,8 +997,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
*/
private MessageAck makeAckForAllDeliveredMessages(byte type) {
synchronized (deliveredMessages) {
if (deliveredMessages.isEmpty())
if (deliveredMessages.isEmpty()) {
return null;
}
MessageDispatch md = deliveredMessages.getFirst();
MessageAck ack = new MessageAck(md, type, deliveredMessages.size());
@ -1030,23 +1028,17 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} else {
// old pending ack being superseded by ack of another type, if is is not a delivered
// ack and hence important, send it now so it is not lost.
if ( !oldPendingAck.isDeliveredAck()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
}
if (!oldPendingAck.isDeliveredAck()) {
LOG.debug("Sending old pending ack {}, new pending: {}", oldPendingAck, pendingAck);
session.sendAck(oldPendingAck);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("dropping old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
}
LOG.debug("dropping old pending ack {}, new pending: {}", oldPendingAck, pendingAck);
}
}
// AMQ-3956 evaluate both expired and normal msgs as
// otherwise consumer may get stalled
if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter + ackCounter - additionalWindowSize)) {
if (LOG.isDebugEnabled()) {
LOG.debug("ackLater: sending: " + pendingAck);
}
LOG.debug("ackLater: sending: {}", pendingAck);
session.sendAck(pendingAck);
pendingAck=null;
deliveredCounter = 0;
@ -1100,8 +1092,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
synchronized(deliveredMessages) {
// Acknowledge all messages so far.
MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
if (ack == null)
if (ack == null) {
return; // no msgs
}
if (session.getTransacted()) {
rollbackOnFailedRecoveryRedelivery();
@ -1138,8 +1131,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
}
if (numberNotReplayed > 0) {
LOG.info("waiting for redelivery of " + numberNotReplayed + " in transaction: "
+ previouslyDeliveredMessages.transactionId + ", to consumer :" + this.getConsumerId());
LOG.info("waiting for redelivery of {} in transaction: {}, to consumer: {}",
numberNotReplayed, this.getConsumerId(), previouslyDeliveredMessages.transactionId);
try {
Thread.sleep(Math.max(500, failoverRedeliveryWaitPeriod/4));
} catch (InterruptedException outOfhere) {
@ -1161,11 +1154,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
if (!entry.getValue()) {
numberNotReplayed++;
if (LOG.isDebugEnabled()) {
LOG.debug("previously delivered message has not been replayed in transaction: "
+ previouslyDeliveredMessages.transactionId
+ " , messageId: " + entry.getKey());
}
LOG.debug("previously delivered message has not been replayed in transaction: {}, messageId: {}",
previouslyDeliveredMessages.transactionId, entry.getKey());
}
}
if (numberNotReplayed > 0) {
@ -1338,9 +1328,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
if (previouslyDeliveredMessages != null) {
for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
if (!entry.getValue()) {
if (LOG.isTraceEnabled()) {
LOG.trace("rollback non redelivered: " + entry.getKey());
}
LOG.trace("rollback non redelivered: {}" + entry.getKey());
removeFromDeliveredMessages(entry.getKey());
}
}
@ -1396,7 +1384,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
afterMessageIsConsumed(md, expired);
} catch (RuntimeException e) {
LOG.error(getConsumerId() + " Exception while processing message: " + md.getMessage().getMessageId(), e);
LOG.error("{} Exception while processing message: {}", getConsumerId(), md.getMessage().getMessageId(), e);
if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) {
// schedual redelivery and possible dlq processing
md.setRollbackCause(e);
@ -1421,9 +1409,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
// deal with duplicate delivery
ConsumerId consumerWithPendingTransaction;
if (redeliveryExpectedInCurrentTransaction(md, true)) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} tracking transacted redelivery {}", getConsumerId(), md.getMessage());
}
LOG.debug("{} tracking transacted redelivery {}", getConsumerId(), md.getMessage());
if (transactedIndividualAck) {
immediateIndividualTransactedAck(md);
} else {
@ -1490,15 +1476,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
for (MessageDispatch delivered : deliveredMessages) {
previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false);
}
if (LOG.isDebugEnabled()) {
LOG.debug(getConsumerId() + " tracking existing transacted " + previouslyDeliveredMessages.transactionId +
" delivered list (" + deliveredMessages.size() + ") on transport interrupt");
}
LOG.debug("{} tracking existing transacted {} delivered list ({}) on transport interrupt",
getConsumerId(), previouslyDeliveredMessages.transactionId, deliveredMessages.size());
} else {
if (session.isClientAcknowledge()) {
if (LOG.isDebugEnabled()) {
LOG.debug(getConsumerId() + " rolling back delivered list (" + deliveredMessages.size() + ") on transport interrupt");
}
LOG.debug("{} rolling back delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size());
// allow redelivery
if (!this.info.isBrowser()) {
for (MessageDispatch md: deliveredMessages) {
@ -1506,9 +1488,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug(getConsumerId() + " clearing delivered list (" + deliveredMessages.size() + ") on transport interrupt");
}
LOG.debug("{} clearing delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size());
deliveredMessages.clear();
pendingAck = null;
}
@ -1608,9 +1588,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
public void run() {
try {
if (optimizeAcknowledge && !unconsumedMessages.isClosed()) {
if (LOG.isInfoEnabled()) {
LOG.info("Consumer:{} is performing scheduled delivery of outstanding optimized Acks", info.getConsumerId());
}
LOG.info("Consumer:{} is performing scheduled delivery of outstanding optimized Acks", info.getConsumerId());
deliverAcks();
}
} catch (Exception e) {

View File

@ -58,8 +58,7 @@ public class JmsQueueBrowserExpirationTest {
// Message expires after 1 second
private static final long TTL = 1000;
private static final Logger LOG = LoggerFactory
.getLogger(JmsQueueBrowserExpirationTest.class);
private static final Logger LOG = LoggerFactory.getLogger(JmsQueueBrowserExpirationTest.class);
private BrokerService broker;
private URI connectUri;
@ -81,12 +80,13 @@ public class JmsQueueBrowserExpirationTest {
@After
public void stopBroker() throws Exception {
broker.stop();
broker.waitUntilStopped();
if (broker != null) {
broker.stop();
broker.waitUntilStopped();
}
}
//This should finish in under 3 seconds because the messages should be expired
@Test(timeout=3000)
@Test(timeout=10000)
public void testBrowsingExpiration() throws JMSException, InterruptedException {
sendTestMessages();
@ -106,19 +106,14 @@ public class JmsQueueBrowserExpirationTest {
// Give JMS threads more opportunity to do their work.
Thread.sleep(100);
browsed = browse(queue, browserConnection);
String time =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - begin)
+ " ms";
System.out.println("[" + time + "] found " + browsed + " messages");
LOG.info("[{}ms] found {}", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - begin), browsed);
}
System.out.println("Finished");
LOG.info("Finished");
browserConnection.close();
}
private int browse(ActiveMQQueue queue, Connection connection)
throws JMSException {
Session session =
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
private int browse(ActiveMQQueue queue, Connection connection) throws JMSException {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> enumeration = browser.getEnumeration();
int browsed = 0;
@ -146,8 +141,7 @@ public class JmsQueueBrowserExpirationTest {
producer.send(prodSession.createTextMessage(msgStr));
LOG.info("P&C: {}", msgStr);
}
prodSession.close();
}
}