git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@901188 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2010-01-20 13:27:05 +00:00
parent 0a3c0e5300
commit ee55abb921
20 changed files with 1495 additions and 106 deletions

View File

@ -32,7 +32,7 @@ import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store; import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.usage.Usage; import org.apache.activemq.usage.Usage;
/** /**
@ -266,7 +266,7 @@ public interface Broker extends Region, Service {
/** /**
* @return the temp data store * @return the temp data store
*/ */
Store getTempDataStore(); PListStore getTempDataStore();
/** /**
* @return the URI that can be used to connect to the local Broker * @return the URI that can be used to connect to the local Broker

View File

@ -38,7 +38,7 @@ import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response; import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store; import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.usage.Usage; import org.apache.activemq.usage.Usage;
/** /**
@ -230,7 +230,7 @@ public class BrokerFilter implements Broker {
next.setAdminConnectionContext(adminConnectionContext); next.setAdminConnectionContext(adminConnectionContext);
} }
public Store getTempDataStore() { public PListStore getTempDataStore() {
return next.getTempDataStore(); return next.getTempDataStore();
} }

View File

@ -64,8 +64,6 @@ import org.apache.activemq.broker.region.virtual.VirtualTopic;
import org.apache.activemq.broker.scheduler.SchedulerBroker; import org.apache.activemq.broker.scheduler.SchedulerBroker;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerId;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreFactory;
import org.apache.activemq.network.ConnectionFilter; import org.apache.activemq.network.ConnectionFilter;
import org.apache.activemq.network.DiscoveryNetworkConnector; import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.network.NetworkConnector;
@ -77,6 +75,7 @@ import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.PersistenceAdapterFactory; import org.apache.activemq.store.PersistenceAdapterFactory;
import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory; import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter; import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportFactory;
@ -160,7 +159,7 @@ public class BrokerService implements Service {
private BrokerId brokerId; private BrokerId brokerId;
private DestinationInterceptor[] destinationInterceptors; private DestinationInterceptor[] destinationInterceptors;
private ActiveMQDestination[] destinations; private ActiveMQDestination[] destinations;
private Store tempDataStore; private PListStore tempDataStore;
private int persistenceThreadPriority = Thread.MAX_PRIORITY; private int persistenceThreadPriority = Thread.MAX_PRIORITY;
private boolean useLocalHostBrokerName; private boolean useLocalHostBrokerName;
private final CountDownLatch stoppedLatch = new CountDownLatch(1); private final CountDownLatch stoppedLatch = new CountDownLatch(1);
@ -538,7 +537,7 @@ public class BrokerService implements Service {
stopper.stop(broker); stopper.stop(broker);
} }
if (tempDataStore != null) { if (tempDataStore != null) {
tempDataStore.close(); tempDataStore.stop();
} }
stopper.stop(persistenceAdapter); stopper.stop(persistenceAdapter);
if (isUseJmx()) { if (isUseJmx()) {
@ -1330,7 +1329,7 @@ public class BrokerService implements Service {
/** /**
* @return the tempDataStore * @return the tempDataStore
*/ */
public synchronized Store getTempDataStore() { public synchronized PListStore getTempDataStore() {
if (tempDataStore == null) { if (tempDataStore == null) {
if (!isPersistent()) { if (!isPersistent()) {
return null; return null;
@ -1355,8 +1354,10 @@ public class BrokerService implements Service {
String str = result ? "Successfully deleted" : "Failed to delete"; String str = result ? "Successfully deleted" : "Failed to delete";
LOG.info(str + " temporary storage"); LOG.info(str + " temporary storage");
} }
tempDataStore = StoreFactory.open(getTmpDataDirectory(), "rw"); this.tempDataStore = new PListStore();
} catch (IOException e) { this.tempDataStore.setDirectory(getTmpDataDirectory());
this.tempDataStore.start();
} catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
@ -1367,7 +1368,7 @@ public class BrokerService implements Service {
* @param tempDataStore * @param tempDataStore
* the tempDataStore to set * the tempDataStore to set
*/ */
public void setTempDataStore(Store tempDataStore) { public void setTempDataStore(PListStore tempDataStore) {
this.tempDataStore = tempDataStore; this.tempDataStore = tempDataStore;
} }

View File

@ -39,7 +39,7 @@ import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response; import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store; import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.usage.Usage; import org.apache.activemq.usage.Usage;
/** /**
@ -225,7 +225,7 @@ public class EmptyBroker implements Broker {
return null; return null;
} }
public Store getTempDataStore() { public PListStore getTempDataStore() {
return null; return null;
} }

View File

@ -39,7 +39,7 @@ import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response; import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store; import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.usage.Usage; import org.apache.activemq.usage.Usage;
/** /**
@ -233,7 +233,7 @@ public class ErrorBroker implements Broker {
throw new BrokerStoppedException(this.message); throw new BrokerStoppedException(this.message);
} }
public Store getTempDataStore() { public PListStore getTempDataStore() {
throw new BrokerStoppedException(this.message); throw new BrokerStoppedException(this.message);
} }

View File

@ -20,7 +20,6 @@ import java.net.URI;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
@ -40,7 +39,7 @@ import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response; import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store; import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.usage.Usage; import org.apache.activemq.usage.Usage;
/** /**
@ -243,7 +242,7 @@ public class MutableBrokerFilter implements Broker {
return getNext().messagePull(context, pull); return getNext().messagePull(context, pull);
} }
public Store getTempDataStore() { public PListStore getTempDataStore() {
return getNext().getTempDataStore(); return getNext().getTempDataStore();
} }

View File

@ -25,7 +25,6 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import javax.jms.InvalidClientIDException; import javax.jms.InvalidClientIDException;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
@ -53,8 +52,8 @@ import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response; import org.apache.activemq.command.Response;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.state.ConnectionState; import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.BrokerSupport; import org.apache.activemq.util.BrokerSupport;
@ -112,12 +111,14 @@ public class RegionBroker extends EmptyBroker {
tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory, destinationFactory); tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
} }
@Override
public Map<ActiveMQDestination, Destination> getDestinationMap() { public Map<ActiveMQDestination, Destination> getDestinationMap() {
Map<ActiveMQDestination, Destination> answer = getQueueRegion().getDestinationMap(); Map<ActiveMQDestination, Destination> answer = getQueueRegion().getDestinationMap();
answer.putAll(getTopicRegion().getDestinationMap()); answer.putAll(getTopicRegion().getDestinationMap());
return answer; return answer;
} }
@Override
public Set <Destination> getDestinations(ActiveMQDestination destination) { public Set <Destination> getDestinations(ActiveMQDestination destination) {
switch (destination.getDestinationType()) { switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE: case ActiveMQDestination.QUEUE_TYPE:
@ -133,6 +134,7 @@ public class RegionBroker extends EmptyBroker {
} }
} }
@Override
public Broker getAdaptor(Class type) { public Broker getAdaptor(Class type) {
if (type.isInstance(this)) { if (type.isInstance(this)) {
return this; return this;
@ -172,6 +174,7 @@ public class RegionBroker extends EmptyBroker {
return new QueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); return new QueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
} }
@Override
public void start() throws Exception { public void start() throws Exception {
((TopicRegion)topicRegion).setKeepDurableSubsActive(keepDurableSubsActive); ((TopicRegion)topicRegion).setKeepDurableSubsActive(keepDurableSubsActive);
started = true; started = true;
@ -181,6 +184,7 @@ public class RegionBroker extends EmptyBroker {
tempTopicRegion.start(); tempTopicRegion.start();
} }
@Override
public void stop() throws Exception { public void stop() throws Exception {
started = false; started = false;
ServiceStopper ss = new ServiceStopper(); ServiceStopper ss = new ServiceStopper();
@ -197,6 +201,7 @@ public class RegionBroker extends EmptyBroker {
return brokerService != null ? brokerService.getDestinationPolicy() : null; return brokerService != null ? brokerService.getDestinationPolicy() : null;
} }
@Override
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
String clientId = info.getClientId(); String clientId = info.getClientId();
if (clientId == null) { if (clientId == null) {
@ -224,6 +229,7 @@ public class RegionBroker extends EmptyBroker {
connections.add(context.getConnection()); connections.add(context.getConnection());
} }
@Override
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
String clientId = info.getClientId(); String clientId = info.getClientId();
if (clientId == null) { if (clientId == null) {
@ -247,6 +253,7 @@ public class RegionBroker extends EmptyBroker {
return connectionId == connectionId2 || (connectionId != null && connectionId.equals(connectionId2)); return connectionId == connectionId2 || (connectionId != null && connectionId.equals(connectionId2));
} }
@Override
public Connection[] getClients() throws Exception { public Connection[] getClients() throws Exception {
ArrayList<Connection> l = new ArrayList<Connection>(connections); ArrayList<Connection> l = new ArrayList<Connection>(connections);
Connection rc[] = new Connection[l.size()]; Connection rc[] = new Connection[l.size()];
@ -254,6 +261,7 @@ public class RegionBroker extends EmptyBroker {
return rc; return rc;
} }
@Override
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
Destination answer; Destination answer;
@ -285,6 +293,7 @@ public class RegionBroker extends EmptyBroker {
} }
@Override
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
if (destinations.containsKey(destination)) { if (destinations.containsKey(destination)) {
@ -309,16 +318,19 @@ public class RegionBroker extends EmptyBroker {
} }
@Override
public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
addDestination(context, info.getDestination()); addDestination(context, info.getDestination());
} }
@Override
public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
removeDestination(context, info.getDestination(), info.getTimeout()); removeDestination(context, info.getDestination(), info.getTimeout());
} }
@Override
public ActiveMQDestination[] getDestinations() throws Exception { public ActiveMQDestination[] getDestinations() throws Exception {
ArrayList<ActiveMQDestination> l; ArrayList<ActiveMQDestination> l;
@ -329,6 +341,7 @@ public class RegionBroker extends EmptyBroker {
return rc; return rc;
} }
@Override
public void addProducer(ConnectionContext context, ProducerInfo info) public void addProducer(ConnectionContext context, ProducerInfo info)
throws Exception { throws Exception {
ActiveMQDestination destination = info.getDestination(); ActiveMQDestination destination = info.getDestination();
@ -353,6 +366,7 @@ public class RegionBroker extends EmptyBroker {
} }
} }
@Override
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
ActiveMQDestination destination = info.getDestination(); ActiveMQDestination destination = info.getDestination();
if (destination != null) { if (destination != null) {
@ -373,6 +387,7 @@ public class RegionBroker extends EmptyBroker {
} }
} }
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
ActiveMQDestination destination = info.getDestination(); ActiveMQDestination destination = info.getDestination();
switch (destination.getDestinationType()) { switch (destination.getDestinationType()) {
@ -393,6 +408,7 @@ public class RegionBroker extends EmptyBroker {
} }
} }
@Override
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
ActiveMQDestination destination = info.getDestination(); ActiveMQDestination destination = info.getDestination();
switch (destination.getDestinationType()) { switch (destination.getDestinationType()) {
@ -413,10 +429,12 @@ public class RegionBroker extends EmptyBroker {
} }
} }
@Override
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
topicRegion.removeSubscription(context, info); topicRegion.removeSubscription(context, info);
} }
@Override
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
message.setBrokerInTime(System.currentTimeMillis()); message.setBrokerInTime(System.currentTimeMillis());
if (producerExchange.isMutable() || producerExchange.getRegion() == null) { if (producerExchange.isMutable() || producerExchange.getRegion() == null) {
@ -445,6 +463,7 @@ public class RegionBroker extends EmptyBroker {
producerExchange.getRegion().send(producerExchange, message); producerExchange.getRegion().send(producerExchange, message);
} }
@Override
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
if (consumerExchange.isWildcard() || consumerExchange.getRegion() == null) { if (consumerExchange.isWildcard() || consumerExchange.getRegion() == null) {
ActiveMQDestination destination = ack.getDestination(); ActiveMQDestination destination = ack.getDestination();
@ -470,6 +489,7 @@ public class RegionBroker extends EmptyBroker {
consumerExchange.getRegion().acknowledge(consumerExchange, ack); consumerExchange.getRegion().acknowledge(consumerExchange, ack);
} }
@Override
public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
ActiveMQDestination destination = pull.getDestination(); ActiveMQDestination destination = pull.getDestination();
switch (destination.getDestinationType()) { switch (destination.getDestinationType()) {
@ -489,35 +509,43 @@ public class RegionBroker extends EmptyBroker {
} }
} }
@Override
public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
throw new IllegalAccessException("Transaction operation not implemented by this broker."); throw new IllegalAccessException("Transaction operation not implemented by this broker.");
} }
@Override
public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
throw new IllegalAccessException("Transaction operation not implemented by this broker."); throw new IllegalAccessException("Transaction operation not implemented by this broker.");
} }
@Override
public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
throw new IllegalAccessException("Transaction operation not implemented by this broker."); throw new IllegalAccessException("Transaction operation not implemented by this broker.");
} }
@Override
public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
throw new IllegalAccessException("Transaction operation not implemented by this broker."); throw new IllegalAccessException("Transaction operation not implemented by this broker.");
} }
@Override
public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
throw new IllegalAccessException("Transaction operation not implemented by this broker."); throw new IllegalAccessException("Transaction operation not implemented by this broker.");
} }
@Override
public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
throw new IllegalAccessException("Transaction operation not implemented by this broker."); throw new IllegalAccessException("Transaction operation not implemented by this broker.");
} }
@Override
public void gc() { public void gc() {
queueRegion.gc(); queueRegion.gc();
topicRegion.gc(); topicRegion.gc();
} }
@Override
public BrokerId getBrokerId() { public BrokerId getBrokerId() {
if (brokerId == null) { if (brokerId == null) {
brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId()); brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId());
@ -529,6 +557,7 @@ public class RegionBroker extends EmptyBroker {
this.brokerId = brokerId; this.brokerId = brokerId;
} }
@Override
public String getBrokerName() { public String getBrokerName() {
if (brokerName == null) { if (brokerName == null) {
try { try {
@ -552,22 +581,26 @@ public class RegionBroker extends EmptyBroker {
return new JMSException("Unknown destination type: " + destination.getDestinationType()); return new JMSException("Unknown destination type: " + destination.getDestinationType());
} }
@Override
public synchronized void addBroker(Connection connection, BrokerInfo info) { public synchronized void addBroker(Connection connection, BrokerInfo info) {
brokerInfos.add(info); brokerInfos.add(info);
} }
@Override
public synchronized void removeBroker(Connection connection, BrokerInfo info) { public synchronized void removeBroker(Connection connection, BrokerInfo info) {
if (info != null) { if (info != null) {
brokerInfos.remove(info); brokerInfos.remove(info);
} }
} }
@Override
public synchronized BrokerInfo[] getPeerBrokerInfos() { public synchronized BrokerInfo[] getPeerBrokerInfos() {
BrokerInfo[] result = new BrokerInfo[brokerInfos.size()]; BrokerInfo[] result = new BrokerInfo[brokerInfos.size()];
result = brokerInfos.toArray(result); result = brokerInfos.toArray(result);
return result; return result;
} }
@Override
public void preProcessDispatch(MessageDispatch messageDispatch) { public void preProcessDispatch(MessageDispatch messageDispatch) {
Message message = messageDispatch.getMessage(); Message message = messageDispatch.getMessage();
if (message != null) { if (message != null) {
@ -580,9 +613,11 @@ public class RegionBroker extends EmptyBroker {
} }
} }
@Override
public void postProcessDispatch(MessageDispatch messageDispatch) { public void postProcessDispatch(MessageDispatch messageDispatch) {
} }
@Override
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
ActiveMQDestination destination = messageDispatchNotification.getDestination(); ActiveMQDestination destination = messageDispatchNotification.getDestination();
switch (destination.getDestinationType()) { switch (destination.getDestinationType()) {
@ -607,10 +642,12 @@ public class RegionBroker extends EmptyBroker {
return brokerService.isSlave(); return brokerService.isSlave();
} }
@Override
public boolean isStopped() { public boolean isStopped() {
return !started; return !started;
} }
@Override
public Set<ActiveMQDestination> getDurableDestinations() { public Set<ActiveMQDestination> getDurableDestinations() {
return destinationFactory.getDestinations(); return destinationFactory.getDestinations();
} }
@ -634,10 +671,12 @@ public class RegionBroker extends EmptyBroker {
return destinationInterceptor; return destinationInterceptor;
} }
@Override
public ConnectionContext getAdminConnectionContext() { public ConnectionContext getAdminConnectionContext() {
return adminConnectionContext; return adminConnectionContext;
} }
@Override
public void setAdminConnectionContext(ConnectionContext adminConnectionContext) { public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
this.adminConnectionContext = adminConnectionContext; this.adminConnectionContext = adminConnectionContext;
} }
@ -646,21 +685,26 @@ public class RegionBroker extends EmptyBroker {
return connectionStates; return connectionStates;
} }
public Store getTempDataStore() { @Override
public PListStore getTempDataStore() {
return brokerService.getTempDataStore(); return brokerService.getTempDataStore();
} }
@Override
public URI getVmConnectorURI() { public URI getVmConnectorURI() {
return brokerService.getVmConnectorURI(); return brokerService.getVmConnectorURI();
} }
@Override
public void brokerServiceStarted() { public void brokerServiceStarted() {
} }
@Override
public BrokerService getBrokerService() { public BrokerService getBrokerService() {
return brokerService; return brokerService;
} }
@Override
public boolean isExpired(MessageReference messageReference) { public boolean isExpired(MessageReference messageReference) {
boolean expired = false; boolean expired = false;
if (messageReference.isExpired()) { if (messageReference.isExpired()) {
@ -688,6 +732,7 @@ public class RegionBroker extends EmptyBroker {
} }
@Override
public void messageExpired(ConnectionContext context, MessageReference node) { public void messageExpired(ConnectionContext context, MessageReference node) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Message expired " + node); LOG.debug("Message expired " + node);
@ -695,6 +740,7 @@ public class RegionBroker extends EmptyBroker {
getRoot().sendToDeadLetterQueue(context, node); getRoot().sendToDeadLetterQueue(context, node);
} }
@Override
public void sendToDeadLetterQueue(ConnectionContext context, public void sendToDeadLetterQueue(ConnectionContext context,
MessageReference node){ MessageReference node){
try{ try{
@ -739,6 +785,7 @@ public class RegionBroker extends EmptyBroker {
} }
} }
@Override
public Broker getRoot() { public Broker getRoot() {
try { try {
return getBrokerService().getBroker(); return getBrokerService().getBroker();
@ -751,6 +798,7 @@ public class RegionBroker extends EmptyBroker {
/** /**
* @return the broker sequence id * @return the broker sequence id
*/ */
@Override
public long getBrokerSequenceId() { public long getBrokerSequenceId() {
synchronized(sequenceGenerator) { synchronized(sequenceGenerator) {
return sequenceGenerator.getNextSequenceId(); return sequenceGenerator.getNextSequenceId();

View File

@ -21,7 +21,6 @@ import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
@ -29,15 +28,17 @@ import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.QueueMessageReference; import org.apache.activemq.broker.region.QueueMessageReference;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.kaha.CommandMarshaller;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.kahadb.plist.PList;
import org.apache.activemq.store.kahadb.plist.PListEntry;
import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage; import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener; import org.apache.activemq.usage.UsageListener;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.util.ByteSequence;
/** /**
* persist pending messages pending message (messages awaiting dispatch to a * persist pending messages pending message (messages awaiting dispatch to a
@ -46,31 +47,34 @@ import org.apache.commons.logging.LogFactory;
* @version $Revision$ * @version $Revision$
*/ */
public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener { public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener {
private static final Log LOG = LogFactory.getLog(FilePendingMessageCursor.class); static final Log LOG = LogFactory.getLog(FilePendingMessageCursor.class);
private static final AtomicLong NAME_COUNT = new AtomicLong(); private static final AtomicLong NAME_COUNT = new AtomicLong();
protected Broker broker; protected Broker broker;
private Store store; private final PListStore store;
private String name; private final String name;
private LinkedList<MessageReference> memoryList = new LinkedList<MessageReference>(); private LinkedList<MessageReference> memoryList = new LinkedList<MessageReference>();
private ListContainer<MessageReference> diskList; private PList diskList;
private Iterator<MessageReference> iter; private Iterator<MessageReference> iter;
private Destination regionDestination; private Destination regionDestination;
private boolean iterating; private boolean iterating;
private boolean flushRequired; private boolean flushRequired;
private AtomicBoolean started = new AtomicBoolean(); private final AtomicBoolean started = new AtomicBoolean();
private final WireFormat wireFormat = new OpenWireFormat();
/** /**
* @param broker
* @param name * @param name
* @param store * @param store
*/ */
public FilePendingMessageCursor(Broker broker,String name) { public FilePendingMessageCursor(Broker broker, String name) {
this.useCache=false; this.useCache = false;
this.broker = broker; this.broker = broker;
//the store can be null if the BrokerService has persistence // the store can be null if the BrokerService has persistence
//turned off // turned off
this.store= broker.getTempDataStore(); this.store = broker.getTempDataStore();
this.name = NAME_COUNT.incrementAndGet() + "_" + name; this.name = NAME_COUNT.incrementAndGet() + "_" + name;
} }
@Override
public void start() throws Exception { public void start() throws Exception {
if (started.compareAndSet(false, true)) { if (started.compareAndSet(false, true)) {
super.start(); super.start();
@ -80,6 +84,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
} }
} }
@Override
public void stop() throws Exception { public void stop() throws Exception {
if (started.compareAndSet(true, false)) { if (started.compareAndSet(true, false)) {
super.stop(); super.stop();
@ -92,13 +97,14 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
/** /**
* @return true if there are no pending messages * @return true if there are no pending messages
*/ */
@Override
public synchronized boolean isEmpty() { public synchronized boolean isEmpty() {
if(memoryList.isEmpty() && isDiskListEmpty()){ if (memoryList.isEmpty() && isDiskListEmpty()) {
return true; return true;
} }
for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) { for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
MessageReference node = iterator.next(); MessageReference node = iterator.next();
if (node== QueueMessageReference.NULL_MESSAGE){ if (node == QueueMessageReference.NULL_MESSAGE) {
continue; continue;
} }
if (!node.isDropped()) { if (!node.isDropped()) {
@ -109,18 +115,22 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
} }
return isDiskListEmpty(); return isDiskListEmpty();
} }
/** /**
* reset the cursor * reset the cursor
*/ */
@Override
public synchronized void reset() { public synchronized void reset() {
iterating = true; iterating = true;
last = null; last = null;
iter = isDiskListEmpty() ? memoryList.iterator() : getDiskList().listIterator(); if (isDiskListEmpty()) {
this.iter = this.memoryList.iterator();
} else {
this.iter = new DiskIterator();
}
} }
@Override
public synchronized void release() { public synchronized void release() {
iterating = false; iterating = false;
if (flushRequired) { if (flushRequired) {
@ -129,10 +139,11 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
} }
} }
@Override
public synchronized void destroy() throws Exception { public synchronized void destroy() throws Exception {
stop(); stop();
for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) { for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) {
Message node = (Message)i.next(); Message node = (Message) i.next();
node.decrementReferenceCount(); node.decrementReferenceCount();
} }
memoryList.clear(); memoryList.clear();
@ -141,26 +152,22 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
private void destroyDiskList() throws Exception { private void destroyDiskList() throws Exception {
if (!isDiskListEmpty()) { if (!isDiskListEmpty()) {
Iterator<MessageReference> iterator = diskList.iterator(); store.removePList(name);
while (iterator.hasNext()) { }
iterator.next();
iterator.remove();
}
diskList.clear();
}
store.deleteListContainer(name, "TopicSubscription");
} }
@Override
public synchronized LinkedList<MessageReference> pageInList(int maxItems) { public synchronized LinkedList<MessageReference> pageInList(int maxItems) {
LinkedList<MessageReference> result = new LinkedList<MessageReference>(); LinkedList<MessageReference> result = new LinkedList<MessageReference>();
int count = 0; int count = 0;
for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() && count < maxItems;) { for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() && count < maxItems;) {
result.add(i.next()); MessageReference ref = i.next();
result.add(ref);
count++; count++;
} }
if (count < maxItems && !isDiskListEmpty()) { if (count < maxItems && !isDiskListEmpty()) {
for (Iterator<MessageReference> i = getDiskList().iterator(); i.hasNext() && count < maxItems;) { for (Iterator<MessageReference> i = new DiskIterator(); i.hasNext() && count < maxItems;) {
Message message = (Message)i.next(); Message message = (Message) i.next();
message.setRegionDestination(regionDestination); message.setRegionDestination(regionDestination);
message.setMemoryUsage(this.getSystemUsage().getMemoryUsage()); message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
message.incrementReferenceCount(); message.incrementReferenceCount();
@ -176,12 +183,13 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
* *
* @param node * @param node
*/ */
@Override
public synchronized void addMessageLast(MessageReference node) { public synchronized void addMessageLast(MessageReference node) {
if (!node.isExpired()) { if (!node.isExpired()) {
try { try {
regionDestination = node.getMessage().getRegionDestination(); regionDestination = node.getMessage().getRegionDestination();
if (isDiskListEmpty()) { if (isDiskListEmpty()) {
if (hasSpace() || this.store==null) { if (hasSpace() || this.store == null) {
memoryList.add(node); memoryList.add(node);
node.incrementReferenceCount(); node.incrementReferenceCount();
return; return;
@ -200,11 +208,11 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
} }
} }
systemUsage.getTempUsage().waitForSpace(); systemUsage.getTempUsage().waitForSpace();
getDiskList().add(node); ByteSequence bs = getByteSequence(node.getMessage());
getDiskList().addLast(node.getMessageId().toString(), bs);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Caught an Exception adding a message: " + node LOG.error("Caught an Exception adding a message: " + node + " first to FilePendingMessageCursor ", e);
+ " first to FilePendingMessageCursor ", e);
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} else { } else {
@ -217,6 +225,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
* *
* @param node * @param node
*/ */
@Override
public synchronized void addMessageFirst(MessageReference node) { public synchronized void addMessageFirst(MessageReference node) {
if (!node.isExpired()) { if (!node.isExpired()) {
try { try {
@ -242,11 +251,11 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
} }
systemUsage.getTempUsage().waitForSpace(); systemUsage.getTempUsage().waitForSpace();
node.decrementReferenceCount(); node.decrementReferenceCount();
getDiskList().addFirst(node); ByteSequence bs = getByteSequence(node.getMessage());
getDiskList().addFirst(node.getMessageId().toString(), bs);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Caught an Exception adding a message: " + node LOG.error("Caught an Exception adding a message: " + node + " first to FilePendingMessageCursor ", e);
+ " first to FilePendingMessageCursor ", e);
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} else { } else {
@ -257,6 +266,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
/** /**
* @return true if there pending messages to dispatch * @return true if there pending messages to dispatch
*/ */
@Override
public synchronized boolean hasNext() { public synchronized boolean hasNext() {
return iter.hasNext(); return iter.hasNext();
} }
@ -264,8 +274,9 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
/** /**
* @return the next pending message * @return the next pending message
*/ */
@Override
public synchronized MessageReference next() { public synchronized MessageReference next() {
Message message = (Message)iter.next(); Message message = (Message) iter.next();
last = message; last = message;
if (!isDiskListEmpty()) { if (!isDiskListEmpty()) {
// got from disk // got from disk
@ -279,10 +290,11 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
/** /**
* remove the message at the cursor position * remove the message at the cursor position
*/ */
@Override
public synchronized void remove() { public synchronized void remove() {
iter.remove(); iter.remove();
if (last != null) { if (last != null) {
last.decrementReferenceCount(); last.decrementReferenceCount();
} }
} }
@ -290,18 +302,24 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
* @param node * @param node
* @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference) * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
*/ */
@Override
public synchronized void remove(MessageReference node) { public synchronized void remove(MessageReference node) {
if (memoryList.remove(node)) { if (memoryList.remove(node)) {
node.decrementReferenceCount(); node.decrementReferenceCount();
} }
if (!isDiskListEmpty()) { if (!isDiskListEmpty()) {
getDiskList().remove(node); try {
getDiskList().remove(node.getMessageId().toString());
} catch (IOException e) {
throw new RuntimeException(e);
}
} }
} }
/** /**
* @return the number of pending messages * @return the number of pending messages
*/ */
@Override
public synchronized int size() { public synchronized int size() {
return memoryList.size() + (isDiskListEmpty() ? 0 : getDiskList().size()); return memoryList.size() + (isDiskListEmpty() ? 0 : getDiskList().size());
} }
@ -309,31 +327,37 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
/** /**
* clear all pending messages * clear all pending messages
*/ */
@Override
public synchronized void clear() { public synchronized void clear() {
memoryList.clear(); memoryList.clear();
if (!isDiskListEmpty()) { if (!isDiskListEmpty()) {
getDiskList().clear(); try {
getDiskList().destroy();
} catch (IOException e) {
throw new RuntimeException(e);
}
} }
last=null; last = null;
} }
public synchronized boolean isFull() { @Override
public synchronized boolean isFull() {
return super.isFull() return super.isFull() || (systemUsage != null && systemUsage.getTempUsage().isFull());
|| (systemUsage != null && systemUsage.getTempUsage().isFull());
} }
@Override
public boolean hasMessagesBufferedToDeliver() { public boolean hasMessagesBufferedToDeliver() {
return !isEmpty(); return !isEmpty();
} }
@Override
public void setSystemUsage(SystemUsage usageManager) { public void setSystemUsage(SystemUsage usageManager) {
super.setSystemUsage(usageManager); super.setSystemUsage(usageManager);
} }
public void onUsageChanged(Usage usage, int oldPercentUsage, public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
int newPercentUsage) {
if (newPercentUsage >= getMemoryUsageHighWaterMark()) { if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
synchronized (this) { synchronized (this) {
flushRequired = true; flushRequired = true;
@ -347,7 +371,8 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
} }
} }
} }
@Override
public boolean isTransient() { public boolean isTransient() {
return true; return true;
} }
@ -355,7 +380,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
protected boolean isSpaceInMemoryList() { protected boolean isSpaceInMemoryList() {
return hasSpace() && isDiskListEmpty(); return hasSpace() && isDiskListEmpty();
} }
protected synchronized void expireOldMessages() { protected synchronized void expireOldMessages() {
if (!memoryList.isEmpty()) { if (!memoryList.isEmpty()) {
LinkedList<MessageReference> tmpList = new LinkedList<MessageReference>(this.memoryList); LinkedList<MessageReference> tmpList = new LinkedList<MessageReference>(this.memoryList);
@ -364,21 +389,29 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
MessageReference node = tmpList.removeFirst(); MessageReference node = tmpList.removeFirst();
if (node.isExpired()) { if (node.isExpired()) {
discard(node); discard(node);
}else { } else {
memoryList.add(node); memoryList.add(node);
} }
} }
} }
} }
protected synchronized void flushToDisk() { protected synchronized void flushToDisk() {
if (!memoryList.isEmpty()) { if (!memoryList.isEmpty()) {
while (!memoryList.isEmpty()) { while (!memoryList.isEmpty()) {
MessageReference node = memoryList.removeFirst(); MessageReference node = memoryList.removeFirst();
node.decrementReferenceCount(); node.decrementReferenceCount();
getDiskList().addLast(node); ByteSequence bs;
try {
bs = getByteSequence(node.getMessage());
getDiskList().addLast(node.getMessageId().toString(), bs);
} catch (IOException e) {
LOG.error("Failed to write to disk list", e);
throw new RuntimeException(e);
}
} }
memoryList.clear(); memoryList.clear();
} }
@ -388,19 +421,18 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
return diskList == null || diskList.isEmpty(); return diskList == null || diskList.isEmpty();
} }
protected ListContainer<MessageReference> getDiskList() { protected PList getDiskList() {
if (diskList == null) { if (diskList == null) {
try { try {
diskList = store.getListContainer(name, "TopicSubscription", true); diskList = store.getPList(name);
diskList.setMarshaller(new CommandMarshaller(new OpenWireFormat())); } catch (Exception e) {
} catch (IOException e) {
LOG.error("Caught an IO Exception getting the DiskList " + name, e); LOG.error("Caught an IO Exception getting the DiskList " + name, e);
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
return diskList; return diskList;
} }
protected void discard(MessageReference message) { protected void discard(MessageReference message) {
message.decrementReferenceCount(); message.decrementReferenceCount();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -408,4 +440,68 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
} }
broker.getRoot().sendToDeadLetterQueue(new ConnectionContext(new NonCachedMessageEvaluationContext()), message); broker.getRoot().sendToDeadLetterQueue(new ConnectionContext(new NonCachedMessageEvaluationContext()), message);
} }
protected ByteSequence getByteSequence(Message message) throws IOException {
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
return new ByteSequence(packet.data, packet.offset, packet.length);
}
protected Message getMessage(ByteSequence bs) throws IOException {
org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(bs.getData(), bs
.getOffset(), bs.getLength());
return (Message) this.wireFormat.unmarshal(packet);
}
final class DiskIterator implements Iterator<MessageReference> {
private PListEntry next = null;
private PListEntry current = null;
PList list;
DiskIterator() {
try {
this.list = getDiskList();
synchronized (this.list) {
this.current = this.list.getFirst();
this.next = this.current;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public boolean hasNext() {
return this.next != null;
}
public MessageReference next() {
this.current = next;
try {
ByteSequence bs = this.current.getByteSequence();
synchronized (this.list) {
this.current = this.list.refresh(this.current);
this.next = this.list.getNext(this.current);
}
return getMessage(bs);
} catch (IOException e) {
LOG.error("I/O error", e);
throw new RuntimeException(e);
}
}
public void remove() {
try {
synchronized (this.list) {
this.current = this.list.refresh(this.current);
this.list.remove(this.current);
}
} catch (IOException e) {
LOG.error("I/O error", e);
throw new RuntimeException(e);
}
}
}
} }

View File

@ -0,0 +1,162 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb.plist;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Page;
import org.apache.kahadb.util.VariableMarshaller;
class EntryLocation {
static final long NOT_SET = -1;
private String id;
private Page<EntryLocation> page;
private long next;
private long prev;
private Location location;
static class EntryLocationMarshaller extends VariableMarshaller<EntryLocation> {
static final EntryLocationMarshaller INSTANCE = new EntryLocationMarshaller();
public EntryLocation readPayload(DataInput dataIn) throws IOException {
EntryLocation result = new EntryLocation();
result.readExternal(dataIn);
return result;
}
public void writePayload(EntryLocation value, DataOutput dataOut) throws IOException {
value.writeExternal(dataOut);
}
}
EntryLocation(Location location) {
this.location = location;
}
EntryLocation() {
}
EntryLocation copy() {
EntryLocation result = new EntryLocation();
result.id=this.id;
result.location=this.location;
result.next=this.next;
result.prev=this.prev;
result.page=this.page;
return result;
}
void reset() {
this.id = "";
this.next = NOT_SET;
this.prev = NOT_SET;
}
public void readExternal(DataInput in) throws IOException {
this.id = in.readUTF();
this.prev = in.readLong();
this.next = in.readLong();
if (this.location == null) {
this.location = new Location();
}
this.location.readExternal(in);
}
public void writeExternal(DataOutput out) throws IOException {
out.writeUTF(this.id);
out.writeLong(this.prev);
out.writeLong(this.next);
if (this.location == null) {
this.location = new Location();
}
this.location.writeExternal(out);
}
/**
* @return the jobId
*/
String getId() {
return this.id;
}
/**
* @param id
* the id to set
*/
void setId(String id) {
this.id = id;
}
Location getLocation() {
return this.location;
}
/**
* @param location
* the location to set
*/
void setLocation(Location location) {
this.location = location;
}
/**
* @return the next
*/
long getNext() {
return this.next;
}
/**
* @param next
* the next to set
*/
void setNext(long next) {
this.next = next;
}
/**
* @return the prev
*/
long getPrev() {
return this.prev;
}
/**
* @param prev
* the prev to set
*/
void setPrev(long prev) {
this.prev = prev;
}
/**
* @return the page
*/
Page<EntryLocation> getPage() {
return this.page;
}
/**
* @param page
* the page to set
*/
void setPage(Page<EntryLocation> page) {
this.page = page;
}
}

View File

@ -0,0 +1,468 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb.plist;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.store.kahadb.plist.EntryLocation.EntryLocationMarshaller;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Page;
import org.apache.kahadb.page.Transaction;
import org.apache.kahadb.util.ByteSequence;
public class PList {
final PListStore store;
private String name;
private long rootId = EntryLocation.NOT_SET;
private long lastId = EntryLocation.NOT_SET;
private final AtomicBoolean loaded = new AtomicBoolean();
private int size = 0;
PList(PListStore store) {
this.store = store;
}
public void setName(String name) {
this.name = name;
}
/*
* (non-Javadoc)
* @see org.apache.activemq.beanstalk.JobScheduler#getName()
*/
public String getName() {
return this.name;
}
public synchronized int size() {
return this.size;
}
public synchronized boolean isEmpty() {
return size == 0;
}
/**
* @return the rootId
*/
public long getRootId() {
return this.rootId;
}
/**
* @param rootId
* the rootId to set
*/
public void setRootId(long rootId) {
this.rootId = rootId;
}
/**
* @return the lastId
*/
public long getLastId() {
return this.lastId;
}
/**
* @param lastId
* the lastId to set
*/
public void setLastId(long lastId) {
this.lastId = lastId;
}
/**
* @return the loaded
*/
public boolean isLoaded() {
return this.loaded.get();
}
void read(DataInput in) throws IOException {
this.rootId = in.readLong();
this.name = in.readUTF();
}
public void write(DataOutput out) throws IOException {
out.writeLong(this.rootId);
out.writeUTF(name);
}
public synchronized void destroy() throws IOException {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
destroy(tx);
}
});
}
void destroy(Transaction tx) throws IOException {
// start from the first
EntryLocation entry = loadEntry(tx, getRoot(tx).getNext());
while (entry != null) {
EntryLocation toRemove = entry.copy();
entry = loadEntry(tx, entry.getNext());
doRemove(tx, toRemove);
}
}
synchronized void load(Transaction tx) throws IOException {
if (loaded.compareAndSet(false, true)) {
final Page<EntryLocation> p = tx.load(this.rootId, null);
if (p.getType() == Page.PAGE_FREE_TYPE) {
// Need to initialize it..
EntryLocation root = createEntry(p, "root", EntryLocation.NOT_SET, EntryLocation.NOT_SET);
storeEntry(tx, root);
this.lastId = root.getPage().getPageId();
} else {
// find last id
long nextId = this.rootId;
while (nextId != EntryLocation.NOT_SET) {
EntryLocation next = getNext(tx, nextId);
if (next != null) {
this.lastId = next.getPage().getPageId();
nextId = next.getNext();
this.size++;
}
}
}
}
}
synchronized public void unload() {
if (loaded.compareAndSet(true, false)) {
}
}
synchronized public void addLast(final String id, final ByteSequence bs) throws IOException {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
addLast(tx, id, bs);
}
});
}
void addLast(Transaction tx, String id, ByteSequence bs) throws IOException {
Location location = this.store.write(bs, false);
EntryLocation entry = createEntry(tx, id, this.lastId, EntryLocation.NOT_SET);
entry.setLocation(location);
storeEntry(tx, entry);
this.store.incrementJournalCount(tx, location);
EntryLocation last = loadEntry(tx, this.lastId);
last.setNext(entry.getPage().getPageId());
storeEntry(tx, last);
this.lastId = entry.getPage().getPageId();
this.size++;
}
synchronized public void addFirst(final String id, final ByteSequence bs) throws IOException {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
addFirst(tx, id, bs);
}
});
}
void addFirst(Transaction tx, String id, ByteSequence bs) throws IOException {
Location location = this.store.write(bs, false);
EntryLocation entry = createEntry(tx, id, EntryLocation.NOT_SET, EntryLocation.NOT_SET);
entry.setLocation(location);
EntryLocation oldFirst = getFirst(tx);
if (oldFirst != null) {
oldFirst.setPrev(entry.getPage().getPageId());
storeEntry(tx, oldFirst);
entry.setNext(oldFirst.getPage().getPageId());
}
EntryLocation root = getRoot(tx);
root.setNext(entry.getPage().getPageId());
storeEntry(tx, root);
storeEntry(tx, entry);
this.store.incrementJournalCount(tx, location);
this.size++;
}
synchronized public boolean remove(final String id) throws IOException {
final AtomicBoolean result = new AtomicBoolean();
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
result.set(remove(tx, id));
}
});
return result.get();
}
synchronized public boolean remove(final int position) throws IOException {
final AtomicBoolean result = new AtomicBoolean();
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
result.set(remove(tx, position));
}
});
return result.get();
}
synchronized public boolean remove(final PListEntry entry) throws IOException {
final AtomicBoolean result = new AtomicBoolean();
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
result.set(doRemove(tx, entry.getEntry()));
}
});
return result.get();
}
synchronized public PListEntry get(final int position) throws IOException {
PListEntry result = null;
final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
ref.set(get(tx, position));
}
});
if (ref.get() != null) {
ByteSequence bs = this.store.getPayload(ref.get().getLocation());
result = new PListEntry(ref.get(), bs);
}
return result;
}
synchronized public PListEntry getFirst() throws IOException {
PListEntry result = null;
final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
ref.set(getFirst(tx));
}
});
if (ref.get() != null) {
ByteSequence bs = this.store.getPayload(ref.get().getLocation());
result = new PListEntry(ref.get(), bs);
}
return result;
}
synchronized public PListEntry getLast() throws IOException {
PListEntry result = null;
final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
ref.set(getLast(tx));
}
});
if (ref.get() != null) {
ByteSequence bs = this.store.getPayload(ref.get().getLocation());
result = new PListEntry(ref.get(), bs);
}
return result;
}
synchronized public PListEntry getNext(PListEntry entry) throws IOException {
PListEntry result = null;
final long nextId = entry != null ? entry.getEntry().getNext() : this.rootId;
if (nextId != EntryLocation.NOT_SET) {
final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
ref.set(getNext(tx, nextId));
}
});
if (ref.get() != null) {
ByteSequence bs = this.store.getPayload(ref.get().getLocation());
result = new PListEntry(ref.get(), bs);
}
}
return result;
}
synchronized public PListEntry refresh(final PListEntry entry) throws IOException {
PListEntry result = null;
final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
ref.set(loadEntry(tx, entry.getEntry().getPage().getPageId()));
}
});
if (ref.get() != null) {
result = new PListEntry(ref.get(), entry.getByteSequence());
}
return result;
}
boolean remove(Transaction tx, String id) throws IOException {
boolean result = false;
long nextId = this.rootId;
while (nextId != EntryLocation.NOT_SET) {
EntryLocation entry = getNext(tx, nextId);
if (entry != null) {
if (entry.getId().equals(id)) {
result = doRemove(tx, entry);
break;
}
}
}
return result;
}
boolean remove(Transaction tx, int position) throws IOException {
boolean result = false;
long nextId = this.rootId;
int count = 0;
while (nextId != EntryLocation.NOT_SET) {
EntryLocation entry = getNext(tx, nextId);
if (entry != null) {
if (count == position) {
result = doRemove(tx, entry);
break;
}
}
count++;
}
return result;
}
EntryLocation get(Transaction tx, int position) throws IOException {
EntryLocation result = null;
long nextId = this.rootId;
int count = -1;
while (nextId != EntryLocation.NOT_SET) {
EntryLocation entry = getNext(tx, nextId);
if (entry != null) {
if (count == position) {
result = entry;
break;
}
nextId = entry.getNext();
} else {
break;
}
count++;
}
return result;
}
EntryLocation getFirst(Transaction tx) throws IOException {
long offset = getRoot(tx).getNext();
if (offset != EntryLocation.NOT_SET) {
return loadEntry(tx, offset);
}
return null;
}
EntryLocation getLast(Transaction tx) throws IOException {
if (this.lastId != EntryLocation.NOT_SET) {
return loadEntry(tx, this.lastId);
}
return null;
}
boolean doRemove(Transaction tx, EntryLocation entry) throws IOException {
boolean result = false;
if (entry != null) {
EntryLocation prev = getPrevious(tx, entry.getPrev());
EntryLocation next = getNext(tx, entry.getNext());
long prevId = prev != null ? prev.getPage().getPageId() : this.rootId;
long nextId = next != null ? next.getPage().getPageId() : EntryLocation.NOT_SET;
if (next != null) {
next.setPrev(prevId);
storeEntry(tx, next);
} else {
// we are deleting the last one in the list
this.lastId = prevId;
}
if (prev != null) {
prev.setNext(nextId);
storeEntry(tx, prev);
}
this.store.decrementJournalCount(tx, entry.getLocation());
entry.reset();
storeEntry(tx, entry);
tx.free(entry.getPage().getPageId());
result = true;
this.size--;
}
return result;
}
private EntryLocation createEntry(Transaction tx, String id, long previous, long next) throws IOException {
Page<EntryLocation> p = tx.allocate();
EntryLocation result = new EntryLocation();
result.setPage(p);
p.set(result);
result.setId(id);
result.setPrev(previous);
result.setNext(next);
return result;
}
private EntryLocation createEntry(Page<EntryLocation> p, String id, long previous, long next) throws IOException {
EntryLocation result = new EntryLocation();
result.setPage(p);
p.set(result);
result.setId(id);
result.setPrev(previous);
result.setNext(next);
return result;
}
EntryLocation loadEntry(Transaction tx, long pageId) throws IOException {
Page<EntryLocation> page = tx.load(pageId, EntryLocationMarshaller.INSTANCE);
EntryLocation entry = page.get();
entry.setPage(page);
return entry;
}
private void storeEntry(Transaction tx, EntryLocation entry) throws IOException {
tx.store(entry.getPage(), EntryLocationMarshaller.INSTANCE, true);
}
EntryLocation getNext(Transaction tx, long next) throws IOException {
EntryLocation result = null;
if (next != EntryLocation.NOT_SET) {
result = loadEntry(tx, next);
}
return result;
}
private EntryLocation getPrevious(Transaction tx, long previous) throws IOException {
EntryLocation result = null;
if (previous != EntryLocation.NOT_SET) {
result = loadEntry(tx, previous);
}
return result;
}
private EntryLocation getRoot(Transaction tx) throws IOException {
EntryLocation result = loadEntry(tx, this.rootId);
return result;
}
ByteSequence getPayload(EntryLocation entry) throws IOException {
return this.store.getPayload(entry.getLocation());
}
}

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb.plist;
import org.apache.kahadb.util.ByteSequence;
public class PListEntry {
private final ByteSequence byteSequence;
private final EntryLocation entry;
PListEntry(EntryLocation entry, ByteSequence bs) {
this.entry = entry;
this.byteSequence = bs;
}
/**
* @return the byteSequence
*/
public ByteSequence getByteSequence() {
return this.byteSequence;
}
public String getId() {
return this.entry.getId();
}
/**
* @return the entry
*/
EntryLocation getEntry() {
return this.entry;
}
public PListEntry copy() {
return new PListEntry(this.entry, this.byteSequence);
}
@Override
public String toString() {
return this.entry.getId() + "[pageId=" + this.entry.getPage().getPageId() + ",next=" + this.entry.getNext()
+ "]";
}
}

