changed pending linked list to use a PendingMessageCursor interface instead

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@439552 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-09-02 07:03:30 +00:00
parent 51902f45f5
commit 7d1e6bcdee
17 changed files with 609 additions and 87 deletions

View File

@ -33,6 +33,7 @@ import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store;
/**
* The Message Broker which routes messages,
@ -251,4 +252,12 @@ public interface Broker extends Region, Service {
* Sets the default administration connection context used when configuring the broker on startup or via JMX
*/
public abstract void setAdminConnectionContext(ConnectionContext adminConnectionContext);
/**
* @return the broker's temp data store
* @throws Exception
*/
public Store getTempDataStore();
}

View File

@ -35,6 +35,7 @@ import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store;
import java.util.Map;
import java.util.Set;
@ -231,5 +232,10 @@ public class BrokerFilter implements Broker {
public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
next.setAdminConnectionContext(adminConnectionContext);
}
public Store getTempDataStore() {
return next.getTempDataStore();
}
}

View File

@ -17,9 +17,19 @@
*/
package org.apache.activemq.broker;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionMetaData;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisoryBroker;
@ -34,15 +44,19 @@ import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.jmx.NetworkConnectorView;
import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
import org.apache.activemq.broker.jmx.ProxyConnectorView;
import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
import org.apache.activemq.broker.region.DestinationFactory;
import org.apache.activemq.broker.region.DestinationFactoryImpl;
import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.virtual.*;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreFactory;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.network.ConnectionFilter;
import org.apache.activemq.network.DiscoveryNetworkConnector;
@ -64,24 +78,8 @@ import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.URISupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.management.InstanceNotFoundException;
import javax.management.JMException;
import javax.management.MBeanRegistrationException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/**
* Manages the lifecycle of an ActiveMQ Broker. A BrokerService consists of a number of transport
@ -105,6 +103,7 @@ public class BrokerService implements Service, Serializable {
private boolean shutdownOnMasterFailure = false;
private String brokerName = "localhost";
private File dataDirectory;
private File tmpDataDirectory;
private Broker broker;
private BrokerView adminView;
private ManagementContext managementContext;
@ -139,6 +138,7 @@ public class BrokerService implements Service, Serializable {
private BrokerId brokerId;
private DestinationInterceptor[] destinationInterceptors;
private ActiveMQDestination[] destinations;
private Store tempDataStore;
/**
* Adds a new transport connector for the given bind address
@ -530,6 +530,24 @@ public class BrokerService implements Service, Serializable {
public void setDataDirectory(File dataDirectory) {
this.dataDirectory = dataDirectory;
}
/**
* @return the tmpDataDirectory
*/
public File getTmpDataDirectory(){
if (tmpDataDirectory == null) {
tmpDataDirectory = new File(getDataDirectory(), "tmp_storage");
}
return tmpDataDirectory;
}
/**
* @param tmpDataDirectory the tmpDataDirectory to set
*/
public void setTmpDataDirectory(File tmpDataDirectory){
this.tmpDataDirectory=tmpDataDirectory;
}
public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) {
this.persistenceFactory = persistenceFactory;
@ -906,6 +924,29 @@ public class BrokerService implements Service, Serializable {
public void setDestinations(ActiveMQDestination[] destinations) {
this.destinations = destinations;
}
/**
* @return the tempDataStore
*/
public Store getTempDataStore() {
if (tempDataStore == null){
String name = getTmpDataDirectory().getPath();
try {
StoreFactory.delete(name);
tempDataStore = StoreFactory.open(name,"rw");
}catch(IOException e){
throw new RuntimeException(e);
}
}
return tempDataStore;
}
/**
* @param tempDataStore the tempDataStore to set
*/
public void setTempDataStore(Store tempDataStore){
this.tempDataStore=tempDataStore;
}
// Implementation methods
// -------------------------------------------------------------------------
@ -1386,5 +1427,6 @@ public class BrokerService implements Service, Serializable {
masterConnector = (MasterConnector) service;
}
}
}

View File

@ -35,6 +35,7 @@ import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store;
import java.util.Collections;
import java.util.Map;
@ -229,5 +230,9 @@ public class EmptyBroker implements Broker {
public Response messagePull(ConnectionContext context, MessagePull pull) {
return null;
}
public Store getTempDataStore() {
return null;
}
}

View File

