From e67d48680f16061d0a5fbd09129ce8f0a7759255 Mon Sep 17 00:00:00 2001 From: gtully Date: Thu, 2 Mar 2017 17:00:16 +0000 Subject: [PATCH] [AMQ-6614] fix up jmx blockedSendsCount and producer view blocking flag for async send case. fix and test --- .../broker/ProducerBrokerExchange.java | 3 +- .../apache/activemq/broker/region/Queue.java | 5 ++ .../nio/NIOAsyncSendWithPFCTest.java | 83 ++++++++++++++++--- 3 files changed, 78 insertions(+), 13 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java b/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java index bf1d21e800..26652debe6 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java @@ -43,7 +43,7 @@ public class ProducerBrokerExchange { private boolean auditProducerSequenceIds; private boolean isNetworkProducer; private BrokerService brokerService; - private final FlowControlInfo flowControlInfo = new FlowControlInfo(); + private FlowControlInfo flowControlInfo = new FlowControlInfo(); public ProducerBrokerExchange() { } @@ -55,6 +55,7 @@ public class ProducerBrokerExchange { rc.region = region; rc.producerState = producerState; rc.mutable = mutable; + rc.flowControlInfo = flowControlInfo; return rc; } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 6283232fb4..2b5c0c37ad 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -689,10 +689,15 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } else { LOG.debug("unexpected exception on deferred send of: {}", message, e); } + } finally { + getDestinationStatistics().getBlockedSends().decrement(); + producerExchangeCopy.blockingOnFlowControl(false); } } }); + getDestinationStatistics().getBlockedSends().increment(); + producerExchange.blockingOnFlowControl(true); if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) { flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage .getSendFailIfNoSpaceAfterTimeout())); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest.java index a9cc901e67..8c971bfd32 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/nio/NIOAsyncSendWithPFCTest.java @@ -19,9 +19,9 @@ package org.apache.activemq.transport.nio; import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.TestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.DestinationView; +import org.apache.activemq.broker.jmx.ProducerViewMBean; import org.apache.activemq.broker.jmx.QueueView; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; @@ -109,7 +109,7 @@ public class NIOAsyncSendWithPFCTest extends TestCase { try { - sendMessagesAsync(1, DESTINATION_TWO); + sendMessages(1, DESTINATION_TWO, false); } catch (Exception ex) { LOG.error("Ex on send new connection", ex); fail("*** received the following exception when creating addition producer new connection:" + ex); @@ -148,6 +148,7 @@ public class NIOAsyncSendWithPFCTest extends TestCase { //wait till producer follow control kicks in waitForProducerFlowControl(broker, queueView); + assertTrue("Producer view blocked", getProducerView(broker, DESTINATION_ONE).isProducerBlocked()); try { Session producerSession = exisitngConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -164,6 +165,34 @@ public class NIOAsyncSendWithPFCTest extends TestCase { + } + + public void testSyncSendPFCExistingConnection() throws Exception { + + BrokerService broker = createBroker(); + broker.waitUntilStarted(); + + ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_PRODUCERS); + QueueView queueView = getQueueView(broker, DESTINATION_ONE); + + try { + + for (int i = 0; i < NUMBER_OF_PRODUCERS; i++) { + + executorService.submit(new ProducerTask(true)); + + } + + //wait till producer follow control kicks in + waitForProducerFlowControl(broker, queueView); + assertTrue("Producer view blocked", getProducerView(broker, DESTINATION_ONE).isProducerBlocked()); + + + } finally { + broker.stop(); + broker.waitUntilStopped(); + } + } private void waitForProducerFlowControl(BrokerService broker, QueueView queueView) throws Exception { @@ -171,20 +200,30 @@ public class NIOAsyncSendWithPFCTest extends TestCase { boolean blockingAllSends; do { - blockingAllSends = queueView.getBlockedSends() > 10; + blockingAllSends = queueView.getBlockedSends() >= 10; + LOG.info("Blocking all sends:" + queueView.getBlockedSends()); Thread.sleep(1000); } while (!blockingAllSends); } class ProducerTask implements Runnable { + boolean sync = false; + + ProducerTask() { + this(false); + } + + ProducerTask(boolean sync) { + this.sync = sync; + } @Override public void run() { try { //send X messages - sendMessagesAsync(MESSAGES_TO_SEND, DESTINATION_ONE); + sendMessages(MESSAGES_TO_SEND, DESTINATION_ONE, sync); } catch (Exception e) { e.printStackTrace(); } @@ -192,7 +231,7 @@ public class NIOAsyncSendWithPFCTest extends TestCase { } - private Long sendMessagesAsync(int messageCount, String destination) throws Exception { + private Long sendMessages(int messageCount, String destination, boolean sync) throws Exception { long numberOfMessageSent = 0; @@ -201,7 +240,12 @@ public class NIOAsyncSendWithPFCTest extends TestCase { ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); - connection.setUseAsyncSend(true); + if (sync) { + connection.setUseAsyncSend(false); + connection.setAlwaysSyncSend(true); + } else { + connection.setUseAsyncSend(true); + } connection.start(); try { @@ -221,16 +265,16 @@ public class NIOAsyncSendWithPFCTest extends TestCase { LOG.info(" Finished after producing : " + numberOfMessageSent); return numberOfMessageSent; - } catch (Exception ex) { - LOG.info("Exception received producing ", ex); - LOG.info("finishing after exception :" + numberOfMessageSent); - return numberOfMessageSent; + } catch (JMSException expected) { + LOG.debug("Exception received producing ", expected); } finally { if (connection != null) { - connection.close(); + try { + connection.close(); + } catch (JMSException ignored) {} } } - + return numberOfMessageSent; } private TextMessage createTextMessage(Session session) throws JMSException { @@ -264,5 +308,20 @@ public class NIOAsyncSendWithPFCTest extends TestCase { return null; } + private ProducerViewMBean getProducerView(BrokerService broker, String qName) throws Exception { + ObjectName[] qProducers = broker.getAdminView().getQueueProducers(); + for (ObjectName name : qProducers) { + ProducerViewMBean proxy = (ProducerViewMBean) broker.getManagementContext() + .newProxyInstance(name, ProducerViewMBean.class, true); + + LOG.info("" + proxy.getProducerId() + ", dest: " + proxy.getDestinationName() + ", blocked: " + proxy.isProducerBlocked()); + + if (proxy.getDestinationName().contains(qName)) { + return proxy; + } + } + return null; + } + }