mirror of https://github.com/apache/activemq.git
Fix failing test case, use the returned locator from addLast calls to perform the remove instead of trying with a String.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1430360 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9a3bd32e90
commit
3c83c17787
|
@ -33,8 +33,8 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
import org.apache.activemq.store.PList;
|
||||
import org.apache.activemq.store.PListEntry;
|
||||
import org.apache.activemq.util.IOHelper;
|
||||
import org.apache.activemq.util.ByteSequence;
|
||||
import org.apache.activemq.util.IOHelper;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -53,12 +53,12 @@ public class PListTest {
|
|||
private PListEntry getFirst(PList plist) throws IOException {
|
||||
PList.PListIterator iterator = plist.iterator();
|
||||
try {
|
||||
if( iterator.hasNext() ) {
|
||||
if (iterator.hasNext()) {
|
||||
return iterator.next();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}finally {
|
||||
} finally {
|
||||
iterator.release();
|
||||
}
|
||||
}
|
||||
|
@ -78,14 +78,13 @@ public class PListTest {
|
|||
for (ByteSequence bs : map.values()) {
|
||||
String origStr = new String(bs.getData(), bs.getOffset(), bs.getLength());
|
||||
PListEntry entry = plist.get(count);
|
||||
String plistString = new String(entry.getByteSequence().getData(), entry.getByteSequence().getOffset(),
|
||||
entry.getByteSequence().getLength());
|
||||
String plistString = new String(entry.getByteSequence().getData(), entry.getByteSequence().getOffset(), entry.getByteSequence().getLength());
|
||||
assertEquals(origStr, plistString);
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test
|
||||
public void testAddFirst() throws Exception {
|
||||
final int COUNT = 1000;
|
||||
Map<String, ByteSequence> map = new LinkedHashMap<String, ByteSequence>();
|
||||
|
@ -100,8 +99,7 @@ public class PListTest {
|
|||
for (ByteSequence bs : map.values()) {
|
||||
String origStr = new String(bs.getData(), bs.getOffset(), bs.getLength());
|
||||
PListEntry entry = plist.get(count);
|
||||
String plistString = new String(entry.getByteSequence().getData(), entry.getByteSequence().getOffset(),
|
||||
entry.getByteSequence().getLength());
|
||||
String plistString = new String(entry.getByteSequence().getData(), entry.getByteSequence().getOffset(), entry.getByteSequence().getLength());
|
||||
assertEquals(origStr, plistString);
|
||||
count--;
|
||||
}
|
||||
|
@ -126,15 +124,14 @@ public class PListTest {
|
|||
plist.remove(entry.getId());
|
||||
entry = plist.getFirst();
|
||||
}
|
||||
assertEquals(0,plist.size());
|
||||
|
||||
assertEquals(0, plist.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDestroy() throws Exception {
|
||||
doTestRemove(1);
|
||||
plist.destroy();
|
||||
assertEquals(0,plist.size());
|
||||
assertEquals(0, plist.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -148,7 +145,7 @@ public class PListTest {
|
|||
plist.addLast(test, bs);
|
||||
}
|
||||
plist.destroy();
|
||||
assertEquals(0,plist.size());
|
||||
assertEquals(0, plist.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -191,12 +188,11 @@ public class PListTest {
|
|||
store = new PListStoreImpl();
|
||||
store.setCleanupInterval(400);
|
||||
store.setDirectory(directory);
|
||||
store.setJournalMaxFileLength(1024*5);
|
||||
store.setJournalMaxFileLength(1024 * 5);
|
||||
store.setLazyInit(false);
|
||||
store.start();
|
||||
|
||||
final ByteSequence payload = new ByteSequence(new byte[1024*2]);
|
||||
|
||||
final ByteSequence payload = new ByteSequence(new byte[1024 * 2]);
|
||||
|
||||
final Vector<Throwable> exceptions = new Vector<Throwable>();
|
||||
final int iterations = 1000;
|
||||
|
@ -204,8 +200,8 @@ public class PListTest {
|
|||
|
||||
final PList[] lists = new PList[numLists];
|
||||
String threadName = Thread.currentThread().getName();
|
||||
for (int i=0; i<numLists; i++) {
|
||||
Thread.currentThread().setName("C:"+String.valueOf(i));
|
||||
for (int i = 0; i < numLists; i++) {
|
||||
Thread.currentThread().setName("C:" + String.valueOf(i));
|
||||
lists[i] = store.getPList(String.valueOf(i));
|
||||
}
|
||||
Thread.currentThread().setName(threadName);
|
||||
|
@ -216,43 +212,43 @@ public class PListTest {
|
|||
public void run() {
|
||||
final String threadName = Thread.currentThread().getName();
|
||||
try {
|
||||
for (int i=0; i<iterations; i++) {
|
||||
PList candidate = lists[i%numLists];
|
||||
Thread.currentThread().setName("ALRF:"+candidate.getName());
|
||||
for (int i = 0; i < iterations; i++) {
|
||||
PList candidate = lists[i % numLists];
|
||||
Thread.currentThread().setName("ALRF:" + candidate.getName());
|
||||
synchronized (plistLocks(candidate)) {
|
||||
candidate.addLast(String.valueOf(i), payload);
|
||||
Object locator = candidate.addLast(String.valueOf(i), payload);
|
||||
getFirst(candidate);
|
||||
assertTrue(candidate.remove(String.valueOf(i)));
|
||||
assertTrue(candidate.remove(locator));
|
||||
}
|
||||
}
|
||||
} catch (Exception error) {
|
||||
LOG.error("Unexpcted ex", error);
|
||||
error.printStackTrace();
|
||||
exceptions.add(error);
|
||||
} finally {
|
||||
} finally {
|
||||
Thread.currentThread().setName(threadName);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
class B implements Runnable {
|
||||
class B implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
final String threadName = Thread.currentThread().getName();
|
||||
try {
|
||||
for (int i=0; i<iterations; i++) {
|
||||
PList candidate = lists[i%numLists];
|
||||
Thread.currentThread().setName("ALRF:"+candidate.getName());
|
||||
synchronized (plistLocks(candidate)) {
|
||||
candidate.addLast(String.valueOf(i), payload);
|
||||
for (int i = 0; i < iterations; i++) {
|
||||
PList candidate = lists[i % numLists];
|
||||
Thread.currentThread().setName("ALRF:" + candidate.getName());
|
||||
synchronized (plistLocks(candidate)) {
|
||||
Object locator = candidate.addLast(String.valueOf(i), payload);
|
||||
getFirst(candidate);
|
||||
assertTrue(candidate.remove(String.valueOf(i)));
|
||||
}
|
||||
assertTrue(candidate.remove(locator));
|
||||
}
|
||||
}
|
||||
} catch (Exception error) {
|
||||
error.printStackTrace();
|
||||
exceptions.add(error);
|
||||
} finally {
|
||||
} finally {
|
||||
Thread.currentThread().setName(threadName);
|
||||
}
|
||||
}
|
||||
|
@ -282,24 +278,23 @@ public class PListTest {
|
|||
store.setDirectory(directory);
|
||||
store.start();
|
||||
|
||||
|
||||
final int numThreads = 20;
|
||||
final int iterations = 1000;
|
||||
executor = Executors.newFixedThreadPool(100);
|
||||
for (int i=0; i<numThreads; i++) {
|
||||
for (int i = 0; i < numThreads; i++) {
|
||||
new Job(i, PListTest.TaskType.ADD, iterations).run();
|
||||
}
|
||||
|
||||
for (int i=0; i<numThreads; i++) {
|
||||
for (int i = 0; i < numThreads; i++) {
|
||||
executor.execute(new Job(i, PListTest.TaskType.ITERATE, iterations));
|
||||
}
|
||||
|
||||
for (int i=0; i<100; i++) {
|
||||
executor.execute(new Job(i+20, PListTest.TaskType.ADD, 100));
|
||||
for (int i = 0; i < 100; i++) {
|
||||
executor.execute(new Job(i + 20, PListTest.TaskType.ADD, 100));
|
||||
}
|
||||
|
||||
executor.shutdown();
|
||||
boolean finishedInTime = executor.awaitTermination(60*5, TimeUnit.SECONDS);
|
||||
boolean finishedInTime = executor.awaitTermination(60 * 5, TimeUnit.SECONDS);
|
||||
assertTrue("finished ok", finishedInTime);
|
||||
}
|
||||
|
||||
|
@ -313,7 +308,7 @@ public class PListTest {
|
|||
store.setDirectory(directory);
|
||||
store.start();
|
||||
|
||||
for (int i=0;i<2000; i++) {
|
||||
for (int i = 0; i < 2000; i++) {
|
||||
new Job(i, PListTest.TaskType.ADD, 5).run();
|
||||
|
||||
}
|
||||
|
@ -329,7 +324,7 @@ public class PListTest {
|
|||
IOHelper.deleteChildren(directory);
|
||||
store = new PListStoreImpl();
|
||||
store.setDirectory(directory);
|
||||
store.setJournalMaxFileLength(1024*5);
|
||||
store.setJournalMaxFileLength(1024 * 5);
|
||||
store.setCleanupInterval(5000);
|
||||
store.setIndexWriteBatchSize(500);
|
||||
store.start();
|
||||
|
@ -341,48 +336,48 @@ public class PListTest {
|
|||
|
||||
// create/delete
|
||||
LOG.info("create");
|
||||
for (int i=0; i<numLists;i++) {
|
||||
for (int i = 0; i < numLists; i++) {
|
||||
new Job(i, PListTest.TaskType.CREATE, iterations).run();
|
||||
}
|
||||
|
||||
LOG.info("delete");
|
||||
for (int i=0; i<numLists;i++) {
|
||||
for (int i = 0; i < numLists; i++) {
|
||||
new Job(i, PListTest.TaskType.DELETE, iterations).run();
|
||||
}
|
||||
|
||||
LOG.info("fill");
|
||||
for (int i=0; i<numLists;i++) {
|
||||
for (int i = 0; i < numLists; i++) {
|
||||
new Job(i, PListTest.TaskType.ADD, iterations).run();
|
||||
}
|
||||
LOG.info("remove");
|
||||
for (int i=0; i<numLists;i++) {
|
||||
for (int i = 0; i < numLists; i++) {
|
||||
new Job(i, PListTest.TaskType.REMOVE, iterations).run();
|
||||
}
|
||||
|
||||
LOG.info("check empty");
|
||||
for (int i=0; i<numLists;i++) {
|
||||
for (int i = 0; i < numLists; i++) {
|
||||
assertEquals("empty " + i, 0, store.getPList("List-" + i).size());
|
||||
}
|
||||
|
||||
LOG.info("delete again");
|
||||
for (int i=0; i<numLists;i++) {
|
||||
for (int i = 0; i < numLists; i++) {
|
||||
new Job(i, PListTest.TaskType.DELETE, iterations).run();
|
||||
}
|
||||
|
||||
LOG.info("fill again");
|
||||
for (int i=0; i<numLists;i++) {
|
||||
for (int i = 0; i < numLists; i++) {
|
||||
new Job(i, PListTest.TaskType.ADD, iterations).run();
|
||||
}
|
||||
|
||||
LOG.info("parallel add and remove");
|
||||
executor = Executors.newFixedThreadPool(numLists*2);
|
||||
for (int i=0; i<numLists*2; i++) {
|
||||
executor.execute(new Job(i, i>=numLists ? PListTest.TaskType.ADD : PListTest.TaskType.REMOVE, iterations));
|
||||
executor = Executors.newFixedThreadPool(numLists * 2);
|
||||
for (int i = 0; i < numLists * 2; i++) {
|
||||
executor.execute(new Job(i, i >= numLists ? PListTest.TaskType.ADD : PListTest.TaskType.REMOVE, iterations));
|
||||
}
|
||||
|
||||
executor.shutdown();
|
||||
LOG.info("wait for parallel work to complete");
|
||||
boolean finishedInTime = executor.awaitTermination(60*5, TimeUnit.SECONDS);
|
||||
boolean finishedInTime = executor.awaitTermination(60 * 5, TimeUnit.SECONDS);
|
||||
assertTrue("no exceptions", exceptions.isEmpty());
|
||||
assertTrue("finished ok", finishedInTime);
|
||||
}
|
||||
|
@ -392,14 +387,14 @@ public class PListTest {
|
|||
|
||||
@Test
|
||||
public void testRepeatStressWithCache() throws Exception {
|
||||
for (int i=0; i<numRepeats;i++) {
|
||||
for (int i = 0; i < numRepeats; i++) {
|
||||
do_testConcurrentAddIterateRemove(true);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRepeatStressWithOutCache() throws Exception {
|
||||
for (int i=0; i<numRepeats;i++) {
|
||||
for (int i = 0; i < numRepeats; i++) {
|
||||
do_testConcurrentAddIterateRemove(false);
|
||||
}
|
||||
}
|
||||
|
@ -411,7 +406,7 @@ public class PListTest {
|
|||
IOHelper.deleteChildren(directory);
|
||||
store = new PListStoreImpl();
|
||||
store.setIndexEnablePageCaching(enablePageCache);
|
||||
store.setIndexPageSize(2*1024);
|
||||
store.setIndexPageSize(2 * 1024);
|
||||
store.setDirectory(directory);
|
||||
store.start();
|
||||
|
||||
|
@ -419,12 +414,12 @@ public class PListTest {
|
|||
final int numLists = 10;
|
||||
|
||||
LOG.info("create");
|
||||
for (int i=0; i<numLists;i++) {
|
||||
for (int i = 0; i < numLists; i++) {
|
||||
new Job(i, PListTest.TaskType.CREATE, iterations).run();
|
||||
}
|
||||
|
||||
LOG.info("fill");
|
||||
for (int i=0; i<numLists;i++) {
|
||||
for (int i = 0; i < numLists; i++) {
|
||||
new Job(i, PListTest.TaskType.ADD, iterations).run();
|
||||
}
|
||||
|
||||
|
@ -432,22 +427,22 @@ public class PListTest {
|
|||
executor = Executors.newFixedThreadPool(400);
|
||||
final int numProducer = 5;
|
||||
final int numConsumer = 10;
|
||||
for (int i=0; i<numLists; i++) {
|
||||
for (int j=0; j<numProducer; j++) {
|
||||
executor.execute(new Job(i, PListTest.TaskType.ADD, iterations*2));
|
||||
for (int i = 0; i < numLists; i++) {
|
||||
for (int j = 0; j < numProducer; j++) {
|
||||
executor.execute(new Job(i, PListTest.TaskType.ADD, iterations * 2));
|
||||
}
|
||||
for (int k=0;k<numConsumer; k++) {
|
||||
executor.execute(new Job(i, TaskType.ITERATE_REMOVE, iterations/4));
|
||||
for (int k = 0; k < numConsumer; k++) {
|
||||
executor.execute(new Job(i, TaskType.ITERATE_REMOVE, iterations / 4));
|
||||
}
|
||||
}
|
||||
|
||||
for (int i=numLists; i<numLists*10; i++) {
|
||||
for (int i = numLists; i < numLists * 10; i++) {
|
||||
executor.execute(new Job(i, PListTest.TaskType.ADD, iterations));
|
||||
}
|
||||
}
|
||||
|
||||
executor.shutdown();
|
||||
LOG.info("wait for parallel work to complete");
|
||||
boolean shutdown = executor.awaitTermination(60*60, TimeUnit.SECONDS);
|
||||
boolean shutdown = executor.awaitTermination(60 * 60, TimeUnit.SECONDS);
|
||||
assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
|
||||
assertTrue("test did not timeout ", shutdown);
|
||||
}
|
||||
|
@ -459,8 +454,8 @@ public class PListTest {
|
|||
IOHelper.mkdirs(directory);
|
||||
IOHelper.deleteChildren(directory);
|
||||
store = new PListStoreImpl();
|
||||
store.setIndexPageSize(2*1024);
|
||||
store.setJournalMaxFileLength(1024*1024);
|
||||
store.setIndexPageSize(2 * 1024);
|
||||
store.setJournalMaxFileLength(1024 * 1024);
|
||||
store.setDirectory(directory);
|
||||
store.setCleanupInterval(-1);
|
||||
store.setIndexEnablePageCaching(false);
|
||||
|
@ -471,35 +466,39 @@ public class PListTest {
|
|||
final int numLists = 10;
|
||||
|
||||
LOG.info("create");
|
||||
for (int i=0; i<numLists;i++) {
|
||||
for (int i = 0; i < numLists; i++) {
|
||||
new Job(i, PListTest.TaskType.CREATE, iterations).run();
|
||||
}
|
||||
|
||||
LOG.info("parallel add and iterate");
|
||||
// We want a lot of adds occurring so that new free pages get created along
|
||||
// with overlapping seeks from the iterators so that we are likely to seek into
|
||||
// We want a lot of adds occurring so that new free pages get created
|
||||
// along
|
||||
// with overlapping seeks from the iterators so that we are likely to
|
||||
// seek into
|
||||
// some bad area in the page file.
|
||||
executor = Executors.newFixedThreadPool(400);
|
||||
final int numProducer = 300;
|
||||
final int numConsumer = 100;
|
||||
for (int i=0; i<numLists; i++) {
|
||||
for (int j=0; j<numProducer; j++) {
|
||||
for (int i = 0; i < numLists; i++) {
|
||||
for (int j = 0; j < numProducer; j++) {
|
||||
executor.execute(new Job(i, PListTest.TaskType.ADD, iterations));
|
||||
}
|
||||
for (int k=0;k<numConsumer; k++) {
|
||||
executor.execute(new Job(i, TaskType.ITERATE, iterations*2));
|
||||
for (int k = 0; k < numConsumer; k++) {
|
||||
executor.execute(new Job(i, TaskType.ITERATE, iterations * 2));
|
||||
}
|
||||
}
|
||||
|
||||
executor.shutdown();
|
||||
LOG.info("wait for parallel work to complete");
|
||||
boolean shutdown = executor.awaitTermination(60*60, TimeUnit.SECONDS);
|
||||
boolean shutdown = executor.awaitTermination(60 * 60, TimeUnit.SECONDS);
|
||||
assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
|
||||
assertTrue("test did not timeout ", shutdown);
|
||||
LOG.info("Num dataFiles:" + store.getJournal().getFiles().size());
|
||||
}
|
||||
|
||||
enum TaskType {CREATE, DELETE, ADD, REMOVE, ITERATE, ITERATE_REMOVE}
|
||||
enum TaskType {
|
||||
CREATE, DELETE, ADD, REMOVE, ITERATE, ITERATE_REMOVE
|
||||
}
|
||||
|
||||
class Job implements Runnable {
|
||||
|
||||
|
@ -520,22 +519,22 @@ public class PListTest {
|
|||
PListImpl plist = null;
|
||||
switch (task) {
|
||||
case CREATE:
|
||||
Thread.currentThread().setName("C:"+id);
|
||||
Thread.currentThread().setName("C:" + id);
|
||||
plist = store.getPList(String.valueOf(id));
|
||||
LOG.info("Job-" + id + ", CREATE");
|
||||
break;
|
||||
case DELETE:
|
||||
Thread.currentThread().setName("D:"+id);
|
||||
Thread.currentThread().setName("D:" + id);
|
||||
store.removePList(String.valueOf(id));
|
||||
break;
|
||||
case ADD:
|
||||
Thread.currentThread().setName("A:"+id);
|
||||
Thread.currentThread().setName("A:" + id);
|
||||
plist = store.getPList(String.valueOf(id));
|
||||
|
||||
for (int j = 0; j < iterations; j++) {
|
||||
synchronized (plistLocks(plist)) {
|
||||
if (exceptions.isEmpty()) {
|
||||
plist.addLast ("PL>" + id + idSeed + "-" + j, payload);
|
||||
plist.addLast("PL>" + id + idSeed + "-" + j, payload);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
|
@ -547,12 +546,12 @@ public class PListTest {
|
|||
}
|
||||
break;
|
||||
case REMOVE:
|
||||
Thread.currentThread().setName("R:"+id);
|
||||
Thread.currentThread().setName("R:" + id);
|
||||
plist = store.getPList(String.valueOf(id));
|
||||
synchronized (plistLocks(plist)) {
|
||||
|
||||
for (int j = iterations -1; j >= 0; j--) {
|
||||
plist.remove("PL>" + id + idSeed + "-" + j);
|
||||
for (int j = iterations - 1; j >= 0; j--) {
|
||||
plist.remove("PL>" + id + idSeed + "-" + j);
|
||||
if (j > 0 && j % (iterations / 2) == 0) {
|
||||
LOG.info("Job-" + id + " Done remove: " + j);
|
||||
}
|
||||
|
@ -560,7 +559,7 @@ public class PListTest {
|
|||
}
|
||||
break;
|
||||
case ITERATE:
|
||||
Thread.currentThread().setName("I:"+id);
|
||||
Thread.currentThread().setName("I:" + id);
|
||||
plist = store.getPList(String.valueOf(id));
|
||||
int iterateCount = 0;
|
||||
synchronized (plistLocks(plist)) {
|
||||
|
@ -571,17 +570,19 @@ public class PListTest {
|
|||
iterateCount++;
|
||||
}
|
||||
|
||||
//LOG.info("Job-" + id + " Done iterate: it=" + iterator + ", count:" + iterateCount + ", size:" + plist.size());
|
||||
// LOG.info("Job-" + id + " Done iterate: it=" +
|
||||
// iterator + ", count:" + iterateCount +
|
||||
// ", size:" + plist.size());
|
||||
if (plist.size() != iterateCount) {
|
||||
System.err.println("Count Wrong: " + iterator);
|
||||
}
|
||||
assertEquals("iterate got all " + id + " iterator:" + iterator , plist.size(), iterateCount);
|
||||
assertEquals("iterate got all " + id + " iterator:" + iterator, plist.size(), iterateCount);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case ITERATE_REMOVE:
|
||||
Thread.currentThread().setName("IRM:"+id);
|
||||
Thread.currentThread().setName("IRM:" + id);
|
||||
plist = store.getPList(String.valueOf(id));
|
||||
|
||||
int removeCount = 0;
|
||||
|
@ -604,7 +605,7 @@ public class PListTest {
|
|||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Job["+id+"] caught exception: " + e.getMessage());
|
||||
LOG.warn("Job[" + id + "] caught exception: " + e.getMessage());
|
||||
e.printStackTrace();
|
||||
exceptions.add(e);
|
||||
if (executor != null) {
|
||||
|
@ -617,12 +618,13 @@ public class PListTest {
|
|||
}
|
||||
|
||||
Map<PList, Object> locks = new HashMap<PList, Object>();
|
||||
|
||||
private Object plistLocks(PList plist) {
|
||||
Object lock = null;
|
||||
synchronized (locks) {
|
||||
if (locks.containsKey(plist)) {
|
||||
lock = locks.get(plist);
|
||||
} else {
|
||||
} else {
|
||||
lock = new Object();
|
||||
locks.put(plist, lock);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue