https://issues.apache.org/jira/browse/AMQ-3325 - PList temp store, chunk stream does not exist when broker under stress. Sync issue around temp store list creation for subs pending message cursor

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1104075 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-05-17 09:16:22 +00:00
parent fefedf5fb1
commit 128a8f7a8e
4 changed files with 250 additions and 42 deletions

View File

@ -19,6 +19,7 @@ package org.apache.activemq.store.kahadb.plist;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.store.kahadb.plist.EntryLocation.EntryLocationMarshaller;
@ -26,8 +27,11 @@ import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Page;
import org.apache.kahadb.page.Transaction;
import org.apache.kahadb.util.ByteSequence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PList {
static final Logger LOG = LoggerFactory.getLogger(PList.class);
final PListStore store;
private String name;
private long rootId = EntryLocation.NOT_SET;
@ -334,6 +338,25 @@ public class PList {
return result;
}
synchronized public void claimFileLocations(final Set<Integer> candidates) throws IOException {
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
long nextId = rootId;
while (nextId != EntryLocation.NOT_SET) {
EntryLocation entry = getNext(tx, nextId);
if (entry != null) {
candidates.remove(entry.getLocation().getDataFileId());
nextId = entry.getNext();
} else {
break;
}
}
}
});
}
}
boolean remove(Transaction tx, String id) throws IOException {
boolean result = false;
long nextId = this.rootId;

View File

@ -27,7 +27,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.thread.Scheduler;
@ -195,11 +194,13 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
}
}
synchronized public PList getPList(final String name) throws Exception {
public PList getPList(final String name) throws Exception {
if (!isStarted()) {
throw new IllegalStateException("Not started");
}
intialize();
synchronized (indexLock) {
synchronized (this) {
PList result = this.persistentLists.get(name);
if (result == null) {
final PList pl = new PList(this);
@ -223,9 +224,13 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
return result;
}
}
}
synchronized public boolean removePList(final String name) throws Exception {
public boolean removePList(final String name) throws Exception {
boolean result = false;
synchronized (indexLock) {
synchronized (this) {
final PList pl = this.persistentLists.remove(name);
result = pl != null;
if (result) {
@ -236,6 +241,8 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
}
});
}
}
}
return result;
}
@ -324,16 +331,21 @@ public class PListStore extends ServiceSupport implements BrokerServiceAware, Ru
try {
final Set<Integer> candidates = journal.getFileMap().keySet();
LOG.trace("Full gc candidate set:" + candidates);
for (PList list : persistentLists.values()) {
PListEntry entry = list.getFirst();
while (entry != null) {
claimCandidates(entry, candidates);
entry = list.getNext(entry);
if (candidates.size() > 1) {
List<PList> plists = null;
synchronized (this) {
plists = new ArrayList(persistentLists.values());
}
for (PList list : plists) {
list.claimFileLocations(candidates);
if (isStopping()) {
return;
}
LOG.trace("Remaining gc candidate set after refs from: " + list.getName() + ":" + candidates);
}
LOG.debug("GC Candidate set:" + candidates);
LOG.trace("GC Candidate set:" + candidates);
this.journal.removeDataFiles(candidates);
}
} catch (IOException e) {
LOG.error("Exception on periodic cleanup: " + e, e);
}

View File

@ -26,7 +26,6 @@ import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -36,11 +35,16 @@ import org.apache.kahadb.util.ByteSequence;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PListTest {
static final Logger LOG = LoggerFactory.getLogger(PListTest.class);
private PListStore store;
private PList plist;
final ByteSequence payload = new ByteSequence(new byte[400]);
final String idSeed = new String("Seed");
final Vector<Throwable> exceptions = new Vector<Throwable>();
@Test
@ -225,6 +229,175 @@ public class PListTest {
assertTrue("no exceptions", exceptions.isEmpty());
}
@Test
public void testConcurrentAddLast() throws Exception {
File directory = store.getDirectory();
store.stop();
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
store = new PListStore();
store.setDirectory(directory);
//store.setJournalMaxFileLength(1024*5);
store.start();
final int numThreads = 20;
final int iterations = 2000;
ExecutorService executor = Executors.newFixedThreadPool(100);
for (int i=0; i<numThreads; i++) {
new Job(i, PListTest.TaskType.ADD, iterations).run();
}
for (int i=0; i<numThreads; i++) {
executor.execute(new Job(i, PListTest.TaskType.ITERATE, iterations));
}
for (int i=0; i<100; i++) {
executor.execute(new Job(i+20, PListTest.TaskType.ADD, 100));
}
executor.shutdown();
executor.awaitTermination(60*5, TimeUnit.SECONDS);
}
@Test
public void testOverFlow() throws Exception {
File directory = store.getDirectory();
store.stop();
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
store = new PListStore();
store.setDirectory(directory);
store.start();
for (int i=0;i<2000; i++) {
new Job(i, PListTest.TaskType.ADD, 5).run();
}
LOG.info("After Load index file: " + store.pageFile.getFile().length());
LOG.info("After remove index file: " + store.pageFile.getFile().length());
}
@Test
public void testConcurrentAddRemoveWithPreload() throws Exception {
File directory = store.getDirectory();
store.stop();
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
store = new PListStore();
store.setDirectory(directory);
store.setJournalMaxFileLength(1024*5);
store.setCleanupInterval(5000);
store.start();
final int iterations = 500;
final int numLists = 10;
// prime the store
// create/delete
for (int i=0; i<numLists;i++) {
new Job(i, PListTest.TaskType.CREATE, iterations).run();
}
for (int i=0; i<numLists;i++) {
new Job(i, PListTest.TaskType.DELETE, iterations).run();
}
// fill
for (int i=0; i<numLists;i++) {
new Job(i, PListTest.TaskType.ADD, iterations).run();
}
// empty
for (int i=0; i<numLists;i++) {
new Job(i, PListTest.TaskType.REMOVE, iterations).run();
}
// empty
for (int i=0; i<numLists;i++) {
new Job(i, PListTest.TaskType.DELETE, iterations).run();
}
// fill
for (int i=0; i<numLists;i++) {
new Job(i, PListTest.TaskType.ADD, iterations).run();
}
// parallel
ExecutorService executor = Executors.newFixedThreadPool(100);
for (int i=0; i<numLists*2; i++) {
executor.execute(new Job(i, i>=numLists ? PListTest.TaskType.ADD : PListTest.TaskType.REMOVE, iterations));
}
executor.shutdown();
executor.awaitTermination(60*5, TimeUnit.SECONDS);
assertTrue("no excepitons", exceptions.isEmpty());
}
enum TaskType {CREATE, DELETE, ADD, REMOVE, ITERATE}
class Job implements Runnable {
int id;
TaskType task;
int iterations;
public Job(int id, TaskType t, int iterations) {
this.id = id;
this.task = t;
this.iterations = iterations;
}
@Override
public void run() {
try {
PList plist = null;
switch (task) {
case CREATE:
plist = store.getPList("List-" + id);
break;
case DELETE:
store.removePList("List-" + id);
break;
case ADD:
plist = store.getPList("List-" + id);
for (int j = 0; j < iterations; j++) {
plist.addLast(idSeed + "id" + j, payload);
if (j > 0 && j % (iterations / 2) == 0) {
LOG.info("Job-" + id + ", Done: " + j);
}
}
break;
case REMOVE:
plist = store.getPList("List-" + id);
for (int j = iterations; j > 0; j--) {
plist.remove(idSeed + "id" + j);
if (j > 0 && j % (iterations / 2) == 0) {
LOG.info("Job-" + id + " Done remove: " + j);
}
}
break;
case ITERATE:
plist = store.getPList("List-" + id);
PListEntry element = plist.getFirst();
while (element != null) {
element = plist.getNext(element);
}
break;
default:
}
} catch (Exception e) {
e.printStackTrace();
exceptions.add(e);
}
}
}
@Before
public void setUp() throws Exception {
File directory = new File("target/test/PlistDB");
@ -244,6 +417,7 @@ public class PListTest {
@After
public void tearDown() throws Exception {
store.stop();
exceptions.clear();
}
}

View File

@ -97,7 +97,6 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>