Extract out a PListStore interface so that the broker can be decoupled from the KahaDB store implementation.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1406370 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2012-11-06 22:04:50 +00:00
parent 1aab71b2a7
commit d5dd937b81
24 changed files with 308 additions and 246 deletions

View File

@ -33,8 +33,7 @@ import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.network.NetworkBridge;
import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.store.PListStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.Usage;

View File

@ -40,7 +40,7 @@ import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.store.PListStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.Usage;

View File

@ -16,64 +16,14 @@
*/
package org.apache.activemq.broker;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionMetaData;
import org.apache.activemq.ConfigurationException;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
import org.apache.activemq.broker.ft.MasterConnector;
import org.apache.activemq.broker.jmx.AnnotatedMBean;
import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.activemq.broker.jmx.ConnectorView;
import org.apache.activemq.broker.jmx.ConnectorViewMBean;
import org.apache.activemq.broker.jmx.FTConnectorView;
import org.apache.activemq.broker.jmx.JmsConnectorView;
import org.apache.activemq.broker.jmx.JobSchedulerView;
import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
import org.apache.activemq.broker.jmx.ManagedRegionBroker;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.jmx.NetworkConnectorView;
import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
import org.apache.activemq.broker.jmx.ProxyConnectorView;
import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFactory;
import org.apache.activemq.broker.region.DestinationFactoryImpl;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.jmx.*;
import org.apache.activemq.broker.region.*;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.virtual.MirroredQueue;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
@ -91,11 +41,10 @@ import org.apache.activemq.network.jms.JmsConnector;
import org.apache.activemq.proxy.ProxyConnector;
import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.JournaledStore;
import org.apache.activemq.store.PListStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.PersistenceAdapterFactory;
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.TaskRunnerFactory;
@ -104,21 +53,23 @@ import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.stomp.ProtocolConverter;
import org.apache.activemq.transport.vm.VMTransportFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.BrokerSupport;
import org.apache.activemq.util.DefaultIOExceptionHandler;
import org.apache.activemq.util.IOExceptionHandler;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.InetAddressUtil;
import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ThreadPoolUtils;
import org.apache.activemq.util.TimeUtils;
import org.apache.activemq.util.URISupport;
import org.apache.activemq.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Manages the lifecycle of an ActiveMQ Broker. A BrokerService consists of a
* number of transport connectors, network connectors and a bunch of properties
@ -133,6 +84,8 @@ public class BrokerService implements Service {
public static final String LOCAL_HOST_NAME;
public static final String BROKER_VERSION;
public static final String DEFAULT_BROKER_NAME = "localhost";
public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class);
private static final long serialVersionUID = 7353129142305630237L;
private boolean useJmx = true;
@ -1660,7 +1613,9 @@ public class BrokerService implements Service {
String str = result ? "Successfully deleted" : "Failed to delete";
LOG.info(str + " temporary storage");
}
this.tempDataStore = new PListStore();
String clazz = "org.apache.activemq.store.kahadb.plist.PListStoreImpl";
this.tempDataStore = (PListStore) getClass().getClassLoader().loadClass(clazz).newInstance();
this.tempDataStore.setDirectory(getTmpDataDirectory());
configureService(tempDataStore);
this.tempDataStore.start();
@ -1892,12 +1847,8 @@ public class BrokerService implements Service {
long maxJournalFileSize = 0;
long storeLimit = usage.getStoreUsage().getLimit();
if (adapter instanceof KahaDBPersistenceAdapter) {
KahaDBPersistenceAdapter kahaDB = (KahaDBPersistenceAdapter) adapter;
maxJournalFileSize = kahaDB.getJournalMaxFileLength();
} else if (adapter instanceof AMQPersistenceAdapter) {
AMQPersistenceAdapter amqAdapter = (AMQPersistenceAdapter) adapter;
maxJournalFileSize = amqAdapter.getMaxFileLength();
if (adapter instanceof JournaledStore) {
maxJournalFileSize = ((JournaledStore) adapter).getJournalMaxFileLength();
}
if (storeLimit < maxJournalFileSize) {
@ -1930,10 +1881,11 @@ public class BrokerService implements Service {
if (isPersistent()) {
long maxJournalFileSize;
if (usage.getTempUsage().getStore() != null) {
maxJournalFileSize = usage.getTempUsage().getStore().getJournalMaxFileLength();
PListStore store = usage.getTempUsage().getStore();
if (store != null && store instanceof JournaledStore) {
maxJournalFileSize = ((JournaledStore) store).getJournalMaxFileLength();
} else {
maxJournalFileSize = org.apache.activemq.store.kahadb.disk.journal.Journal.DEFAULT_MAX_FILE_LENGTH;
maxJournalFileSize = DEFAULT_MAX_FILE_LENGTH;
}
if (storeLimit < maxJournalFileSize) {
@ -2225,11 +2177,16 @@ public class BrokerService implements Service {
PersistenceAdapterFactory fac = getPersistenceFactory();
if (fac != null) {
return fac.createPersistenceAdapter();
}else {
KahaDBPersistenceAdapter adaptor = new KahaDBPersistenceAdapter();
File dir = new File(getBrokerDataDirectory(),"KahaDB");
adaptor.setDirectory(dir);
return adaptor;
} else {
try {
String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter";
PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance();
File dir = new File(getBrokerDataDirectory(),"KahaDB");
adaptor.setDirectory(dir);
return adaptor;
} catch (Throwable e) {
throw IOExceptionSupport.create(e);
}
}
} else {
return new MemoryPersistenceAdapter();

View File

@ -41,7 +41,7 @@ import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.store.PListStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.Usage;

View File

@ -41,7 +41,7 @@ import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.store.PListStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.Usage;

View File

@ -41,7 +41,7 @@ import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.store.PListStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.Usage;

View File

@ -28,7 +28,7 @@ import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.*;
import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.store.PListStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;

View File

@ -30,9 +30,9 @@ import org.apache.activemq.broker.region.QueueMessageReference;
import org.apache.activemq.command.Message;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.kahadb.plist.PList;
import org.apache.activemq.store.kahadb.plist.PListEntry;
import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.store.PList;
import org.apache.activemq.store.PListStore;
import org.apache.activemq.store.PListEntry;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
@ -454,7 +454,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
return diskList == null || diskList.isEmpty();
}
protected PList getDiskList() {
public PList getDiskList() {
if (diskList == null) {
try {
diskList = store.getPList(name);

View File

@ -0,0 +1,8 @@
package org.apache.activemq.store;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public interface JournaledStore {
int getJournalMaxFileLength();
}

View File

@ -0,0 +1,41 @@
package org.apache.activemq.store;
import org.apache.activemq.util.ByteSequence;
import java.io.IOException;
import java.util.Iterator;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public interface PList {
void setName(String name);
String getName();
void destroy() throws IOException;
void addLast(String id, ByteSequence bs) throws IOException;
void addFirst(String id, ByteSequence bs) throws IOException;
boolean remove(String id) throws IOException;
boolean remove(long position) throws IOException;
PListEntry get(long position) throws IOException;
PListEntry getFirst() throws IOException;
PListEntry getLast() throws IOException;
boolean isEmpty();
PListIterator iterator() throws IOException;
long size();
public interface PListIterator extends Iterator<PListEntry> {
void release();
}
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb.plist;
package org.apache.activemq.store;
import org.apache.activemq.util.ByteSequence;
@ -23,7 +23,7 @@ public class PListEntry {
private final ByteSequence byteSequence;
private final String entry;
PListEntry(String entry, ByteSequence bs) {
public PListEntry(String entry, ByteSequence bs) {
this.entry = entry;
this.byteSequence = bs;
}

View File

@ -0,0 +1,21 @@
package org.apache.activemq.store;
import org.apache.activemq.Service;
import org.apache.activemq.store.kahadb.plist.PListImpl;
import java.io.File;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public interface PListStore extends Service {
File getDirectory();
void setDirectory(File directory);
PListImpl getPList(String name) throws Exception;
boolean removePList(String name) throws Exception;
long size();
}

View File

@ -51,13 +51,7 @@ import org.apache.activemq.kaha.impl.async.AsyncDataManager;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.kaha.impl.index.hash.HashIndex;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.ReferenceStore;
import org.apache.activemq.store.ReferenceStoreAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TopicReferenceStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.*;
import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.Task;
@ -82,7 +76,7 @@ import org.slf4j.LoggerFactory;
* @org.apache.xbean.XBean element="amqPersistenceAdapter"
*
*/
public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware {
public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware, JournaledStore {
private static final Logger LOG = LoggerFactory.getLogger(AMQPersistenceAdapter.class);
private Scheduler scheduler;
@ -1117,4 +1111,9 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
// reference store send has adequate duplicate suppression
return -1;
}
@Override
public int getJournalMaxFileLength() {
return getMaxFileLength();
}
}

View File

@ -28,11 +28,7 @@ import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.broker.Locker;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.SharedFileLocker;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.*;
import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
@ -50,7 +46,7 @@ import java.util.Set;
* @org.apache.xbean.XBean element="kahaDB"
*
*/
public class KahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter {
public class KahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter, JournaledStore {
private final KahaDBStore letter = new KahaDBStore();
/**

View File

@ -26,6 +26,8 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.store.PList;
import org.apache.activemq.store.PListEntry;
import org.apache.activemq.store.kahadb.disk.index.ListIndex;
import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.disk.page.Transaction;
@ -35,13 +37,13 @@ import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PList extends ListIndex<String, Location> {
static final Logger LOG = LoggerFactory.getLogger(PList.class);
final PListStore store;
public class PListImpl extends ListIndex<String, Location> implements PList {
static final Logger LOG = LoggerFactory.getLogger(PListImpl.class);
final PListStoreImpl store;
private String name;
Object indexLock;
PList(PListStore store) {
PListImpl(PListStoreImpl store) {
this.store = store;
this.indexLock = store.getIndexLock();
setPageFile(store.getPageFile());
@ -49,10 +51,12 @@ public class PList extends ListIndex<String, Location> {
setValueMarshaller(LocationMarshaller.INSTANCE);
}
@Override
public void setName(String name) {
this.name = name;
}
@Override
public String getName() {
return this.name;
}
@ -65,6 +69,7 @@ public class PList extends ListIndex<String, Location> {
out.writeLong(getHeadPageId());
}
@Override
public synchronized void destroy() throws IOException {
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@ -76,6 +81,7 @@ public class PList extends ListIndex<String, Location> {
}
}
@Override
public void addLast(final String id, final ByteSequence bs) throws IOException {
final Location location = this.store.write(bs, false);
synchronized (indexLock) {
@ -87,6 +93,7 @@ public class PList extends ListIndex<String, Location> {
}
}
@Override
public void addFirst(final String id, final ByteSequence bs) throws IOException {
final Location location = this.store.write(bs, false);
synchronized (indexLock) {
@ -98,6 +105,7 @@ public class PList extends ListIndex<String, Location> {
}
}
@Override
public boolean remove(final String id) throws IOException {
final AtomicBoolean result = new AtomicBoolean();
synchronized (indexLock) {
@ -110,6 +118,7 @@ public class PList extends ListIndex<String, Location> {
return result.get();
}
@Override
public boolean remove(final long position) throws IOException {
final AtomicBoolean result = new AtomicBoolean();
synchronized (indexLock) {
@ -129,6 +138,7 @@ public class PList extends ListIndex<String, Location> {
return result.get();
}
@Override
public PListEntry get(final long position) throws IOException {
PListEntry result = null;
final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
@ -147,6 +157,7 @@ public class PList extends ListIndex<String, Location> {
return result;
}
@Override
public PListEntry getFirst() throws IOException {
PListEntry result = null;
final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
@ -164,6 +175,7 @@ public class PList extends ListIndex<String, Location> {
return result;
}
@Override
public PListEntry getLast() throws IOException {
PListEntry result = null;
final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
@ -181,19 +193,21 @@ public class PList extends ListIndex<String, Location> {
return result;
}
@Override
public boolean isEmpty() {
return size() == 0;
}
@Override
public PListIterator iterator() throws IOException {
return new PListIterator();
return new PListIteratorImpl();
}
public final class PListIterator implements Iterator<PListEntry> {
final class PListIteratorImpl implements PListIterator {
final Iterator<Map.Entry<String, Location>> iterator;
final Transaction tx;
PListIterator() throws IOException {
PListIteratorImpl() throws IOException {
tx = store.pageFile.tx();
synchronized (indexLock) {
this.iterator = iterator(tx);

View File

@ -16,42 +16,36 @@
*/
package org.apache.activemq.store.kahadb.plist;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.store.JournaledStore;
import org.apache.activemq.store.PList;
import org.apache.activemq.store.PListStore;
import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.disk.page.Page;
import org.apache.activemq.store.kahadb.disk.page.PageFile;
import org.apache.activemq.store.kahadb.disk.page.Transaction;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.LockFile;
import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;
/**
* @org.apache.xbean.XBean
*/
public class PListStore extends ServiceSupport implements BrokerServiceAware, Runnable {
static final Logger LOG = LoggerFactory.getLogger(PListStore.class);
public class PListStoreImpl extends ServiceSupport implements BrokerServiceAware, Runnable, PListStore, JournaledStore {
static final Logger LOG = LoggerFactory.getLogger(PListStoreImpl.class);
private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
static final int CLOSED_STATE = 1;
@ -70,7 +64,7 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
// private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
MetaData metaData = new MetaData(this);
final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
Map<String, PList> persistentLists = new HashMap<String, PList>();
Map<String, PListImpl> persistentLists = new HashMap<String, PListImpl>();
final Object indexLock = new Object();
private Scheduler scheduler;
private long cleanupInterval = 30000;
@ -122,16 +116,16 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
}
protected class MetaData {
protected MetaData(PListStore store) {
protected MetaData(PListStoreImpl store) {
this.store = store;
}
private final PListStore store;
private final PListStoreImpl store;
Page<MetaData> page;
BTreeIndex<String, PList> lists;
BTreeIndex<String, PListImpl> lists;
void createIndexes(Transaction tx) throws IOException {
this.lists = new BTreeIndex<String, PList>(pageFile, tx.allocate().getPageId());
this.lists = new BTreeIndex<String, PListImpl>(pageFile, tx.allocate().getPageId());
}
void load(Transaction tx) throws IOException {
@ -140,16 +134,16 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
this.lists.load(tx);
}
void loadLists(Transaction tx, Map<String, PList> lists) throws IOException {
for (Iterator<Entry<String, PList>> i = this.lists.iterator(tx); i.hasNext();) {
Entry<String, PList> entry = i.next();
void loadLists(Transaction tx, Map<String, PListImpl> lists) throws IOException {
for (Iterator<Entry<String, PListImpl>> i = this.lists.iterator(tx); i.hasNext();) {
Entry<String, PListImpl> entry = i.next();
entry.getValue().load(tx);
lists.put(entry.getKey(), entry.getValue());
}
}
public void read(DataInput is) throws IOException {
this.lists = new BTreeIndex<String, PList>(pageFile, is.readLong());
this.lists = new BTreeIndex<String, PListImpl>(pageFile, is.readLong());
this.lists.setKeyMarshaller(StringMarshaller.INSTANCE);
this.lists.setValueMarshaller(new PListMarshaller(this.store));
}
@ -160,9 +154,9 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
}
class MetaDataMarshaller extends VariableMarshaller<MetaData> {
private final PListStore store;
private final PListStoreImpl store;
MetaDataMarshaller(PListStore store) {
MetaDataMarshaller(PListStoreImpl store) {
this.store = store;
}
public MetaData readPayload(DataInput dataIn) throws IOException {
@ -176,18 +170,18 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
}
}
class PListMarshaller extends VariableMarshaller<PList> {
private final PListStore store;
PListMarshaller(PListStore store) {
class PListMarshaller extends VariableMarshaller<PListImpl> {
private final PListStoreImpl store;
PListMarshaller(PListStoreImpl store) {
this.store = store;
}
public PList readPayload(DataInput dataIn) throws IOException {
PList result = new PList(this.store);
public PListImpl readPayload(DataInput dataIn) throws IOException {
PListImpl result = new PListImpl(this.store);
result.read(dataIn);
return result;
}
public void writePayload(PList list, DataOutput dataOut) throws IOException {
public void writePayload(PListImpl list, DataOutput dataOut) throws IOException {
list.write(dataOut);
}
}
@ -196,10 +190,12 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
return this.journal;
}
@Override
public File getDirectory() {
return directory;
}
@Override
public void setDirectory(File directory) {
this.directory = directory;
}
@ -217,16 +213,17 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
}
}
public PList getPList(final String name) throws Exception {
@Override
public PListImpl getPList(final String name) throws Exception {
if (!isStarted()) {
throw new IllegalStateException("Not started");
}
intialize();
synchronized (indexLock) {
synchronized (this) {
PList result = this.persistentLists.get(name);
PListImpl result = this.persistentLists.get(name);
if (result == null) {
final PList pl = new PList(this);
final PListImpl pl = new PListImpl(this);
pl.setName(name);
getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
@ -238,7 +235,7 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
result = pl;
this.persistentLists.put(name, pl);
}
final PList toLoad = result;
final PListImpl toLoad = result;
getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
toLoad.load(tx);
@ -250,6 +247,7 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
}
}
@Override
public boolean removePList(final String name) throws Exception {
boolean result = false;
synchronized (indexLock) {
@ -312,7 +310,7 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
if (cleanupInterval > 0) {
if (scheduler == null) {
scheduler = new Scheduler(PListStore.class.getSimpleName());
scheduler = new Scheduler(PListStoreImpl.class.getSimpleName());
scheduler.start();
}
scheduler.executePeriodically(this, cleanupInterval);
@ -334,12 +332,12 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
@Override
protected synchronized void doStop(ServiceStopper stopper) throws Exception {
if (scheduler != null) {
if (PListStore.class.getSimpleName().equals(scheduler.getName())) {
if (PListStoreImpl.class.getSimpleName().equals(scheduler.getName())) {
scheduler.stop();
scheduler = null;
}
}
for (PList pl : this.persistentLists.values()) {
for (PListImpl pl : this.persistentLists.values()) {
pl.unload(null);
}
if (this.pageFile != null) {
@ -372,13 +370,13 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
iterator.remove();
}
}
List<PList> plists = null;
List<PListImpl> plists = null;
synchronized (indexLock) {
synchronized (this) {
plists = new ArrayList<PList>(persistentLists.values());
plists = new ArrayList<PListImpl>(persistentLists.values());
}
}
for (PList list : plists) {
for (PListImpl list : plists) {
list.claimFileLocations(candidates);
if (isStopping()) {
return;

View File

@ -20,8 +20,8 @@ import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.activemq.Service;
import org.apache.activemq.store.PListStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.plist.PListStore;
/**
* Holder for Usage instances for memory, store and temp files Main use case is

View File

@ -16,8 +16,8 @@
*/
package org.apache.activemq.usage;
import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.store.PListStore;
/**
* Used to keep track of how much of something is being used so that a

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.IndirectMessageReference;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.QueueMessageReference;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.PList;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.store.kahadb.disk.page.PageFile;
import org.apache.activemq.util.ByteSequence;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class FilePendingMessageCursorTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursorTestSupport.class);
protected BrokerService brokerService;
protected FilePendingMessageCursor underTest;
@After
public void stopBroker() throws Exception {
if (brokerService != null) {
brokerService.getTempDataStore().stop();
}
}
private void createBrokerWithTempStoreLimit() throws Exception {
brokerService = new BrokerService();
SystemUsage usage = brokerService.getSystemUsage();
usage.getTempUsage().setLimit(1025*1024*15);
// put something in the temp store to on demand initialise it
PList dud = brokerService.getTempDataStore().getPList("dud");
dud.addFirst("A", new ByteSequence("A".getBytes()));
}
@Test
public void testAddToEmptyCursorWhenTempStoreIsFull() throws Exception {
createBrokerWithTempStoreLimit();
SystemUsage usage = brokerService.getSystemUsage();
assertTrue("temp store is full: %" + usage.getTempUsage().getPercentUsage(), usage.getTempUsage().isFull());
underTest = new FilePendingMessageCursor(brokerService.getBroker(), "test", false);
underTest.setSystemUsage(usage);
// ok to add
underTest.addMessageLast(QueueMessageReference.NULL_MESSAGE);
assertFalse("cursor is not full", underTest.isFull());
}
}

View File

@ -37,6 +37,7 @@ import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.StoreUsage;
import org.apache.activemq.usage.SystemUsage;
@ -157,7 +158,7 @@ public class TempStorageBlockedBrokerTest extends TestSupport {
+ broker.getSystemUsage().getTempUsage().getUsage());
// do a cleanup
broker.getTempDataStore().run();
((PListStoreImpl)broker.getTempDataStore()).run();
LOG.info("Subscrition Usage: " + tempUsageBySubscription + ", endUsage: "
+ broker.getSystemUsage().getTempUsage().getUsage());

View File

@ -38,6 +38,7 @@ import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;
@ -164,7 +165,7 @@ public class TempStorageConfigBrokerTest {
broker.getSystemUsage().setSendFailIfNoSpace(true);
broker.getSystemUsage().getMemoryUsage().setLimit(1048576);
broker.getSystemUsage().getTempUsage().setLimit(2*1048576);
broker.getSystemUsage().getTempUsage().getStore().setJournalMaxFileLength(2*1048576);
((PListStoreImpl)broker.getSystemUsage().getTempUsage().getStore()).setJournalMaxFileLength(2 * 1048576);
broker.getSystemUsage().getStoreUsage().setLimit(20*1048576);
PolicyEntry defaultPolicy = new PolicyEntry();

View File

@ -41,7 +41,7 @@ import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
@ -148,7 +148,7 @@ public class TempStoreDataCleanupTest {
LOG.info("MemoryUseage before awaiting temp store cleanup = " + broker.getAdminView().getMemoryPercentUsage());
final PListStore pa = broker.getTempDataStore();
final PListStoreImpl pa = (PListStoreImpl) broker.getTempDataStore();
assertTrue("only one journal file should be left: " + pa.getJournal().getFileMap().size(),
Wait.waitFor(new Wait.Condition() {

View File

@ -1,81 +1,23 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
package org.apache.activemq.store.kahadb.plist;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.IndirectMessageReference;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.QueueMessageReference;
import org.apache.activemq.broker.region.*;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursorTestSupport;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.kahadb.plist.PList;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.store.kahadb.disk.page.PageFile;
import org.apache.activemq.util.ByteSequence;
import org.junit.After;
import org.apache.activemq.usage.SystemUsage;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class FilePendingMessageCursorTest {
private static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursorTest.class);
BrokerService brokerService;
FilePendingMessageCursor underTest;
@After
public void stopBroker() throws Exception {
if (brokerService != null) {
brokerService.getTempDataStore().stop();
}
}
private void createBrokerWithTempStoreLimit() throws Exception {
brokerService = new BrokerService();
SystemUsage usage = brokerService.getSystemUsage();
usage.getTempUsage().setLimit(1025*1024*15);
// put something in the temp store to on demand initialise it
PList dud = brokerService.getTempDataStore().getPList("dud");
dud.addFirst("A", new ByteSequence("A".getBytes()));
}
@Test
public void testAddToEmptyCursorWhenTempStoreIsFull() throws Exception {
createBrokerWithTempStoreLimit();
SystemUsage usage = brokerService.getSystemUsage();
assertTrue("temp store is full: %" + usage.getTempUsage().getPercentUsage(), usage.getTempUsage().isFull());
underTest = new FilePendingMessageCursor(brokerService.getBroker(), "test", false);
underTest.setSystemUsage(usage);
// ok to add
underTest.addMessageLast(QueueMessageReference.NULL_MESSAGE);
assertFalse("cursor is not full", underTest.isFull());
}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class KahaDBFilePendingMessageCursorTest extends FilePendingMessageCursorTestSupport {
@Test
public void testAddRemoveAddIndexSize() throws Exception {
@ -89,7 +31,7 @@ public class FilePendingMessageCursorTest {
underTest.setSystemUsage(usage);
LOG.info("start");
final PageFile pageFile = underTest.getDiskList().getPageFile();
final PageFile pageFile = ((PListImpl)underTest.getDiskList()).getPageFile();
LOG.info("page count: " +pageFile.getPageCount());
LOG.info("free count: " + pageFile.getFreePageCount());
LOG.info("content size: " +pageFile.getPageContentSize());
@ -132,4 +74,6 @@ public class FilePendingMessageCursorTest {
LOG.info("content size: " + pageFile.getPageContentSize());
assertEquals("expected page usage", initialPageCount -1, pageFile.getPageCount() - pageFile.getFreePageCount() );
}
}

View File

@ -31,6 +31,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.store.PList;
import org.apache.activemq.store.PListEntry;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ByteSequence;
import org.junit.After;
@ -41,8 +43,8 @@ import org.slf4j.LoggerFactory;
public class PListTest {
static final Logger LOG = LoggerFactory.getLogger(PListTest.class);
private PListStore store;
private PList plist;
private PListStoreImpl store;
private PListImpl plist;
final ByteSequence payload = new ByteSequence(new byte[400]);
final String idSeed = new String("Seed" + new byte[1024]);
final Vector<Throwable> exceptions = new Vector<Throwable>();
@ -173,7 +175,7 @@ public class PListTest {
store.stop();
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
store = new PListStore();
store = new PListStoreImpl();
store.setCleanupInterval(400);
store.setDirectory(directory);
store.setJournalMaxFileLength(1024*5);
@ -263,7 +265,7 @@ public class PListTest {
store.stop();
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
store = new PListStore();
store = new PListStoreImpl();
store.setDirectory(directory);
store.start();
@ -294,7 +296,7 @@ public class PListTest {
store.stop();
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
store = new PListStore();
store = new PListStoreImpl();
store.setDirectory(directory);
store.start();
@ -312,7 +314,7 @@ public class PListTest {
store.stop();
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
store = new PListStore();
store = new PListStoreImpl();
store.setDirectory(directory);
store.setJournalMaxFileLength(1024*5);
store.setCleanupInterval(5000);
@ -394,7 +396,7 @@ public class PListTest {
store.stop();
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
store = new PListStore();
store = new PListStoreImpl();
store.setIndexEnablePageCaching(enablePageCache);
store.setIndexPageSize(2*1024);
store.setDirectory(directory);
@ -443,7 +445,7 @@ public class PListTest {
store.stop();
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
store = new PListStore();
store = new PListStoreImpl();
store.setIndexPageSize(2*1024);
store.setJournalMaxFileLength(1024*1024);
store.setDirectory(directory);
@ -502,7 +504,7 @@ public class PListTest {
public void run() {
final String threadName = Thread.currentThread().getName();
try {
PList plist = null;
PListImpl plist = null;
switch (task) {
case CREATE:
Thread.currentThread().setName("C:"+id);
@ -625,7 +627,7 @@ public class PListTest {
}
protected void startStore(File directory) throws Exception {
store = new PListStore();
store = new PListStoreImpl();
store.setDirectory(directory);
store.start();
plist = store.getPList("main");