git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1437038 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-01-22 16:40:46 +00:00
parent 8bf0e43ed3
commit 144be7c277
1 changed files with 43 additions and 10 deletions

View File

@ -16,9 +16,19 @@
*/ */
package org.apache.activemq.store.kahadb; package org.apache.activemq.store.kahadb;
import static org.apache.activemq.broker.jmx.BrokerMBeanSupport.createPersistenceAdapterName;
import java.io.File;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.Callable;
import javax.management.ObjectName;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.LockableServiceSupport; import org.apache.activemq.broker.LockableServiceSupport;
import org.apache.activemq.broker.Locker;
import org.apache.activemq.broker.jmx.AnnotatedMBean; import org.apache.activemq.broker.jmx.AnnotatedMBean;
import org.apache.activemq.broker.jmx.PersistenceAdapterView; import org.apache.activemq.broker.jmx.PersistenceAdapterView;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
@ -29,22 +39,18 @@ import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId; import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.protobuf.Buffer; import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.broker.Locker; import org.apache.activemq.store.JournaledStore;
import org.apache.activemq.store.*; import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.SharedFileLocker;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId; import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.store.kahadb.data.KahaXATransactionId; import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceStopper;
import java.io.File;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.Callable;
import static org.apache.activemq.broker.jmx.BrokerMBeanSupport.createPersistenceAdapterName;
/** /**
* An implementation of {@link PersistenceAdapter} designed for use with * An implementation of {@link PersistenceAdapter} designed for use with
* KahaDB - Embedded Lightweight Non-Relational Database * KahaDB - Embedded Lightweight Non-Relational Database
@ -60,6 +66,7 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
* @throws IOException * @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext) * @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext)
*/ */
@Override
public void beginTransaction(ConnectionContext context) throws IOException { public void beginTransaction(ConnectionContext context) throws IOException {
this.letter.beginTransaction(context); this.letter.beginTransaction(context);
} }
@ -69,6 +76,7 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
* @throws IOException * @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean) * @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean)
*/ */
@Override
public void checkpoint(boolean sync) throws IOException { public void checkpoint(boolean sync) throws IOException {
this.letter.checkpoint(sync); this.letter.checkpoint(sync);
} }
@ -78,6 +86,7 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
* @throws IOException * @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext) * @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext)
*/ */
@Override
public void commitTransaction(ConnectionContext context) throws IOException { public void commitTransaction(ConnectionContext context) throws IOException {
this.letter.commitTransaction(context); this.letter.commitTransaction(context);
} }
@ -88,6 +97,7 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
* @throws IOException * @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue) * @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
*/ */
@Override
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
return this.letter.createQueueMessageStore(destination); return this.letter.createQueueMessageStore(destination);
} }
@ -98,6 +108,7 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
* @throws IOException * @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic) * @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
*/ */
@Override
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
return this.letter.createTopicMessageStore(destination); return this.letter.createTopicMessageStore(destination);
} }
@ -107,6 +118,7 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
* @throws IOException * @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore() * @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore()
*/ */
@Override
public TransactionStore createTransactionStore() throws IOException { public TransactionStore createTransactionStore() throws IOException {
return this.letter.createTransactionStore(); return this.letter.createTransactionStore();
} }
@ -115,6 +127,7 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
* @throws IOException * @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages() * @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages()
*/ */
@Override
public void deleteAllMessages() throws IOException { public void deleteAllMessages() throws IOException {
this.letter.deleteAllMessages(); this.letter.deleteAllMessages();
} }
@ -123,6 +136,7 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
* @return destinations * @return destinations
* @see org.apache.activemq.store.PersistenceAdapter#getDestinations() * @see org.apache.activemq.store.PersistenceAdapter#getDestinations()
*/ */
@Override
public Set<ActiveMQDestination> getDestinations() { public Set<ActiveMQDestination> getDestinations() {
return this.letter.getDestinations(); return this.letter.getDestinations();
} }
@ -132,10 +146,12 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
* @throws IOException * @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId() * @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId()
*/ */
@Override
public long getLastMessageBrokerSequenceId() throws IOException { public long getLastMessageBrokerSequenceId() throws IOException {
return this.letter.getLastMessageBrokerSequenceId(); return this.letter.getLastMessageBrokerSequenceId();
} }
@Override
public long getLastProducerSequenceId(ProducerId id) throws IOException { public long getLastProducerSequenceId(ProducerId id) throws IOException {
return this.letter.getLastProducerSequenceId(id); return this.letter.getLastProducerSequenceId(id);
} }
@ -144,6 +160,7 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
* @param destination * @param destination
* @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue) * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
*/ */
@Override
public void removeQueueMessageStore(ActiveMQQueue destination) { public void removeQueueMessageStore(ActiveMQQueue destination) {
this.letter.removeQueueMessageStore(destination); this.letter.removeQueueMessageStore(destination);
} }
@ -152,6 +169,7 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
* @param destination * @param destination
* @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic) * @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
*/ */
@Override
public void removeTopicMessageStore(ActiveMQTopic destination) { public void removeTopicMessageStore(ActiveMQTopic destination) {
this.letter.removeTopicMessageStore(destination); this.letter.removeTopicMessageStore(destination);
} }
@ -161,6 +179,7 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
* @throws IOException * @throws IOException
* @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext) * @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext)
*/ */
@Override
public void rollbackTransaction(ConnectionContext context) throws IOException { public void rollbackTransaction(ConnectionContext context) throws IOException {
this.letter.rollbackTransaction(context); this.letter.rollbackTransaction(context);
} }
@ -169,6 +188,7 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
* @param brokerName * @param brokerName
* @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String) * @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String)
*/ */
@Override
public void setBrokerName(String brokerName) { public void setBrokerName(String brokerName) {
this.letter.setBrokerName(brokerName); this.letter.setBrokerName(brokerName);
} }
@ -177,6 +197,7 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
* @param usageManager * @param usageManager
* @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage) * @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage)
*/ */
@Override
public void setUsageManager(SystemUsage usageManager) { public void setUsageManager(SystemUsage usageManager) {
this.letter.setUsageManager(usageManager); this.letter.setUsageManager(usageManager);
} }
@ -185,6 +206,7 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
* @return the size of the store * @return the size of the store
* @see org.apache.activemq.store.PersistenceAdapter#size() * @see org.apache.activemq.store.PersistenceAdapter#size()
*/ */
@Override
public long size() { public long size() {
return this.letter.size(); return this.letter.size();
} }
@ -193,6 +215,7 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
* @throws Exception * @throws Exception
* @see org.apache.activemq.Service#start() * @see org.apache.activemq.Service#start()
*/ */
@Override
public void doStart() throws Exception { public void doStart() throws Exception {
this.letter.start(); this.letter.start();
@ -219,8 +242,14 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
* @throws Exception * @throws Exception
* @see org.apache.activemq.Service#stop() * @see org.apache.activemq.Service#stop()
*/ */
@Override
public void doStop(ServiceStopper stopper) throws Exception { public void doStop(ServiceStopper stopper) throws Exception {
this.letter.stop(); this.letter.stop();
if (brokerService != null && brokerService.isUseJmx()) {
ObjectName brokerObjectName = brokerService.getBrokerObjectName();
brokerService.getManagementContext().unregisterMBean(createPersistenceAdapterName(brokerObjectName.toString(), toString()));
}
} }
/** /**
@ -228,6 +257,7 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
* *
* @return the journalMaxFileLength * @return the journalMaxFileLength
*/ */
@Override
public int getJournalMaxFileLength() { public int getJournalMaxFileLength() {
return this.letter.getJournalMaxFileLength(); return this.letter.getJournalMaxFileLength();
} }
@ -367,6 +397,7 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
* *
* @return the directory * @return the directory
*/ */
@Override
public File getDirectory() { public File getDirectory() {
return this.letter.getDirectory(); return this.letter.getDirectory();
} }
@ -375,6 +406,7 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
* @param dir * @param dir
* @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File) * @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File)
*/ */
@Override
public void setDirectory(File dir) { public void setDirectory(File dir) {
this.letter.setDirectory(dir); this.letter.setDirectory(dir);
} }
@ -607,6 +639,7 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
return rc; return rc;
} }
@Override
public Locker createDefaultLocker() throws IOException { public Locker createDefaultLocker() throws IOException {
SharedFileLocker locker = new SharedFileLocker(); SharedFileLocker locker = new SharedFileLocker();
locker.configure(this); locker.configure(this);