diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java b/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
index 3d108e9500..d6e6de8a9a 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
@@ -33,8 +33,7 @@ import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.network.NetworkBridge;
-import org.apache.activemq.store.kahadb.plist.PListStore;
+import org.apache.activemq.store.PListStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.Usage;
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
index 9438e7bf04..0c842b3a1c 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
@@ -40,7 +40,7 @@ import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.store.kahadb.plist.PListStore;
+import org.apache.activemq.store.PListStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.Usage;
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
index 15edbe87c7..ea473fb4f6 100644
--- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -16,64 +16,14 @@
*/
package org.apache.activemq.broker;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
import org.apache.activemq.ActiveMQConnectionMetaData;
import org.apache.activemq.ConfigurationException;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
import org.apache.activemq.broker.ft.MasterConnector;
-import org.apache.activemq.broker.jmx.AnnotatedMBean;
-import org.apache.activemq.broker.jmx.BrokerView;
-import org.apache.activemq.broker.jmx.ConnectorView;
-import org.apache.activemq.broker.jmx.ConnectorViewMBean;
-import org.apache.activemq.broker.jmx.FTConnectorView;
-import org.apache.activemq.broker.jmx.JmsConnectorView;
-import org.apache.activemq.broker.jmx.JobSchedulerView;
-import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
-import org.apache.activemq.broker.jmx.ManagedRegionBroker;
-import org.apache.activemq.broker.jmx.ManagementContext;
-import org.apache.activemq.broker.jmx.NetworkConnectorView;
-import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
-import org.apache.activemq.broker.jmx.ProxyConnectorView;
-import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.DestinationFactory;
-import org.apache.activemq.broker.region.DestinationFactoryImpl;
-import org.apache.activemq.broker.region.DestinationInterceptor;
-import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.jmx.*;
+import org.apache.activemq.broker.region.*;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.virtual.MirroredQueue;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
@@ -91,11 +41,10 @@ import org.apache.activemq.network.jms.JmsConnector;
import org.apache.activemq.proxy.ProxyConnector;
import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.selector.SelectorParser;
+import org.apache.activemq.store.JournaledStore;
+import org.apache.activemq.store.PListStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.PersistenceAdapterFactory;
-import org.apache.activemq.store.amq.AMQPersistenceAdapter;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.TaskRunnerFactory;
@@ -104,21 +53,23 @@ import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.stomp.ProtocolConverter;
import org.apache.activemq.transport.vm.VMTransportFactory;
import org.apache.activemq.usage.SystemUsage;
-import org.apache.activemq.util.BrokerSupport;
-import org.apache.activemq.util.DefaultIOExceptionHandler;
-import org.apache.activemq.util.IOExceptionHandler;
-import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.InetAddressUtil;
-import org.apache.activemq.util.JMXSupport;
-import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.util.ThreadPoolUtils;
-import org.apache.activemq.util.TimeUtils;
-import org.apache.activemq.util.URISupport;
+import org.apache.activemq.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import java.io.*;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
/**
* Manages the lifecycle of an ActiveMQ Broker. A BrokerService consists of a
* number of transport connectors, network connectors and a bunch of properties
@@ -133,6 +84,8 @@ public class BrokerService implements Service {
public static final String LOCAL_HOST_NAME;
public static final String BROKER_VERSION;
public static final String DEFAULT_BROKER_NAME = "localhost";
+ public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
+
private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class);
private static final long serialVersionUID = 7353129142305630237L;
private boolean useJmx = true;
@@ -1660,7 +1613,9 @@ public class BrokerService implements Service {
String str = result ? "Successfully deleted" : "Failed to delete";
LOG.info(str + " temporary storage");
}
- this.tempDataStore = new PListStore();
+
+ String clazz = "org.apache.activemq.store.kahadb.plist.PListStoreImpl";
+ this.tempDataStore = (PListStore) getClass().getClassLoader().loadClass(clazz).newInstance();
this.tempDataStore.setDirectory(getTmpDataDirectory());
configureService(tempDataStore);
this.tempDataStore.start();
@@ -1892,12 +1847,8 @@ public class BrokerService implements Service {
long maxJournalFileSize = 0;
long storeLimit = usage.getStoreUsage().getLimit();
- if (adapter instanceof KahaDBPersistenceAdapter) {
- KahaDBPersistenceAdapter kahaDB = (KahaDBPersistenceAdapter) adapter;
- maxJournalFileSize = kahaDB.getJournalMaxFileLength();
- } else if (adapter instanceof AMQPersistenceAdapter) {
- AMQPersistenceAdapter amqAdapter = (AMQPersistenceAdapter) adapter;
- maxJournalFileSize = amqAdapter.getMaxFileLength();
+ if (adapter instanceof JournaledStore) {
+ maxJournalFileSize = ((JournaledStore) adapter).getJournalMaxFileLength();
}
if (storeLimit < maxJournalFileSize) {
@@ -1930,10 +1881,11 @@ public class BrokerService implements Service {
if (isPersistent()) {
long maxJournalFileSize;
- if (usage.getTempUsage().getStore() != null) {
- maxJournalFileSize = usage.getTempUsage().getStore().getJournalMaxFileLength();
+ PListStore store = usage.getTempUsage().getStore();
+ if (store != null && store instanceof JournaledStore) {
+ maxJournalFileSize = ((JournaledStore) store).getJournalMaxFileLength();
} else {
- maxJournalFileSize = org.apache.activemq.store.kahadb.disk.journal.Journal.DEFAULT_MAX_FILE_LENGTH;
+ maxJournalFileSize = DEFAULT_MAX_FILE_LENGTH;
}
if (storeLimit < maxJournalFileSize) {
@@ -2225,11 +2177,16 @@ public class BrokerService implements Service {
PersistenceAdapterFactory fac = getPersistenceFactory();
if (fac != null) {
return fac.createPersistenceAdapter();
- }else {
- KahaDBPersistenceAdapter adaptor = new KahaDBPersistenceAdapter();
- File dir = new File(getBrokerDataDirectory(),"KahaDB");
- adaptor.setDirectory(dir);
- return adaptor;
+ } else {
+ try {
+ String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter";
+ PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance();
+ File dir = new File(getBrokerDataDirectory(),"KahaDB");
+ adaptor.setDirectory(dir);
+ return adaptor;
+ } catch (Throwable e) {
+ throw IOExceptionSupport.create(e);
+ }
}
} else {
return new MemoryPersistenceAdapter();
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
index 065a1bb889..a6b4f03b0d 100644
--- a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
@@ -41,7 +41,7 @@ import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.store.kahadb.plist.PListStore;
+import org.apache.activemq.store.PListStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.Usage;
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
index 109d3abb45..573601438c 100644
--- a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
@@ -41,7 +41,7 @@ import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.store.kahadb.plist.PListStore;
+import org.apache.activemq.store.PListStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.Usage;
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
index 20a2e8f01f..e4cd9b0ddb 100644
--- a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
@@ -41,7 +41,7 @@ import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.store.kahadb.plist.PListStore;
+import org.apache.activemq.store.PListStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.Usage;
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
index ab9357e4b2..e521d4b11f 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
@@ -28,7 +28,7 @@ import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.*;
import org.apache.activemq.state.ConnectionState;
-import org.apache.activemq.store.kahadb.plist.PListStore;
+import org.apache.activemq.store.PListStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
index 864132eb45..6b1dc5a46c 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
@@ -30,9 +30,9 @@ import org.apache.activemq.broker.region.QueueMessageReference;
import org.apache.activemq.command.Message;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.store.kahadb.plist.PList;
-import org.apache.activemq.store.kahadb.plist.PListEntry;
-import org.apache.activemq.store.kahadb.plist.PListStore;
+import org.apache.activemq.store.PList;
+import org.apache.activemq.store.PListStore;
+import org.apache.activemq.store.PListEntry;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
@@ -454,7 +454,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
return diskList == null || diskList.isEmpty();
}
- protected PList getDiskList() {
+ public PList getDiskList() {
if (diskList == null) {
try {
diskList = store.getPList(name);
diff --git a/activemq-core/src/main/java/org/apache/activemq/store/JournaledStore.java b/activemq-core/src/main/java/org/apache/activemq/store/JournaledStore.java
new file mode 100644
index 0000000000..7ed783a046
--- /dev/null
+++ b/activemq-core/src/main/java/org/apache/activemq/store/JournaledStore.java
@@ -0,0 +1,8 @@
+package org.apache.activemq.store;
+
+/**
+ * @author Hiram Chirino
+ */
+public interface JournaledStore {
+ int getJournalMaxFileLength();
+}
diff --git a/activemq-core/src/main/java/org/apache/activemq/store/PList.java b/activemq-core/src/main/java/org/apache/activemq/store/PList.java
new file mode 100644
index 0000000000..cc01d7848d
--- /dev/null
+++ b/activemq-core/src/main/java/org/apache/activemq/store/PList.java
@@ -0,0 +1,41 @@
+package org.apache.activemq.store;
+
+import org.apache.activemq.util.ByteSequence;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * @author Hiram Chirino
+ */
+public interface PList {
+ void setName(String name);
+
+ String getName();
+
+ void destroy() throws IOException;
+
+ void addLast(String id, ByteSequence bs) throws IOException;
+
+ void addFirst(String id, ByteSequence bs) throws IOException;
+
+ boolean remove(String id) throws IOException;
+
+ boolean remove(long position) throws IOException;
+
+ PListEntry get(long position) throws IOException;
+
+ PListEntry getFirst() throws IOException;
+
+ PListEntry getLast() throws IOException;
+
+ boolean isEmpty();
+
+ PListIterator iterator() throws IOException;
+
+ long size();
+
+ public interface PListIterator extends Iterator {
+ void release();
+ }
+}
diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListEntry.java b/activemq-core/src/main/java/org/apache/activemq/store/PListEntry.java
similarity index 92%
rename from activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListEntry.java
rename to activemq-core/src/main/java/org/apache/activemq/store/PListEntry.java
index ca8ced555e..3a59a962d0 100644
--- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListEntry.java
+++ b/activemq-core/src/main/java/org/apache/activemq/store/PListEntry.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.store.kahadb.plist;
+package org.apache.activemq.store;
import org.apache.activemq.util.ByteSequence;
@@ -23,7 +23,7 @@ public class PListEntry {
private final ByteSequence byteSequence;
private final String entry;
- PListEntry(String entry, ByteSequence bs) {
+ public PListEntry(String entry, ByteSequence bs) {
this.entry = entry;
this.byteSequence = bs;
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/store/PListStore.java b/activemq-core/src/main/java/org/apache/activemq/store/PListStore.java
new file mode 100644
index 0000000000..844d4b32a9
--- /dev/null
+++ b/activemq-core/src/main/java/org/apache/activemq/store/PListStore.java
@@ -0,0 +1,21 @@
+package org.apache.activemq.store;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.store.kahadb.plist.PListImpl;
+
+import java.io.File;
+
+/**
+ * @author Hiram Chirino
+ */
+public interface PListStore extends Service {
+ File getDirectory();
+
+ void setDirectory(File directory);
+
+ PListImpl getPList(String name) throws Exception;
+
+ boolean removePList(String name) throws Exception;
+
+ long size();
+}
diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
index fcc461e9cc..21402df81d 100644
--- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
+++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
@@ -51,13 +51,7 @@ import org.apache.activemq.kaha.impl.async.AsyncDataManager;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.kaha.impl.index.hash.HashIndex;
import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.store.MessageStore;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.ReferenceStore;
-import org.apache.activemq.store.ReferenceStoreAdapter;
-import org.apache.activemq.store.TopicMessageStore;
-import org.apache.activemq.store.TopicReferenceStore;
-import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.*;
import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.Task;
@@ -82,7 +76,7 @@ import org.slf4j.LoggerFactory;
* @org.apache.xbean.XBean element="amqPersistenceAdapter"
*
*/
-public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware {
+public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware, JournaledStore {
private static final Logger LOG = LoggerFactory.getLogger(AMQPersistenceAdapter.class);
private Scheduler scheduler;
@@ -1117,4 +1111,9 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
// reference store send has adequate duplicate suppression
return -1;
}
+
+ @Override
+ public int getJournalMaxFileLength() {
+ return getMaxFileLength();
+ }
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
index 564a290b89..2acdb0ee72 100644
--- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
+++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
@@ -28,11 +28,7 @@ import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.broker.Locker;
-import org.apache.activemq.store.MessageStore;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.SharedFileLocker;
-import org.apache.activemq.store.TopicMessageStore;
-import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.*;
import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
@@ -50,7 +46,7 @@ import java.util.Set;
* @org.apache.xbean.XBean element="kahaDB"
*
*/
-public class KahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter {
+public class KahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter, JournaledStore {
private final KahaDBStore letter = new KahaDBStore();
/**
diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java
similarity index 93%
rename from activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java
rename to activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java
index 6c080addc0..2e3a48e053 100644
--- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java
+++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java
@@ -26,6 +26,8 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.activemq.store.PList;
+import org.apache.activemq.store.PListEntry;
import org.apache.activemq.store.kahadb.disk.index.ListIndex;
import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.disk.page.Transaction;
@@ -35,13 +37,13 @@ import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class PList extends ListIndex {
- static final Logger LOG = LoggerFactory.getLogger(PList.class);
- final PListStore store;
+public class PListImpl extends ListIndex implements PList {
+ static final Logger LOG = LoggerFactory.getLogger(PListImpl.class);
+ final PListStoreImpl store;
private String name;
Object indexLock;
- PList(PListStore store) {
+ PListImpl(PListStoreImpl store) {
this.store = store;
this.indexLock = store.getIndexLock();
setPageFile(store.getPageFile());
@@ -49,10 +51,12 @@ public class PList extends ListIndex {
setValueMarshaller(LocationMarshaller.INSTANCE);
}
+ @Override
public void setName(String name) {
this.name = name;
}
+ @Override
public String getName() {
return this.name;
}
@@ -65,6 +69,7 @@ public class PList extends ListIndex {
out.writeLong(getHeadPageId());
}
+ @Override
public synchronized void destroy() throws IOException {
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure() {
@@ -76,6 +81,7 @@ public class PList extends ListIndex {
}
}
+ @Override
public void addLast(final String id, final ByteSequence bs) throws IOException {
final Location location = this.store.write(bs, false);
synchronized (indexLock) {
@@ -87,6 +93,7 @@ public class PList extends ListIndex {
}
}
+ @Override
public void addFirst(final String id, final ByteSequence bs) throws IOException {
final Location location = this.store.write(bs, false);
synchronized (indexLock) {
@@ -98,6 +105,7 @@ public class PList extends ListIndex {
}
}
+ @Override
public boolean remove(final String id) throws IOException {
final AtomicBoolean result = new AtomicBoolean();
synchronized (indexLock) {
@@ -110,6 +118,7 @@ public class PList extends ListIndex {
return result.get();
}
+ @Override
public boolean remove(final long position) throws IOException {
final AtomicBoolean result = new AtomicBoolean();
synchronized (indexLock) {
@@ -129,6 +138,7 @@ public class PList extends ListIndex {
return result.get();
}
+ @Override
public PListEntry get(final long position) throws IOException {
PListEntry result = null;
final AtomicReference> ref = new AtomicReference>();
@@ -147,6 +157,7 @@ public class PList extends ListIndex {
return result;
}
+ @Override
public PListEntry getFirst() throws IOException {
PListEntry result = null;
final AtomicReference> ref = new AtomicReference>();
@@ -164,6 +175,7 @@ public class PList extends ListIndex {
return result;
}
+ @Override
public PListEntry getLast() throws IOException {
PListEntry result = null;
final AtomicReference> ref = new AtomicReference>();
@@ -181,19 +193,21 @@ public class PList extends ListIndex {
return result;
}
+ @Override
public boolean isEmpty() {
return size() == 0;
}
+ @Override
public PListIterator iterator() throws IOException {
- return new PListIterator();
+ return new PListIteratorImpl();
}
- public final class PListIterator implements Iterator {
+ final class PListIteratorImpl implements PListIterator {
final Iterator> iterator;
final Transaction tx;
- PListIterator() throws IOException {
+ PListIteratorImpl() throws IOException {
tx = store.pageFile.tx();
synchronized (indexLock) {
this.iterator = iterator(tx);
diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStoreImpl.java
similarity index 87%
rename from activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java
rename to activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStoreImpl.java
index f9f8d8eb9b..d1547b7615 100644
--- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java
+++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStoreImpl.java
@@ -16,42 +16,36 @@
*/
package org.apache.activemq.store.kahadb.plist;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
-import org.apache.activemq.thread.Scheduler;
-import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.util.ServiceSupport;
+import org.apache.activemq.store.JournaledStore;
+import org.apache.activemq.store.PList;
+import org.apache.activemq.store.PListStore;
import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.disk.page.Page;
import org.apache.activemq.store.kahadb.disk.page.PageFile;
import org.apache.activemq.store.kahadb.disk.page.Transaction;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.LockFile;
import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
+import org.apache.activemq.thread.Scheduler;
+import org.apache.activemq.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.Map.Entry;
+
/**
* @org.apache.xbean.XBean
*/
-public class PListStore extends ServiceSupport implements BrokerServiceAware, Runnable {
- static final Logger LOG = LoggerFactory.getLogger(PListStore.class);
+public class PListStoreImpl extends ServiceSupport implements BrokerServiceAware, Runnable, PListStore, JournaledStore {
+ static final Logger LOG = LoggerFactory.getLogger(PListStoreImpl.class);
private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
static final int CLOSED_STATE = 1;
@@ -70,7 +64,7 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
// private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
MetaData metaData = new MetaData(this);
final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
- Map persistentLists = new HashMap();
+ Map persistentLists = new HashMap();
final Object indexLock = new Object();
private Scheduler scheduler;
private long cleanupInterval = 30000;
@@ -122,16 +116,16 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
}
protected class MetaData {
- protected MetaData(PListStore store) {
+ protected MetaData(PListStoreImpl store) {
this.store = store;
}
- private final PListStore store;
+ private final PListStoreImpl store;
Page page;
- BTreeIndex lists;
+ BTreeIndex lists;
void createIndexes(Transaction tx) throws IOException {
- this.lists = new BTreeIndex(pageFile, tx.allocate().getPageId());
+ this.lists = new BTreeIndex(pageFile, tx.allocate().getPageId());
}
void load(Transaction tx) throws IOException {
@@ -140,16 +134,16 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
this.lists.load(tx);
}
- void loadLists(Transaction tx, Map lists) throws IOException {
- for (Iterator> i = this.lists.iterator(tx); i.hasNext();) {
- Entry entry = i.next();
+ void loadLists(Transaction tx, Map lists) throws IOException {
+ for (Iterator> i = this.lists.iterator(tx); i.hasNext();) {
+ Entry entry = i.next();
entry.getValue().load(tx);
lists.put(entry.getKey(), entry.getValue());
}
}
public void read(DataInput is) throws IOException {
- this.lists = new BTreeIndex(pageFile, is.readLong());
+ this.lists = new BTreeIndex(pageFile, is.readLong());
this.lists.setKeyMarshaller(StringMarshaller.INSTANCE);
this.lists.setValueMarshaller(new PListMarshaller(this.store));
}
@@ -160,9 +154,9 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
}
class MetaDataMarshaller extends VariableMarshaller {
- private final PListStore store;
+ private final PListStoreImpl store;
- MetaDataMarshaller(PListStore store) {
+ MetaDataMarshaller(PListStoreImpl store) {
this.store = store;
}
public MetaData readPayload(DataInput dataIn) throws IOException {
@@ -176,18 +170,18 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
}
}
- class PListMarshaller extends VariableMarshaller {
- private final PListStore store;
- PListMarshaller(PListStore store) {
+ class PListMarshaller extends VariableMarshaller {
+ private final PListStoreImpl store;
+ PListMarshaller(PListStoreImpl store) {
this.store = store;
}
- public PList readPayload(DataInput dataIn) throws IOException {
- PList result = new PList(this.store);
+ public PListImpl readPayload(DataInput dataIn) throws IOException {
+ PListImpl result = new PListImpl(this.store);
result.read(dataIn);
return result;
}
- public void writePayload(PList list, DataOutput dataOut) throws IOException {
+ public void writePayload(PListImpl list, DataOutput dataOut) throws IOException {
list.write(dataOut);
}
}
@@ -196,10 +190,12 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
return this.journal;
}
+ @Override
public File getDirectory() {
return directory;
}
+ @Override
public void setDirectory(File directory) {
this.directory = directory;
}
@@ -217,16 +213,17 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
}
}
- public PList getPList(final String name) throws Exception {
+ @Override
+ public PListImpl getPList(final String name) throws Exception {
if (!isStarted()) {
throw new IllegalStateException("Not started");
}
intialize();
synchronized (indexLock) {
synchronized (this) {
- PList result = this.persistentLists.get(name);
+ PListImpl result = this.persistentLists.get(name);
if (result == null) {
- final PList pl = new PList(this);
+ final PListImpl pl = new PListImpl(this);
pl.setName(name);
getPageFile().tx().execute(new Transaction.Closure() {
public void execute(Transaction tx) throws IOException {
@@ -238,7 +235,7 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
result = pl;
this.persistentLists.put(name, pl);
}
- final PList toLoad = result;
+ final PListImpl toLoad = result;
getPageFile().tx().execute(new Transaction.Closure() {
public void execute(Transaction tx) throws IOException {
toLoad.load(tx);
@@ -250,6 +247,7 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
}
}
+ @Override
public boolean removePList(final String name) throws Exception {
boolean result = false;
synchronized (indexLock) {
@@ -312,7 +310,7 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
if (cleanupInterval > 0) {
if (scheduler == null) {
- scheduler = new Scheduler(PListStore.class.getSimpleName());
+ scheduler = new Scheduler(PListStoreImpl.class.getSimpleName());
scheduler.start();
}
scheduler.executePeriodically(this, cleanupInterval);
@@ -334,12 +332,12 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
@Override
protected synchronized void doStop(ServiceStopper stopper) throws Exception {
if (scheduler != null) {
- if (PListStore.class.getSimpleName().equals(scheduler.getName())) {
+ if (PListStoreImpl.class.getSimpleName().equals(scheduler.getName())) {
scheduler.stop();
scheduler = null;
}
}
- for (PList pl : this.persistentLists.values()) {
+ for (PListImpl pl : this.persistentLists.values()) {
pl.unload(null);
}
if (this.pageFile != null) {
@@ -372,13 +370,13 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
iterator.remove();
}
}
- List plists = null;
+ List plists = null;
synchronized (indexLock) {
synchronized (this) {
- plists = new ArrayList(persistentLists.values());
+ plists = new ArrayList(persistentLists.values());
}
}
- for (PList list : plists) {
+ for (PListImpl list : plists) {
list.claimFileLocations(candidates);
if (isStopping()) {
return;
diff --git a/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java b/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java
index ff80f2fec2..ae9f6fb514 100755
--- a/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java
+++ b/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java
@@ -20,8 +20,8 @@ import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.activemq.Service;
+import org.apache.activemq.store.PListStore;
import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.kahadb.plist.PListStore;
/**
* Holder for Usage instances for memory, store and temp files Main use case is
diff --git a/activemq-core/src/main/java/org/apache/activemq/usage/TempUsage.java b/activemq-core/src/main/java/org/apache/activemq/usage/TempUsage.java
index 7a8e24e24a..7ce91b391b 100755
--- a/activemq-core/src/main/java/org/apache/activemq/usage/TempUsage.java
+++ b/activemq-core/src/main/java/org/apache/activemq/usage/TempUsage.java
@@ -16,8 +16,8 @@
*/
package org.apache.activemq.usage;
-import org.apache.activemq.store.kahadb.plist.PListStore;
+import org.apache.activemq.store.PListStore;
/**
* Used to keep track of how much of something is being used so that a
diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java
new file mode 100644
index 0000000000..5a82b533c8
--- /dev/null
+++ b/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.IndirectMessageReference;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.QueueMessageReference;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.PList;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.store.kahadb.disk.page.PageFile;
+import org.apache.activemq.util.ByteSequence;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class FilePendingMessageCursorTestSupport {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursorTestSupport.class);
+ protected BrokerService brokerService;
+ protected FilePendingMessageCursor underTest;
+
+ @After
+ public void stopBroker() throws Exception {
+ if (brokerService != null) {
+ brokerService.getTempDataStore().stop();
+ }
+ }
+
+ private void createBrokerWithTempStoreLimit() throws Exception {
+ brokerService = new BrokerService();
+ SystemUsage usage = brokerService.getSystemUsage();
+ usage.getTempUsage().setLimit(1025*1024*15);
+
+ // put something in the temp store to on demand initialise it
+ PList dud = brokerService.getTempDataStore().getPList("dud");
+ dud.addFirst("A", new ByteSequence("A".getBytes()));
+ }
+
+ @Test
+ public void testAddToEmptyCursorWhenTempStoreIsFull() throws Exception {
+ createBrokerWithTempStoreLimit();
+ SystemUsage usage = brokerService.getSystemUsage();
+ assertTrue("temp store is full: %" + usage.getTempUsage().getPercentUsage(), usage.getTempUsage().isFull());
+
+ underTest = new FilePendingMessageCursor(brokerService.getBroker(), "test", false);
+ underTest.setSystemUsage(usage);
+
+ // ok to add
+ underTest.addMessageLast(QueueMessageReference.NULL_MESSAGE);
+
+ assertFalse("cursor is not full", underTest.isFull());
+ }
+
+}
diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java
index a6c0276ba3..066fb07d2b 100644
--- a/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java
@@ -37,6 +37,7 @@ import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.StoreUsage;
import org.apache.activemq.usage.SystemUsage;
@@ -157,7 +158,7 @@ public class TempStorageBlockedBrokerTest extends TestSupport {
+ broker.getSystemUsage().getTempUsage().getUsage());
// do a cleanup
- broker.getTempDataStore().run();
+ ((PListStoreImpl)broker.getTempDataStore()).run();
LOG.info("Subscrition Usage: " + tempUsageBySubscription + ", endUsage: "
+ broker.getSystemUsage().getTempUsage().getUsage());
diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageConfigBrokerTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageConfigBrokerTest.java
index d236ecf064..1061346c1a 100644
--- a/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageConfigBrokerTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageConfigBrokerTest.java
@@ -38,6 +38,7 @@ import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;
@@ -164,7 +165,7 @@ public class TempStorageConfigBrokerTest {
broker.getSystemUsage().setSendFailIfNoSpace(true);
broker.getSystemUsage().getMemoryUsage().setLimit(1048576);
broker.getSystemUsage().getTempUsage().setLimit(2*1048576);
- broker.getSystemUsage().getTempUsage().getStore().setJournalMaxFileLength(2*1048576);
+ ((PListStoreImpl)broker.getSystemUsage().getTempUsage().getStore()).setJournalMaxFileLength(2 * 1048576);
broker.getSystemUsage().getStoreUsage().setLimit(20*1048576);
PolicyEntry defaultPolicy = new PolicyEntry();
diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java
index d1ba331232..36dafaffa1 100644
--- a/activemq-core/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java
@@ -41,7 +41,7 @@ import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.store.kahadb.plist.PListStore;
+import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
@@ -148,7 +148,7 @@ public class TempStoreDataCleanupTest {
LOG.info("MemoryUseage before awaiting temp store cleanup = " + broker.getAdminView().getMemoryPercentUsage());
- final PListStore pa = broker.getTempDataStore();
+ final PListStoreImpl pa = (PListStoreImpl) broker.getTempDataStore();
assertTrue("only one journal file should be left: " + pa.getJournal().getFileMap().size(),
Wait.waitFor(new Wait.Condition() {
diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTest.java b/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java
similarity index 51%
rename from activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTest.java
rename to activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java
index c57cee7b91..139b36f4e8 100644
--- a/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java
@@ -1,81 +1,23 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.region.cursors;
+package org.apache.activemq.store.kahadb.plist;
import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.DestinationStatistics;
-import org.apache.activemq.broker.region.IndirectMessageReference;
-import org.apache.activemq.broker.region.MessageReference;
-import org.apache.activemq.broker.region.Queue;
-import org.apache.activemq.broker.region.QueueMessageReference;
+import org.apache.activemq.broker.region.*;
+import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.FilePendingMessageCursorTestSupport;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.MessageId;
-import org.apache.activemq.store.kahadb.plist.PList;
-import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.store.kahadb.disk.page.PageFile;
-import org.apache.activemq.util.ByteSequence;
-import org.junit.After;
+import org.apache.activemq.usage.SystemUsage;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-public class FilePendingMessageCursorTest {
- private static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursorTest.class);
- BrokerService brokerService;
- FilePendingMessageCursor underTest;
-
- @After
- public void stopBroker() throws Exception {
- if (brokerService != null) {
- brokerService.getTempDataStore().stop();
- }
- }
-
- private void createBrokerWithTempStoreLimit() throws Exception {
- brokerService = new BrokerService();
- SystemUsage usage = brokerService.getSystemUsage();
- usage.getTempUsage().setLimit(1025*1024*15);
-
- // put something in the temp store to on demand initialise it
- PList dud = brokerService.getTempDataStore().getPList("dud");
- dud.addFirst("A", new ByteSequence("A".getBytes()));
- }
-
- @Test
- public void testAddToEmptyCursorWhenTempStoreIsFull() throws Exception {
- createBrokerWithTempStoreLimit();
- SystemUsage usage = brokerService.getSystemUsage();
- assertTrue("temp store is full: %" + usage.getTempUsage().getPercentUsage(), usage.getTempUsage().isFull());
-
- underTest = new FilePendingMessageCursor(brokerService.getBroker(), "test", false);
- underTest.setSystemUsage(usage);
-
- // ok to add
- underTest.addMessageLast(QueueMessageReference.NULL_MESSAGE);
-
- assertFalse("cursor is not full", underTest.isFull());
- }
+/**
+ * @author Hiram Chirino
+ */
+public class KahaDBFilePendingMessageCursorTest extends FilePendingMessageCursorTestSupport {
@Test
public void testAddRemoveAddIndexSize() throws Exception {
@@ -89,7 +31,7 @@ public class FilePendingMessageCursorTest {
underTest.setSystemUsage(usage);
LOG.info("start");
- final PageFile pageFile = underTest.getDiskList().getPageFile();
+ final PageFile pageFile = ((PListImpl)underTest.getDiskList()).getPageFile();
LOG.info("page count: " +pageFile.getPageCount());
LOG.info("free count: " + pageFile.getFreePageCount());
LOG.info("content size: " +pageFile.getPageContentSize());
@@ -132,4 +74,6 @@ public class FilePendingMessageCursorTest {
LOG.info("content size: " + pageFile.getPageContentSize());
assertEquals("expected page usage", initialPageCount -1, pageFile.getPageCount() - pageFile.getFreePageCount() );
}
+
+
}
diff --git a/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java b/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
index 8fb60ed4ab..011a094ef5 100644
--- a/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
@@ -31,6 +31,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import org.apache.activemq.store.PList;
+import org.apache.activemq.store.PListEntry;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ByteSequence;
import org.junit.After;
@@ -41,8 +43,8 @@ import org.slf4j.LoggerFactory;
public class PListTest {
static final Logger LOG = LoggerFactory.getLogger(PListTest.class);
- private PListStore store;
- private PList plist;
+ private PListStoreImpl store;
+ private PListImpl plist;
final ByteSequence payload = new ByteSequence(new byte[400]);
final String idSeed = new String("Seed" + new byte[1024]);
final Vector exceptions = new Vector();
@@ -173,7 +175,7 @@ public class PListTest {
store.stop();
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
- store = new PListStore();
+ store = new PListStoreImpl();
store.setCleanupInterval(400);
store.setDirectory(directory);
store.setJournalMaxFileLength(1024*5);
@@ -263,7 +265,7 @@ public class PListTest {
store.stop();
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
- store = new PListStore();
+ store = new PListStoreImpl();
store.setDirectory(directory);
store.start();
@@ -294,7 +296,7 @@ public class PListTest {
store.stop();
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
- store = new PListStore();
+ store = new PListStoreImpl();
store.setDirectory(directory);
store.start();
@@ -312,7 +314,7 @@ public class PListTest {
store.stop();
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
- store = new PListStore();
+ store = new PListStoreImpl();
store.setDirectory(directory);
store.setJournalMaxFileLength(1024*5);
store.setCleanupInterval(5000);
@@ -394,7 +396,7 @@ public class PListTest {
store.stop();
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
- store = new PListStore();
+ store = new PListStoreImpl();
store.setIndexEnablePageCaching(enablePageCache);
store.setIndexPageSize(2*1024);
store.setDirectory(directory);
@@ -443,7 +445,7 @@ public class PListTest {
store.stop();
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
- store = new PListStore();
+ store = new PListStoreImpl();
store.setIndexPageSize(2*1024);
store.setJournalMaxFileLength(1024*1024);
store.setDirectory(directory);
@@ -502,7 +504,7 @@ public class PListTest {
public void run() {
final String threadName = Thread.currentThread().getName();
try {
- PList plist = null;
+ PListImpl plist = null;
switch (task) {
case CREATE:
Thread.currentThread().setName("C:"+id);
@@ -625,7 +627,7 @@ public class PListTest {
}
protected void startStore(File directory) throws Exception {
- store = new PListStore();
+ store = new PListStoreImpl();
store.setDirectory(directory);
store.start();
plist = store.getPList("main");