From 050c817ad5a377a2ab60a51c2f44dad6ce90e106 Mon Sep 17 00:00:00 2001
From: "James W. Carman"
* Buffer b = BufferUtils.synchronizedBuffer(myBuffer); * synchronized (b) { @@ -73,9 +72,9 @@ public class BufferUtils { /** * Returns a synchronized buffer backed by the given buffer that will * block on {@link Buffer#get()} and {@link Buffer#remove()} operations. - * If the buffer is empty, then the {@link Buffer#get()} and + * If the buffer is empty, then the {@link Buffer#get()} and * {@link Buffer#remove()} operations will block until new elements - * are added to the buffer, rather than immediately throwing a + * are added to the buffer, rather than immediately throwing a *BufferUnderflowException
. * * @param buffer the buffer to synchronize, must not be null @@ -99,10 +98,10 @@ public class BufferUtils { * @throws IllegalArgumentException if the Buffer is null * @since Commons Collections 3.2 */ - public static Buffer timeoutBuffer(Buffer buffer, long timeout) { - return TimeoutBuffer.decorate(buffer, timeout); + public static Buffer blockingBuffer(Buffer buffer, long timeout) { + return BlockingBuffer.decorate(buffer, timeout); } - + /** * Returns an unmodifiable buffer backed by the given buffer. * diff --git a/src/java/org/apache/commons/collections/buffer/BlockingBuffer.java b/src/java/org/apache/commons/collections/buffer/BlockingBuffer.java index 048539529..de115bec1 100644 --- a/src/java/org/apache/commons/collections/buffer/BlockingBuffer.java +++ b/src/java/org/apache/commons/collections/buffer/BlockingBuffer.java @@ -15,41 +15,42 @@ */ package org.apache.commons.collections.buffer; +import org.apache.commons.collections.Buffer; +import org.apache.commons.collections.BufferUnderflowException; + import java.io.PrintWriter; import java.io.StringWriter; import java.util.Collection; -import org.apache.commons.collections.Buffer; -import org.apache.commons.collections.BufferUnderflowException; - /** - * Decorates anotherBuffer
to make {@link #get()} and - * {@link #remove()} block when theBuffer
is empty. - *- * If either
get
orremove
is called on an empty - *Buffer
, the calling thread waits for notification that - * anadd
oraddAll
operation has completed. - *- * When one or more entries are added to an empty
Buffer
, - * all threads blocked inget
orremove
are notified. - * There is no guarantee that concurrent blockedget
or - *remove
requests will be "unblocked" and receive data in the - * order that they arrive. - *+ * Decorates another
+ * If eitherBuffer
to make {@link #get()} and {@link #remove()} block when theBuffer
+ * is empty. + *get
orremove
is called on an emptyBuffer
, the calling thread waits + * for notification that anadd
oraddAll
operation has completed. + * + * When one or more entries are added to an emptyBuffer
, all threads blocked inget
or + *remove
are notified. There is no guarantee that concurrent blockedget
or + *remove
requests will be "unblocked" and receive data in the order that they arrive. + * * This class is Serializable from Commons Collections 3.1. * * @author Stephen Colebourne * @author Janek Bogucki * @author Phil Steitz + * @author James Carman * @version $Revision$ $Date$ * @since Commons Collections 3.0 */ public class BlockingBuffer extends SynchronizedBuffer { + /** * Serialization version */ private static final long serialVersionUID = 1719328905017860541L; + private long timeout; + /** * Factory method to create a blocking buffer. * @@ -57,53 +58,83 @@ public class BlockingBuffer extends SynchronizedBuffer { * @return a new blocking Buffer * @throws IllegalArgumentException if buffer is null */ - public static Buffer decorate(Buffer buffer) { - return new BlockingBuffer(buffer); + public static Buffer decorate( Buffer buffer ) { + return new BlockingBuffer( buffer ); } - //----------------------------------------------------------------------- + /** + * Factory method to create a blocking buffer with a timeout value. + * + * @param buffer the buffer to decorate, must not be null + * @param timeout the maximum amount of time to block + * @return a new blocking buffer + * @throws IllegalArgumentException if the buffer is null + */ + public static Buffer decorate( Buffer buffer, long timeout ) { + return new BlockingBuffer( buffer, timeout ); + } + + //----------------------------------------------------------------------- + /** * Constructor that wraps (not copies). * * @param buffer the buffer to decorate, must not be null * @throws IllegalArgumentException if the buffer is null */ - protected BlockingBuffer(Buffer buffer) { - super(buffer); + protected BlockingBuffer( Buffer buffer ) { + super( buffer ); + } + + /** + * Constructor that wraps (not copies). + * + * @param buffer the buffer to decorate, must not be null + * @param timeout the maximum amount of time to block + * @throws IllegalArgumentException if the buffer is null + */ + protected BlockingBuffer( Buffer buffer, long timeout ) { + super( buffer ); + this.timeout = timeout < 0 ? 0 : timeout; } //----------------------------------------------------------------------- - public boolean add(Object o) { - synchronized (lock) { - boolean result = collection.add(o); + public boolean add( Object o ) { + synchronized( lock ) { + boolean result = collection.add( o ); lock.notifyAll(); return result; } } - public boolean addAll(Collection c) { - synchronized (lock) { - boolean result = collection.addAll(c); + public boolean addAll( Collection c ) { + synchronized( lock ) { + boolean result = collection.addAll( c ); lock.notifyAll(); return result; } } /** - * Gets the next value from the buffer, waiting until an object is - * added if the buffer is empty. + * Gets the next value from the buffer, waiting until an object is added if the buffer is empty. * * @throws BufferUnderflowException if an interrupt is received */ public Object get() { - synchronized (lock) { - while (collection.isEmpty()) { + synchronized( lock ) { + while( collection.isEmpty() ) { try { - lock.wait(); - } catch (InterruptedException e) { - PrintWriter out = new PrintWriter(new StringWriter()); - e.printStackTrace(out); - throw new BufferUnderflowException("Caused by InterruptedException: " + out.toString()); + if( timeout <= 0 ) { + lock.wait(); + } + else { + return get( timeout ); + } + } + catch( InterruptedException e ) { + PrintWriter out = new PrintWriter( new StringWriter() ); + e.printStackTrace( out ); + throw new BufferUnderflowException( "Caused by InterruptedException: " + out.toString() ); } } return getBuffer().get(); @@ -111,50 +142,56 @@ public class BlockingBuffer extends SynchronizedBuffer { } /** - * Gets the next value from the buffer, waiting until an object is - * added for up to the specified timeout value if the buffer is empty. + * Gets the next value from the buffer, waiting until an object is added for up to the specified timeout value if + * the buffer is empty. * - * @param timeout the timeout value in milliseconds + * @param timeout the timeout value in milliseconds * @throws BufferUnderflowException if an interrupt is received * @throws BufferUnderflowException if the timeout expires * @since Commons Collections 3.2 */ - public Object get(final long timeout) { - synchronized (lock) { + public Object get( final long timeout ) { + synchronized( lock ) { final long expiration = System.currentTimeMillis() + timeout; long timeLeft = expiration - System.currentTimeMillis(); - while (timeLeft > 0 && collection.isEmpty()) { + while( timeLeft > 0 && collection.isEmpty() ) { try { - lock.wait(timeLeft); + lock.wait( timeLeft ); timeLeft = expiration - System.currentTimeMillis(); - } catch(InterruptedException e) { - PrintWriter out = new PrintWriter(new StringWriter()); - e.printStackTrace(out); - throw new BufferUnderflowException("Caused by InterruptedException: " + out.toString()); + } + catch( InterruptedException e ) { + PrintWriter out = new PrintWriter( new StringWriter() ); + e.printStackTrace( out ); + throw new BufferUnderflowException( "Caused by InterruptedException: " + out.toString() ); } } - if (collection.isEmpty()) { - throw new BufferUnderflowException("Timeout expired."); + if( collection.isEmpty() ) { + throw new BufferUnderflowException( "Timeout expired." ); } return getBuffer().get(); } } /** - * Removes the next value from the buffer, waiting until an object is - * added if the buffer is empty. + * Removes the next value from the buffer, waiting until an object is added if the buffer is empty. * * @throws BufferUnderflowException if an interrupt is received */ public Object remove() { - synchronized (lock) { - while (collection.isEmpty()) { + synchronized( lock ) { + while( collection.isEmpty() ) { try { - lock.wait(); - } catch (InterruptedException e) { - PrintWriter out = new PrintWriter(new StringWriter()); - e.printStackTrace(out); - throw new BufferUnderflowException("Caused by InterruptedException: " + out.toString()); + if( timeout <= 0 ) { + lock.wait(); + } + else { + return remove( timeout ); + } + } + catch( InterruptedException e ) { + PrintWriter out = new PrintWriter( new StringWriter() ); + e.printStackTrace( out ); + throw new BufferUnderflowException( "Caused by InterruptedException: " + out.toString() ); } } return getBuffer().remove(); @@ -162,33 +199,33 @@ public class BlockingBuffer extends SynchronizedBuffer { } /** - * Removes the next value from the buffer, waiting until an object is - * added for up to the specified timeout value if the buffer is empty. + * Removes the next value from the buffer, waiting until an object is added for up to the specified timeout value if + * the buffer is empty. * - * @param timeout the timeout value in milliseconds + * @param timeout the timeout value in milliseconds * @throws BufferUnderflowException if an interrupt is received * @throws BufferUnderflowException if the timeout expires * @since Commons Collections 3.2 */ - public Object remove(final long timeout) { - synchronized (lock) { + public Object remove( final long timeout ) { + synchronized( lock ) { final long expiration = System.currentTimeMillis() + timeout; long timeLeft = expiration - System.currentTimeMillis(); - while (timeLeft > 0 && collection.isEmpty()) { + while( timeLeft > 0 && collection.isEmpty() ) { try { - lock.wait(timeLeft); + lock.wait( timeLeft ); timeLeft = expiration - System.currentTimeMillis(); - } catch(InterruptedException e) { - PrintWriter out = new PrintWriter(new StringWriter()); - e.printStackTrace(out); - throw new BufferUnderflowException("Caused by InterruptedException: " + out.toString()); + } + catch( InterruptedException e ) { + PrintWriter out = new PrintWriter( new StringWriter() ); + e.printStackTrace( out ); + throw new BufferUnderflowException( "Caused by InterruptedException: " + out.toString() ); } } - if (collection.isEmpty()) { - throw new BufferUnderflowException("Timeout expired."); + if( collection.isEmpty() ) { + throw new BufferUnderflowException( "Timeout expired." ); } return getBuffer().remove(); } } - } diff --git a/src/test/org/apache/commons/collections/buffer/TestBlockingBuffer.java b/src/test/org/apache/commons/collections/buffer/TestBlockingBuffer.java index c51b24deb..2e0f8250e 100644 --- a/src/test/org/apache/commons/collections/buffer/TestBlockingBuffer.java +++ b/src/test/org/apache/commons/collections/buffer/TestBlockingBuffer.java @@ -15,46 +15,43 @@ */ package org.apache.commons.collections.buffer; +import junit.framework.Test; +import junit.framework.TestSuite; +import org.apache.commons.collections.AbstractTestObject; +import org.apache.commons.collections.Buffer; +import org.apache.commons.collections.BufferUnderflowException; + import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.LinkedList; import java.util.Set; -import junit.framework.Test; -import junit.framework.TestSuite; - -import org.apache.commons.collections.AbstractTestObject; -import org.apache.commons.collections.Buffer; -import org.apache.commons.collections.BufferUnderflowException; - /** - * Extension of {@link TestObject} for exercising the {@link BlockingBuffer} - * implementation. + * Extension of {@link AbstractTestObject} for exercising the {@link BlockingBuffer} implementation. * - * @since Commons Collections 3.0 - * @version $Revision$ - * * @author Janek Bogucki * @author Phil Steitz + * @version $Revision$ + * @since Commons Collections 3.0 */ public class TestBlockingBuffer extends AbstractTestObject { - public TestBlockingBuffer(String testName) { - super(testName); + public TestBlockingBuffer( String testName ) { + super( testName ); } public static Test suite() { - return new TestSuite(TestBlockingBuffer.class); + return new TestSuite( TestBlockingBuffer.class ); } - public static void main(String args[]) { - String[] testCaseName = { TestBlockingBuffer.class.getName()}; - junit.textui.TestRunner.main(testCaseName); + public static void main( String args[] ) { + String[] testCaseName = {TestBlockingBuffer.class.getName()}; + junit.textui.TestRunner.main( testCaseName ); } public Object makeObject() { - return BlockingBuffer.decorate(new MyBuffer()); + return BlockingBuffer.decorate( new MyBuffer() ); } public boolean isEqualsCheckable() { @@ -62,446 +59,499 @@ public class TestBlockingBuffer extends AbstractTestObject { } //----------------------------------------------------------------------- + /** - * Tests {@link BlockingBuffer#get()} in combination with {@link BlockingBuffer#add()}. + * Tests {@link BlockingBuffer#get()} in combination with {@link BlockingBuffer#add(Object)}. */ public void testGetWithAdd() { - - Buffer blockingBuffer = BlockingBuffer.decorate(new MyBuffer()); + Buffer blockingBuffer = BlockingBuffer.decorate( new MyBuffer() ); Object obj = new Object(); - - new DelayedAdd(blockingBuffer, obj).start(); + new DelayedAdd( blockingBuffer, obj ).start(); // verify does not throw BufferUnderflowException; should block until other thread has added to the buffer . - assertSame(obj, blockingBuffer.get()); + assertSame( obj, blockingBuffer.get() ); + } + + public void testGetWithAddTimeout() { + Buffer blockingBuffer = BlockingBuffer.decorate( new MyBuffer(), 500 ); + Object obj = new Object(); + new DelayedAdd( blockingBuffer, obj, 100 ).start(); + + // verify does not throw BufferUnderflowException; should block until other thread has added to the buffer . + assertSame( obj, blockingBuffer.get() ); } //----------------------------------------------------------------------- + /** - * Tests {@link BlockingBuffer#get()} in combination with {@link BlockingBuffer#addAll()}. + * Tests {@link BlockingBuffer#get()} in combination with {@link BlockingBuffer#addAll(java.util.Collection)}. */ public void testGetWithAddAll() { - - Buffer blockingBuffer = BlockingBuffer.decorate(new MyBuffer()); + Buffer blockingBuffer = BlockingBuffer.decorate( new MyBuffer() ); Object obj = new Object(); - - new DelayedAddAll(blockingBuffer, obj).start(); + new DelayedAddAll( blockingBuffer, obj ).start(); // verify does not throw BufferUnderflowException; should block until other thread has added to the buffer . - assertSame(obj, blockingBuffer.get()); + assertSame( obj, blockingBuffer.get() ); + } + + public void testGetWithAddAllTimeout() { + Buffer blockingBuffer = BlockingBuffer.decorate( new MyBuffer(), 500 ); + Object obj = new Object(); + new DelayedAddAll( blockingBuffer, obj, 100 ).start(); + + // verify does not throw BufferUnderflowException; should block until other thread has added to the buffer . + assertSame( obj, blockingBuffer.get() ); } //----------------------------------------------------------------------- + /** - * Tests {@link BlockingBuffer#remove()} in combination with {@link BlockingBuffer#add()}. + * Tests {@link BlockingBuffer#remove()} in combination with {@link BlockingBuffer#add(Object)}. */ public void testRemoveWithAdd() { - - Buffer blockingBuffer = BlockingBuffer.decorate(new MyBuffer()); + Buffer blockingBuffer = BlockingBuffer.decorate( new MyBuffer() ); Object obj = new Object(); - - new DelayedAdd(blockingBuffer, obj).start(); + new DelayedAdd( blockingBuffer, obj ).start(); // verify does not throw BufferUnderflowException; should block until other thread has added to the buffer . - assertSame(obj, blockingBuffer.remove()); + assertSame( obj, blockingBuffer.remove() ); } + public void testRemoveWithAddTimeout() { + Buffer blockingBuffer = BlockingBuffer.decorate( new MyBuffer(), 100 ); + Object obj = new Object(); + new DelayedAdd( blockingBuffer, obj, 500 ).start(); + try { + blockingBuffer.remove(); + } + catch( BufferUnderflowException e ) { + } + } //----------------------------------------------------------------------- + /** - * Tests {@link BlockingBuffer#remove()} in combination with {@link BlockingBuffer#addAll()}. + * Tests {@link BlockingBuffer#remove()} in combination with {@link BlockingBuffer#addAll(java.util.Collection)}. */ public void testRemoveWithAddAll() { - - Buffer blockingBuffer = BlockingBuffer.decorate(new MyBuffer()); + Buffer blockingBuffer = BlockingBuffer.decorate( new MyBuffer() ); Object obj = new Object(); - - new DelayedAddAll(blockingBuffer, obj).start(); + new DelayedAddAll( blockingBuffer, obj ).start(); // verify does not throw BufferUnderflowException; should block until other thread has added to the buffer . - assertSame(obj, blockingBuffer.remove()); + assertSame( obj, blockingBuffer.remove() ); } + public void testRemoveWithAddAllTimeout() { + Buffer blockingBuffer = BlockingBuffer.decorate( new MyBuffer(), 100 ); + Object obj = new Object(); + new DelayedAddAll( blockingBuffer, obj, 500 ).start(); + try { + blockingBuffer.remove(); + } + catch( BufferUnderflowException e ) { + } + } //----------------------------------------------------------------------- + /** - * Tests {@link BlockingBuffer#get()} in combination with {@link BlockingBuffer#add()} using multiple read threads. - * - * Two read threads should block on an empty buffer until one object - * is added then both threads should complete. + * Tests {@link BlockingBuffer#get()} in combination with {@link BlockingBuffer#add(Object)} using multiple read + * threads. + * + * Two read threads should block on an empty buffer until one object is added then both threads should complete. */ public void testBlockedGetWithAdd() { - - Buffer blockingBuffer = BlockingBuffer.decorate(new MyBuffer()); + Buffer blockingBuffer = BlockingBuffer.decorate( new MyBuffer() ); Object obj = new Object(); - + // run methods will get and compare -- must wait for add - Thread thread1 = new ReadThread(blockingBuffer, obj); - Thread thread2 = new ReadThread(blockingBuffer, obj); + Thread thread1 = new ReadThread( blockingBuffer, obj ); + Thread thread2 = new ReadThread( blockingBuffer, obj ); thread1.start(); thread2.start(); - + // give hungry read threads ample time to hang delay(); - + // notifyAll should allow both read threads to complete - blockingBuffer.add(obj); - + blockingBuffer.add( obj ); + // allow notified threads to complete delay(); - + // There should not be any threads waiting. - if (thread1.isAlive() || thread2.isAlive()) - fail("Live thread(s) when both should be dead."); + if( thread1.isAlive() || thread2.isAlive() ) { + fail( "Live thread(s) when both should be dead." ); + } } - + //----------------------------------------------------------------------- + /** - * Tests {@link BlockingBuffer#get()} in combination with {@link BlockingBuffer#addAll()} using multiple read threads. - * - * Two read threads should block on an empty buffer until a - * singleton is added then both threads should complete. + * Tests {@link BlockingBuffer#get()} in combination with {@link BlockingBuffer#addAll(java.util.Collection)} using + * multiple read threads. + * + * Two read threads should block on an empty buffer until a singleton is added then both threads should complete. */ public void testBlockedGetWithAddAll() { - - Buffer blockingBuffer = BlockingBuffer.decorate(new MyBuffer()); + Buffer blockingBuffer = BlockingBuffer.decorate( new MyBuffer() ); Object obj = new Object(); - + // run methods will get and compare -- must wait for addAll - Thread thread1 = new ReadThread(blockingBuffer, obj); - Thread thread2 = new ReadThread(blockingBuffer, obj); + Thread thread1 = new ReadThread( blockingBuffer, obj ); + Thread thread2 = new ReadThread( blockingBuffer, obj ); thread1.start(); thread2.start(); - + // give hungry read threads ample time to hang delay(); - + // notifyAll should allow both read threads to complete - blockingBuffer.addAll(Collections.singleton(obj)); - + blockingBuffer.addAll( Collections.singleton( obj ) ); + // allow notified threads to complete delay(); - + // There should not be any threads waiting. - if (thread1.isAlive() || thread2.isAlive()) - fail("Live thread(s) when both should be dead."); + if( thread1.isAlive() || thread2.isAlive() ) { + fail( "Live thread(s) when both should be dead." ); + } } - + //----------------------------------------------------------------------- + /** - * Tests interrupted {@link BlockingBuffer#get()}. + * Tests interrupted {@link BlockingBuffer#get()}. */ public void testInterruptedGet() { - - Buffer blockingBuffer = BlockingBuffer.decorate(new MyBuffer()); + Buffer blockingBuffer = BlockingBuffer.decorate( new MyBuffer() ); Object obj = new Object(); - + // spawn a read thread to wait on the empty buffer ArrayList exceptionList = new ArrayList(); - Thread thread = new ReadThread(blockingBuffer, obj, exceptionList); + Thread thread = new ReadThread( blockingBuffer, obj, exceptionList ); thread.start(); - + // Interrupting the thread should cause it to throw BufferUnderflowException thread.interrupt(); - + // Chill, so thread can throw and add message to exceptionList delay(); - - assertTrue("Thread interrupt should have led to underflow", - exceptionList.contains("BufferUnderFlow")); - - if (thread.isAlive()) { - fail("Read thread has hung."); + assertTrue( "Thread interrupt should have led to underflow", + exceptionList.contains( "BufferUnderFlow" ) ); + if( thread.isAlive() ) { + fail( "Read thread has hung." ); } - + } - + //----------------------------------------------------------------------- + /** - * Tests {@link BlockingBuffer#remove()} in combination with {@link BlockingBuffer#add()} using multiple read threads. - * - * Two read threads should block on an empty buffer until one - * object is added then one thread should complete. The remaining - * thread should complete after the addition of a second object. + * Tests {@link BlockingBuffer#remove()} in combination with {@link BlockingBuffer#add(Object)} using multiple read + * threads. + * + * Two read threads should block on an empty buffer until one object is added then one thread should complete. The + * remaining thread should complete after the addition of a second object. */ public void testBlockedRemoveWithAdd() { - - Buffer blockingBuffer = BlockingBuffer.decorate(new MyBuffer()); + Buffer blockingBuffer = BlockingBuffer.decorate( new MyBuffer() ); Object obj = new Object(); - + // run methods will remove and compare -- must wait for add - Thread thread1 = new ReadThread(blockingBuffer, obj, null, "remove"); - Thread thread2 = new ReadThread(blockingBuffer, obj, null, "remove"); + Thread thread1 = new ReadThread( blockingBuffer, obj, null, "remove" ); + Thread thread2 = new ReadThread( blockingBuffer, obj, null, "remove" ); thread1.start(); thread2.start(); - + // give hungry read threads ample time to hang delay(); - - blockingBuffer.add(obj); - + blockingBuffer.add( obj ); + // allow notified threads to complete delay(); - + // There should be one thread waiting. - assertTrue ("There is one thread waiting", thread1.isAlive() ^ thread2.isAlive()); - - blockingBuffer.add(obj); - + assertTrue( "There is one thread waiting", thread1.isAlive() ^ thread2.isAlive() ); + blockingBuffer.add( obj ); + // allow notified thread to complete delay(); // There should not be any threads waiting. - if(thread1.isAlive() || thread2.isAlive()) - fail("Live thread(s) when both should be dead."); + if( thread1.isAlive() || thread2.isAlive() ) { + fail( "Live thread(s) when both should be dead." ); + } } //----------------------------------------------------------------------- + /** - * Tests {@link BlockingBuffer#remove()} in combination with {@link BlockingBuffer#addAll()} using multiple read threads. - * - * Two read threads should block on an empty buffer until a - * singleton collection is added then one thread should - * complete. The remaining thread should complete after the - * addition of a second singleton. + * Tests {@link BlockingBuffer#remove()} in combination with {@link BlockingBuffer#addAll(java.util.Collection)} + * using multiple read threads. + * + * Two read threads should block on an empty buffer until a singleton collection is added then one thread should + * complete. The remaining thread should complete after the addition of a second singleton. */ public void testBlockedRemoveWithAddAll1() { - - Buffer blockingBuffer = BlockingBuffer.decorate(new MyBuffer()); + Buffer blockingBuffer = BlockingBuffer.decorate( new MyBuffer() ); Object obj = new Object(); - + // run methods will remove and compare -- must wait for addAll - Thread thread1 = new ReadThread(blockingBuffer, obj, null, "remove"); - Thread thread2 = new ReadThread(blockingBuffer, obj, null, "remove"); + Thread thread1 = new ReadThread( blockingBuffer, obj, null, "remove" ); + Thread thread2 = new ReadThread( blockingBuffer, obj, null, "remove" ); thread1.start(); thread2.start(); - + // give hungry read threads ample time to hang delay(); - - blockingBuffer.addAll(Collections.singleton(obj)); - + blockingBuffer.addAll( Collections.singleton( obj ) ); + // allow notified threads to complete delay(); - + // There should be one thread waiting. - assertTrue ("There is one thread waiting", thread1.isAlive() ^ thread2.isAlive()); - - blockingBuffer.addAll(Collections.singleton(obj)); - + assertTrue( "There is one thread waiting", thread1.isAlive() ^ thread2.isAlive() ); + blockingBuffer.addAll( Collections.singleton( obj ) ); + // allow notified thread to complete delay(); // There should not be any threads waiting. - if(thread1.isAlive() || thread2.isAlive()) - fail("Live thread(s) when both should be dead."); + if( thread1.isAlive() || thread2.isAlive() ) { + fail( "Live thread(s) when both should be dead." ); + } } - //----------------------------------------------------------------------- + /** - * Tests {@link BlockingBuffer#remove()} in combination with {@link BlockingBuffer#addAll()} using multiple read threads. - * - * Two read threads should block on an empty buffer until a - * collection with two distinct objects is added then both - * threads should complete. Each thread should have read a - * different object. + * Tests {@link BlockingBuffer#remove()} in combination with {@link BlockingBuffer#addAll(java.util.Collection)} + * using multiple read threads. + * + * Two read threads should block on an empty buffer until a collection with two distinct objects is added then both + * threads should complete. Each thread should have read a different object. */ public void testBlockedRemoveWithAddAll2() { - - Buffer blockingBuffer = BlockingBuffer.decorate(new MyBuffer()); + Buffer blockingBuffer = BlockingBuffer.decorate( new MyBuffer() ); Object obj1 = new Object(); Object obj2 = new Object(); - - Set objs = Collections.synchronizedSet(new HashSet()); - objs.add(obj1); - objs.add(obj2); + Set objs = Collections.synchronizedSet( new HashSet() ); + objs.add( obj1 ); + objs.add( obj2 ); // run methods will remove and compare -- must wait for addAll - Thread thread1 = new ReadThread(blockingBuffer, objs, "remove"); - Thread thread2 = new ReadThread(blockingBuffer, objs, "remove"); + Thread thread1 = new ReadThread( blockingBuffer, objs, "remove" ); + Thread thread2 = new ReadThread( blockingBuffer, objs, "remove" ); thread1.start(); thread2.start(); - + // give hungry read threads ample time to hang delay(); - - blockingBuffer.addAll(objs); - + blockingBuffer.addAll( objs ); + // allow notified threads to complete delay(); - - assertEquals("Both objects were removed", 0, objs.size()); + assertEquals( "Both objects were removed", 0, objs.size() ); // There should not be any threads waiting. - if(thread1.isAlive() || thread2.isAlive()) - fail("Live thread(s) when both should be dead."); + if( thread1.isAlive() || thread2.isAlive() ) { + fail( "Live thread(s) when both should be dead." ); + } } //----------------------------------------------------------------------- + /** - * Tests interrupted remove. + * Tests interrupted remove. */ public void testInterruptedRemove() { - - Buffer blockingBuffer = BlockingBuffer.decorate(new MyBuffer()); + Buffer blockingBuffer = BlockingBuffer.decorate( new MyBuffer() ); Object obj = new Object(); - + // spawn a read thread to wait on the empty buffer ArrayList exceptionList = new ArrayList(); - Thread thread = new ReadThread(blockingBuffer, obj, exceptionList, "remove"); + Thread thread = new ReadThread( blockingBuffer, obj, exceptionList, "remove" ); thread.start(); - + // Interrupting the thread should cause it to throw BufferUnderflowException thread.interrupt(); - + // Chill, so thread can throw and add message to exceptionList delay(); - - assertTrue("Thread interrupt should have led to underflow", - exceptionList.contains("BufferUnderFlow")); - - if (thread.isAlive()) { - fail("Read thread has hung."); + assertTrue( "Thread interrupt should have led to underflow", + exceptionList.contains( "BufferUnderFlow" ) ); + if( thread.isAlive() ) { + fail( "Read thread has hung." ); } - + } public void testTimeoutGet() { - final BlockingBuffer buffer = new BlockingBuffer(new MyBuffer()); + final BlockingBuffer buffer = new BlockingBuffer( new MyBuffer() ); try { buffer.get( 100 ); fail( "Get should have timed out." ); } - catch( BufferUnderflowException e ){ + catch( BufferUnderflowException e ) { } } public void testTimeoutRemove() { - final BlockingBuffer buffer = new BlockingBuffer(new MyBuffer()); + final BlockingBuffer buffer = new BlockingBuffer( new MyBuffer() ); try { buffer.remove( 100 ); fail( "Get should have timed out." ); } - catch( BufferUnderflowException e ){ + catch( BufferUnderflowException e ) { } } + protected static class DelayedAdd extends Thread { Buffer buffer; + Object obj; - DelayedAdd (Buffer buffer, Object obj) { + long delay = 1000; + + public DelayedAdd( Buffer buffer, Object obj, long delay ) { + this.buffer = buffer; + this.obj = obj; + this.delay = delay; + } + + DelayedAdd( Buffer buffer, Object obj ) { super(); this.buffer = buffer; this.obj = obj; } - - public void run() { + public void run() { try { // wait for other thread to block on get() or remove() - Thread.sleep(100); + Thread.sleep( delay ); } - catch (InterruptedException e) {} - - buffer.add(obj); + catch( InterruptedException e ) { + } + buffer.add( obj ); } } - + protected static class DelayedAddAll extends Thread { Buffer buffer; + Object obj; - DelayedAddAll (Buffer buffer, Object obj) { + long delay = 100; + + public DelayedAddAll( Buffer buffer, Object obj, long delay ) { + this.buffer = buffer; + this.obj = obj; + this.delay = delay; + } + + DelayedAddAll( Buffer buffer, Object obj ) { super(); this.buffer = buffer; this.obj = obj; } - - public void run() { + public void run() { try { // wait for other thread to block on get() or remove() - Thread.sleep(100); + Thread.sleep( delay ); } - catch (InterruptedException e) {} - - buffer.addAll(Collections.singleton(obj)); + catch( InterruptedException e ) { + } + buffer.addAll( Collections.singleton( obj ) ); } } - + protected static class ReadThread extends Thread { Buffer buffer; + Object obj; + ArrayList exceptionList = null; + String action = "get"; + Set objs; - - ReadThread (Buffer buffer, Object obj) { + + ReadThread( Buffer buffer, Object obj ) { super(); this.buffer = buffer; this.obj = obj; } - ReadThread (Buffer buffer, Object obj, ArrayList exceptionList) { + ReadThread( Buffer buffer, Object obj, ArrayList exceptionList ) { super(); this.buffer = buffer; this.obj = obj; this.exceptionList = exceptionList; } - - ReadThread (Buffer buffer, Object obj, ArrayList exceptionList, String action) { + + ReadThread( Buffer buffer, Object obj, ArrayList exceptionList, String action ) { super(); this.buffer = buffer; this.obj = obj; this.exceptionList = exceptionList; this.action = action; } - - ReadThread (Buffer buffer, Set objs, String action) { + + ReadThread( Buffer buffer, Set objs, String action ) { super(); this.buffer = buffer; this.objs = objs; this.action = action; } - - public void run() { + + public void run() { try { - if (action == "get") { - assertSame(obj, buffer.get()); - } else { - if (null != obj) - assertSame(obj, buffer.remove()); - else - assertTrue(objs.remove(buffer.remove())); + if( action == "get" ) { + assertSame( obj, buffer.get() ); } - } catch (BufferUnderflowException ex) { - exceptionList.add("BufferUnderFlow"); + else { + if( null != obj ) { + assertSame( obj, buffer.remove() ); + } + else { + assertTrue( objs.remove( buffer.remove() ) ); + } + } + } + catch( BufferUnderflowException ex ) { + exceptionList.add( "BufferUnderFlow" ); } } } - protected static class MyBuffer extends LinkedList implements Buffer { public Object get() { - if(isEmpty()) + if( isEmpty() ) { throw new BufferUnderflowException(); - return get(0); + } + return get( 0 ); } public Object remove() { - if(isEmpty()) + if( isEmpty() ) { throw new BufferUnderflowException(); - return remove(0); + } + return remove( 0 ); } } - private void delay(){ + private void delay() { try { - Thread.sleep(100); - } catch (InterruptedException e) {} + Thread.sleep( 100 ); + } + catch( InterruptedException e ) { + } } public String getCompatibilityVersion() { @@ -510,12 +560,13 @@ public class TestBlockingBuffer extends AbstractTestObject { // public void testCreate() throws Exception { // Buffer buffer = BlockingBuffer.decorate(new UnboundedFifoBuffer()); -// writeExternalFormToDisk((java.io.Serializable) buffer, "D:/dev/collections/data/test/BlockingBuffer.emptyCollection.version3.1.obj"); +// writeExternalFormToDisk((java.io.Serializable) buffer, +// "D:/dev/collections/data/test/BlockingBuffer.emptyCollection.version3.1.obj"); // buffer = BlockingBuffer.decorate(new UnboundedFifoBuffer()); // buffer.add("A"); // buffer.add("B"); // buffer.add("C"); -// writeExternalFormToDisk((java.io.Serializable) buffer, "D:/dev/collections/data/test/BlockingBuffer.fullCollection.version3.1.obj"); +// writeExternalFormToDisk((java.io.Serializable) buffer, +// "D:/dev/collections/data/test/BlockingBuffer.fullCollection.version3.1.obj"); // } - }