@ -39,6 +39,7 @@ import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store;
/**
* Implementation of the broker where all it's methods throw an
@ -229,5 +230,9 @@ public class ErrorBroker implements Broker {
public Response messagePull(ConnectionContext context, MessagePull pull) {
throw new BrokerStoppedException(this.message);
}
public Store getTempDataStore() {
throw new BrokerStoppedException(this.message);
}
}

View File

@ -35,6 +35,7 @@ import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store;
import java.util.Map;
import java.util.Set;
@ -243,5 +244,9 @@ public class MutableBrokerFilter implements Broker {
public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
return getNext().messagePull(context, pull);
}
public Store getTempDataStore() {
return getNext().getTempDataStore();
}
}

View File

@ -19,17 +19,16 @@ package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.Iterator;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.util.SubscriptionKey;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
public class DurableTopicSubscription extends PrefetchSubscription {
@ -41,7 +40,8 @@ public class DurableTopicSubscription extends PrefetchSubscription {
private boolean active=false;
public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) throws InvalidSelectorException {
super(broker,context, info);
//super(broker,context, info, new FilePendingMessageCursor(context.getClientId() + info.getConsumerId().toString(),broker.getTempDataStore()));
super(broker,context,info);
this.keepDurableSubsActive = keepDurableSubsActive;
subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
}
@ -102,7 +102,7 @@ public class DurableTopicSubscription extends PrefetchSubscription {
}
if( keepDurableSubsActive ) {
synchronized(pending) {
pending.addFirst(node);
pending.addMessageFirst(node);
}
} else {
node.decrementReferenceCount();
@ -112,10 +112,11 @@ public class DurableTopicSubscription extends PrefetchSubscription {
if( !keepDurableSubsActive ) {
synchronized(pending) {
for (Iterator iter = pending.iterator(); iter.hasNext();) {
MessageReference node = (MessageReference) iter.next();
pending.reset();
while(pending.hasNext()) {
MessageReference node = pending.next();
node.decrementReferenceCount();
iter.remove();
pending.remove();
}
}
}
@ -189,8 +190,9 @@ public class DurableTopicSubscription extends PrefetchSubscription {
synchronized public void destroy() {
synchronized(pending) {
for (Iterator iter = pending.iterator(); iter.hasNext();) {
MessageReference node = (MessageReference) iter.next();
pending.reset();
while(pending.hasNext()) {
MessageReference node = pending.next();
node.decrementReferenceCount();
}
pending.clear();

View File

@ -20,12 +20,14 @@ package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerControl;
@ -50,7 +52,7 @@ import org.apache.commons.logging.LogFactory;
abstract public class PrefetchSubscription extends AbstractSubscription{
static private final Log log=LogFactory.getLog(PrefetchSubscription.class);
final protected LinkedList pending=new LinkedList();
final protected PendingMessageCursor pending;
final protected LinkedList dispatched=new LinkedList();
protected int prefetchExtension=0;
@ -59,10 +61,16 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
long enqueueCounter;
long dispatchCounter;
long dequeueCounter;
public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info)
throws InvalidSelectorException{
public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info, PendingMessageCursor cursor)
throws InvalidSelectorException{
super(broker,context,info);
pending = cursor;
}
public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info)
throws InvalidSelectorException{
this(broker,context,info,new VMPendingMessageCursor());
}
@ -77,8 +85,8 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
prefetchExtension++;
final long dispatchCounterBeforePull = dispatchCounter;
dispatchMatched();
dispatchMatched();
// If there was nothing dispatched.. we may need to setup a timeout.
if( dispatchCounterBeforePull == dispatchCounter ) {
// imediate timeout used by receiveNoWait()
@ -86,7 +94,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
// Send a NULL message.
add(QueueMessageReference.NULL_MESSAGE);
dispatchMatched();
}
}
if( pull.getTimeout() > 0 ) {
Scheduler.executeAfterDelay(new Runnable(){
public void run() {
@ -124,17 +132,18 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
if( pending.isEmpty() ) {
log.debug("Prefetch limit.");
}
pending.addLast(node);
pending.addMessageLast(node);
}
}
}
synchronized public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
synchronized(pending){
for(Iterator i=pending.iterator();i.hasNext();){
MessageReference node=(MessageReference) i.next();
pending.reset();
while(pending.hasNext()){
MessageReference node=pending.next();
if(node.getMessageId().equals(mdn.getMessageId())){
i.remove();
pending.remove();
createMessageDispatch(node,node.getMessage());
dispatched.addLast(node);
return;
@ -329,9 +338,10 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
if(!dispatching){
dispatching=true;
try{
for(Iterator iter=pending.iterator();iter.hasNext()&&!isFull();){
MessageReference node=(MessageReference) iter.next();
iter.remove();
pending.reset();
while(pending.hasNext()&&!isFull()){
MessageReference node=pending.next();
pending.remove();
dispatch(node);
}
}finally{
@ -352,8 +362,8 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
// NULL messages don't count... they don't get Acked.
if( node != QueueMessageReference.NULL_MESSAGE ) {
dispatchCounter++;
dispatched.addLast(node);
dispatchCounter++;
dispatched.addLast(node);
} else {
prefetchExtension=Math.max(0,prefetchExtension-1);
}
@ -380,8 +390,8 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
synchronized protected void onDispatch(final MessageReference node,final Message message){
if(node.getRegionDestination()!=null){
if( node != QueueMessageReference.NULL_MESSAGE ) {
node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message);
context.getConnection().getStatistics().onMessageDequeue(message);
node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message);
context.getConnection().getStatistics().onMessageDequeue(message);
}
try{
dispatchMatched();
@ -412,19 +422,19 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
*/
protected MessageDispatch createMessageDispatch(MessageReference node,Message message){
if( node == QueueMessageReference.NULL_MESSAGE ) {
MessageDispatch md = new MessageDispatch();
MessageDispatch md=new MessageDispatch();
md.setMessage(null);
md.setConsumerId( info.getConsumerId() );
md.setConsumerId(info.getConsumerId());
md.setDestination( null );
return md;
} else {
MessageDispatch md=new MessageDispatch();
md.setConsumerId(info.getConsumerId());
md.setDestination(node.getRegionDestination().getActiveMQDestination());
md.setMessage(message);
md.setRedeliveryCounter(node.getRedeliveryCounter());
return md;
}
md.setDestination(node.getRegionDestination().getActiveMQDestination());
md.setMessage(message);
md.setRedeliveryCounter(node.getRedeliveryCounter());
return md;
}
}
/**

View File

@ -18,13 +18,13 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
import javax.jms.InvalidSelectorException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.filter.MessageEvaluationContext;
public class QueueBrowserSubscription extends QueueSubscription {

View File

@ -17,6 +17,9 @@
*/
package org.apache.activemq.broker.region;
import java.io.IOException;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.group.MessageGroupMap;
@ -29,11 +32,6 @@ import org.apache.activemq.transaction.Synchronization;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import java.io.IOException;
public class QueueSubscription extends PrefetchSubscription implements LockOwner {
private static final Log log = LogFactory.getLog(QueueSubscription.class);

View File

@ -42,6 +42,7 @@ import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
@ -573,4 +574,8 @@ public class RegionBroker implements Broker {
public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
this.adminConnectionContext = adminConnectionContext;
}
public Store getTempDataStore() {
return brokerService.getTempDataStore();
}
}

