git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@438917 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-08-31 13:42:14 +00:00
parent f48b4eb3a3
commit bfaff9b37c
35 changed files with 1203 additions and 342 deletions

View File

@ -22,22 +22,21 @@ import javax.management.ObjectName;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFactory;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.QueueRegion;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.thread.TaskRunnerFactory;
public class ManagedQueueRegion extends QueueRegion {
private final ManagedRegionBroker regionBroker;
public ManagedQueueRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) {
super(broker,destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter);
public ManagedQueueRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
super(broker,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
regionBroker = broker;
}

View File

@ -42,6 +42,8 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFactory;
import org.apache.activemq.broker.region.DestinationFactoryImpl;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Region;
@ -89,9 +91,9 @@ public class ManagedRegionBroker extends RegionBroker {
private Broker contextBroker;
public ManagedRegionBroker(BrokerService brokerService,MBeanServer mbeanServer,ObjectName brokerObjectName,
TaskRunnerFactory taskRunnerFactory,UsageManager memoryManager,PersistenceAdapter adapter, DestinationInterceptor destinationInterceptor)
TaskRunnerFactory taskRunnerFactory,UsageManager memoryManager, DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor)
throws IOException{
super(brokerService,taskRunnerFactory,memoryManager,adapter, destinationInterceptor);
super(brokerService,taskRunnerFactory,memoryManager, destinationFactory, destinationInterceptor);
this.mbeanServer=mbeanServer;
this.brokerObjectName=brokerObjectName;
}
@ -119,33 +121,39 @@ public class ManagedRegionBroker extends RegionBroker {
}
protected Region createQueueRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory,
PersistenceAdapter adapter){
return new ManagedQueueRegion(this,destinationStatistics,memoryManager,taskRunnerFactory,adapter);
DestinationFactory destinationFactory){
return new ManagedQueueRegion(this,destinationStatistics,memoryManager,taskRunnerFactory,destinationFactory);
}
protected Region createTempQueueRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory){
return new ManagedTempQueueRegion(this,destinationStatistics,memoryManager,taskRunnerFactory);
protected Region createTempQueueRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory){
return new ManagedTempQueueRegion(this,destinationStatistics,memoryManager,taskRunnerFactory, destinationFactory);
}
protected Region createTempTopicRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory){
return new ManagedTempTopicRegion(this,destinationStatistics,memoryManager,taskRunnerFactory);
protected Region createTempTopicRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory){
return new ManagedTempTopicRegion(this,destinationStatistics,memoryManager,taskRunnerFactory, destinationFactory);
}
protected Region createTopicRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory,
PersistenceAdapter adapter){
return new ManagedTopicRegion(this,destinationStatistics,memoryManager,taskRunnerFactory,adapter);
DestinationFactory destinationFactory){
return new ManagedTopicRegion(this,destinationStatistics,memoryManager,taskRunnerFactory, destinationFactory);
}
public void register(ActiveMQDestination destName,Destination destination){
// TODO refactor to allow views for custom destinations
try{
ObjectName objectName=createObjectName(destName);
DestinationView view;
if(destination instanceof Queue){
if (destination instanceof Queue) {
view=new QueueView(this,(Queue) destination);
}else{
} else if (destination instanceof Topic){
view=new TopicView(this,(Topic) destination);
} else {
view = null;
log.warn("JMX View is not supported for custom destination: " + destination);
}
if (view != null) {
registerDestination(objectName,destName,view);
}
registerDestination(objectName,destName,view);
}catch(Exception e){
log.error("Failed to register destination "+destName,e);
}
@ -288,13 +296,12 @@ public class ManagedRegionBroker extends RegionBroker {
protected void buildExistingSubscriptions() throws Exception{
Map subscriptions=new HashMap();
Set destinations=adaptor.getDestinations();
Set destinations=destinationFactory.getDestinations();
if(destinations!=null){
for(Iterator iter=destinations.iterator();iter.hasNext();){
ActiveMQDestination dest=(ActiveMQDestination) iter.next();
if(dest.isTopic()){
TopicMessageStore store=adaptor.createTopicMessageStore((ActiveMQTopic) dest);
SubscriptionInfo[] infos=store.getAllSubscriptions();
SubscriptionInfo[] infos= destinationFactory.getAllDurableSubscriptions((ActiveMQTopic) dest);
if(infos!=null){
for(int i=0;i<infos.length;i++){
SubscriptionInfo info=infos[i];
@ -356,10 +363,15 @@ public class ManagedRegionBroker extends RegionBroker {
}
protected List getSubscriberMessages(SubscriptionView view){
//TODO It is very dangerous operation for big backlogs
if (!(destinationFactory instanceof DestinationFactoryImpl)) {
throw new RuntimeException("unsupported by " + destinationFactory);
}
PersistenceAdapter adapter = ((DestinationFactoryImpl)destinationFactory).getPersistenceAdapter();
final List result=new ArrayList();
try{
ActiveMQTopic topic=new ActiveMQTopic(view.getDestinationName());
TopicMessageStore store=adaptor.createTopicMessageStore(topic);
TopicMessageStore store=adapter.createTopicMessageStore(topic);
store.recover(new MessageRecoveryListener(){
public void recoverMessage(Message message) throws Exception{
result.add(message);
@ -373,6 +385,7 @@ public class ManagedRegionBroker extends RegionBroker {
log.error("Failed to browse messages for Subscription "+view,e);
}
return result;
}
protected ObjectName[] getTopics(){

View File

@ -22,6 +22,7 @@ import javax.management.ObjectName;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFactory;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TempQueueRegion;
@ -34,8 +35,8 @@ public class ManagedTempQueueRegion extends TempQueueRegion {
private final ManagedRegionBroker regionBroker;
public ManagedTempQueueRegion(ManagedRegionBroker regionBroker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
super(regionBroker,destinationStatistics, memoryManager, taskRunnerFactory);
public ManagedTempQueueRegion(ManagedRegionBroker regionBroker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
super(regionBroker,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
this.regionBroker = regionBroker;
}

View File

@ -22,6 +22,7 @@ import javax.management.ObjectName;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFactory;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TempTopicRegion;
@ -34,8 +35,8 @@ public class ManagedTempTopicRegion extends TempTopicRegion {
private final ManagedRegionBroker regionBroker;
public ManagedTempTopicRegion(ManagedRegionBroker regionBroker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
super(regionBroker,destinationStatistics, memoryManager, taskRunnerFactory);
public ManagedTempTopicRegion(ManagedRegionBroker regionBroker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
super(regionBroker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
this.regionBroker = regionBroker;
}

View File

@ -22,21 +22,21 @@ import javax.management.ObjectName;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFactory;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.thread.TaskRunnerFactory;
public class ManagedTopicRegion extends TopicRegion {
private final ManagedRegionBroker regionBroker;
public ManagedTopicRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) {
super(broker,destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter);
public ManagedTopicRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
super(broker,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
regionBroker = broker;
}

View File

@ -35,7 +35,6 @@ import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.filter.DestinationMap;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -54,7 +53,7 @@ abstract public class AbstractRegion implements Region {
protected final DestinationMap destinationMap = new DestinationMap();
protected final ConcurrentHashMap subscriptions = new ConcurrentHashMap();
protected final UsageManager memoryManager;
protected final PersistenceAdapter persistenceAdapter;
protected final DestinationFactory destinationFactory;
protected final DestinationStatistics destinationStatistics;
protected final RegionBroker broker;
protected boolean autoCreateDestinations=true;
@ -62,12 +61,18 @@ abstract public class AbstractRegion implements Region {
protected final Object destinationsMutex = new Object();
protected final Map consumerChangeMutexMap = new HashMap();
public AbstractRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) {
public AbstractRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
if (broker == null) {
throw new IllegalArgumentException("null broker");
}
this.broker = broker;
this.destinationStatistics = destinationStatistics;
this.memoryManager = memoryManager;
this.taskRunnerFactory = taskRunnerFactory;
this.persistenceAdapter = persistenceAdapter;
if (broker == null) {
throw new IllegalArgumentException("null destinationFactory");
}
this.destinationFactory = destinationFactory;
}
public void start() throws Exception {
@ -205,15 +210,14 @@ abstract public class AbstractRegion implements Region {
// eagerly load all destinations into the broker but have an inactive state for the
// destination which has reduced memory usage.
//
if( persistenceAdapter!=null ) {
Set inactiveDests = getInactiveDestinations();
for (Iterator iter = inactiveDests.iterator(); iter.hasNext();) {
ActiveMQDestination dest = (ActiveMQDestination) iter.next();
if( sub.matches(dest) ) {
context.getBroker().addDestination(context, dest);
}
}
Set inactiveDests = getInactiveDestinations();
for (Iterator iter = inactiveDests.iterator(); iter.hasNext();) {
ActiveMQDestination dest = (ActiveMQDestination) iter.next();
if( sub.matches(dest) ) {
context.getBroker().addDestination(context, dest);
}
}
subscriptions.put(info.getConsumerId(), sub);
@ -243,14 +247,14 @@ abstract public class AbstractRegion implements Region {
* @return Set of all stored destinations
*/
public Set getDurableDestinations(){
return persistenceAdapter.getDestinations();
return destinationFactory.getDestinations();
}
/**
* @return all Destinations that don't have active consumers
*/
protected Set getInactiveDestinations() {
Set inactiveDests = persistenceAdapter.getDestinations();
Set inactiveDests = destinationFactory.getDestinations();
inactiveDests.removeAll( destinations.keySet() );
return inactiveDests;
}
@ -341,7 +345,10 @@ abstract public class AbstractRegion implements Region {
}
protected abstract Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws Exception;
abstract protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception;
protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
return destinationFactory.createDestination(context, destination, destinationStatistics);
}
public boolean isAutoCreateDestinations() {
return autoCreateDestinations;

View File

@ -25,9 +25,7 @@ import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageStore;
/**
*
@ -43,7 +41,6 @@ public interface Destination extends Service {
void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException;
void gc();
Message loadMessage(MessageId messageId) throws IOException;
ActiveMQDestination getActiveMQDestination();
UsageManager getUsageManager();
@ -51,7 +48,6 @@ public interface Destination extends Service {
void dispose(ConnectionContext context) throws IOException;
DestinationStatistics getDestinationStatistics();
MessageStore getMessageStore();
DeadLetterStrategy getDeadLetterStrategy();
public Message[] browse();

View File

@ -0,0 +1,56 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.Set;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.SubscriptionInfo;
/**
* Used to create Destinations. One instance of DestinationFactory is used per BrokerService.
*
* @author fateev@amazon.com
* @version $Revision$
*/
public abstract class DestinationFactory {
/**
* Create destination implementation.
*/
abstract public Destination createDestination(ConnectionContext context, ActiveMQDestination destination, DestinationStatistics destinationStatistics) throws Exception;
/**
* Returns a set of all the {@link org.apache.activemq.command.ActiveMQDestination}
* objects that the persistence store is aware exist.
*/
abstract public Set getDestinations();
/**
* Lists all the durable subscirptions for a given destination.
*/
abstract public SubscriptionInfo[] getAllDurableSubscriptions(ActiveMQTopic topic) throws IOException;
abstract public long getLastMessageBrokerSequenceId() throws IOException;
abstract public void setRegionBroker(RegionBroker regionBroker);
}

View File

@ -0,0 +1,158 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.Set;
import javax.jms.JMSException;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.TaskRunnerFactory;
/**
* Creates standard ActiveMQ implementations of {@link org.apache.activemq.broker.region.Destination}.
*
* @author fateev@amazon.com
* @version $Revision$
*/
public class DestinationFactoryImpl extends DestinationFactory {
protected final UsageManager memoryManager;
protected final TaskRunnerFactory taskRunnerFactory;
protected final PersistenceAdapter persistenceAdapter;
protected RegionBroker broker;
public DestinationFactoryImpl(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
PersistenceAdapter persistenceAdapter) {
this.memoryManager = memoryManager;
this.taskRunnerFactory = taskRunnerFactory;
if (persistenceAdapter == null) {
throw new IllegalArgumentException("null persistenceAdapter");
}
this.persistenceAdapter = persistenceAdapter;
}
public void setRegionBroker(RegionBroker broker) {
if (broker == null) {
throw new IllegalArgumentException("null broker");
}
this.broker = broker;
}
public Set getDestinations() {
return persistenceAdapter.getDestinations();
}
/**
* @return instance of {@link Queue} or {@link Topic}
*/
public Destination createDestination(ConnectionContext context, ActiveMQDestination destination, DestinationStatistics destinationStatistics) throws Exception {
if (destination.isQueue()) {
if (destination.isTemporary()) {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination;
return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory) {
public void addSubscription(ConnectionContext context,Subscription sub) throws Exception {
// Only consumers on the same connection can consume from
// the temporary destination
if( !tempDest.getConnectionId().equals( sub.getConsumerInfo().getConsumerId().getConnectionId() ) ) {
throw new JMSException("Cannot subscribe to remote temporary destination: "+tempDest);
}
super.addSubscription(context, sub);
};
};
} else {
MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue) destination);
Queue queue = new Queue(destination, memoryManager, store, destinationStatistics, taskRunnerFactory);
configureQueue(queue, destination);
return queue;
}
} else if (destination.isTemporary()){
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination;
return new Topic(destination, null, memoryManager, destinationStatistics, taskRunnerFactory) {
public void addSubscription(ConnectionContext context,Subscription sub) throws Exception {
// Only consumers on the same connection can consume from
// the temporary destination
if( !tempDest.getConnectionId().equals( sub.getConsumerInfo().getConsumerId().getConnectionId() ) ) {
throw new JMSException("Cannot subscribe to remote temporary destination: "+tempDest);
}
super.addSubscription(context, sub);
};
};
} else {
TopicMessageStore store = null;
if (!AdvisorySupport.isAdvisoryTopic(destination)) {
store = persistenceAdapter.createTopicMessageStore((ActiveMQTopic) destination);
}
Topic topic = new Topic(destination, store, memoryManager, destinationStatistics, taskRunnerFactory);
configureTopic(topic, destination);
return topic;
}
}
protected void configureQueue(Queue queue, ActiveMQDestination destination) {
if (broker == null) {
throw new IllegalStateException("broker property is not set");
}
if (broker.getDestinationPolicy() != null) {
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
if (entry != null) {
entry.configure(queue);
}
}
}
protected void configureTopic(Topic topic, ActiveMQDestination destination) {
if (broker == null) {
throw new IllegalStateException("broker property is not set");
}
if (broker.getDestinationPolicy() != null) {
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
if (entry != null) {
entry.configure(topic);
}
}
}
public long getLastMessageBrokerSequenceId() throws IOException {
return persistenceAdapter.getLastMessageBrokerSequenceId();
}
public PersistenceAdapter getPersistenceAdapter() {
return persistenceAdapter;
}
public SubscriptionInfo[] getAllDurableSubscriptions(ActiveMQTopic topic) throws IOException {
return persistenceAdapter.createTopicMessageStore(topic).getAllSubscriptions();
}
}

View File

@ -23,9 +23,7 @@ import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageStore;
import java.io.IOException;
import java.util.Iterator;
@ -99,10 +97,6 @@ public class DestinationFilter implements Destination {
return next.getMessagesCached();
}
public MessageStore getMessageStore() {
return next.getMessageStore();
}
public String getName() {
return next.getName();
}
@ -115,10 +109,6 @@ public class DestinationFilter implements Destination {
return next.getUsageManager();
}
public Message loadMessage(MessageId messageId) throws IOException {
return next.loadMessage(messageId);
}
public boolean lock(MessageReference node, LockOwner lockOwner) {
return next.lock(node, lockOwner);
}

View File

@ -0,0 +1,128 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region;
import java.io.IOException;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
/**
* Only used by the {@link QueueMessageReference#END_OF_BROWSE_MARKER}
*/
final class EndOfBrowseMarkerQueueMessageReference implements
QueueMessageReference {
private ActiveMQMessage message = new ActiveMQMessage();
private volatile int references;
public void drop() {
throw new RuntimeException("not implemented");
}
public LockOwner getLockOwner() {
throw new RuntimeException("not implemented");
}
public boolean isAcked() {
return false;
}
public boolean isDropped() {
throw new RuntimeException("not implemented");
}
public boolean lock(LockOwner subscription) {
throw new RuntimeException("not implemented");
}
public void setAcked(boolean b) {
throw new RuntimeException("not implemented");
}
public void unlock() {
throw new RuntimeException("not implemented");
}
public int decrementReferenceCount() {
return --references;
}
public long getExpiration() {
throw new RuntimeException("not implemented");
}
public String getGroupID() {
throw new RuntimeException("not implemented");
}
public int getGroupSequence() {
throw new RuntimeException("not implemented");
}
public Message getMessage() throws IOException {
return message;
}
public Message getMessageHardRef() {
throw new RuntimeException("not implemented");
}
public MessageId getMessageId() {
return message.getMessageId();
}
public int getRedeliveryCounter() {
throw new RuntimeException("not implemented");
}
public int getReferenceCount() {
return references;
}
public Destination getRegionDestination() {
return null;
}
public int getSize() {
throw new RuntimeException("not implemented");
}
public ConsumerId getTargetConsumerId() {
throw new RuntimeException("not implemented");
}
public void incrementRedeliveryCounter() {
throw new RuntimeException("not implemented");
}
public int incrementReferenceCount() {
return ++references;
}
public boolean isExpired() {
throw new RuntimeException("not implemented");
}
public boolean isPersistent() {
throw new RuntimeException("not implemented");
}
}

View File

@ -19,10 +19,10 @@ package org.apache.activemq.broker.region;
import java.io.IOException;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageStore;
/**
* Keeps track of a message that is flowing through the Broker. This
@ -31,13 +31,13 @@ import org.apache.activemq.command.MessageId;
*
* @version $Revision: 1.15 $
*/
public class IndirectMessageReference implements MessageReference {
public static final ActiveMQMessage END_OF_BROWSE_MARKER_MESSAGE = new ActiveMQMessage();
public static final IndirectMessageReference END_OF_BROWSE_MARKER = new IndirectMessageReference(END_OF_BROWSE_MARKER_MESSAGE);
public class IndirectMessageReference implements QueueMessageReference {
/** The destination that is managing the message */
private final Destination regionDestination;
private final MessageStore destinationStore;
/** The id of the message is always valid */
private final MessageId messageId;
/** Is the message persistent? */
@ -63,23 +63,9 @@ public class IndirectMessageReference implements MessageReference {
/** the expiration time of the message */
private long expiration;
/**
* Only used by the END_OF_BROWSE_MARKER singleton
*/
private IndirectMessageReference(ActiveMQMessage message) {
this.regionDestination=null;
this.message = message;
this.messageId=null;
this.persistent=false;
this.groupID = null;
this.groupSequence = 0;
this.targetConsumerId=null;
this.expiration = message.getExpiration();
this.cachedSize = message != null ? message.getSize() : 0;
}
public IndirectMessageReference(Destination destination, Message message) {
public IndirectMessageReference(Queue destination, MessageStore destinationStore, Message message) {
this.regionDestination=destination;
this.destinationStore = destinationStore;
this.message = message;
this.messageId=message.getMessageId();
this.persistent=message.isPersistent() && destination.getMessageStore()!=null;
@ -106,10 +92,11 @@ public class IndirectMessageReference implements MessageReference {
if( persistent && rc==1 ) {
assert message == null;
try {
message = regionDestination.loadMessage(messageId);
message = destinationStore.getMessage(messageId);
if( message == null ) {
dropped = true;
} else {
message.setRegionDestination(regionDestination);
message.incrementReferenceCount();
}
} catch (IOException e) {

View File

@ -49,6 +49,8 @@ public interface MessageReference {
public ConsumerId getTargetConsumerId();
public int getSize();
public long getExpiration();
public String getGroupID();
public int getGroupSequence();
/**
* Returns true if this message is expired

View File

@ -160,7 +160,7 @@ public class Queue implements Destination {
// subscription.
for (Iterator iter = messages.iterator(); iter.hasNext();) {
IndirectMessageReference node = (IndirectMessageReference) iter.next();
QueueMessageReference node = (QueueMessageReference) iter.next();
if (node.isDropped()) {
continue;
}
@ -220,7 +220,7 @@ public class Queue implements Destination {
List messagesToDispatch = new ArrayList();
synchronized (messages) {
for (Iterator iter = messages.iterator(); iter.hasNext();) {
IndirectMessageReference node = (IndirectMessageReference) iter.next();
QueueMessageReference node = (QueueMessageReference) iter.next();
if (node.isDropped()) {
continue;
}
@ -237,7 +237,7 @@ public class Queue implements Destination {
// now lets dispatch from the copy of the collection to
// avoid deadlocks
for (Iterator iter = messagesToDispatch.iterator(); iter.hasNext();) {
IndirectMessageReference node = (IndirectMessageReference) iter.next();
QueueMessageReference node = (QueueMessageReference) iter.next();
node.incrementRedeliveryCounter();
node.unlock();
msgContext.setMessageReference(node);
@ -316,7 +316,7 @@ public class Queue implements Destination {
synchronized (messages) {
for (Iterator iter = messages.iterator(); iter.hasNext();) {
// Remove dropped messages from the queue.
IndirectMessageReference node = (IndirectMessageReference) iter.next();
QueueMessageReference node = (QueueMessageReference) iter.next();
if (node.isDropped()) {
garbageSize--;
iter.remove();
@ -345,7 +345,7 @@ public class Queue implements Destination {
}
}
public Message loadMessage(MessageId messageId) throws IOException {
Message loadMessage(MessageId messageId) throws IOException {
Message msg = store.getMessage(messageId);
if (msg != null) {
msg.setRegionDestination(this);
@ -460,7 +460,7 @@ public class Queue implements Destination {
// Implementation methods
// -------------------------------------------------------------------------
private MessageReference createMessageReference(Message message) {
return new IndirectMessageReference(this, message);
return new IndirectMessageReference(this, store, message);
}
private void dispatch(ConnectionContext context, MessageReference node, Message message) throws Exception {
@ -504,7 +504,7 @@ public class Queue implements Destination {
return rc;
}
public MessageStore getMessageStore() {
MessageStore getMessageStore() {
return store;
}
@ -565,7 +565,7 @@ public class Queue implements Destination {
ConnectionContext c = createConnectionContext();
for (Iterator iter = messages.iterator(); iter.hasNext();) {
try {
IndirectMessageReference r = (IndirectMessageReference) iter.next();
QueueMessageReference r = (QueueMessageReference) iter.next();
// We should only delete messages that can be locked.
if (r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER)) {

View File

@ -38,7 +38,7 @@ public class QueueBrowserSubscription extends QueueSubscription {
}
protected boolean canDispatch(MessageReference node) {
return !((IndirectMessageReference)node).isAcked();
return !((QueueMessageReference)node).isAcked();
}
public String toString() {
@ -53,11 +53,11 @@ public class QueueBrowserSubscription extends QueueSubscription {
public void browseDone() throws Exception {
browseDone = true;
add(IndirectMessageReference.END_OF_BROWSE_MARKER);
add(QueueMessageReference.END_OF_BROWSE_MARKER);
}
protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
if( node == IndirectMessageReference.END_OF_BROWSE_MARKER ) {
if( node == QueueMessageReference.END_OF_BROWSE_MARKER ) {
MessageDispatch md = new MessageDispatch();
md.setMessage(null);
md.setConsumerId( info.getConsumerId() );

View File

@ -0,0 +1,43 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region;
/**
* Queue specific MessageReference.
*
* @author fateev@amazon.com
* @version $Revision$
*/
public interface QueueMessageReference extends MessageReference {
public static final QueueMessageReference END_OF_BROWSE_MARKER = new EndOfBrowseMarkerQueueMessageReference();
public boolean isAcked();
public void setAcked(boolean b);
public void drop();
public boolean isDropped();
public boolean lock(LockOwner subscription);
public void unlock();
public LockOwner getLockOwner();
}

View File

@ -17,16 +17,10 @@
*/
package org.apache.activemq.broker.region;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.thread.TaskRunnerFactory;
import javax.jms.InvalidSelectorException;
@ -43,8 +37,8 @@ public class QueueRegion extends AbstractRegion {
public QueueRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
PersistenceAdapter persistenceAdapter) {
super(broker,destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter);
DestinationFactory destinationFactory) {
super(broker,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
public String toString() {
@ -52,24 +46,6 @@ public class QueueRegion extends AbstractRegion {
+ "%";
}
// Implementation methods
// -------------------------------------------------------------------------
protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue) destination);
Queue queue = new Queue(destination, memoryManager, store, destinationStatistics, taskRunnerFactory);
configureQueue(queue, destination);
return queue;
}
protected void configureQueue(Queue queue, ActiveMQDestination destination) {
if (broker.getDestinationPolicy() != null) {
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
if (entry != null) {
entry.configure(queue);
}
}
}
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
if (info.isBrowser()) {
return new QueueBrowserSubscription(broker,context, info);

View File

@ -49,11 +49,13 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
*/
protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference n) throws IOException {
final IndirectMessageReference node = (IndirectMessageReference) n;
final Queue queue = (Queue)node.getRegionDestination();
queue.acknowledge(context, this, ack, node);
final Destination q = n.getRegionDestination();
q.acknowledge(context, this, ack, n);
final QueueMessageReference node = (QueueMessageReference) n;
final Queue queue = (Queue)q;
if( !ack.isInTransaction() ) {
node.drop();
queue.dropEvent();
@ -72,10 +74,9 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
}
protected boolean canDispatch(MessageReference n) throws IOException {
IndirectMessageReference node = (IndirectMessageReference) n;
QueueMessageReference node = (QueueMessageReference) n;
if( node.isAcked() )
return false;
// Keep message groups together.
String groupId = node.getGroupID();
int sequence = node.getGroupSequence();
@ -121,7 +122,6 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
} else {
return node.lock(this);
}
}
/**

View File

@ -87,20 +87,22 @@ public class RegionBroker implements Broker {
private BrokerId brokerId;
private String brokerName;
private Map clientIdSet = new HashMap(); // we will synchronize access
protected PersistenceAdapter adaptor;
private final DestinationInterceptor destinationInterceptor;
private ConnectionContext adminConnectionContext;
protected DestinationFactory destinationFactory;
public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter adapter, DestinationInterceptor destinationInterceptor) throws IOException {
public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor) throws IOException {
this.brokerService = brokerService;
if (destinationFactory == null) {
throw new IllegalArgumentException("null destinationFactory");
}
this.sequenceGenerator.setLastSequenceId( destinationFactory.getLastMessageBrokerSequenceId() );
this.destinationFactory = destinationFactory;
queueRegion = createQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
topicRegion = createTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
this.destinationInterceptor = destinationInterceptor;
this.sequenceGenerator.setLastSequenceId( adapter.getLastMessageBrokerSequenceId() );
this.adaptor = adapter;//weird - both are valid spellings ...
queueRegion = createQueueRegion(memoryManager, taskRunnerFactory, adapter);
topicRegion = createTopicRegion(memoryManager, taskRunnerFactory, adapter);
tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory);
tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory);
tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
}
public Map getDestinationMap() {
@ -147,20 +149,20 @@ public class RegionBroker implements Broker {
return topicRegion;
}
protected Region createTempTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
return new TempTopicRegion(this,destinationStatistics, memoryManager, taskRunnerFactory);
protected Region createTempTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new TempTopicRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
protected Region createTempQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
return new TempQueueRegion(this,destinationStatistics, memoryManager, taskRunnerFactory);
protected Region createTempQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new TempQueueRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
protected Region createTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter adapter) {
return new TopicRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, adapter);
protected Region createTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new TopicRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
protected Region createQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter adapter) {
return new QueueRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, adapter);
protected Region createQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new QueueRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
private static PersistenceAdapter createDefaultPersistenceAdapter(UsageManager memoryManager) throws IOException {
@ -537,7 +539,7 @@ public class RegionBroker implements Broker {
}
public Set getDurableDestinations(){
return adaptor != null ? adaptor.getDestinations() : Collections.EMPTY_SET;
return destinationFactory.getDestinations();
}
public boolean isFaultTolerantConfiguration(){

View File

@ -18,12 +18,7 @@
package org.apache.activemq.broker.region;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory;
@ -34,27 +29,11 @@ import org.apache.activemq.thread.TaskRunnerFactory;
*/
public class TempQueueRegion extends AbstractRegion {
public TempQueueRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
super(broker,destinationStatistics, memoryManager, taskRunnerFactory, null);
public TempQueueRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
super(broker,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
setAutoCreateDestinations(false);
}
protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination;
return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory) {
public void addSubscription(ConnectionContext context,Subscription sub) throws Exception {
// Only consumers on the same connection can consume from
// the temporary destination
if( !tempDest.getConnectionId().equals( sub.getConsumerInfo().getConsumerId().getConnectionId() ) ) {
throw new JMSException("Cannot subscribe to remote temporary destination: "+tempDest);
}
super.addSubscription(context, sub);
};
};
}
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
if( info.isBrowser() ) {
return new QueueBrowserSubscription(broker,context, info);

View File

@ -19,10 +19,7 @@ package org.apache.activemq.broker.region;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory;
@ -33,27 +30,11 @@ import org.apache.activemq.thread.TaskRunnerFactory;
*/
public class TempTopicRegion extends AbstractRegion {
public TempTopicRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
super(broker,destinationStatistics, memoryManager, taskRunnerFactory, null);
public TempTopicRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
super(broker,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
setAutoCreateDestinations(false);
}
protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination;
return new Topic(destination, null, memoryManager, destinationStatistics, taskRunnerFactory) {
public void addSubscription(ConnectionContext context,Subscription sub) throws Exception {
// Only consumers on the same connection can consume from
// the temporary destination
if( !tempDest.getConnectionId().equals( sub.getConsumerInfo().getConsumerId().getConnectionId() ) ) {
throw new JMSException("Cannot subscribe to remote temporary destination: "+tempDest);
}
super.addSubscription(context, sub);
};
};
}
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
if( info.isDurable() ) {
throw new JMSException("A durable subscription cannot be created for a temporary topic.");

View File

@ -17,17 +17,16 @@
*/
package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.Iterator;
import java.util.Set;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
@ -35,7 +34,6 @@ import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.util.LongSequenceGenerator;
@ -57,8 +55,8 @@ public class TopicRegion extends AbstractRegion {
private boolean keepDurableSubsActive=false;
public TopicRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
PersistenceAdapter persistenceAdapter) {
super(broker,destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter);
DestinationFactory destinationFactory) {
super(broker,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
@ -160,14 +158,15 @@ public class TopicRegion extends AbstractRegion {
// Implementation methods
// -------------------------------------------------------------------------
protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
TopicMessageStore store = null;
if (!AdvisorySupport.isAdvisoryTopic(destination)){
store = persistenceAdapter.createTopicMessageStore((ActiveMQTopic) destination);
}
Topic topic = new Topic(destination, store, memoryManager, destinationStatistics, taskRunnerFactory);
configureTopic(topic, destination);
Topic topic = (Topic) super.createDestination(context, destination);
recoverDurableSubscriptions(context, topic);
return topic;
}
private void recoverDurableSubscriptions(ConnectionContext context, Topic topic) throws IOException, JMSException, Exception {
TopicMessageStore store = (TopicMessageStore) topic.getMessageStore();
// Eagerly recover the durable subscriptions
if (store != null) {
SubscriptionInfo[] infos = store.getAllSubscriptions();
@ -191,8 +190,6 @@ public class TopicRegion extends AbstractRegion {
topic.addSubscription(context, sub);
}
}
return topic;
}
private ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) {

View File

@ -100,6 +100,20 @@ public interface ListContainer extends List{
* @return true if successful
*/
public boolean doRemove(int position);
/**
* @return the maximumCacheSize
*/
public int getMaximumCacheSize();
/**
* @param maximumCacheSize the maximumCacheSize to set
*/
public void setMaximumCacheSize(int maximumCacheSize);
/**
* clear any cached values
*/
public void clearCache();
}

View File

@ -25,6 +25,12 @@ import java.util.Set;
* @version $Revision: 1.2 $
*/
public interface Store{
public final static Marshaller BytesMarshaller = new BytesMarshaller();
public final static Marshaller ObjectMarshaller = new ObjectMarshaller();
public final static Marshaller StringMarshaller = new StringMarshaller();
/**
* close the store
*

View File

@ -31,7 +31,7 @@ import org.apache.commons.logging.LogFactory;
public abstract class BaseContainerImpl{
private static final Log log=LogFactory.getLog(BaseContainerImpl.class);
protected IndexItem root;
protected IndexLinkedList list;
protected IndexLinkedList indexList;
protected IndexManager rootIndexManager; // IndexManager that contains the root
protected IndexManager indexManager;
protected DataManager dataManager;
@ -59,8 +59,8 @@ public abstract class BaseContainerImpl{
synchronized(mutex){
if (!initialized){
initialized= true;
if (this.list == null){
this.list=new DiskIndexLinkedList(indexManager,root);
if (this.indexList == null){
this.indexList=new DiskIndexLinkedList(indexManager,root);
}
}
}
@ -68,23 +68,23 @@ public abstract class BaseContainerImpl{
}
protected void clear(){
if (list != null){
list.clear();
if (indexList != null){
indexList.clear();
}
}
/**
* @return the list
* @return the indexList
*/
public IndexLinkedList getList(){
return list;
return indexList;
}
/**
* @param list the list to set
* @param indexList the indexList to set
*/
public void setList(IndexLinkedList list){
this.list=list;
public void setList(IndexLinkedList indexList){
this.indexList=indexList;
}
public abstract void unload();
@ -99,7 +99,7 @@ public abstract class BaseContainerImpl{
protected abstract void remove(IndexItem currentItem);
protected final IndexLinkedList getInternalList(){
return list;
return indexList;
}
public final void close(){
@ -143,24 +143,24 @@ public abstract class BaseContainerImpl{
synchronized(mutex){
loaded=true;
synchronized(mutex){
List list=new ArrayList();
List indexList=new ArrayList();
try{
long nextItem=root.getNextItem();
while(nextItem!=Item.POSITION_NOT_SET){
IndexItem item=new IndexItem();
item.setOffset(nextItem);
list.add(item);
indexList.add(item);
nextItem=item.getNextItem();
}
root.setNextItem(Item.POSITION_NOT_SET);
updateIndex(root);
for(int i=0;i<list.size();i++){
IndexItem item=(IndexItem) list.get(i);
for(int i=0;i<indexList.size();i++){
IndexItem item=(IndexItem) indexList.get(i);
dataManager.removeInterestInFile(item.getKeyFile());
dataManager.removeInterestInFile(item.getValueFile());
indexManager.freeIndex(item);
}
list.clear();
indexList.clear();
}catch(IOException e){
log.error("Failed to clear Container "+getId(),e);
throw new RuntimeStoreException(e);

View File

@ -0,0 +1,114 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha.impl;
import java.util.ListIterator;
/**
* @version $Revision$
*/
public class CachedContainerListIterator implements ListIterator{
protected ListContainerImpl container;
protected IndexLinkedList list;
protected int pos = 0;
protected int nextPos =0;
protected IndexItem currentItem;
protected CachedContainerListIterator(ListContainerImpl container,int start){
this.container=container;
this.list=list;
this.pos=start;
this.nextPos = this.pos;
}
public boolean hasNext(){
return nextPos >= 0 && nextPos < container.size();
}
public Object next(){
pos = nextPos;
Object result = container.getCachedItem(pos);
nextPos++;
return result;
}
public void remove(){
container.remove(pos);
nextPos--;
}
/*
* (non-Javadoc)
*
* @see java.util.ListIterator#hasPrevious()
*/
public boolean hasPrevious(){
return pos >= 0 && pos < container.size();
}
/*
* (non-Javadoc)
*
* @see java.util.ListIterator#previous()
*/
public Object previous(){
Object result = container.getCachedItem(pos);
pos--;
return result;
}
/*
* (non-Javadoc)
*
* @see java.util.ListIterator#nextIndex()
*/
public int nextIndex(){
return pos +1;
}
/*
* (non-Javadoc)
*
* @see java.util.ListIterator#previousIndex()
*/
public int previousIndex(){
return pos -1;
}
/*
* (non-Javadoc)
*
* @see java.util.ListIterator#set(E)
*/
public void set(Object o){
container.internalSet(pos,o);
}
/*
* (non-Javadoc)
*
* @see java.util.ListIterator#add(E)
*/
public void add(Object o){
container.internalAdd(previousIndex()+1,o);
}
}

View File

@ -103,7 +103,7 @@ public class ContainerListIterator extends ContainerValueCollectionIterator impl
* @see java.util.ListIterator#add(E)
*/
public void add(Object o){
IndexItem item=((ListContainerImpl) container).internalSet(previousIndex()+1,o);
IndexItem item=((ListContainerImpl) container).internalAdd(previousIndex()+1,o);
nextItem=item;
}
}

View File

@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.ObjectMarshaller;
import org.apache.activemq.kaha.Store;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
@ -35,7 +36,7 @@ import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
class IndexRootContainer {
private static final Log log=LogFactory.getLog(IndexRootContainer.class);
protected static final Marshaller rootMarshaller = new ObjectMarshaller();
protected static final Marshaller rootMarshaller = Store.ObjectMarshaller;
protected IndexItem root;
protected IndexManager indexManager;
protected DataManager dataManager;

View File

@ -40,14 +40,14 @@ public class KahaStore implements Store{
private static final Log log=LogFactory.getLog(KahaStore.class);
private File directory;
private IndexRootContainer mapsContainer;
private IndexRootContainer listsContainer;
protected IndexRootContainer mapsContainer;
protected IndexRootContainer listsContainer;
private Map lists=new ConcurrentHashMap();
private Map maps=new ConcurrentHashMap();
private Map dataManagers = new ConcurrentHashMap();
private Map indexManagers = new ConcurrentHashMap();
private IndexManager rootIndexManager; //contains all the root indexes
protected IndexManager rootIndexManager; //contains all the root indexes
private boolean closed=false;
private String name;
@ -221,7 +221,8 @@ public class KahaStore implements Store{
}
return result;
}
public void deleteListContainer(Object id) throws IOException{
initialize();
ListContainerImpl container=(ListContainerImpl) lists.remove(id);

View File

@ -1,19 +1,15 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.kaha.impl;
@ -21,12 +17,13 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.ObjectMarshaller;
import org.apache.activemq.kaha.RuntimeStoreException;
import org.apache.activemq.kaha.Store;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
@ -34,12 +31,16 @@ import org.apache.commons.logging.LogFactory;
*
* @version $Revision: 1.2 $
*/
public final class ListContainerImpl extends BaseContainerImpl implements ListContainer{
public class ListContainerImpl extends BaseContainerImpl implements ListContainer{
private static final Log log=LogFactory.getLog(ListContainerImpl.class);
protected Marshaller marshaller=new ObjectMarshaller();
protected Marshaller marshaller=Store.ObjectMarshaller;
protected LinkedList cacheList=new LinkedList();
protected int offset=0;
protected int maximumCacheSize=100;
protected IndexItem lastCached;
protected ListContainerImpl(ContainerId id,IndexItem root,IndexManager rootIndexManager,IndexManager indexManager,DataManager dataManager)
throws IOException{
protected ListContainerImpl(ContainerId id,IndexItem root,IndexManager rootIndexManager,IndexManager indexManager,
DataManager dataManager) throws IOException{
super(id,root,rootIndexManager,indexManager,dataManager);
}
@ -59,7 +60,8 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
long nextItem=root.getNextItem();
while(nextItem!=Item.POSITION_NOT_SET){
IndexItem item=indexManager.getIndex(nextItem);
list.add(item);
indexList.add(item);
itemAdded(item,indexList.size()-1,getValue(item));
nextItem=item.getNextItem();
}
}catch(IOException e){
@ -80,7 +82,8 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
checkClosed();
if(loaded){
loaded=false;
list.clear();
indexList.clear();
clearCache();
}
}
@ -102,7 +105,7 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
synchronized(mutex){
result=other.size()==size();
if(result){
for(int i=0;i<list.size();i++){
for(int i=0;i<indexList.size();i++){
Object o1=other.get(i);
Object o2=get(i);
result=o1==o2||(o1!=null&&o2!=null&&o1.equals(o2));
@ -123,7 +126,7 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
*/
public int size(){
load();
return list.size();
return indexList.size();
}
/*
@ -135,7 +138,8 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
load();
IndexItem item=writeFirst(o);
synchronized(mutex){
list.addFirst(item);
indexList.addFirst(item);
itemAdded(item,0,o);
}
}
@ -148,7 +152,8 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
load();
IndexItem item=writeLast(o);
synchronized(mutex){
list.addLast(item);
indexList.addLast(item);
itemAdded(item,indexList.size()-1,o);
}
}
@ -161,13 +166,14 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
load();
Object result=null;
synchronized(mutex){
IndexItem item=(IndexItem) list.getFirst();
IndexItem item=(IndexItem) indexList.getFirst();
if(item!=null){
itemRemoved(0);
result=getValue(item);
int index=list.indexOf(item);
IndexItem prev=index>0?(IndexItem) list.get(index-1):root;
IndexItem next=index<(list.size()-1)?(IndexItem) list.get(index+1):null;
list.removeFirst();
int index=indexList.indexOf(item);
IndexItem prev=index>0?(IndexItem) indexList.get(index-1):root;
IndexItem next=index<(indexList.size()-1)?(IndexItem) indexList.get(index+1):null;
indexList.removeFirst();
delete(item,prev,next);
item=null;
}
@ -184,12 +190,13 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
load();
Object result=null;
synchronized(mutex){
IndexItem last=list.getLast();
IndexItem last=indexList.getLast();
if(last!=null){
itemRemoved(indexList.size()-1);
result=getValue(last);
IndexItem prev=list.getPrevEntry(last);
IndexItem prev=indexList.getPrevEntry(last);
IndexItem next=null;
list.removeLast();
indexList.removeLast();
delete(last,prev,next);
}
}
@ -203,7 +210,7 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
*/
public boolean isEmpty(){
load();
return list.isEmpty();
return indexList.isEmpty();
}
/*
@ -216,14 +223,14 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
boolean result=false;
if(o!=null){
synchronized(mutex){
IndexItem next=list.getFirst();
IndexItem next=indexList.getFirst();
while(next!=null){
Object value=getValue(next);
if(value!=null&&value.equals(o)){
result=true;
break;
}
next=list.getNextEntry(next);
next=indexList.getNextEntry(next);
}
}
}
@ -247,13 +254,13 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
*/
public Object[] toArray(){
load();
List tmp=new ArrayList(list.size());
List tmp=new ArrayList(indexList.size());
synchronized(mutex){
IndexItem next=list.getFirst();
IndexItem next=indexList.getFirst();
while(next!=null){
Object value=getValue(next);
tmp.add(value);
next=list.getNextEntry(next);
next=indexList.getNextEntry(next);
}
}
return tmp.toArray();
@ -266,13 +273,13 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
*/
public Object[] toArray(Object[] a){
load();
List tmp=new ArrayList(list.size());
List tmp=new ArrayList(indexList.size());
synchronized(mutex){
IndexItem next=list.getFirst();
IndexItem next=indexList.getFirst();
while(next!=null){
Object value=getValue(next);
tmp.add(value);
next=list.getNextEntry(next);
next=indexList.getNextEntry(next);
}
}
return tmp.toArray(a);
@ -298,15 +305,18 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
load();
boolean result=false;
synchronized(mutex){
IndexItem next=list.getFirst();
int pos=0;
IndexItem next=indexList.getFirst();
while(next!=null){
Object value=getValue(next);
if(value!=null&&value.equals(o)){
remove(next);
itemRemoved(pos);
result=true;
break;
}
next=list.getNextEntry(next);
next=indexList.getNextEntry(next);
pos++;
}
}
return result;
@ -314,9 +324,9 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
protected void remove(IndexItem item){
synchronized(mutex){
IndexItem prev = list.getPrevEntry(item);
IndexItem next = list.getNextEntry(item);
list.remove(item);
IndexItem prev=indexList.getPrevEntry(item);
IndexItem next=indexList.getNextEntry(item);
indexList.remove(item);
delete(item,prev,next);
}
}
@ -396,13 +406,13 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
load();
List tmpList=new ArrayList();
synchronized(mutex){
IndexItem next=list.getFirst();
IndexItem next=indexList.getFirst();
while(next!=null){
Object o=getValue(next);
if(!c.contains(o)){
tmpList.add(o);
}
next=list.getNextEntry(next);
next=indexList.getNextEntry(next);
}
}
for(Iterator i=tmpList.iterator();i.hasNext();){
@ -421,6 +431,7 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
synchronized(mutex){
super.clear();
doClear();
clearCache();
}
}
@ -431,12 +442,7 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
*/
public Object get(int index){
load();
Object result=null;
IndexItem item=(IndexItem) list.get(index);
if(item!=null){
result=getValue(item);
}
return result;
return getCachedItem(index);
}
/*
@ -448,12 +454,13 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
load();
Object result=null;
synchronized(mutex){
IndexItem replace=list.isEmpty()?null:(IndexItem) list.get(index);
IndexItem prev=(list.isEmpty()||(index-1)<0)?null:(IndexItem) list.get(index-1);
IndexItem next=(list.isEmpty()||(index+1)>=size())?null:(IndexItem) list.get(index+1);
IndexItem replace=indexList.isEmpty()?null:(IndexItem) indexList.get(index);
IndexItem prev=(indexList.isEmpty()||(index-1)<0)?null:(IndexItem) indexList.get(index-1);
IndexItem next=(indexList.isEmpty()||(index+1)>=size())?null:(IndexItem) indexList.get(index+1);
result=getValue(replace);
list.remove(index);
indexList.remove(index);
delete(replace,prev,next);
itemRemoved(index);
add(index,element);
}
return result;
@ -461,11 +468,12 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
protected IndexItem internalSet(int index,Object element){
synchronized(mutex){
IndexItem replace=list.isEmpty()?null:(IndexItem) list.get(index);
IndexItem prev=(list.isEmpty()||(index-1)<0)?null:(IndexItem) list.get(index-1);
IndexItem next=(list.isEmpty()||(index+1)>=size())?null:(IndexItem) list.get(index+1);
list.remove(index);
IndexItem replace=indexList.isEmpty()?null:(IndexItem) indexList.get(index);
IndexItem prev=(indexList.isEmpty()||(index-1)<0)?null:(IndexItem) indexList.get(index-1);
IndexItem next=(indexList.isEmpty()||(index+1)>=size())?null:(IndexItem) indexList.get(index+1);
indexList.remove(index);
delete(replace,prev,next);
itemRemoved(index);
return internalAdd(index,element);
}
}
@ -479,22 +487,24 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
load();
synchronized(mutex){
IndexItem item=insert(index,element);
list.add(index,item);
indexList.add(index,item);
itemAdded(item,index,element);
}
}
protected IndexItem internalAdd(int index,Object element){
synchronized(mutex){
IndexItem item=insert(index,element);
list.add(index,item);
indexList.add(index,item);
itemAdded(item,index,element);
return item;
}
}
protected IndexItem internalGet(int index){
synchronized(mutex){
if(index>=0&&index<list.size()){
return list.get(index);
if(index>=0&&index<indexList.size()){
return indexList.get(index);
}
}
return null;
@ -509,13 +519,14 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
load();
boolean result=false;
synchronized(mutex){
IndexItem item=list.get(index);
IndexItem item=indexList.get(index);
if(item!=null){
result=true;
IndexItem prev=list.getPrevEntry(item);
IndexItem prev=indexList.getPrevEntry(item);
prev=prev!=null?prev:root;
IndexItem next=list.getNextEntry(prev);
list.remove(index);
IndexItem next=indexList.getNextEntry(prev);
indexList.remove(index);
itemRemoved(index);
delete(item,prev,next);
}
}
@ -531,13 +542,14 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
load();
Object result=null;
synchronized(mutex){
IndexItem item=list.get(index);
IndexItem item=indexList.get(index);
if(item!=null){
itemRemoved(index);
result=getValue(item);
IndexItem prev=list.getPrevEntry(item);
IndexItem prev=indexList.getPrevEntry(item);
prev=prev!=null?prev:root;
IndexItem next=list.getNextEntry(item);
list.remove(index);
IndexItem next=indexList.getNextEntry(item);
indexList.remove(index);
delete(item,prev,next);
}
}
@ -555,7 +567,7 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
if(o!=null){
synchronized(mutex){
int count=0;
IndexItem next=list.getFirst();
IndexItem next=indexList.getFirst();
while(next!=null){
Object value=getValue(next);
if(value!=null&&value.equals(o)){
@ -563,7 +575,7 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
break;
}
count++;
next=list.getNextEntry(next);
next=indexList.getNextEntry(next);
}
}
}
@ -580,8 +592,8 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
int result=-1;
if(o!=null){
synchronized(mutex){
int count=list.size()-1;
IndexItem next=list.getLast();
int count=indexList.size()-1;
IndexItem next=indexList.getLast();
while(next!=null){
Object value=getValue(next);
if(value!=null&&value.equals(o)){
@ -589,7 +601,7 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
break;
}
count--;
next=list.getPrevEntry(next);
next=indexList.getPrevEntry(next);
}
}
}
@ -603,7 +615,7 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
*/
public ListIterator listIterator(){
load();
return new ContainerListIterator(this,list,list.getRoot());
return new CachedContainerListIterator(this,0);
}
/*
@ -613,14 +625,7 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
*/
public ListIterator listIterator(int index){
load();
IndexItem start=list.get(index);
if(start!=null){
start=list.getPrevEntry(start);
}
if(start==null){
start=root;
}
return new ContainerListIterator(this,list,start);
return new CachedContainerListIterator(this,index);
}
/*
@ -632,10 +637,10 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
load();
List result=new ArrayList();
int count=fromIndex;
IndexItem next=list.get(fromIndex);
IndexItem next=indexList.get(fromIndex);
while(next!=null&&count++<toIndex){
result.add(getValue(next));
next=list.getNextEntry(next);
next=indexList.getNextEntry(next);
}
return result;
}
@ -647,9 +652,9 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
DataItem data=dataManager.storeDataItem(marshaller,value);
index=indexManager.createNewIndex();
index.setValueData(data);
IndexItem prev=list.getLast();
IndexItem prev=indexList.getLast();
prev=prev!=null?prev:root;
IndexItem next=list.getNextEntry(prev);
IndexItem next=indexList.getNextEntry(prev);
prev.setNextItem(index.getOffset());
index.setPreviousItem(prev.getOffset());
updateIndex(prev);
@ -675,7 +680,7 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
index=indexManager.createNewIndex();
index.setValueData(data);
IndexItem prev=root;
IndexItem next=list.getNextEntry(prev);
IndexItem next=indexList.getNextEntry(prev);
prev.setNextItem(index.getOffset());
index.setPreviousItem(prev.getOffset());
updateIndex(prev);
@ -705,14 +710,14 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
IndexItem next=null;
if(insertPos<=0){
prev=root;
next=list.getNextEntry(root);
}else if(insertPos>=list.size()){
prev=list.getLast();
next=indexList.getNextEntry(root);
}else if(insertPos>=indexList.size()){
prev=indexList.getLast();
next=null;
}else{
prev=list.get(insertPos);
prev=indexList.get(insertPos);
prev=prev!=null?prev:root;
next=list.getNextEntry(prev);
next=indexList.getNextEntry(prev);
}
prev.setNextItem(index.getOffset());
index.setPreviousItem(prev.getOffset());
@ -763,4 +768,145 @@ public final class ListContainerImpl extends BaseContainerImpl implements ListCo
result.append("]");
return result.toString();
}
protected void itemAdded(IndexItem item,int pos,Object value){
int cachePosition=pos-offset;
// if pos is before the cache offset
// we need to clear the cache
if(pos<offset){
clearCache();
}
if(cacheList.isEmpty()){
offset=pos;
cacheList.add(value);
lastCached=item;
}else if(cachePosition==cacheList.size()&&cachePosition<maximumCacheSize){
cacheList.add(value);
lastCached=item;
}else if(cachePosition>=0&&cachePosition<=cacheList.size()){
cacheList.add(cachePosition,value);
if(cacheList.size()>maximumCacheSize){
itemRemoved(cacheList.size()-1);
}
}
}
protected void itemRemoved(int pos){
int lastPosition=offset+cacheList.size()-1;
int cachePosition=pos-offset;
if(cachePosition>=0&&cachePosition<cacheList.size()){
if(cachePosition==lastPosition){
if(lastCached!=null){
lastCached=indexList.getPrevEntry(lastCached);
}
}
cacheList.remove(pos);
if(cacheList.isEmpty()){
clearCache();
}
}
}
protected Object getCachedItem(int pos){
int cachePosition=pos-offset;
Object result=null;
if(cachePosition>=0&&cachePosition<cacheList.size()){
result=cacheList.get(cachePosition);
}
if(result==null){
if(cachePosition==cacheList.size()&&lastCached!=null){
IndexItem item=indexList.getNextEntry(lastCached);
if(item!=null){
result=getValue(item);
cacheList.add(result);
lastCached=item;
if(cacheList.size()>maximumCacheSize){
itemRemoved(0);
}
}
}else{
IndexItem item=indexList.get(pos);
if(item!=null){
result=getValue(item);
if(result!=null){
// outside the cache window - so clear
if(!cacheList.isEmpty()){
clearCache();
}
offset=pos;
cacheList.add(result);
lastCached=item;
}
}
}
}
return result;
}
/**
* clear any cached values
*/
public void clearCache(){
cacheList.clear();
offset=0;
lastCached=null;
}
/**
* @return the cacheList
*/
public LinkedList getCacheList(){
return cacheList;
}
/**
* @param cacheList the cacheList to set
*/
public void setCacheList(LinkedList cacheList){
this.cacheList=cacheList;
}
/**
* @return the lastCached
*/
public IndexItem getLastCached(){
return lastCached;
}
/**
* @param lastCached the lastCached to set
*/
public void setLastCached(IndexItem lastCached){
this.lastCached=lastCached;
}
/**
* @return the maximumCacheSize
*/
public int getMaximumCacheSize(){
return maximumCacheSize;
}
/**
* @param maximumCacheSize the maximumCacheSize to set
*/
public void setMaximumCacheSize(int maximumCacheSize){
this.maximumCacheSize=maximumCacheSize;
}
/**
* @return the offset
*/
public int getOffset(){
return offset;
}
/**
* @param offset the offset to set
*/
public void setOffset(int offset){
this.offset=offset;
}
}

View File

@ -28,6 +28,7 @@ import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.ObjectMarshaller;
import org.apache.activemq.kaha.RuntimeStoreException;
import org.apache.activemq.kaha.Store;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
@ -39,8 +40,8 @@ final class MapContainerImpl extends BaseContainerImpl implements MapContainer{
private static final Log log=LogFactory.getLog(MapContainerImpl.class);
protected Map map=new HashMap();
protected Map valueToKeyMap=new HashMap();
protected Marshaller keyMarshaller=new ObjectMarshaller();
protected Marshaller valueMarshaller=new ObjectMarshaller();
protected Marshaller keyMarshaller= Store.ObjectMarshaller;
protected Marshaller valueMarshaller=Store.ObjectMarshaller;
protected MapContainerImpl(ContainerId id,IndexItem root,IndexManager rootIndexManager,IndexManager indexManager,DataManager dataManager){
super(id,root,rootIndexManager,indexManager,dataManager);
@ -66,7 +67,7 @@ final class MapContainerImpl extends BaseContainerImpl implements MapContainer{
Object key=dataManager.readItem(keyMarshaller,data);
map.put(key,item);
valueToKeyMap.put(item,key);
list.add(item);
indexList.add(item);
nextItem=item.getNextItem();
}
}catch(IOException e){
@ -91,7 +92,7 @@ final class MapContainerImpl extends BaseContainerImpl implements MapContainer{
synchronized(mutex){
map.clear();
valueToKeyMap.clear();
list.clear();
indexList.clear();
}
}
}
@ -165,15 +166,15 @@ final class MapContainerImpl extends BaseContainerImpl implements MapContainer{
load();
boolean result=false;
if(o!=null){
synchronized(list){
IndexItem item=list.getFirst();
synchronized(indexList){
IndexItem item=indexList.getFirst();
while(item!=null){
Object value=getValue(item);
if(value!=null&&value.equals(o)){
result=true;
break;
}
item=list.getNextEntry(item);
item=indexList.getNextEntry(item);
}
}
}
@ -242,7 +243,7 @@ final class MapContainerImpl extends BaseContainerImpl implements MapContainer{
IndexItem item=write(key,value);
map.put(key,item);
valueToKeyMap.put(item,key);
list.add(item);
indexList.add(item);
}
return result;
}
@ -261,11 +262,11 @@ final class MapContainerImpl extends BaseContainerImpl implements MapContainer{
map.remove(key);
valueToKeyMap.remove(item);
// ensure we have the upto date item
item=list.getEntry(item);
item=indexList.getEntry(item);
result=getValue(item);
IndexItem prev=list.getPrevEntry(item);
IndexItem next=list.getNextEntry(item);
list.remove(item);
IndexItem prev=indexList.getPrevEntry(item);
IndexItem next=indexList.getNextEntry(item);
indexList.remove(item);
delete(item,prev,next);
}
}
@ -277,7 +278,7 @@ final class MapContainerImpl extends BaseContainerImpl implements MapContainer{
boolean result=false;
if(o!=null){
synchronized(mutex){
IndexItem item=list.getFirst();
IndexItem item=indexList.getFirst();
while(item!=null){
Object value=getValue(item);
if(value!=null&&value.equals(o)){
@ -289,7 +290,7 @@ final class MapContainerImpl extends BaseContainerImpl implements MapContainer{
}
break;
}
item=list.getNextEntry(item);
item=indexList.getNextEntry(item);
}
}
}
@ -326,7 +327,7 @@ final class MapContainerImpl extends BaseContainerImpl implements MapContainer{
}
protected IndexLinkedList getItemList(){
return list;
return indexList;
}
protected Object getValue(IndexItem item){
@ -342,6 +343,22 @@ final class MapContainerImpl extends BaseContainerImpl implements MapContainer{
}
return result;
}
protected Object getKey(IndexItem item){
Object result=null;
if(item!=null){
try{
DataItem data=item.getKeyDataItem();
result=dataManager.readItem(keyMarshaller,data);
}catch(IOException e){
log.error("Failed to get key for "+item,e);
throw new RuntimeStoreException(e);
}
}
return result;
}
protected IndexItem write(Object key,Object value){
IndexItem index=null;
@ -355,9 +372,9 @@ final class MapContainerImpl extends BaseContainerImpl implements MapContainer{
DataItem data=dataManager.storeDataItem(valueMarshaller,value);
index.setValueData(data);
}
IndexItem prev=list.getLast();
prev=prev!=null?prev:list.getRoot();
IndexItem next=list.getNextEntry(prev);
IndexItem prev=indexList.getLast();
prev=prev!=null?prev:indexList.getRoot();
IndexItem next=indexList.getNextEntry(prev);
prev.setNextItem(index.getOffset());
index.setPreviousItem(prev.getOffset());
updateIndex(prev);

View File

@ -0,0 +1,82 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import junit.framework.TestCase;
public class VolumeTest extends TestCase{
protected Store store;
protected String name;
protected static final int NUMBER=1;
/*
* dump a large number of messages into a list - then retreive them
*/
public void testListVolume() throws Exception{
ListContainer container=store.getListContainer("volume");
container.setMarshaller(Store.BytesMarshaller);
byte[] data = new byte[10];
for (int i =0; i< NUMBER; i++){
container.add(data);
if(i%100000==0){
System.err.println("persisted " + i);
}
}
int count = 0;
for (Iterator i = container.iterator(); i.hasNext();){
assertNotNull(i.next());
count++;
if (count%100000==0){
System.err.println("retrived " + count);
}
}
assertEquals("Different retrieved to stored",NUMBER,count);
}
protected Store getStore() throws IOException{
return StoreFactory.open(name,"rw");
}
protected void setUp() throws Exception{
super.setUp();
name = System.getProperty("basedir", ".")+"/target/activemq-data/volume-container.db";
StoreFactory.delete(name);
store=StoreFactory.open(name,"rw");
}
protected void tearDown() throws Exception{
super.tearDown();
if(store!=null){
store.close();
}
assertTrue(StoreFactory.delete(name));
}
}

View File

@ -0,0 +1,164 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.kaha.impl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import junit.framework.TestCase;
import org.apache.activemq.kaha.StoreFactory;
/**
* Junit tests for CachedListContainerImpl
*
* @version $Revision$
*/
public class CachedListContainerImplTest extends TestCase{
protected String name;
protected KahaStore store;
protected int MAX_CACHE_SIZE=10;
protected KahaStore getStore() throws IOException{
return new KahaStore(name,"rw");
}
public void testAdds() throws Exception{
ListContainerImpl list=getStoreList("test");
List data=getDataList(100);
list.addAll(data);
assertEquals(MAX_CACHE_SIZE,list.getCacheList().size());
List cached=getCachedList(MAX_CACHE_SIZE);
for(int i=0;i<cached.size();i++){
list.add(i,cached.get(i));
}
assertEquals(MAX_CACHE_SIZE,list.getCacheList().size());
for(int i=0;i<cached.size();i++){
assertEquals(cached.get(i),list.getCacheList().get(i));
}
}
public void testAddsIntoCacheSpace() throws Exception{
ListContainerImpl list=getStoreList("test");
int initialDataSize=50;
List data=getDataList(initialDataSize);
list.addAll(data);
assertEquals(MAX_CACHE_SIZE,list.getCacheList().size());
List cached=getCachedList(MAX_CACHE_SIZE);
for(int i=MAX_CACHE_SIZE/2;i<cached.size();i++){
list.add(i,cached.get(i));
}
assertEquals(MAX_CACHE_SIZE,list.getCacheList().size());
for(int i=0;i<MAX_CACHE_SIZE/2;i++){
assertEquals(data.get(i),list.getCacheList().get(i));
}
for(int i=MAX_CACHE_SIZE/2;i<MAX_CACHE_SIZE;i++){
assertEquals(cached.get(i),list.getCacheList().get(i));
}
}
public void testRemoves() throws Exception{
ListContainerImpl list=getStoreList("test");
int initialDataSize=10;
List data=getDataList(initialDataSize);
list.addAll(data);
assertEquals(MAX_CACHE_SIZE,list.getCacheList().size());
List cached=getCachedList(MAX_CACHE_SIZE);
list.addAll(cached);
assertEquals(MAX_CACHE_SIZE,list.getCacheList().size());
for(int i=0;i<cached.size();i++){
assertNotSame(cached.get(i),list.getCacheList().get(i));
}
for(int i=0;i<initialDataSize;i++){
list.remove(0);
}
assertEquals(0,list.getCacheList().size());
// repopulate the cache
for(int i=0;i<MAX_CACHE_SIZE;i++){
list.get(i);
}
assertEquals(MAX_CACHE_SIZE,list.getCacheList().size());
for(int i=0;i<cached.size();i++){
assertEquals(cached.get(i),list.getCacheList().get(i));
}
}
public void testCacheSize() throws Exception{
ListContainerImpl list=getStoreList("test");
List data=getDataList(100);
list.addAll(data);
assertEquals(MAX_CACHE_SIZE,list.getCacheList().size());
}
public void testInserts() throws Exception{
ListContainerImpl list=getStoreList("test");
List data=getDataList(100);
list.addAll(data);
assertEquals(MAX_CACHE_SIZE,list.getCacheList().size());
List cached=getCachedList(MAX_CACHE_SIZE);
for(int i=0;i<cached.size();i++){
list.set(i,cached.get(i));
}
assertEquals(MAX_CACHE_SIZE,list.getCacheList().size());
for(int i=0;i<cached.size();i++){
assertEquals(cached.get(i),list.getCacheList().get(i));
}
}
protected ListContainerImpl getStoreList(Object id) throws Exception{
String containerName="test";
DataManager dm=store.getDataManager(containerName);
IndexManager im=store.getIndexManager(dm,containerName);
ContainerId containerId=new ContainerId();
containerId.setKey(id);
containerId.setDataContainerName(containerName);
IndexItem root=store.listsContainer.addRoot(containerId);
ListContainerImpl result=new ListContainerImpl(containerId,root,store.rootIndexManager,im,dm);
result.expressDataInterest();
result.setMaximumCacheSize(MAX_CACHE_SIZE);
return result;
}
protected List getDataList(int num){
List result=new ArrayList();
for(int i=0;i<num;i++){
result.add("data:"+i);
}
return result;
}
protected List getCachedList(int num){
List result=new ArrayList();
for(int i=0;i<num;i++){
result.add("cached:"+i);
}
return result;
}
protected void setUp() throws Exception{
super.setUp();
name=System.getProperty("basedir",".")+"/target/activemq-data/store-test.db";
store=getStore();
}
protected void tearDown() throws Exception{
super.tearDown();
if(store!=null){
store.close();
store=null;
}
boolean rc=StoreFactory.delete(name);
assertTrue(rc);
}
}

View File

@ -48,7 +48,6 @@ public class ForwardingBridgeTest extends NetworkTestSupport {
} );
}
public void testAddConsumerThenSend() throws Exception {
// Start a producer on local broker
StubConnection connection1 = createConnection();
ConnectionInfo connectionInfo1 = createConnectionInfo();
@ -68,11 +67,12 @@ public class ForwardingBridgeTest extends NetworkTestSupport {
connection2.send(sessionInfo2);
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo2, destination);
connection2.send(consumerInfo);
Thread.sleep(1000);
// Send the message to the local boker.
connection1.send(createMessage(producerInfo, destination, deliveryMode));
// Make sure the message was delivered via the remote.
Message m = receiveMessage(connection2);
assertNotNull(m);
}

View File

@ -24,7 +24,7 @@
</transportConnectors>
<persistenceAdapter>
<kahaPersistenceAdapter dir = "${basedir}/target/activemq-data/kaha-broker.db"/>
<kahaPersistenceAdapter dir = "${basedir}/target/activemq-data/kaha-broker.db" maxDataFileLength = "1024"/>
</persistenceAdapter>
</broker>