git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1375140 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-08-20 17:50:10 +00:00
parent 2027c87791
commit 030d09b816
2 changed files with 125 additions and 23 deletions

View File

@ -25,6 +25,7 @@ import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kahadb.index.ListIndex;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Transaction;
@ -201,7 +202,9 @@ public class PList extends ListIndex<String, Location> {
@Override
public boolean hasNext() {
return iterator.hasNext();
synchronized (indexLock) {
return iterator.hasNext();
}
}
@Override

View File

@ -154,7 +154,7 @@ public class PListTest {
Iterator<PListEntry> iterator = plist.iterator();
while (iterator.hasNext()) {
PListEntry v = iterator.next();
iterator.next();
iterator.remove();
}
}
@ -209,7 +209,7 @@ public class PListTest {
Thread.currentThread().setName("ALRF:"+candidate.getName());
synchronized (plistLocks(candidate)) {
candidate.addLast(String.valueOf(i), payload);
PListEntry entry = candidate.getFirst();
candidate.getFirst();
assertTrue(candidate.remove(String.valueOf(i)));
}
}
@ -233,7 +233,7 @@ public class PListTest {
Thread.currentThread().setName("ALRF:"+candidate.getName());
synchronized (plistLocks(candidate)) {
candidate.addLast(String.valueOf(i), payload);
PListEntry entry = candidate.getFirst();
candidate.getFirst();
assertTrue(candidate.remove(String.valueOf(i)));
}
}
@ -321,6 +321,7 @@ public class PListTest {
store.setDirectory(directory);
store.setJournalMaxFileLength(1024*5);
store.setCleanupInterval(5000);
store.setIndexWriteBatchSize(500);
store.start();
final int iterations = 500;
@ -441,6 +442,87 @@ public class PListTest {
assertTrue("test did not timeout ", shutdown);
}
@Test
public void testSerialAddIterate() throws Exception {
File directory = store.getDirectory();
store.stop();
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
store = new PListStore();
store.setIndexPageSize(512);
store.setJournalMaxFileLength(100*1024);
store.setDirectory(directory);
store.setCleanupInterval(-1);
store.setIndexEnablePageCaching(false);
store.setIndexWriteBatchSize(2000);
store.setEnableIndexWriteAsync(false);
store.start();
final int iterations = 1000;
final int numLists = 1;
LOG.info("create");
for (int i=0; i<numLists;i++) {
new Job(i, PListTest.TaskType.CREATE, iterations).run();
}
LOG.info("serial add and iterate");
for (int i=0; i<iterations; i++) {
new Job(0, TaskType.ADD, i).run();
new Job(0, TaskType.ITERATE, 0).run();
}
assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
LOG.info("Num dataFiles:" + store.getJournal().getFiles().size());
}
@Test
public void testConcurrentAddIterate() throws Exception {
File directory = store.getDirectory();
store.stop();
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
store = new PListStore();
store.setIndexPageSize(2*1024);
store.setJournalMaxFileLength(1024*1024);
store.setDirectory(directory);
store.setCleanupInterval(-1);
store.setIndexEnablePageCaching(false);
store.setIndexWriteBatchSize(100);
store.start();
final int iterations = 250;
final int numLists = 10;
LOG.info("create");
for (int i=0; i<numLists;i++) {
new Job(i, PListTest.TaskType.CREATE, iterations).run();
}
LOG.info("parallel add and iterate");
// We want a lot of adds occurring so that new free pages get created along
// with overlapping seeks from the iterators so that we are likely to seek into
// some bad area in the page file.
executor = Executors.newFixedThreadPool(400);
final int numProducer = 300;
final int numConsumer = 100;
for (int i=0; i<numLists; i++) {
for (int j=0; j<numProducer; j++) {
executor.execute(new Job(i, PListTest.TaskType.ADD, iterations));
}
for (int k=0;k<numConsumer; k++) {
executor.execute(new Job(i, TaskType.ITERATE, iterations*2));
}
}
executor.shutdown();
LOG.info("wait for parallel work to complete");
boolean shutdown = executor.awaitTermination(60*60, TimeUnit.SECONDS);
assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
assertTrue("test did not timeout ", shutdown);
LOG.info("Num dataFiles:" + store.getJournal().getFiles().size());
}
enum TaskType {CREATE, DELETE, ADD, REMOVE, ITERATE, ITERATE_REMOVE}
class Job implements Runnable {
@ -476,10 +558,17 @@ public class PListTest {
for (int j = 0; j < iterations; j++) {
synchronized (plistLocks(plist)) {
plist.addLast ("PL>" + id + idSeed + "-" + j, payload);
if (exceptions.isEmpty()) {
plist.addLast ("PL>" + id + idSeed + "-" + j, payload);
} else {
break;
}
}
}
LOG.info("Job-" + id + ", Add, done: " + iterations);
if (exceptions.isEmpty()) {
LOG.info("Job-" + id + ", Add, done: " + iterations);
}
break;
case REMOVE:
Thread.currentThread().setName("R:"+id);
@ -497,12 +586,20 @@ public class PListTest {
case ITERATE:
Thread.currentThread().setName("I:"+id);
plist = store.getPList(String.valueOf(id));
int iterateCount = 0;
synchronized (plistLocks(plist)) {
Iterator<PListEntry> iterator = plist.iterator();
PListEntry element = null;
while (iterator.hasNext()) {
element = iterator.next();
if (exceptions.isEmpty()) {
Iterator<PListEntry> iterator = plist.iterator();
while (iterator.hasNext() && exceptions.isEmpty()) {
iterator.next();
iterateCount++;
}
//LOG.info("Job-" + id + " Done iterate: it=" + iterator + ", count:" + iterateCount + ", size:" + plist.size());
if (plist.size() != iterateCount) {
System.err.println("Count Wrong: " + iterator);
}
assertEquals("iterate got all " + id + " iterator:" + iterator , plist.size(), iterateCount);
}
}
break;
@ -515,10 +612,9 @@ public class PListTest {
synchronized (plistLocks(plist)) {
Iterator<PListEntry> removeIterator = plist.iterator();
PListEntry v = null;
while (removeIterator.hasNext()) {
v = removeIterator.next();
removeIterator.next();
removeIterator.remove();
if (removeCount++ > iterations) {
break;
@ -532,9 +628,12 @@ public class PListTest {
}
} catch (Exception e) {
LOG.warn("Job["+id+"] caught exception: " + e.getMessage());
e.printStackTrace();
exceptions.add(e);
executor.shutdownNow();
if (executor != null) {
executor.shutdownNow();
}
} finally {
Thread.currentThread().setName(threadName);
}
@ -557,7 +656,7 @@ public class PListTest {
@Before
public void setUp() throws Exception {
File directory = new File("target/test/PlistDB");
File directory = new File("/tmp/target/test/PlistDB");
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
startStore(directory);