View File

@ -0,0 +1,132 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
import java.util.*;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.Message;
import org.apache.activemq.kaha.*;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.kahadaptor.CommandMarshaller;
/**
* perist pending messages pending message (messages awaiting disptach to a consumer) cursor
*
* @version $Revision$
*/
public class FilePendingMessageCursor implements PendingMessageCursor{
private ListContainer list;
private Iterator iter = null;
private Destination regionDestination;
/**
* @param name
* @param store
* @throws IOException
*/
public FilePendingMessageCursor(String name, Store store) {
try{
list = store.getListContainer(name);
list.setMarshaller(new CommandMarshaller(new OpenWireFormat()));
list.setMaximumCacheSize(0);
}catch(IOException e){
throw new RuntimeException(e);
}
}
/**
* @return true if there are no pending messages
*/
public boolean isEmpty(){
return list.isEmpty();
}
/**
* reset the cursor
*
*/
public void reset(){
iter = list.listIterator();
}
/**
* add message to await dispatch
*
* @param node
*/
public void addMessageLast(MessageReference node){
try{
regionDestination = node.getMessage().getRegionDestination();
node.decrementReferenceCount();
}catch(IOException e){
throw new RuntimeException(e);
}
list.addLast(node);
}
/**
* add message to await dispatch
* @param position
* @param node
*/
public void addMessageFirst(MessageReference node){
try{
regionDestination = node.getMessage().getRegionDestination();
node.decrementReferenceCount();
}catch(IOException e){
throw new RuntimeException(e);
}
list.addFirst(node);
}
/**
* @return true if there pending messages to dispatch
*/
public boolean hasNext(){
return iter.hasNext();
}
/**
* @return the next pending message
*/
public MessageReference next(){
Message message = (Message) iter.next();
message.setRegionDestination(regionDestination);
message.incrementReferenceCount();
return message;
}
/**
* remove the message at the cursor position
*
*/
public void remove(){
iter.remove();
}
/**
* @return the number of pending messages
*/
public int size(){
return list.size();
}
/**
* clear all pending messages
*
*/
public void clear(){
list.clear();
}
}