View File

@ -0,0 +1,387 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb.plist;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.index.BTreeIndex;
import org.apache.kahadb.journal.Journal;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Page;
import org.apache.kahadb.page.PageFile;
import org.apache.kahadb.page.Transaction;
import org.apache.kahadb.util.ByteSequence;
import org.apache.kahadb.util.IntegerMarshaller;
import org.apache.kahadb.util.LockFile;
import org.apache.kahadb.util.StringMarshaller;
import org.apache.kahadb.util.VariableMarshaller;
public class PListStore extends ServiceSupport {
static final Log LOG = LogFactory.getLog(PListStore.class);
private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
public static final int CLOSED_STATE = 1;
public static final int OPEN_STATE = 2;
private File directory;
PageFile pageFile;
private Journal journal;
private LockFile lockFile;
private boolean failIfDatabaseIsLocked;
private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
private boolean enableIndexWriteAsync = false;
// private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
MetaData metaData = new MetaData(this);
final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
Map<String, PList> persistentLists = new HashMap<String, PList>();
protected class MetaData {
protected MetaData(PListStore store) {
this.store = store;
LinkedList list = new LinkedList();
}
private final PListStore store;
Page<MetaData> page;
BTreeIndex<Integer, Integer> journalRC;
BTreeIndex<String, PList> storedSchedulers;
void createIndexes(Transaction tx) throws IOException {
this.storedSchedulers = new BTreeIndex<String, PList>(pageFile, tx.allocate().getPageId());
this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
}
void load(Transaction tx) throws IOException {
this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
this.storedSchedulers.load(tx);
this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
this.journalRC.load(tx);
}
void loadLists(Transaction tx, Map<String, PList> schedulers) throws IOException {
for (Iterator<Entry<String, PList>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
Entry<String, PList> entry = i.next();
entry.getValue().load(tx);
schedulers.put(entry.getKey(), entry.getValue());
}
}
public void read(DataInput is) throws IOException {
this.storedSchedulers = new BTreeIndex<String, PList>(pageFile, is.readLong());
this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
}
public void write(DataOutput os) throws IOException {
os.writeLong(this.storedSchedulers.getPageId());
os.writeLong(this.journalRC.getPageId());
}
}
class MetaDataMarshaller extends VariableMarshaller<MetaData> {
private final PListStore store;
MetaDataMarshaller(PListStore store) {
this.store = store;
}
public MetaData readPayload(DataInput dataIn) throws IOException {
MetaData rc = new MetaData(this.store);
rc.read(dataIn);
return rc;
}
public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
object.write(dataOut);
}
}
class ValueMarshaller extends VariableMarshaller<List<EntryLocation>> {
public List<EntryLocation> readPayload(DataInput dataIn) throws IOException {
List<EntryLocation> result = new ArrayList<EntryLocation>();
int size = dataIn.readInt();
for (int i = 0; i < size; i++) {
EntryLocation jobLocation = new EntryLocation();
jobLocation.readExternal(dataIn);
result.add(jobLocation);
}
return result;
}
public void writePayload(List<EntryLocation> value, DataOutput dataOut) throws IOException {
dataOut.writeInt(value.size());
for (EntryLocation jobLocation : value) {
jobLocation.writeExternal(dataOut);
}
}
}
class JobSchedulerMarshaller extends VariableMarshaller<PList> {
private final PListStore store;
JobSchedulerMarshaller(PListStore store) {
this.store = store;
}
public PList readPayload(DataInput dataIn) throws IOException {
PList result = new PList(this.store);
result.read(dataIn);
return result;
}
public void writePayload(PList js, DataOutput dataOut) throws IOException {
js.write(dataOut);
}
}
public File getDirectory() {
return directory;
}
public void setDirectory(File directory) {
this.directory = directory;
}
public long size() {
if ( !isStarted() ) {
return 0;
}
try {
return journal.getDiskSize() + pageFile.getDiskSize();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public PList getPList(final String name) throws Exception {
PList result = this.persistentLists.get(name);
if (result == null) {
final PList pl = new PList(this);
pl.setName(name);
getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
pl.setRootId(tx.allocate().getPageId());
pl.load(tx);
metaData.storedSchedulers.put(tx, name, pl);
}
});
result = pl;
this.persistentLists.put(name, pl);
}
final PList load = result;
getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
load.load(tx);
}
});
return result;
}
synchronized public boolean removePList(final String name) throws Exception {
boolean result = false;
final PList pl = this.persistentLists.remove(name);
result = pl != null;
if (result) {
getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
metaData.storedSchedulers.remove(tx, name);
pl.destroy(tx);
}
});
}
return result;
}
@Override
protected synchronized void doStart() throws Exception {
if (this.directory == null) {
this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
}
IOHelper.mkdirs(this.directory);
lock();
this.journal = new Journal();
this.journal.setDirectory(directory);
this.journal.setMaxFileLength(getJournalMaxFileLength());
this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
this.journal.start();
this.pageFile = new PageFile(directory, "scheduleDB");
this.pageFile.load();
this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
if (pageFile.getPageCount() == 0) {
Page<MetaData> page = tx.allocate();
assert page.getPageId() == 0;
page.set(metaData);
metaData.page = page;
metaData.createIndexes(tx);
tx.store(metaData.page, metaDataMarshaller, true);
} else {
Page<MetaData> page = tx.load(0, metaDataMarshaller);
metaData = page.get();
metaData.page = page;
}
metaData.load(tx);
metaData.loadLists(tx, persistentLists);
}
});
this.pageFile.flush();
LOG.info(this + " started");
}
@Override
protected synchronized void doStop(ServiceStopper stopper) throws Exception {
for (PList pl : this.persistentLists.values()) {
pl.unload();
}
if (this.pageFile != null) {
this.pageFile.unload();
}
if (this.journal != null) {
journal.close();
}
if (this.lockFile != null) {
this.lockFile.unlock();
}
this.lockFile = null;
LOG.info(this + " stopped");
}
synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException {
int logId = location.getDataFileId();
Integer val = this.metaData.journalRC.get(tx, logId);
int refCount = val != null ? val.intValue() + 1 : 1;
this.metaData.journalRC.put(tx, logId, refCount);
}
synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException {
int logId = location.getDataFileId();
int refCount = this.metaData.journalRC.get(tx, logId);
refCount--;
if (refCount <= 0) {
this.metaData.journalRC.remove(tx, logId);
Set<Integer> set = new HashSet<Integer>();
set.add(logId);
this.journal.removeDataFiles(set);
} else {
this.metaData.journalRC.put(tx, logId, refCount);
}
}
synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
ByteSequence result = null;
result = this.journal.read(location);
return result;
}
synchronized Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
return this.journal.write(payload, sync);
}
private void lock() throws IOException {
if (lockFile == null) {
File lockFileName = new File(directory, "lock");
lockFile = new LockFile(lockFileName, true);
if (failIfDatabaseIsLocked) {
lockFile.lock();
} else {
while (true) {
try {
lockFile.lock();
break;
} catch (IOException e) {
LOG.info("Database " + lockFileName + " is locked... waiting "
+ (DATABASE_LOCKED_WAIT_DELAY / 1000)
+ " seconds for the database to be unlocked. Reason: " + e);
try {
Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
} catch (InterruptedException e1) {
}
}
}
}
}
}
PageFile getPageFile() {
this.pageFile.isLoaded();
return this.pageFile;
}
public boolean isFailIfDatabaseIsLocked() {
return failIfDatabaseIsLocked;
}
public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
}
public int getJournalMaxFileLength() {
return journalMaxFileLength;
}
public void setJournalMaxFileLength(int journalMaxFileLength) {
this.journalMaxFileLength = journalMaxFileLength;
}
public int getJournalMaxWriteBatchSize() {
return journalMaxWriteBatchSize;
}
public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
}
public boolean isEnableIndexWriteAsync() {
return enableIndexWriteAsync;
}
public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
this.enableIndexWriteAsync = enableIndexWriteAsync;
}
@Override
public String toString() {
return "JobSchedulerStore:" + this.directory;
}
}

