git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@901300 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2010-01-20 17:42:00 +00:00
parent 862cd718ed
commit affd91eda3
9 changed files with 284 additions and 19 deletions

View File

@ -19,7 +19,6 @@ package org.apache.activemq.broker.region.cursors;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map.Entry;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.Message;
@ -47,6 +46,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
this.regionDestination=destination;
}
@Override
public final synchronized void start() throws Exception{
if (!isStarted()) {
super.start();
@ -60,6 +60,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
}
@Override
public final synchronized void stop() throws Exception {
resetBatch();
super.stop();
@ -91,6 +92,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
return recovered;
}
@Override
public final void reset() {
if (batchList.isEmpty()) {
try {
@ -104,6 +106,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
size();
}
@Override
public synchronized void release() {
clearIterator(false);
}
@ -127,6 +130,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
public final void finished() {
}
@Override
public final synchronized boolean hasNext() {
if (batchList.isEmpty()) {
try {
@ -140,6 +144,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
return this.iterator.hasNext();
}
@Override
public final synchronized MessageReference next() {
MessageReference result = null;
if (!this.batchList.isEmpty()&&this.iterator.hasNext()) {
@ -149,6 +154,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
return result;
}
@Override
public final synchronized void addMessageLast(MessageReference node) throws Exception {
if (cacheEnabled && hasSpace()) {
recoverMessage(node.getMessage(),true);
@ -171,11 +177,13 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
protected void setBatch(MessageId messageId) throws Exception {
}
@Override
public final synchronized void addMessageFirst(MessageReference node) throws Exception {
cacheEnabled=false;
size++;
}
@Override
public final synchronized void remove() {
size--;
if (iterator!=null) {
@ -184,7 +192,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
if (last != null) {
last.decrementReferenceCount();
}
if (size==0 && isStarted() && useCache && hasSpace() && getStoreSize() == 0) {
if (size==0 && isStarted() && useCache && hasSpace() && isStoreEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() + " enabling cache on last remove");
}
@ -192,16 +200,19 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
}
@Override
public final synchronized void remove(MessageReference node) {
size--;
cacheEnabled=false;
batchList.remove(node.getMessageId());
}
@Override
public final synchronized void clear() {
gc();
}
@Override
public final synchronized void gc() {
for (Message msg : batchList.values()) {
rollback(msg.getMessageId());
@ -218,6 +229,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
}
@Override
protected final synchronized void fillBatch() {
if (batchResetNeeded) {
resetBatch();
@ -237,15 +249,18 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
}
@Override
public final synchronized boolean isEmpty() {
// negative means more messages added to store through queue.send since last reset
return size == 0;
}
@Override
public final synchronized boolean hasMessagesBufferedToDeliver() {
return !batchList.isEmpty();
}
@Override
public final synchronized int size() {
if (size < 0) {
this.size = getStoreSize();
@ -259,4 +274,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
protected abstract void resetBatch();
protected abstract int getStoreSize();
protected abstract boolean isStoreEmpty();
}

View File

@ -17,7 +17,6 @@
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
@ -33,7 +32,7 @@ import org.apache.commons.logging.LogFactory;
*/
class QueueStorePrefetch extends AbstractStoreCursor {
private static final Log LOG = LogFactory.getLog(QueueStorePrefetch.class);
private MessageStore store;
private final MessageStore store;
/**
* Construct it
@ -41,7 +40,7 @@ class QueueStorePrefetch extends AbstractStoreCursor {
*/
public QueueStorePrefetch(Queue queue) {
super(queue);
this.store = (MessageStore)queue.getMessageStore();
this.store = queue.getMessageStore();
}
@ -58,29 +57,47 @@ class QueueStorePrefetch extends AbstractStoreCursor {
@Override
protected synchronized int getStoreSize() {
try {
return this.store.getMessageCount();
int result = this.store.getMessageCount();
return result;
} catch (IOException e) {
LOG.error("Failed to get message count", e);
throw new RuntimeException(e);
}
}
@Override
protected synchronized boolean isStoreEmpty() {
try {
return this.store.isEmpty();
} catch (Exception e) {
LOG.error("Failed to get message count", e);
throw new RuntimeException(e);
}
}
@Override
protected void resetBatch() {
this.store.resetBatching();
}
@Override
protected void setBatch(MessageId messageId) throws Exception {
store.setBatch(messageId);
batchResetNeeded = false;
}
@Override
protected void doFillBatch() throws Exception {
this.store.recoverNextMessages(this.maxBatchSize, this);
}
@Override
public String toString() {
return "QueueStorePrefetch" + System.identityHashCode(this);
}

View File

@ -17,7 +17,6 @@
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message;
@ -36,10 +35,10 @@ import org.apache.commons.logging.LogFactory;
*/
class TopicStorePrefetch extends AbstractStoreCursor {
private static final Log LOG = LogFactory.getLog(TopicStorePrefetch.class);
private TopicMessageStore store;
private String clientId;
private String subscriberName;
private Subscription subscription;
private final TopicMessageStore store;
private final String clientId;
private final String subscriberName;
private final Subscription subscription;
/**
* @param topic
@ -62,6 +61,7 @@ class TopicStorePrefetch extends AbstractStoreCursor {
}
@Override
public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception {
MessageEvaluationContext messageEvaluationContext = new NonCachedMessageEvaluationContext();
messageEvaluationContext.setMessageReference(message);
@ -73,6 +73,7 @@ class TopicStorePrefetch extends AbstractStoreCursor {
}
@Override
protected synchronized int getStoreSize() {
try {
return store.getMessageCount(clientId, subscriberName);
@ -81,17 +82,31 @@ class TopicStorePrefetch extends AbstractStoreCursor {
throw new RuntimeException(e);
}
}
@Override
protected synchronized boolean isStoreEmpty() {
try {
return this.store.isEmpty();
} catch (Exception e) {
LOG.error("Failed to get message count", e);
throw new RuntimeException(e);
}
}
@Override
protected void resetBatch() {
this.store.resetBatching(clientId, subscriberName);
}
@Override
protected void doFillBatch() throws Exception {
this.store.recoverNextMessages(clientId, subscriberName,
maxBatchSize, this);
}
@Override
public String toString() {
return "TopicStorePrefetch" + System.identityHashCode(this) + "(" + clientId + "," + subscriberName + ")";
}

View File

@ -17,10 +17,9 @@
package org.apache.activemq.store;
import java.io.IOException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.usage.MemoryUsage;
abstract public class AbstractMessageStore implements MessageStore {
@ -48,4 +47,13 @@ abstract public class AbstractMessageStore implements MessageStore {
public void setBatch(MessageId messageId) throws IOException, Exception {
}
/**
* flag to indicate if the store is empty
* @return true if the message count is 0
* @throws Exception
*/
public boolean isEmpty() throws Exception{
return getMessageCount()==0;
}
}

View File

@ -17,7 +17,6 @@
package org.apache.activemq.store;
import java.io.IOException;
import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
@ -25,7 +24,6 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
/**
* Represents a message store which is used by the persistent implementations
@ -114,7 +112,15 @@ public interface MessageStore extends Service {
/**
* allow caching cursors to set the current batch offset when cache is exhausted
* @param messageId
* @throws Exception
*/
void setBatch(MessageId messageId) throws Exception;
/**
* flag to indicate if the store is empty
* @return true if the message count is 0
* @throws Exception
*/
boolean isEmpty() throws Exception;
}

View File

@ -96,4 +96,8 @@ public class ProxyMessageStore implements MessageStore {
public void setBatch(MessageId messageId) throws Exception {
delegate.setBatch(messageId);
}
public boolean isEmpty() throws Exception {
return delegate.isEmpty();
}
}

View File

@ -17,7 +17,6 @@
package org.apache.activemq.store;
import java.io.IOException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@ -25,7 +24,6 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
/**
* A simple proxy that delegates to another MessageStore.
@ -138,4 +136,8 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
public void setBatch(MessageId messageId) throws Exception {
delegate.setBatch(messageId);
}
public boolean isEmpty() throws Exception {
return delegate.isEmpty();
}
}

View File

@ -24,7 +24,6 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@ -69,7 +68,7 @@ import org.apache.kahadb.page.Transaction;
public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
private WireFormat wireFormat = new OpenWireFormat();
private final WireFormat wireFormat = new OpenWireFormat();
public void setBrokerName(String brokerName) {
}
@ -128,6 +127,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
this.dest = convert( destination );
}
@Override
public ActiveMQDestination getDestination() {
return destination;
}
@ -200,6 +200,25 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
});
}
}
public boolean isEmpty() throws IOException {
synchronized(indexMutex) {
return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>(){
public Boolean execute(Transaction tx) throws IOException {
// Iterate through all index entries to get a count of messages in the destination.
StoredDestination sd = getStoredDestination(dest, tx);
boolean result = true;
for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
iterator.next();
result = false;
break;
}
return result;
}
});
}
}
public void recover(final MessageRecoveryListener listener) throws Exception {
synchronized(indexMutex) {
@ -266,10 +285,13 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
@Override
public void setMemoryUsage(MemoryUsage memoeyUSage) {
}
@Override
public void start() throws Exception {
}
@Override
public void stop() throws Exception {
}

View File

@ -0,0 +1,174 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.util.IOHelper;
public class AMQ2512Test extends EmbeddedBrokerTestSupport {
private static Connection connection;
private final static String QUEUE_NAME = "dee.q";
private final static int INITIAL_MESSAGES_CNT = 1000;
private final static int WORKER_INTERNAL_ITERATIONS = 100;
private final static int TOTAL_MESSAGES_CNT = INITIAL_MESSAGES_CNT * WORKER_INTERNAL_ITERATIONS
+ INITIAL_MESSAGES_CNT;
private final static byte[] payload = new byte[5 * 1024];
private final static String TEXT = new String(payload);
private final static String PRP_INITIAL_ID = "initial-id";
private final static String PRP_WORKER_ID = "worker-id";
private final static CountDownLatch LATCH = new CountDownLatch(TOTAL_MESSAGES_CNT);
private final static AtomicInteger ON_MSG_COUNTER = new AtomicInteger();
public void testKahaDBFailure() throws Exception {
final ConnectionFactory fac = new ActiveMQConnectionFactory(this.bindAddress);
connection = fac.createConnection();
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue queue = session.createQueue(QUEUE_NAME);
final MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
final long startTime = System.nanoTime();
final List<Consumer> consumers = new ArrayList<Consumer>();
for (int i = 0; i < 20; i++) {
consumers.add(new Consumer("worker-" + i));
}
for (int i = 0; i < INITIAL_MESSAGES_CNT; i++) {
final TextMessage msg = session.createTextMessage(TEXT);
msg.setStringProperty(PRP_INITIAL_ID, "initial-" + i);
producer.send(msg);
}
LATCH.await();
final long endTime = System.nanoTime();
System.out.println("Total execution time = "
+ TimeUnit.MILLISECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS) + " [ms].");
System.out.println("Rate = " + TOTAL_MESSAGES_CNT
/ TimeUnit.SECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS) + " [msg/s].");
for (Consumer c : consumers) {
c.close();
}
connection.close();
}
private final static class Consumer implements MessageListener {
private final String name;
private final Session session;
private final MessageProducer producer;
private Consumer(String name) {
this.name = name;
try {
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
final Queue queue = session.createQueue(QUEUE_NAME + "?consumer.prefetchSize=10");
producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
final MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(this);
} catch (JMSException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
public void onMessage(Message message) {
final TextMessage msg = (TextMessage) message;
try {
if (!msg.propertyExists(PRP_WORKER_ID)) {
for (int i = 0; i < WORKER_INTERNAL_ITERATIONS; i++) {
final TextMessage newMsg = session.createTextMessage(msg.getText());
newMsg.setStringProperty(PRP_WORKER_ID, name + "-" + i);
newMsg.setStringProperty(PRP_INITIAL_ID, msg.getStringProperty(PRP_INITIAL_ID));
producer.send(newMsg);
}
}
msg.acknowledge();
} catch (JMSException e) {
e.printStackTrace();
throw new RuntimeException(e);
} finally {
final int onMsgCounter = ON_MSG_COUNTER.getAndIncrement();
if (onMsgCounter % 1000 == 0) {
System.out.println("message received: " + onMsgCounter);
}
LATCH.countDown();
}
}
private void close() {
if (session != null) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
}
@Override
protected void setUp() throws Exception {
bindAddress = "tcp://0.0.0.0:61617";
super.setUp();
}
@Override
protected BrokerService createBroker() throws Exception {
File dataFileDir = new File("target/test-amq-2512/datadb");
IOHelper.mkdirs(dataFileDir);
IOHelper.deleteChildren(dataFileDir);
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(dataFileDir);
BrokerService answer = new BrokerService();
answer.setPersistenceAdapter(kaha);
kaha.setEnableJournalDiskSyncs(false);
//kaha.setIndexCacheSize(10);
answer.setDataDirectoryFile(dataFileDir);
answer.setUseJmx(false);
answer.addConnector(bindAddress);
return answer;
}
}