git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1302607 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-03-19 19:23:12 +00:00
parent d6917ea4bc
commit 0af0021d0b
2 changed files with 370 additions and 4 deletions

View File

@ -18,9 +18,33 @@ package org.apache.activemq.store.kahadb;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.*;
import java.util.*;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.Stack;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -32,7 +56,18 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.store.kahadb.data.*;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination;
import org.apache.activemq.store.kahadb.data.KahaEntryType;
import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ServiceStopper;
@ -46,7 +81,17 @@ import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Page;
import org.apache.kahadb.page.PageFile;
import org.apache.kahadb.page.Transaction;
import org.apache.kahadb.util.*;
import org.apache.kahadb.util.ByteSequence;
import org.apache.kahadb.util.DataByteArrayInputStream;
import org.apache.kahadb.util.DataByteArrayOutputStream;
import org.apache.kahadb.util.LocationMarshaller;
import org.apache.kahadb.util.LockFile;
import org.apache.kahadb.util.LongMarshaller;
import org.apache.kahadb.util.Marshaller;
import org.apache.kahadb.util.Sequence;
import org.apache.kahadb.util.SequenceSet;
import org.apache.kahadb.util.StringMarshaller;
import org.apache.kahadb.util.VariableMarshaller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -1284,6 +1329,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
sd.subscriptionAcks.remove(tx, subscriptionKey);
sd.subscriptionCache.remove(subscriptionKey);
removeAckLocationsForSub(tx, sd, subscriptionKey);
if (sd.subscriptions.isEmpty(tx)) {
sd.messageIdIndex.clear(tx);
sd.locationIndex.clear(tx);
}
}
}

View File

@ -0,0 +1,316 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR ONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DurableSubSelectorDelayTest {
private static final Logger LOG = LoggerFactory.getLogger(DurableSubSelectorDelayTest.class);
public static final long RUNTIME = 3 * 60 * 1000;
private BrokerService broker;
private ActiveMQTopic topic;
@Test
public void testProcess() throws Exception {
MsgProducer msgProducer = new MsgProducer();
msgProducer.start();
DurableSubscriber subscribers[] = new DurableSubscriber[10];
for (int i = 0; i < subscribers.length; i++) {
subscribers[i] = new DurableSubscriber(i);
subscribers[i].process();
}
// wait for server to finish
msgProducer.join();
for (int j = 0; j < subscribers.length; j++) {
LOG.info("Unsubscribing subscriber " + subscribers[j]);
// broker.getAdminView().destroyDurableSubscriber(clientID,
// Client.SUBSCRIPTION_NAME);
subscribers[j].unsubscribe();
}
// allow the clean up thread time to run
TimeUnit.MINUTES.sleep(2);
final KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
assertTrue("only one journal file should be left ", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return pa.getStore().getJournal().getFileMap().size() == 1;
}
}, TimeUnit.MINUTES.toMillis(2)));
LOG.info("DONE.");
}
/**
* Message Producer
*/
final class MsgProducer extends Thread {
final String url = "vm://"
+ DurableSubSelectorDelayTest.getName();
final ConnectionFactory cf = new ActiveMQConnectionFactory(url);
int transRover = 0;
int messageRover = 0;
public MsgProducer() {
super("MsgProducer");
setDaemon(true);
}
@Override
public void run() {
long endTime = RUNTIME + System.currentTimeMillis();
try {
while (endTime > System.currentTimeMillis()) {
Thread.sleep(400);
send();
}
} catch (Throwable e) {
e.printStackTrace(System.out);
throw new RuntimeException(e);
}
}
public void send() throws JMSException {
int trans = ++transRover;
boolean relevantTrans = true;
int count = 40;
LOG.info("Sending Trans[id=" + trans + ", count="
+ count + "]");
Connection con = cf.createConnection();
Session sess = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = sess.createProducer(null);
for (int i = 0; i < count; i++) {
Message message = sess.createMessage();
message.setIntProperty("ID", ++messageRover);
message.setIntProperty("TRANS", trans);
message.setBooleanProperty("RELEVANT", false);
prod.send(topic, message);
}
Message message = sess.createMessage();
message.setIntProperty("ID", ++messageRover);
message.setIntProperty("TRANS", trans);
message.setBooleanProperty("COMMIT", true);
message.setBooleanProperty("RELEVANT", relevantTrans);
prod.send(topic, message);
LOG.info("Committed Trans[id=" + trans + ", count="
+ count + "], ID=" + messageRover);
sess.close();
con.close();
}
}
/**
* Consumes massages from a durable subscription. Goes online/offline
* periodically. Checks the incoming messages against the sent messages of
* the server.
*/
private final class DurableSubscriber {
String url = "tcp://localhost:61656";
final ConnectionFactory cf = new ActiveMQConnectionFactory(url);
private final String subName ;
private final int id;
private final String conClientId;
private final String selector;
public DurableSubscriber(int id) throws JMSException {
this.id = id;
conClientId = "cli" + id;
subName = "subscription"+ id;
selector ="RELEVANT = true";
}
private void process() throws JMSException {
long end = System.currentTimeMillis() + 20000;
int transCount = 0;
LOG.info(toString() + " ONLINE.");
Connection con = openConnection();
Session sess = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = sess.createDurableSubscriber(topic,
subName, selector, false);
//MessageConsumer consumer = sess.createDurableSubscriber(topic,SUBSCRIPTION_NAME);
try {
do {
long max = end - System.currentTimeMillis();
if (max <= 0) {
break;
}
Message message = consumer.receive(max);
if (message == null)
continue;
LOG.info("Received Trans[id="
+ message.getIntProperty("TRANS") + ", count="
+ transCount + "] in " + this + ".");
} while (true);
} finally {
sess.close();
con.close();
LOG.info(toString() + " OFFLINE.");
}
}
private Connection openConnection() throws JMSException {
Connection con = cf.createConnection();
con.setClientID(conClientId);
con.start();
return con;
}
private void unsubscribe() throws JMSException {
Connection con = openConnection();
Session session = con
.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.unsubscribe(subName);
session.close();
con.close();
}
@Override
public String toString() {
return "DurableSubscriber[id=" + id + "]";
}
}
@Before
public void setUp() throws Exception {
topic = new ActiveMQTopic("TopicT");
startBroker();
}
@After
public void tearDown() throws Exception {
destroyBroker();
}
private void startBroker() throws Exception {
startBroker(true);
}
private void startBroker(boolean deleteAllMessages) throws Exception {
if (broker != null)
return;
broker = BrokerFactory.createBroker("broker:(vm://" + getName() + ")");
broker.setBrokerName(getName());
broker.setAdvisorySupport(false);
broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
File kahadbData = new File("activemq-data/" + getName() + "-kahadb");
if (deleteAllMessages)
delete(kahadbData);
broker.setPersistent(true);
KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter();
kahadb.setDirectory(kahadbData);
kahadb.setJournalMaxFileLength( 500 * 1024);
broker.setPersistenceAdapter(kahadb);
broker.addConnector("tcp://localhost:61656");
broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024);
broker.getSystemUsage().getTempUsage().setLimit(256 * 1024 * 1024);
broker.getSystemUsage().getStoreUsage().setLimit(256 * 1024 * 1024);
broker.start();
}
protected static String getName() {
return "DurableSubSelectorDelayTest";
}
private static boolean delete(File path) {
if (path == null)
return true;
if (path.isDirectory()) {
for (File file : path.listFiles()) {
delete(file);
}
}
return path.delete();
}
private void destroyBroker() throws Exception {
if (broker == null)
return;
broker.stop();
broker = null;
}
}