[AMQ-6614] fix up jmx blockedSendsCount and producer view blocking flag for async send case. fix and test

This commit is contained in:
gtully 2017-03-02 17:00:16 +00:00
parent eab9a0d057
commit e67d48680f
3 changed files with 78 additions and 13 deletions

View File

@ -43,7 +43,7 @@ public class ProducerBrokerExchange {
private boolean auditProducerSequenceIds;
private boolean isNetworkProducer;
private BrokerService brokerService;
private final FlowControlInfo flowControlInfo = new FlowControlInfo();
private FlowControlInfo flowControlInfo = new FlowControlInfo();
public ProducerBrokerExchange() {
}
@ -55,6 +55,7 @@ public class ProducerBrokerExchange {
rc.region = region;
rc.producerState = producerState;
rc.mutable = mutable;
rc.flowControlInfo = flowControlInfo;
return rc;
}

View File

@ -689,10 +689,15 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
} else {
LOG.debug("unexpected exception on deferred send of: {}", message, e);
}
} finally {
getDestinationStatistics().getBlockedSends().decrement();
producerExchangeCopy.blockingOnFlowControl(false);
}
}
});
getDestinationStatistics().getBlockedSends().increment();
producerExchange.blockingOnFlowControl(true);
if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage
.getSendFailIfNoSpaceAfterTimeout()));

View File

@ -19,9 +19,9 @@ package org.apache.activemq.transport.nio;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.DestinationView;
import org.apache.activemq.broker.jmx.ProducerViewMBean;
import org.apache.activemq.broker.jmx.QueueView;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
@ -109,7 +109,7 @@ public class NIOAsyncSendWithPFCTest extends TestCase {
try {
sendMessagesAsync(1, DESTINATION_TWO);
sendMessages(1, DESTINATION_TWO, false);
} catch (Exception ex) {
LOG.error("Ex on send new connection", ex);
fail("*** received the following exception when creating addition producer new connection:" + ex);
@ -148,6 +148,7 @@ public class NIOAsyncSendWithPFCTest extends TestCase {
//wait till producer follow control kicks in
waitForProducerFlowControl(broker, queueView);
assertTrue("Producer view blocked", getProducerView(broker, DESTINATION_ONE).isProducerBlocked());
try {
Session producerSession = exisitngConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -164,6 +165,34 @@ public class NIOAsyncSendWithPFCTest extends TestCase {
}
public void testSyncSendPFCExistingConnection() throws Exception {
BrokerService broker = createBroker();
broker.waitUntilStarted();
ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_PRODUCERS);
QueueView queueView = getQueueView(broker, DESTINATION_ONE);
try {
for (int i = 0; i < NUMBER_OF_PRODUCERS; i++) {
executorService.submit(new ProducerTask(true));
}
//wait till producer follow control kicks in
waitForProducerFlowControl(broker, queueView);
assertTrue("Producer view blocked", getProducerView(broker, DESTINATION_ONE).isProducerBlocked());
} finally {
broker.stop();
broker.waitUntilStopped();
}
}
private void waitForProducerFlowControl(BrokerService broker, QueueView queueView) throws Exception {
@ -171,20 +200,30 @@ public class NIOAsyncSendWithPFCTest extends TestCase {
boolean blockingAllSends;
do {
blockingAllSends = queueView.getBlockedSends() > 10;
blockingAllSends = queueView.getBlockedSends() >= 10;
LOG.info("Blocking all sends:" + queueView.getBlockedSends());
Thread.sleep(1000);
} while (!blockingAllSends);
}
class ProducerTask implements Runnable {
boolean sync = false;
ProducerTask() {
this(false);
}
ProducerTask(boolean sync) {
this.sync = sync;
}
@Override
public void run() {
try {
//send X messages
sendMessagesAsync(MESSAGES_TO_SEND, DESTINATION_ONE);
sendMessages(MESSAGES_TO_SEND, DESTINATION_ONE, sync);
} catch (Exception e) {
e.printStackTrace();
}
@ -192,7 +231,7 @@ public class NIOAsyncSendWithPFCTest extends TestCase {
}
private Long sendMessagesAsync(int messageCount, String destination) throws Exception {
private Long sendMessages(int messageCount, String destination, boolean sync) throws Exception {
long numberOfMessageSent = 0;
@ -201,7 +240,12 @@ public class NIOAsyncSendWithPFCTest extends TestCase {
ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.setUseAsyncSend(true);
if (sync) {
connection.setUseAsyncSend(false);
connection.setAlwaysSyncSend(true);
} else {
connection.setUseAsyncSend(true);
}
connection.start();
try {
@ -221,16 +265,16 @@ public class NIOAsyncSendWithPFCTest extends TestCase {
LOG.info(" Finished after producing : " + numberOfMessageSent);
return numberOfMessageSent;
} catch (Exception ex) {
LOG.info("Exception received producing ", ex);
LOG.info("finishing after exception :" + numberOfMessageSent);
return numberOfMessageSent;
} catch (JMSException expected) {
LOG.debug("Exception received producing ", expected);
} finally {
if (connection != null) {
connection.close();
try {
connection.close();
} catch (JMSException ignored) {}
}
}
return numberOfMessageSent;
}
private TextMessage createTextMessage(Session session) throws JMSException {
@ -264,5 +308,20 @@ public class NIOAsyncSendWithPFCTest extends TestCase {
return null;
}
private ProducerViewMBean getProducerView(BrokerService broker, String qName) throws Exception {
ObjectName[] qProducers = broker.getAdminView().getQueueProducers();
for (ObjectName name : qProducers) {
ProducerViewMBean proxy = (ProducerViewMBean) broker.getManagementContext()
.newProxyInstance(name, ProducerViewMBean.class, true);
LOG.info("" + proxy.getProducerId() + ", dest: " + proxy.getDestinationName() + ", blocked: " + proxy.isProducerBlocked());
if (proxy.getDestinationName().contains(qName)) {
return proxy;
}
}
return null;
}
}