AMQ-7234 - fix up memory usage wait timeout such that topic pfc in a transaction can see connection context state changes, fix and test

This commit is contained in:
gtully 2019-06-21 16:55:13 +01:00
parent efc857fc1f
commit de3f77063f
5 changed files with 153 additions and 10 deletions

View File

@ -694,7 +694,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
// While waiting for space to free up... the
// transaction may be done
if (message.isInTransaction()) {
if (context.getTransaction().getState() > IN_USE_STATE) {
if (context.getTransaction() == null || context.getTransaction().getState() > IN_USE_STATE) {
throw new JMSException("Send transaction completed while waiting for space");
}
}

View File

@ -63,6 +63,10 @@ import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.JMSException;
import static org.apache.activemq.transaction.Transaction.IN_USE_STATE;
/**
* The Topic is a destination that sends a copy of a message to every active
* Subscription registered.
@ -409,8 +413,15 @@ public class Topic extends BaseDestination implements Task {
public void run() {
try {
// While waiting for space to free up... the
// message may have expired.
// While waiting for space to free up...
// the transaction may be done
if (message.isInTransaction()) {
if (context.getTransaction() == null || context.getTransaction().getState() > IN_USE_STATE) {
throw new JMSException("Send transaction completed while waiting for space");
}
}
// the message may have expired.
if (message.isExpired()) {
broker.messageExpired(context, message, null);
getDestinationStatistics().getExpired().increment();

View File

@ -94,7 +94,7 @@ public class MemoryUsage extends Usage<MemoryUsage> {
* @return true if space
*/
@Override
public boolean waitForSpace(long timeout) throws InterruptedException {
public boolean waitForSpace(final long timeout) throws InterruptedException {
if (parent != null) {
if (!parent.waitForSpace(timeout)) {
return false;
@ -106,12 +106,15 @@ public class MemoryUsage extends Usage<MemoryUsage> {
usageLock.readLock().unlock();
usageLock.writeLock().lock();
try {
while (percentUsage >= 100 ) {
waitForSpaceCondition.await(timeout, TimeUnit.MILLISECONDS);
final long deadline = timeout > 0 ? System.currentTimeMillis() + timeout : Long.MAX_VALUE;
long timeleft = deadline;
while (percentUsage >= 100 && timeleft > 0) {
waitForSpaceCondition.await(Math.min(getPollingTime(), timeleft), TimeUnit.MILLISECONDS);
timeleft = deadline - System.currentTimeMillis();
}
usageLock.readLock().lock();
} finally {
usageLock.writeLock().unlock();
usageLock.readLock().lock();
}
}

View File

@ -17,9 +17,6 @@
package org.apache.activemq.usage;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
@ -29,6 +26,11 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class MemoryUsageTest {
MemoryUsage underTest;
@ -83,6 +85,15 @@ public class MemoryUsageTest {
assertEquals("limits are still matched whole", underTest.getLimit(), child.getLimit());
}
@Test(timeout=2000)
public void testLimitedWaitFail() throws Exception {
underTest.setLimit(10);
underTest.start();
underTest.increaseUsage(11);
assertFalse("did not get usage within limit", underTest.waitForSpace(500));
}
@Before
public void setUp() throws Exception {
underTest = new MemoryUsage();

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.usecases;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -33,9 +34,11 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.activemq.util.Wait;
@ -57,6 +60,8 @@ public class TopicProducerFlowControlTest extends TestCase implements MessageLis
private BrokerService broker;
protected void setUp() throws Exception {
produced.set(0);
consumed.set(0);
// Setup and start the broker
broker = new BrokerService();
broker.setBrokerName(brokerName);
@ -202,6 +207,119 @@ public class TopicProducerFlowControlTest extends TestCase implements MessageLis
}
}
public void testTransactedProducerBlockedAndClosedWillRelease() throws Exception {
doTestTransactedProducerBlockedAndClosedWillRelease(false);
}
public void testTransactedSyncSendProducerBlockedAndClosedWillRelease() throws Exception {
doTestTransactedProducerBlockedAndClosedWillRelease(true);
}
public void doTestTransactedProducerBlockedAndClosedWillRelease(final boolean alwaysSyncSend) throws Exception {
// Create the connection factory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
connectionFactory.setWatchTopicAdvisories(false);
connectionFactory.setAlwaysSyncSend(alwaysSyncSend);
Connection c = connectionFactory.createConnection();
c.start();
ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
prefetchPolicy.setAll(5000);
connectionFactory.setPrefetchPolicy(prefetchPolicy);
// Start the test destination listener
Session listenerSession = c.createSession(false, 1);
Destination destination = createDestination(listenerSession);
final AtomicInteger warnings = new AtomicInteger();
Appender appender = new DefaultTestAppender() {
@Override
public void doAppend(LoggingEvent event) {
if (event.getLevel().equals(Level.WARN) && event.getMessage().toString().contains("Usage Manager memory limit reached")) {
LOG.info("received log message: " + event.getMessage());
warnings.incrementAndGet();
}
}
};
org.apache.log4j.Logger log4jLogger =
org.apache.log4j.Logger.getLogger(Topic.class);
log4jLogger.addAppender(appender);
try {
// Start producing the test messages
final Session session = connectionFactory.createConnection().createSession(true, Session.SESSION_TRANSACTED);
final MessageProducer producer = session.createProducer(destination);
Thread producingThread = new Thread("Producing Thread") {
public void run() {
try {
for (long i = 0; i < numMessagesToSend; i++) {
producer.send(session.createTextMessage("test"));
long count = produced.incrementAndGet();
if (count % 10000 == 0) {
LOG.info("Produced " + count + " messages");
}
}
} catch (Throwable ex) {
ex.printStackTrace();
} finally {
try {
producer.close();
session.close();
} catch (Exception e) {
}
}
}
};
producingThread.start();
assertTrue("Producer got blocked", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return warnings.get() > 0;
}
}, 5 * 1000));
LOG.info("Produced: " + produced.get() + ", Warnings:" + warnings.get());
assertTrue("Producer got blocked", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return warnings.get() > 0;
}
}, 5 * 1000));
final long enqueueCountWhenBlocked = broker.getDestination(ActiveMQDestination.transform(destination)).getDestinationStatistics().getEnqueues().getCount();
// now whack the hung connection broker side (mimic jmx), and verify usage gone b/c of rollback
for (TransportConnection transportConnection : broker.getTransportConnectors().get(0).getConnections()) {
transportConnection.serviceException(new IOException("forcing close for hung connection"));
}
assertTrue("Usage gets released on close", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
LOG.info("Usage: " + broker.getSystemUsage().getMemoryUsage().getUsage());
return broker.getSystemUsage().getMemoryUsage().getUsage() == 0;
}
}, 5 * 1000));
c.close();
// verify no pending sends completed in rolledback tx
assertEquals("nothing sent during close", enqueueCountWhenBlocked, broker.getDestination(ActiveMQDestination.transform(destination)).getDestinationStatistics().getEnqueues().getCount());
} finally {
log4jLogger.removeAppender(appender);
}
}
protected Destination createDestination(Session listenerSession) throws Exception {
return new ActiveMQTopic("test");
}