resolve regression in ThreeBrokerVirtualTopicNetworkTest - asnyc tasks need to use destination in key as id is not uniqueue with virtual topics. Also, on a failed cancle, we must wait for the write to compete so the ack/remove does not lag the write leaving an outstanding message. consequence of fixes for https://issues.apache.org/activemq/browse/AMQ-2620 and

https://issues.apache.org/activemq/browse/AMQ-2568

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@948209 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-05-25 21:20:40 +00:00
parent a4983690cb
commit 33f4190343
3 changed files with 53 additions and 16 deletions

View File

@ -37,6 +37,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@ -72,19 +73,21 @@ import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Transaction;
public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
private static final Log LOG = LogFactory.getLog(KahaDBStore.class);
private static final int MAX_ASYNC_JOBS = 10000;
protected ExecutorService queueExecutor;
protected ExecutorService topicExecutor;
protected final Map<MessageId, StoreQueueTask> asyncQueueMap = new HashMap<MessageId, StoreQueueTask>();
protected final Map<AsyncJobKey, StoreQueueTask> asyncQueueMap = new HashMap<AsyncJobKey, StoreQueueTask>();
protected final Map<MessageId, StoreTopicTask> asyncTopicMap = new HashMap<MessageId, StoreTopicTask>();
private final WireFormat wireFormat = new OpenWireFormat();
private SystemUsage usageManager;
@ -95,11 +98,10 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
private boolean concurrentStoreAndDispatchQueues = true;
private boolean concurrentStoreAndDispatchTopics = true;
private int maxAsyncJobs = MAX_ASYNC_JOBS;
private Scheduler scheduler;
public KahaDBStore() {
}
public void setBrokerName(String brokerName) {
}
@ -197,8 +199,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
super.doStop(stopper);
}
protected StoreQueueTask removeQueueTask(MessageId id) {
StoreQueueTask task = this.asyncQueueMap.remove(id);
protected StoreQueueTask removeQueueTask(ActiveMQDestination activeMQDestination, MessageId id) {
StoreQueueTask task = this.asyncQueueMap.remove(new AsyncJobKey(id, activeMQDestination));
if (task != null) {
task.getMessage().decrementReferenceCount();
this.queueSemaphore.release();
@ -206,14 +208,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
return task;
}
protected void addQueueTask(StoreQueueTask task) throws IOException {
protected void addQueueTask(ActiveMQDestination activeMQDestination, StoreQueueTask task) throws IOException {
try {
this.queueSemaphore.acquire();
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
}
this.asyncQueueMap.put(task.getMessage().getMessageId(), task);
this.asyncQueueMap.put(new AsyncJobKey(task.getMessage().getMessageId(), activeMQDestination), task);
task.getMessage().incrementReferenceCount();
this.queueExecutor.execute(task);
}
@ -302,7 +304,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
throws IOException {
if (isConcurrentStoreAndDispatchQueues()) {
StoreQueueTask result = new StoreQueueTask(this, context, message);
addQueueTask(result);
addQueueTask(destination, result);
return result.getFuture();
} else {
return super.asyncAddQueueMessage(context, message);
@ -312,9 +314,16 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
@Override
public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
if (isConcurrentStoreAndDispatchQueues()) {
StoreQueueTask task = removeQueueTask(ack.getLastMessageId());
StoreQueueTask task = removeQueueTask(destination, ack.getLastMessageId());
if (task != null) {
if (!task.cancel()) {
try {
task.future.get();
} catch (InterruptedException e) {
throw new InterruptedIOException(e.toString());
} catch (Exception ignored) {
LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored);
}
removeMessage(context, ack);
}
} else {
@ -334,7 +343,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null);
}
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
@ -894,6 +902,35 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
}
static class AsyncJobKey {
MessageId id;
ActiveMQDestination destination;
AsyncJobKey(MessageId id, ActiveMQDestination destination) {
this.id = id;
this.destination = destination;
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
return obj instanceof AsyncJobKey &&
id.equals(((AsyncJobKey)obj).id) &&
destination.equals(((AsyncJobKey)obj).destination);
}
@Override
public int hashCode() {
return id.hashCode() + destination.hashCode();
}
public String toString() {
return destination.getPhysicalName() + "-" + id;
}
}
class StoreQueueTask implements Runnable {
protected final Message message;
protected final ConnectionContext context;
@ -915,8 +952,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public boolean cancel() {
if (this.done.compareAndSet(false, true)) {
this.future.cancel(false);
return true;
return this.future.cancel(false);
}
return false;
}
@ -925,7 +961,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
try {
if (this.done.compareAndSet(false, true)) {
this.store.addMessage(context, message);
removeQueueTask(this.message.getMessageId());
removeQueueTask(this.store.getDestination(), this.message.getMessageId());
this.future.complete();
}
} catch (Exception e) {

View File

@ -744,7 +744,6 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
synchronized (indexMutex) {
ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
inflightTx.add(new AddOpperation(command, location));
TransactionId key = key(command.getTransactionInfo());
}
} else {
synchronized (indexMutex) {

View File

@ -114,6 +114,7 @@ public class ThreeBrokerVirtualTopicNetworkTest extends JmsMultipleBrokersTestSu
// ensure we don't get any more messages
Thread.sleep(2000);
LOG.info("MessagesA: " + msgsA.getMessageIds());
assertEquals(10, msgsA.getMessageCount());
assertEquals(11, msgsB.getMessageCount());
assertEquals(11, msgsC.getMessageCount());
@ -141,6 +142,7 @@ public class ThreeBrokerVirtualTopicNetworkTest extends JmsMultipleBrokersTestSu
// ensure we don't get any more messages
Thread.sleep(5000);
LOG.info("Extra MessagesA: " + msgsA.getMessageIds());
assertEquals(0, msgsA.getMessageCount());
assertEquals(11, msgsB.getMessageCount());
assertEquals(11, msgsC.getMessageCount());