Ensure messages inform usage manager when they hop in and out of ram ...

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@454471 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-10-09 19:38:17 +00:00
parent 78035ea26b
commit 7a6b944e6b
3 changed files with 57 additions and 12 deletions

View File

@ -134,23 +134,26 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
}
public synchronized void addMessageLast(MessageReference node) throws Exception{
if(started){
if(node!=null){
Message msg=node.getMessage();
if(node!=null){
Message msg=node.getMessage();
if(started){
pendingCount++;
if(!msg.isPersistent()){
nonPersistent.addMessageLast(node);
}else{
Destination dest=msg.getRegionDestination();
TopicStorePrefetch tsp=(TopicStorePrefetch)topics.get(dest);
if(tsp!=null){
tsp.addMessageLast(node);
}
}
if(msg.isPersistent()){
Destination dest=msg.getRegionDestination();
TopicStorePrefetch tsp=(TopicStorePrefetch)topics.get(dest);
if(tsp!=null){
tsp.addMessageLast(node);
if(started){
// if the store has been empty - then this message is next to dispatch
if((pendingCount-nonPersistent.size())<=0){
tsp.nextToDispatch(node.getMessageId());
}
}
}
pendingCount++;
}
}
}

View File

@ -84,6 +84,12 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements
throw new RuntimeException(e);
}
}
public synchronized void addMessageLast(MessageReference node) throws Exception{
if(node!=null){
node.decrementReferenceCount();
}
}
public synchronized boolean hasNext(){
if(isEmpty()){
@ -112,6 +118,7 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements
public void recoverMessage(Message message) throws Exception{
message.setRegionDestination(regionDestination);
message.incrementReferenceCount();
batchList.addLast(message);
}

View File

@ -34,6 +34,7 @@ import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
* @version $Revision$
*/
public class InactiveDurableTopicTest extends TestCase{
private static final int MESSAGE_COUNT = 100000;
private static final String DEFAULT_PASSWORD="";
private static final String USERNAME="testuser";
private static final String CLIENTID="mytestclient";
@ -53,7 +54,7 @@ public class InactiveDurableTopicTest extends TestCase{
super.setUp();
broker=new BrokerService();
//broker.setPersistenceAdapter(new KahaPersistenceAdapter(new File ("TEST_STUFD")));
broker.setPersistenceAdapter(new KahaPersistenceAdapter(new File ("TEST_STUFD")));
/*
DefaultPersistenceAdapterFactory factory = new DefaultPersistenceAdapterFactory();
factory.setDataDirectoryFile(broker.getDataDirectory());
@ -84,6 +85,7 @@ public class InactiveDurableTopicTest extends TestCase{
connection=connectionFactory.createConnection(USERNAME,DEFAULT_PASSWORD);
assertNotNull(connection);
connection.setClientID(CLIENTID);
connection.start();
session=connection.createSession(false,javax.jms.Session.CLIENT_ACKNOWLEDGE);
assertNotNull(session);
topic=session.createTopic(TOPIC_NAME);
@ -119,14 +121,14 @@ public class InactiveDurableTopicTest extends TestCase{
assertNotNull(msg);
msg.setString("key1","value1");
int loop;
for(loop=0;loop<100000;loop++){
for(loop=0;loop<MESSAGE_COUNT;loop++){
msg.setInt("key2",loop);
publisher.send(msg,deliveryMode,deliveryPriority,Message.DEFAULT_TIME_TO_LIVE);
if (loop%500==0){
System.out.println("Sent " + loop + " messages");
}
}
this.assertEquals(loop,100000);
this.assertEquals(loop,MESSAGE_COUNT);
publisher.close();
session.close();
connection.stop();
@ -138,4 +140,37 @@ public class InactiveDurableTopicTest extends TestCase{
throw new AssertionFailedError("Create Subscription caught: "+ex);
}
}
public void test3CreateSubscription() throws Exception{
try{
/*
* Step 1 - Establish a connection with a client id and create a durable subscription
*/
connection=connectionFactory.createConnection(USERNAME,DEFAULT_PASSWORD);
assertNotNull(connection);
connection.setClientID(CLIENTID);
connection.start();
session=connection.createSession(false,javax.jms.Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
topic=session.createTopic(TOPIC_NAME);
assertNotNull(topic);
subscriber=session.createDurableSubscriber(topic,SUBID,"",false);
assertNotNull(subscriber);
int loop;
for(loop=0;loop<MESSAGE_COUNT;loop++){
Message msg = subscriber.receive();
if (loop%500==0){
System.out.println("Received " + loop + " messages");
}
}
this.assertEquals(loop,MESSAGE_COUNT);
subscriber.close();
session.close();
connection.close();
}catch(JMSException ex){
try{
connection.close();
}catch(Exception ignore){}
throw new AssertionFailedError("Create Subscription caught: "+ex);
}
}
}