fix intermittent failure in JDBCNegativeQueueTest, sync store add and cursor add such that cursor is always in order w.r.t the store. Hense, when the cursor cache is exhausted, resuming from the store is just fine. Without the sync, moving from cache to store can result in duplicate messages, out of order messages and on occasion missing messages. disabling the cache resolved all of the above but kills performance. Syncing the sore add and cursor add is the correct solution, but it has an impact on concurrent transaction completion for a destination, paralell completion across destinations can still continue, so there is still some batching potential

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@957899 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-06-25 11:49:01 +00:00
parent 00879cf683
commit fb591e7b89
4 changed files with 94 additions and 72 deletions

View File

@ -555,8 +555,13 @@ public abstract class BaseDestination implements Destination {
}
protected final void waitForSpace(ConnectionContext context, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException {
if (systemUsage.isSendFailIfNoSpace()) {
getLog().debug("sendFailIfNoSpace, forcing exception on send: " + warning);
throw new ResourceAllocationException(warning);
}
if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) {
getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send: " + warning);
throw new ResourceAllocationException(warning);
}
} else {

View File

@ -38,6 +38,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.ResourceAllocationException;
@ -100,7 +102,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
private MessageGroupMap messageGroupOwners;
private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
private final Object sendLock = new Object();
private final Lock sendLock = new ReentrantLock();
private ExecutorService executor;
protected final Map<MessageId, Runnable> messagesWaitingForSpace = Collections
.synchronizedMap(new LinkedHashMap<MessageId, Runnable>());
@ -446,7 +448,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
redeliveredWaitingDispatch.add(qmr);
}
if (!redeliveredWaitingDispatch.isEmpty()) {
doDispatch(new ArrayList());
doDispatch(new ArrayList<QueueMessageReference>());
}
}
if (!(this.optimizedDispatch || isSlave())) {
@ -596,57 +598,57 @@ public class Queue extends BaseDestination implements Task, UsageListener {
Exception {
final ConnectionContext context = producerExchange.getConnectionContext();
Future<Object> result = null;
synchronized (sendLock) {
if (store != null && message.isPersistent()) {
if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark() + "% of "
+ systemUsage.getStoreUsage().getLimit() + ". Stopping producer ("
+ message.getProducerId() + ") to prevent flooding "
+ getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info";
if (systemUsage.isSendFailIfNoSpace()) {
throw new ResourceAllocationException(logMessage);
}
waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
}
checkUsage(context, message);
sendLock.lockInterruptibly();
try {
if (store != null && message.isPersistent()) {
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
result = store.asyncAddQueueMessage(context, message);
}
}
if (context.isInTransaction()) {
// If this is a transacted message.. increase the usage now so that
// a big TX does not blow up
// our memory. This increment is decremented once the tx finishes..
message.incrementReferenceCount();
context.getTransaction().addSynchronization(new Synchronization() {
@Override
public void afterCommit() throws Exception {
try {
// It could take while before we receive the commit
// op, by that time the message could have expired..
if (broker.isExpired(message)) {
broker.messageExpired(context, message);
destinationStatistics.getExpired().increment();
return;
if (context.isInTransaction()) {
// If this is a transacted message.. increase the usage now so that
// a big TX does not blow up
// our memory. This increment is decremented once the tx finishes..
message.incrementReferenceCount();
context.getTransaction().addSynchronization(new Synchronization() {
@Override
public void beforeCommit() throws Exception {
sendLock.lockInterruptibly();
}
@Override
public void afterCommit() throws Exception {
try {
// It could take while before we receive the commit
// op, by that time the message could have expired..
if (broker.isExpired(message)) {
broker.messageExpired(context, message);
destinationStatistics.getExpired().increment();
return;
}
sendMessage(message);
} finally {
sendLock.unlock();
message.decrementReferenceCount();
}
sendMessage(context, message);
} finally {
messageSent(context, message);
}
@Override
public void afterRollback() throws Exception {
message.decrementReferenceCount();
}
}
@Override
public void afterRollback() throws Exception {
message.decrementReferenceCount();
}
});
} else {
// Add to the pending list, this takes care of incrementing the
// usage manager.
sendMessage(context, message);
});
} else {
// Add to the pending list, this takes care of incrementing the
// usage manager.
sendMessage(message);
}
} finally {
sendLock.unlock();
}
if (!context.isInTransaction()) {
messageSent(context, message);
}
if (result != null && !result.isCancelled()) {
try {
@ -658,6 +660,26 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
}
private void checkUsage(ConnectionContext context, Message message) throws ResourceAllocationException, IOException, InterruptedException {
if (message.isPersistent()) {
if (store != null && systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
final String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark() + "% of "
+ systemUsage.getStoreUsage().getLimit() + ". Stopping producer ("
+ message.getProducerId() + ") to prevent flooding "
+ getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info";
waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
}
} else if (messages.getSystemUsage() != null && systemUsage.getTempUsage().isFull()) {
final String logMessage = "Usage Manager Temp Store is Full. Stopping producer (" + message.getProducerId()
+ ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info";
waitForSpace(context, messages.getSystemUsage().getTempUsage(), logMessage);
}
}
private void expireMessages() {
if (LOG.isDebugEnabled()) {
LOG.debug("Expiring messages ..");
@ -1458,23 +1480,13 @@ public class Queue extends BaseDestination implements Task, UsageListener {
return answer;
}
final void sendMessage(final ConnectionContext context, Message msg) throws Exception {
if (!msg.isPersistent() && messages.getSystemUsage() != null) {
if (systemUsage.getTempUsage().isFull()) {
final String logMessage = "Usage Manager Temp Store is Full. Stopping producer (" + msg.getProducerId()
+ ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info";
if (systemUsage.isSendFailIfNoSpace()) {
throw new ResourceAllocationException(logMessage);
}
waitForSpace(context, messages.getSystemUsage().getTempUsage(), logMessage);
}
}
final void sendMessage(final Message msg) throws Exception {
synchronized (messages) {
messages.addMessageLast(msg);
}
}
final void messageSent(final ConnectionContext context, final Message msg) throws Exception {
destinationStatistics.getEnqueues().increment();
destinationStatistics.getMessages().increment();
messageDelivered(context, msg);

View File

@ -18,7 +18,6 @@ package org.apache.activemq.broker.region.cursors;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Properties;
@ -52,6 +51,8 @@ import org.apache.activemq.usage.StoreUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.TempUsage;
import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Modified CursorSupport Unit test to reproduce the negative queue issue.
@ -77,7 +78,8 @@ import org.apache.activemq.util.Wait;
*
*/
public class NegativeQueueTest extends TestCase {
private static final Log LOG = LogFactory.getLog(NegativeQueueTest.class);
public static SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd,hh:mm:ss:SSS");
private static final String QUEUE_1_NAME = "conn.test.queue.1";
@ -140,6 +142,7 @@ public class NegativeQueueTest extends TestCase {
}
public void blastAndConsume() throws Exception {
LOG.info(getName());
ConnectionFactory factory = createConnectionFactory();
//get proxy queues for statistics lookups
@ -208,7 +211,7 @@ public class NegativeQueueTest extends TestCase {
consumer.setMessageListener(new SessionAwareMessageListener(consumerSession, latch2, consumerList2));
}
latch2.await(300000, TimeUnit.MILLISECONDS);
assertTrue("got all expected messages on 2", latch2.await(300000, TimeUnit.MILLISECONDS));
producerConnection.close();
for(int ix=0; ix<NUM_CONSUMERS; ix++){
consumerConnections1[ix].close();
@ -299,6 +302,8 @@ public class NegativeQueueTest extends TestCase {
// disable the cache to be sure setBatch is the problem
// will get lots of duplicates
// real problem is sync between cursor and store add - leads to out or order messages
// in the cursor so setBatch can break.
// policy.setUseCache(false);
PolicyMap pMap = new PolicyMap();

View File

@ -21,20 +21,23 @@ public class JDBCNegativeQueueTest extends NegativeQueueTest {
dataSource = new EmbeddedDataSource();
dataSource.setDatabaseName("derbyDb");
dataSource.setCreateDatabase("create");
jdbc.setDataSource(dataSource);
jdbc.setDataSource(dataSource);
answer.setPersistenceAdapter(jdbc);
}
protected void tearDown() throws Exception {
/*Connection conn = dataSource.getConnection();
printQuery(conn, "Select * from ACTIVEMQ_MSGS", System.out); */
if (DEBUG) {
printQuery("Select * from ACTIVEMQ_MSGS", System.out);
}
super.tearDown();
}
private void printQuery(Connection c, String query, PrintStream out)
private void printQuery(String query, PrintStream out)
throws SQLException {
printQuery(c.prepareStatement(query), out);
Connection conn = dataSource.getConnection();
printQuery(conn.prepareStatement(query), out);
conn.close();
}
private void printQuery(PreparedStatement s, PrintStream out)
@ -69,7 +72,4 @@ public class JDBCNegativeQueueTest extends NegativeQueueTest {
}
}
}
}