View File

@ -0,0 +1,73 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
import org.apache.activemq.broker.region.MessageReference;
/**
* Interface to pending message (messages awaiting disptach to a consumer) cursor
*
* @version $Revision$
*/
public interface PendingMessageCursor{
/**
* @return true if there are no pending messages
*/
public boolean isEmpty();
/**
* reset the cursor
*
*/
public void reset();
/**
* add message to await dispatch
* @param node
*/
public void addMessageLast(MessageReference node);
/**
* add message to await dispatch
* @param node
*/
public void addMessageFirst(MessageReference node);
/**
* @return true if there pending messages to dispatch
*/
public boolean hasNext();
/**
* @return the next pending message
*/
public MessageReference next();
/**
* remove the message at the cursor position
*
*/
public void remove();
/**
* @return the number of pending messages
*/
public int size();
/**
* clear all pending messages
*
*/
public void clear();
}

View File

@ -0,0 +1,96 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
import java.util.Iterator;
import java.util.LinkedList;
import org.apache.activemq.broker.region.MessageReference;
/**
* hold pending messages in a linked list (messages awaiting disptach to a consumer) cursor
*
* @version $Revision$
*/
public class VMPendingMessageCursor implements PendingMessageCursor{
private LinkedList list = new LinkedList();
private Iterator iter = null;
/**
* @return true if there are no pending messages
*/
public boolean isEmpty(){
return list.isEmpty();
}
/**
* reset the cursor
*
*/
public void reset(){
iter = list.listIterator();
}
/**
* add message to await dispatch
*
* @param node
*/
public void addMessageLast(MessageReference node){
list.addLast(node);
}
/**
* add message to await dispatch
* @param position
* @param node
*/
public void addMessageFirst(MessageReference node){
list.addFirst(node);
}
/**
* @return true if there pending messages to dispatch
*/
public boolean hasNext(){
return iter.hasNext();
}
/**
* @return the next pending message
*/
public MessageReference next(){
return (MessageReference) iter.next();
}
/**
* remove the message at the cursor position
*
*/
public void remove(){
iter.remove();
}
/**
* @return the number of pending messages
*/
public int size(){
return list.size();
}
/**
* clear all pending messages
*
*/
public void clear(){
list.clear();
}
}

View File