View File

@ -19,8 +19,8 @@ package org.apache.activemq.usage;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.plist.PListStore;
/** /**
* Holder for Usage instances for memory, store and temp files Main use case is * Holder for Usage instances for memory, store and temp files Main use case is
@ -43,13 +43,13 @@ public class SystemUsage implements Service {
*/ */
private boolean sendFailIfNoSpaceExplicitySet; private boolean sendFailIfNoSpaceExplicitySet;
private boolean sendFailIfNoSpace; private boolean sendFailIfNoSpace;
private List<SystemUsage> children = new CopyOnWriteArrayList<SystemUsage>(); private final List<SystemUsage> children = new CopyOnWriteArrayList<SystemUsage>();
public SystemUsage() { public SystemUsage() {
this("default", null, null); this("default", null, null);
} }
public SystemUsage(String name, PersistenceAdapter adapter, Store tempStore) { public SystemUsage(String name, PersistenceAdapter adapter, PListStore tempStore) {
this.parent = null; this.parent = null;
this.name = name; this.name = name;
this.memoryUsage = new MemoryUsage(name + ":memory"); this.memoryUsage = new MemoryUsage(name + ":memory");
@ -90,6 +90,7 @@ public class SystemUsage implements Service {
return this.tempUsage; return this.tempUsage;
} }
@Override
public String toString() { public String toString() {
return "UsageManager(" + getName() + ")"; return "UsageManager(" + getName() + ")";
} }

View File

@ -16,7 +16,8 @@
*/ */
package org.apache.activemq.usage; package org.apache.activemq.usage;
import org.apache.activemq.kaha.Store; import org.apache.activemq.store.kahadb.plist.PListStore;
/** /**
* Used to keep track of how much of something is being used so that a * Used to keep track of how much of something is being used so that a
@ -28,13 +29,13 @@ import org.apache.activemq.kaha.Store;
*/ */
public class TempUsage extends Usage<TempUsage> { public class TempUsage extends Usage<TempUsage> {
private Store store; private PListStore store;
public TempUsage() { public TempUsage() {
super(null, null, 1.0f); super(null, null, 1.0f);
} }
public TempUsage(String name, Store store) { public TempUsage(String name, PListStore store) {
super(null, name, 1.0f); super(null, name, 1.0f);
this.store = store; this.store = store;
} }
@ -44,6 +45,7 @@ public class TempUsage extends Usage<TempUsage> {
this.store = parent.store; this.store = parent.store;
} }
@Override
protected long retrieveUsage() { protected long retrieveUsage() {
if (store == null) { if (store == null) {
return 0; return 0;
@ -51,11 +53,11 @@ public class TempUsage extends Usage<TempUsage> {
return store.size(); return store.size();
} }
public Store getStore() { public PListStore getStore() {
return store; return store;
} }
public void setStore(Store store) { public void setStore(PListStore store) {
this.store = store; this.store = store;
onLimitChange(); onLimitChange();
} }

View File

@ -17,7 +17,6 @@
package org.apache.activemq.bugs; package org.apache.activemq.bugs;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File; import java.io.File;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection; import javax.jms.Connection;
@ -33,8 +32,8 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.store.amq.AMQPersistenceAdapter; import org.apache.activemq.store.amq.AMQPersistenceAdapter;
import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.StoreUsage; import org.apache.activemq.usage.StoreUsage;
import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.SystemUsage;
@ -82,6 +81,7 @@ public class TempStorageBlockedBrokerTest {
producerConnection.start(); producerConnection.start();
Thread producingThread = new Thread("Producing thread") { Thread producingThread = new Thread("Producing thread") {
@Override
public void run() { public void run() {
try { try {
Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -180,7 +180,9 @@ public class TempStorageBlockedBrokerTest {
persistence.setDirectory(directory); persistence.setDirectory(directory);
File tmpDir = new File(directory, "tmp"); File tmpDir = new File(directory, "tmp");
IOHelper.deleteChildren(tmpDir); IOHelper.deleteChildren(tmpDir);
Store tempStore = new org.apache.activemq.kaha.impl.KahaStore(tmpDir, "rw"); PListStore tempStore = new PListStore();
tempStore.setDirectory(tmpDir);
tempStore.start();
SystemUsage sysUsage = new SystemUsage("mySysUsage", persistence, tempStore); SystemUsage sysUsage = new SystemUsage("mySysUsage", persistence, tempStore);
MemoryUsage memUsage = new MemoryUsage(); MemoryUsage memUsage = new MemoryUsage();

View File

@ -17,7 +17,6 @@
package org.apache.activemq.perf; package org.apache.activemq.perf;
import java.io.File; import java.io.File;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadb.KahaDBStore; import org.apache.activemq.store.kahadb.KahaDBStore;
@ -26,19 +25,22 @@ import org.apache.activemq.store.kahadb.KahaDBStore;
*/ */
public class KahaDBQueueTest extends SimpleQueueTest { public class KahaDBQueueTest extends SimpleQueueTest {
@Override
protected void configureBroker(BrokerService answer,String uri) throws Exception { protected void configureBroker(BrokerService answer,String uri) throws Exception {
File dataFileDir = new File("target/test-amq-data/perfTest/kahadb"); File dataFileDir = new File("target/test-amq-data/perfTest/kahadb");
File archiveDir = new File(dataFileDir,"archive");
KahaDBStore kaha = new KahaDBStore(); KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(dataFileDir); kaha.setDirectory(dataFileDir);
kaha.setDirectoryArchive(archiveDir);
kaha.setArchiveDataLogs(true);
// The setEnableJournalDiskSyncs(false) setting is a little dangerous right now, as I have not verified // The setEnableJournalDiskSyncs(false) setting is a little dangerous right now, as I have not verified
// what happens if the index is updated but a journal update is lost. // what happens if the index is updated but a journal update is lost.
// Index is going to be in consistent, but can it be repaired? // Index is going to be in consistent, but can it be repaired?
kaha.setEnableJournalDiskSyncs(false); kaha.setEnableJournalDiskSyncs(false);
// Using a bigger journal file size makes he take fewer spikes as it is not switching files as often. // Using a bigger journal file size makes he take fewer spikes as it is not switching files as often.
kaha.setJournalMaxFileLength(1024*100); //kaha.setJournalMaxFileLength(1024*1024*100);
// small batch means more frequent and smaller writes // small batch means more frequent and smaller writes
kaha.setIndexWriteBatchSize(100); kaha.setIndexWriteBatchSize(100);

View File

@ -25,7 +25,6 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageListener; import javax.jms.MessageListener;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.Topic; import javax.jms.Topic;
import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -42,6 +41,7 @@ public class PerfConsumer implements MessageListener {
protected boolean enableAudit = false; protected boolean enableAudit = false;
protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit(16 * 1024,20); protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit(16 * 1024,20);
protected boolean firstMessage =true; protected boolean firstMessage =true;
protected String lastMsgId;
protected PerfRate rate = new PerfRate(); protected PerfRate rate = new PerfRate();
@ -91,11 +91,12 @@ public class PerfConsumer implements MessageListener {
rate.increment(); rate.increment();
try { try {
if (enableAudit && !this.audit.isInOrder(msg.getJMSMessageID())) { if (enableAudit && !this.audit.isInOrder(msg.getJMSMessageID())) {
LOG.error("Message out of order!!" + msg); LOG.error("Message out of order!!" + msg.getJMSMessageID() + " LAST = " + lastMsgId);
} }
if (enableAudit && this.audit.isDuplicate(msg)){ if (enableAudit && this.audit.isDuplicate(msg)){
LOG.error("Duplicate Message!" + msg); LOG.error("Duplicate Message!" + msg);
} }
lastMsgId=msg.getJMSMessageID();
} catch (JMSException e1) { } catch (JMSException e1) {
// TODO Auto-generated catch block // TODO Auto-generated catch block
e1.printStackTrace(); e1.printStackTrace();

View File

@ -31,27 +31,35 @@ import org.apache.activemq.broker.region.policy.PolicyMap;
*/ */
public class SimpleNonPersistentQueueTest extends SimpleQueueTest { public class SimpleNonPersistentQueueTest extends SimpleQueueTest {
@Override
protected void setUp() throws Exception { protected void setUp() throws Exception {
numberOfConsumers = 1; numberOfConsumers = 1;
numberofProducers = 1; numberofProducers = 1;
super.setUp(); super.setUp();
} }
@Override
protected PerfProducer createProducer(ConnectionFactory fac, Destination dest, int number, byte[] payload) throws JMSException { protected PerfProducer createProducer(ConnectionFactory fac, Destination dest, int number, byte[] payload) throws JMSException {
PerfProducer pp = new PerfProducer(fac, dest, payload); PerfProducer pp = new PerfProducer(fac, dest, payload);
pp.setDeliveryMode(DeliveryMode.NON_PERSISTENT); pp.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
pp.setTimeToLive(100); //pp.setTimeToLive(100);
return pp; return pp;
} }
@Override
protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException { protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException {
PerfConsumer result = new PerfConsumer(fac, dest); PerfConsumer result = new PerfConsumer(fac, dest);
result.setInitialDelay(20*1000); result.setInitialDelay(10*1000);
boolean enableAudit = numberOfConsumers <= 1;
System.err.println("Enable Audit = " + enableAudit);
result.setEnableAudit(enableAudit);
return result; return result;
} }
/*
@Override
protected void configureBroker(BrokerService answer,String uri) throws Exception { protected void configureBroker(BrokerService answer,String uri) throws Exception {
answer.setPersistent(false); // answer.setPersistent(false);
final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>(); final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
final PolicyEntry entry = new PolicyEntry(); final PolicyEntry entry = new PolicyEntry();
entry.setQueue(">"); entry.setQueue(">");
@ -66,5 +74,5 @@ public class SimpleNonPersistentQueueTest extends SimpleQueueTest {
answer.setDestinationPolicy(policyMap); answer.setDestinationPolicy(policyMap);
super.configureBroker(answer, uri); super.configureBroker(answer, uri);
} }
*/
} }

View File

@ -26,21 +26,24 @@ import javax.jms.Session;
*/ */
public class SimpleQueueTest extends SimpleTopicTest { public class SimpleQueueTest extends SimpleTopicTest {
@Override
protected Destination createDestination(Session s, String destinationName) throws JMSException { protected Destination createDestination(Session s, String destinationName) throws JMSException {
return s.createQueue(destinationName); return s.createQueue(destinationName);
} }
@Override
protected void setUp() throws Exception { protected void setUp() throws Exception {
numberOfConsumers = 1; numberOfConsumers = 1;
super.setUp(); super.setUp();
} }
@Override
protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException { protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException {
PerfConsumer consumer = new PerfConsumer(fac, dest); PerfConsumer consumer = new PerfConsumer(fac, dest);
//consumer.setInitialDelay(2000); consumer.setInitialDelay(10000);
//consumer.setSleepDuration(10); //consumer.setSleepDuration(10);
boolean enableAudit = numberOfConsumers <= 1; boolean enableAudit = numberOfConsumers <= 1;
System.out.println("Enable Audit = " + enableAudit); System.err.println("Enable Audit = " + enableAudit);
consumer.setEnableAudit(enableAudit); consumer.setEnableAudit(enableAudit);
return consumer; return consumer;
} }

View File

@ -0,0 +1,150 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb.plist;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.activemq.util.IOHelper;
import org.apache.kahadb.util.ByteSequence;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class PListTest {
private PListStore store;
private PList plist;
@Test
public void testIterator() throws Exception {
final int COUNT = 10;
Map<String, ByteSequence> map = new LinkedHashMap<String, ByteSequence>();
for (int i = 0; i < COUNT; i++) {
String test = new String("test" + i);
ByteSequence bs = new ByteSequence(test.getBytes());
map.put(test, bs);
plist.addLast(test, bs);
}
assertEquals(plist.size(), COUNT);
int number = 0;
PListEntry entry = plist.getFirst();
while (entry != null) {
entry = plist.getNext(entry);
number++;
if (entry != null) {
plist.remove(entry.copy());
}
}
assertEquals(COUNT, number);
}
@Test
public void testAddLast() throws Exception {
final int COUNT = 100;
Map<String, ByteSequence> map = new LinkedHashMap<String, ByteSequence>();
for (int i = 0; i < COUNT; i++) {
String test = new String("test" + i);
ByteSequence bs = new ByteSequence(test.getBytes());
map.put(test, bs);
plist.addLast(test, bs);
}
assertEquals(plist.size(), COUNT);
int count = 0;
for (ByteSequence bs : map.values()) {
String origStr = new String(bs.getData(), bs.getOffset(), bs.getLength());
PListEntry entry = plist.get(count);
String plistString = new String(entry.getByteSequence().getData(), entry.getByteSequence().getOffset(),
entry.getByteSequence().getLength());
assertEquals(origStr, plistString);
count++;
}
}
@Test
public void testAddFirst() throws Exception {
final int COUNT = 100;
Map<String, ByteSequence> map = new LinkedHashMap<String, ByteSequence>();
for (int i = 0; i < COUNT; i++) {
String test = new String("test" + i);
ByteSequence bs = new ByteSequence(test.getBytes());
map.put(test, bs);
plist.addFirst(test, bs);
}
assertEquals(plist.size(), COUNT);
int count = plist.size() - 1;
for (ByteSequence bs : map.values()) {
String origStr = new String(bs.getData(), bs.getOffset(), bs.getLength());
PListEntry entry = plist.get(count);
String plistString = new String(entry.getByteSequence().getData(), entry.getByteSequence().getOffset(),
entry.getByteSequence().getLength());
assertEquals(origStr, plistString);
count--;
}
}
@Test
public void testRemove() throws IOException {
final int COUNT = 200;
Map<String, ByteSequence> map = new LinkedHashMap<String, ByteSequence>();
for (int i = 0; i < COUNT; i++) {
String test = new String("test" + i);
ByteSequence bs = new ByteSequence(test.getBytes());
map.put(test, bs);
plist.addLast(test, bs);
}
assertEquals(plist.size(), COUNT);
PListEntry entry = plist.getFirst();
while (entry != null) {
plist.remove(entry.copy());
entry = plist.getFirst();
}
assertEquals(0,plist.size());
}
//@Test
public void testDestroy() {
fail("Not yet implemented");
}
@Before
public void setUp() throws Exception {
File directory = new File("target/test/PlistDB");
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
startStore(directory);
}
protected void startStore(File directory) throws Exception {
store = new PListStore();
store.setDirectory(directory);
store.start();
plist = store.getPList("test");
}
@After
public void tearDown() throws Exception {
store.stop();
}
}