Modified BlockingBuffer add method to notifyAll

instead of notify.
Added tests to verify blocking behavior.
Patch submitted by: Janek Bogucki
Reviewed by: Phil Steitz
Pr #23232, 23159


git-svn-id: https://svn.apache.org/repos/asf/jakarta/commons/proper/collections/trunk@131161 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Phil Steitz 2003-09-18 03:28:28 +00:00
parent c3992caf1d
commit 9f3a1ecf83
2 changed files with 187 additions and 82 deletions

View File

@ -1,5 +1,5 @@
/*
* $Header: /home/jerenkrantz/tmp/commons/commons-convert/cvs/home/cvs/jakarta-commons//collections/src/java/org/apache/commons/collections/decorators/Attic/BlockingBuffer.java,v 1.2 2003/08/31 17:24:46 scolebourne Exp $
* $Header: /home/jerenkrantz/tmp/commons/commons-convert/cvs/home/cvs/jakarta-commons//collections/src/java/org/apache/commons/collections/decorators/Attic/BlockingBuffer.java,v 1.3 2003/09/18 03:28:28 psteitz Exp $
* ====================================================================
*
* The Apache Software License, Version 1.1
@ -64,13 +64,14 @@ import org.apache.commons.collections.BufferUnderflowException;
/**
* <code>BlockingBuffer</code> decorates another <code>Buffer</code>
* to block on calls to the get method to wait until entries are
* to block on calls to the get and remove methods to wait until entries are
* added to the buffer.
*
* @since Commons Collections 3.0
* @version $Revision: 1.2 $ $Date: 2003/08/31 17:24:46 $
* @version $Revision: 1.3 $ $Date: 2003/09/18 03:28:28 $
*
* @author Stephen Colebourne
* @author Janek Bogucki
*/
public class BlockingBuffer extends SynchronizedBuffer {
@ -98,7 +99,7 @@ public class BlockingBuffer extends SynchronizedBuffer {
public boolean add(Object o) {
synchronized (lock) {
boolean result = collection.add(o);
notify();
notifyAll();
return result;
}
}

View File

@ -1,5 +1,5 @@
/*
* $Header: /home/jerenkrantz/tmp/commons/commons-convert/cvs/home/cvs/jakarta-commons//collections/src/test/org/apache/commons/collections/decorators/Attic/TestBlockingBuffer.java,v 1.1 2003/09/15 03:50:41 psteitz Exp $
* $Header: /home/jerenkrantz/tmp/commons/commons-convert/cvs/home/cvs/jakarta-commons//collections/src/test/org/apache/commons/collections/decorators/Attic/TestBlockingBuffer.java,v 1.2 2003/09/18 03:28:28 psteitz Exp $
* ====================================================================
*
* The Apache Software License, Version 1.1
@ -57,15 +57,16 @@
*/
package org.apache.commons.collections.decorators;
import java.util.Collections;
import java.util.LinkedList;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Set;
import junit.framework.Test;
import junit.framework.TestSuite;
import org.apache.commons.collections.Buffer;
import org.apache.commons.collections.ArrayStack;
import org.apache.commons.collections.BufferUnderflowException;
import org.apache.commons.collections.decorators.BlockingBuffer;
@ -76,7 +77,7 @@ import org.apache.commons.collections.TestObject;
* implementation.
*
* @since Commons Collections 3.0
* @version $Revision: 1.1 $
* @version $Revision: 1.2 $
*
* @author Janek Bogucki
* @author Phil Steitz
@ -102,7 +103,7 @@ public class TestBlockingBuffer extends TestObject {
//-----------------------------------------------------------------------
/**
* Tests {@link BlockingBuffer#get()}.
* Tests {@link BlockingBuffer#get()} in combination with {@link BlockingBuffer#add()}.
*/
public void testGetWithAdd() {
@ -117,7 +118,7 @@ public class TestBlockingBuffer extends TestObject {
//-----------------------------------------------------------------------
/**
* Tests {@link BlockingBuffer#get()}.
* Tests {@link BlockingBuffer#get()} in combination with {@link BlockingBuffer#addAll()}.
*/
public void testGetWithAddAll() {
@ -132,7 +133,7 @@ public class TestBlockingBuffer extends TestObject {
//-----------------------------------------------------------------------
/**
* Tests {@link BlockingBuffer#remove()}.
* Tests {@link BlockingBuffer#remove()} in combination with {@link BlockingBuffer#add()}.
*/
public void testRemoveWithAdd() {
@ -147,7 +148,7 @@ public class TestBlockingBuffer extends TestObject {
//-----------------------------------------------------------------------
/**
* Tests {@link BlockingBuffer#remove()}.
* Tests {@link BlockingBuffer#remove()} in combination with {@link BlockingBuffer#addAll()}.
*/
public void testRemoveWithAddAll() {
@ -159,107 +160,74 @@ public class TestBlockingBuffer extends TestObject {
// verify does not throw BufferUnderflowException; should block until other thread has added to the buffer .
assertSame(obj, blockingBuffer.remove());
}
//-----------------------------------------------------------------------
/**
* Tests get using multiple read threads.
* Tests {@link BlockingBuffer#get()} in combination with {@link BlockingBuffer#add()} using multiple read threads.
*
* Verifies that multiple adds are required to allow gets by
* multiple threads on an empty buffer to complete.
* Two read threads should block on an empty buffer until one object
* is added then both threads should complete.
*/
public void testBlockedGetWithAdd() {
Buffer blockingBuffer = BlockingBuffer.decorate(new MyBuffer());
Object obj = new Object();
// run methods will get and compare -- must wait for adds
// run methods will get and compare -- must wait for add
Thread thread1 = new ReadThread(blockingBuffer, obj);
Thread thread2 = new ReadThread(blockingBuffer, obj);
thread1.start();
thread2.start();
// give hungry read threads ample time to hang
try {
Thread.currentThread().sleep(100);
} catch (InterruptedException e) {}
delay();
// notify should allow one read thread to complete
// notifyAll should allow both read threads to complete
blockingBuffer.add(obj);
// allow notified thread(s) to complete
try {
Thread.currentThread().sleep(100);
} catch (InterruptedException e) {}
// allow notified threads to complete
delay();
// There shoould still be one thread waiting. Verify this.
// This check will fail if add is changed to notifyAll.
assertTrue("One read thread should be waiting",
thread1.isAlive() || thread2.isAlive());
// now add again so the second thread will be notified
blockingBuffer.add(obj);
// wait to exit until both threads are dead, or appear to be hung
boolean finished = false;
for (int i = 1; i < 10; i++) {
if (thread1.isAlive() || thread2.isAlive()) {
try {
Thread.currentThread().sleep(100);
}
catch (InterruptedException e) {}
} else {
finished = true;
break;
}
}
if (!finished) {
fail("Read thread did not finish.");
}
// There should not be any threads waiting.
if (thread1.isAlive() || thread2.isAlive())
fail("Live thread(s) when both should be dead.");
}
//-----------------------------------------------------------------------
/**
* Tests get using multiple read threads.
* Shows that one addAll allows multiple gets to complete.
* Tests {@link BlockingBuffer#get()} in combination with {@link BlockingBuffer#addAll()} using multiple read threads.
*
* Two read threads should block on an empty buffer until a
* singleton is added then both threads should complete.
*/
public void testBlockedGetWithAddAll() {
Buffer blockingBuffer = BlockingBuffer.decorate(new MyBuffer());
Object obj = new Object();
// run methods will get and compare -- must wait for adds
// run methods will get and compare -- must wait for addAll
Thread thread1 = new ReadThread(blockingBuffer, obj);
Thread thread2 = new ReadThread(blockingBuffer, obj);
thread1.start();
thread2.start();
// give hungry read threads ample time to hang
try {
Thread.currentThread().sleep(100);
} catch (InterruptedException e) {}
delay();
// notifyAll should allow both read threads to complete
blockingBuffer.addAll(Collections.singleton(obj));
// wait to exit until both threads are dead, or appear to be hung
boolean finished = false;
for (int i = 1; i < 10; i++) {
if (thread1.isAlive() || thread2.isAlive()) {
try {
Thread.currentThread().sleep(100);
}
catch (InterruptedException e) {}
} else {
finished = true;
break;
}
}
if (!finished) {
fail("Read thread did not finish.");
}
// allow notified threads to complete
delay();
// There should not be any threads waiting.
if (thread1.isAlive() || thread2.isAlive())
fail("Live thread(s) when both should be dead.");
}
//-----------------------------------------------------------------------
/**
* Tests interrupted get.
* Tests interrupted {@link BlockingBuffer#get()}.
*/
public void testInterruptedGet() {
@ -275,19 +243,140 @@ public class TestBlockingBuffer extends TestObject {
thread.interrupt();
// Chill, so thread can throw and add message to exceptionList
try {
Thread.currentThread().sleep(100);
} catch (InterruptedException e) {}
delay();
assertTrue("Thread interrupt should have led to underflow",
exceptionList.contains("BufferUnderFlow"));
if (thread.isAlive()) {
fail("Hung read thread");
fail("Read thread has hung.");
}
}
//-----------------------------------------------------------------------
/**
* Tests {@link BlockingBuffer#remove()} in combination with {@link BlockingBuffer#add()} using multiple read threads.
*
* Two read threads should block on an empty buffer until one
* object is added then one thread should complete. The remaining
* thread should complete after the addition of a second object.
*/
public void testBlockedRemoveWithAdd() {
Buffer blockingBuffer = BlockingBuffer.decorate(new MyBuffer());
Object obj = new Object();
// run methods will remove and compare -- must wait for add
Thread thread1 = new ReadThread(blockingBuffer, obj, null, "remove");
Thread thread2 = new ReadThread(blockingBuffer, obj, null, "remove");
thread1.start();
thread2.start();
// give hungry read threads ample time to hang
delay();
blockingBuffer.add(obj);
// allow notified threads to complete
delay();
// There should be one thread waiting.
assertTrue ("There is one thread waiting", thread1.isAlive() ^ thread2.isAlive());
blockingBuffer.add(obj);
// allow notified thread to complete
delay();
// There should not be any threads waiting.
if(thread1.isAlive() || thread2.isAlive())
fail("Live thread(s) when both should be dead.");
}
//-----------------------------------------------------------------------
/**
* Tests {@link BlockingBuffer#remove()} in combination with {@link BlockingBuffer#addAll()} using multiple read threads.
*
* Two read threads should block on an empty buffer until a
* singleton collection is added then one thread should
* complete. The remaining thread should complete after the
* addition of a second singleton.
*/
public void testBlockedRemoveWithAddAll1() {
Buffer blockingBuffer = BlockingBuffer.decorate(new MyBuffer());
Object obj = new Object();
// run methods will remove and compare -- must wait for addAll
Thread thread1 = new ReadThread(blockingBuffer, obj, null, "remove");
Thread thread2 = new ReadThread(blockingBuffer, obj, null, "remove");
thread1.start();
thread2.start();
// give hungry read threads ample time to hang
delay();
blockingBuffer.addAll(Collections.singleton(obj));
// allow notified threads to complete
delay();
// There should be one thread waiting.
assertTrue ("There is one thread waiting", thread1.isAlive() ^ thread2.isAlive());
blockingBuffer.addAll(Collections.singleton(obj));
// allow notified thread to complete
delay();
// There should not be any threads waiting.
if(thread1.isAlive() || thread2.isAlive())
fail("Live thread(s) when both should be dead.");
}
//-----------------------------------------------------------------------
/**
* Tests {@link BlockingBuffer#remove()} in combination with {@link BlockingBuffer#addAll()} using multiple read threads.
*
* Two read threads should block on an empty buffer until a
* collection with two distinct objects is added then both
* threads should complete. Each thread should have read a
* different object.
*/
public void testBlockedRemoveWithAddAll2() {
Buffer blockingBuffer = BlockingBuffer.decorate(new MyBuffer());
Object obj1 = new Object();
Object obj2 = new Object();
Set objs = Collections.synchronizedSet(new HashSet());
objs.add(obj1);
objs.add(obj2);
// run methods will remove and compare -- must wait for addAll
Thread thread1 = new ReadThread(blockingBuffer, objs, "remove");
Thread thread2 = new ReadThread(blockingBuffer, objs, "remove");
thread1.start();
thread2.start();
// give hungry read threads ample time to hang
delay();
blockingBuffer.addAll(objs);
// allow notified threads to complete
delay();
assertEquals("Both objects were removed", 0, objs.size());
// There should not be any threads waiting.
if(thread1.isAlive() || thread2.isAlive())
fail("Live thread(s) when both should be dead.");
}
//-----------------------------------------------------------------------
/**
* Tests interrupted remove.
*/
@ -305,15 +394,13 @@ public class TestBlockingBuffer extends TestObject {
thread.interrupt();
// Chill, so thread can throw and add message to exceptionList
try {
Thread.currentThread().sleep(100);
} catch (InterruptedException e) {}
delay();
assertTrue("Thread interrupt should have led to underflow",
exceptionList.contains("BufferUnderFlow"));
if (thread.isAlive()) {
fail("Hung read thread");
fail("Read thread has hung.");
}
}
@ -370,6 +457,7 @@ public class TestBlockingBuffer extends TestObject {
Object obj;
ArrayList exceptionList = null;
String action = "get";
Set objs;
ReadThread (Buffer buffer, Object obj) {
super();
@ -392,12 +480,22 @@ public class TestBlockingBuffer extends TestObject {
this.action = action;
}
ReadThread (Buffer buffer, Set objs, String action) {
super();
this.buffer = buffer;
this.objs = objs;
this.action = action;
}
public void run() {
try {
if (action == "get") {
assertSame(obj, buffer.get());
} else {
assertSame(obj, buffer.remove());
if (null != obj)
assertSame(obj, buffer.remove());
else
assertTrue(objs.remove(buffer.remove()));
}
} catch (BufferUnderflowException ex) {
exceptionList.add("BufferUnderFlow");
@ -420,4 +518,10 @@ public class TestBlockingBuffer extends TestObject {
return remove(0);
}
}
private void delay(){
try {
Thread.currentThread().sleep(100);
} catch (InterruptedException e) {}
}
}