@ -59,7 +59,8 @@ public class KahaStore implements Store{
public KahaStore(String name,String mode) throws IOException{
this.name=name;
this.mode=mode;
initialize();
directory=new File(name);
directory.mkdirs();
}
public synchronized void close() throws IOException{
@ -113,22 +114,22 @@ public class KahaStore implements Store{
}
public synchronized boolean delete() throws IOException{
initialize();
clear();
boolean result=true;
for(Iterator iter=indexManagers.values().iterator();iter.hasNext();){
IndexManager im=(IndexManager) iter.next();
result&=im.delete();
iter.remove();
if (initialized){
clear();
for(Iterator iter=indexManagers.values().iterator();iter.hasNext();){
IndexManager im=(IndexManager) iter.next();
result&=im.delete();
iter.remove();
}
for(Iterator iter=dataManagers.values().iterator();iter.hasNext();){
DataManager dm=(DataManager) iter.next();
result&=dm.delete();
iter.remove();
}
}
for(Iterator iter=dataManagers.values().iterator();iter.hasNext();){
DataManager dm=(DataManager) iter.next();
result&=dm.delete();
iter.remove();
}
// now delete all the files - containers that don't use the standard DataManager
// and IndexManager will not have initialized the files - so these will be left around
// unless we do this
if(directory!=null&&directory.isDirectory()){
File[] files=directory.listFiles();
if(files!=null){
@ -248,10 +249,8 @@ public class KahaStore implements Store{
throw new IOException("Store has been closed.");
if(!initialized){
initialized=true;
directory=new File(name);
directory.mkdirs();
log.info("Kaha Store using data directory " + directory);
log.info("Kaha Store using data directory " + directory);
DataManager defaultDM = getDataManager(DEFAULT_CONTAINER_NAME);
rootIndexManager = getIndexManager(defaultDM, DEFAULT_CONTAINER_NAME);

View File

@ -31,7 +31,9 @@ public class CachedListContainerImplTest extends TestCase{
protected int MAX_CACHE_SIZE=10;
protected KahaStore getStore() throws IOException{
return new KahaStore(name,"rw");
KahaStore store = new KahaStore(name,"rw");
store.initialize();
return store;
}
public void testAdds() throws Exception{

View File

@ -0,0 +1,133 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.perf;
import java.io.File;
import java.net.URI;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import junit.framework.AssertionFailedError;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
/**
* @version $Revision$
*/
public class InactiveDurableTopicTest extends TestCase{
private static final String DEFAULT_PASSWORD="";
private static final String USERNAME="testuser";
private static final String CLIENTID="mytestclient";
private static final String TOPIC_NAME="testevent";
private static final String SUBID="subscription1";
private static final int deliveryMode=javax.jms.DeliveryMode.PERSISTENT;
private static final int deliveryPriority=javax.jms.Message.DEFAULT_PRIORITY;
private Connection connection=null;
private MessageProducer publisher=null;
private TopicSubscriber subscriber=null;
private Topic topic=null;
private Session session=null;
ActiveMQConnectionFactory connectionFactory=null;
BrokerService broker;
protected void setUp() throws Exception{
super.setUp();
broker=new BrokerService();
broker.setPersistenceAdapter(new KahaPersistenceAdapter(new File ("TEST_STUFD")));
broker.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
broker.start();
connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
/*
* Doesn't matter if you enable or disable these, so just leaving them out for this test case
* connectionFactory.setAlwaysSessionAsync(true); connectionFactory.setAsyncDispatch(true);
*/
connectionFactory.setUseAsyncSend(true);
}
protected void tearDown() throws Exception{
super.tearDown();
broker.stop();
}
public void test1CreateSubscription() throws Exception{
try{
/*
* Step 1 - Establish a connection with a client id and create a durable subscription
*/
connection=connectionFactory.createConnection(USERNAME,DEFAULT_PASSWORD);
assertNotNull(connection);
connection.setClientID(CLIENTID);
session=connection.createSession(false,javax.jms.Session.CLIENT_ACKNOWLEDGE);
assertNotNull(session);
topic=session.createTopic(TOPIC_NAME);
assertNotNull(topic);
subscriber=session.createDurableSubscriber(topic,SUBID,"",false);
assertNotNull(subscriber);
subscriber.close();
session.close();
connection.close();
}catch(JMSException ex){
try{
connection.close();
}catch(Exception ignore){}
throw new AssertionFailedError("Create Subscription caught: "+ex);
}
}
public void test2ProducerTestCase(){
/*
* Step 2 - Establish a connection without a client id and create a producer and start pumping messages. We will
* get hung
*/
try{
connection=connectionFactory.createConnection(USERNAME,DEFAULT_PASSWORD);
assertNotNull(connection);
session=connection.createSession(false,javax.jms.Session.CLIENT_ACKNOWLEDGE);
assertNotNull(session);
topic=session.createTopic(TOPIC_NAME);
assertNotNull(topic);
publisher=session.createProducer(topic);
assertNotNull(publisher);
MapMessage msg=session.createMapMessage();
assertNotNull(msg);
msg.setString("key1","value1");
int loop;
for(loop=0;loop<100000;loop++){
msg.setInt("key2",loop);
publisher.send(msg,deliveryMode,deliveryPriority,Message.DEFAULT_TIME_TO_LIVE);
if (loop%500==0){
System.out.println("Sent " + loop + " messages");
}
}
this.assertEquals(loop,100000);
publisher.close();
session.close();
connection.stop();
connection.stop();
}catch(JMSException ex){
try{
connection.close();
}catch(Exception ignore){}
throw new AssertionFailedError("Create Subscription caught: "+ex);
}
}
}