resolve failure of org.apache.activemq.store.jdbc.JDBCStoreOrderTest - async ack when in a transaction and transaction store message store proxy missing async impl, belt and braces, fix async ack and add sync imple to proxy - following async changes from http://svn.apache.org/viewvc?rev=945102&view=rev

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@946135 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-05-19 11:48:38 +00:00
parent 3713a3bdeb
commit 50daa35a32
3 changed files with 26 additions and 2 deletions

View File

@ -666,7 +666,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
ack.setLastMessageId(node.getMessageId()); ack.setLastMessageId(node.getMessageId());
ack.setMessageCount(1); ack.setMessageCount(1);
} }
store.removeAsyncMessage(context, ack); if (context.isInTransaction()) {
store.removeMessage(context, ack);
} else {
store.removeAsyncMessage(context, ack);
}
} }
} }

View File

@ -28,7 +28,7 @@ import org.apache.activemq.command.MessageId;
import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.MemoryUsage;
abstract public class AbstractMessageStore implements MessageStore { abstract public class AbstractMessageStore implements MessageStore {
static final FutureTask<Object> FUTURE; public static final FutureTask<Object> FUTURE;
protected final ActiveMQDestination destination; protected final ActiveMQDestination destination;
public AbstractMessageStore(ActiveMQDestination destination) { public AbstractMessageStore(ActiveMQDestination destination) {

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import javax.transaction.xa.XAException; import javax.transaction.xa.XAException;
@ -28,6 +29,7 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId; import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.ProxyMessageStore; import org.apache.activemq.store.ProxyMessageStore;
@ -132,9 +134,18 @@ public class MemoryTransactionStore implements TransactionStore {
MemoryTransactionStore.this.addMessage(getDelegate(), send); MemoryTransactionStore.this.addMessage(getDelegate(), send);
} }
public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
MemoryTransactionStore.this.addMessage(getDelegate(), message);
return AbstractMessageStore.FUTURE;
}
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
MemoryTransactionStore.this.removeMessage(getDelegate(), ack); MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
} }
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
}
}; };
} }
@ -144,9 +155,18 @@ public class MemoryTransactionStore implements TransactionStore {
MemoryTransactionStore.this.addMessage(getDelegate(), send); MemoryTransactionStore.this.addMessage(getDelegate(), send);
} }
public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
MemoryTransactionStore.this.addMessage(getDelegate(), message);
return AbstractMessageStore.FUTURE;
}
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
MemoryTransactionStore.this.removeMessage(getDelegate(), ack); MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
} }
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
}
}; };
} }