diff --git a/src/java/org/apache/commons/collections/buffer/BlockingBuffer.java b/src/java/org/apache/commons/collections/buffer/BlockingBuffer.java index be4fcbab7..e92579f97 100644 --- a/src/java/org/apache/commons/collections/buffer/BlockingBuffer.java +++ b/src/java/org/apache/commons/collections/buffer/BlockingBuffer.java @@ -25,35 +25,35 @@ import org.apache.commons.collections.BufferUnderflowException; /** * Decorates another Buffer to make {@link #get()} and * {@link #remove()} block when the Buffer is empty. - *

+ *

* If either get or remove is called on an empty * Buffer, the calling thread waits for notification that * an add or addAll operation has completed. - *

+ *

* When one or more entries are added to an empty Buffer, * all threads blocked in get or remove are notified. - * There is no guarantee that concurrent blocked get or - * remove requests will be "unblocked" and receive data in the + * There is no guarantee that concurrent blocked get or + * remove requests will be "unblocked" and receive data in the * order that they arrive. - *

+ *

* This class is Serializable from Commons Collections 3.1. * - * @since Commons Collections 3.0 - * @version $Revision$ $Date$ - * * @author Stephen Colebourne * @author Janek Bogucki * @author Phil Steitz + * @version $Revision$ $Date$ + * @since Commons Collections 3.0 */ public class BlockingBuffer extends SynchronizedBuffer { - - /** Serialization version */ + /** + * Serialization version + */ private static final long serialVersionUID = 1719328905017860541L; /** * Factory method to create a blocking buffer. - * - * @param buffer the buffer to decorate, must not be null + * + * @param buffer the buffer to decorate, must not be null * @return a new blocking Buffer * @throws IllegalArgumentException if buffer is null */ @@ -64,8 +64,8 @@ public class BlockingBuffer extends SynchronizedBuffer { //----------------------------------------------------------------------- /** * Constructor that wraps (not copies). - * - * @param buffer the buffer to decorate, must not be null + * + * @param buffer the buffer to decorate, must not be null * @throws IllegalArgumentException if the buffer is null */ protected BlockingBuffer(Buffer buffer) { @@ -74,7 +74,7 @@ public class BlockingBuffer extends SynchronizedBuffer { //----------------------------------------------------------------------- public boolean add(Object o) { - synchronized (lock) { + synchronized(lock) { boolean result = collection.add(o); notifyAll(); return result; @@ -82,7 +82,7 @@ public class BlockingBuffer extends SynchronizedBuffer { } public boolean addAll(Collection c) { - synchronized (lock) { + synchronized(lock) { boolean result = collection.addAll(c); notifyAll(); return result; @@ -90,11 +90,12 @@ public class BlockingBuffer extends SynchronizedBuffer { } public Object get() { - synchronized (lock) { - while (collection.isEmpty()) { + synchronized(lock) { + while(collection.isEmpty()) { try { wait(); - } catch (InterruptedException e) { + } + catch(InterruptedException e) { PrintWriter out = new PrintWriter(new StringWriter()); e.printStackTrace(out); throw new BufferUnderflowException("Caused by InterruptedException: " + out.toString()); @@ -104,12 +105,35 @@ public class BlockingBuffer extends SynchronizedBuffer { } } + public Object get(final long timeout) { + synchronized(lock) { + final long expiration = System.currentTimeMillis() + timeout; + long timeLeft = expiration - System.currentTimeMillis(); + while(timeLeft > 0 && collection.isEmpty()) { + try { + wait(timeLeft); + timeLeft = expiration - System.currentTimeMillis(); + } + catch(InterruptedException e) { + PrintWriter out = new PrintWriter(new StringWriter()); + e.printStackTrace(out); + throw new BufferUnderflowException("Caused by InterruptedException: " + out.toString()); + } + } + if(collection.isEmpty()) { + throw new BufferUnderflowException("Timeout expired."); + } + return getBuffer().get(); + } + } + public Object remove() { - synchronized (lock) { - while (collection.isEmpty()) { + synchronized(lock) { + while(collection.isEmpty()) { try { wait(); - } catch (InterruptedException e) { + } + catch(InterruptedException e) { PrintWriter out = new PrintWriter(new StringWriter()); e.printStackTrace(out); throw new BufferUnderflowException("Caused by InterruptedException: " + out.toString()); @@ -119,4 +143,25 @@ public class BlockingBuffer extends SynchronizedBuffer { } } + public Object remove(final long timeout) { + synchronized(lock) { + final long expiration = System.currentTimeMillis() + timeout; + long timeLeft = expiration - System.currentTimeMillis(); + while(timeLeft > 0 && collection.isEmpty()) { + try { + wait(timeLeft); + timeLeft = expiration - System.currentTimeMillis(); + } + catch(InterruptedException e) { + PrintWriter out = new PrintWriter(new StringWriter()); + e.printStackTrace(out); + throw new BufferUnderflowException("Caused by InterruptedException: " + out.toString()); + } + } + if(collection.isEmpty()) { + throw new BufferUnderflowException("Timeout expired."); + } + return getBuffer().remove(); + } + } } diff --git a/src/test/org/apache/commons/collections/buffer/TestBlockingBuffer.java b/src/test/org/apache/commons/collections/buffer/TestBlockingBuffer.java index 484667b52..c51b24deb 100644 --- a/src/test/org/apache/commons/collections/buffer/TestBlockingBuffer.java +++ b/src/test/org/apache/commons/collections/buffer/TestBlockingBuffer.java @@ -364,7 +364,26 @@ public class TestBlockingBuffer extends AbstractTestObject { } } - + + public void testTimeoutGet() { + final BlockingBuffer buffer = new BlockingBuffer(new MyBuffer()); + try { + buffer.get( 100 ); + fail( "Get should have timed out." ); + } + catch( BufferUnderflowException e ){ + } + } + + public void testTimeoutRemove() { + final BlockingBuffer buffer = new BlockingBuffer(new MyBuffer()); + try { + buffer.remove( 100 ); + fail( "Get should have timed out." ); + } + catch( BufferUnderflowException e ){ + } + } protected static class DelayedAdd extends Thread { Buffer buffer;