37473: Implement a BoundedBuffer class

git-svn-id: https://svn.apache.org/repos/asf/jakarta/commons/proper/collections/trunk@348429 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James W. Carman 2005-11-23 13:16:43 +00:00
parent 050c817ad5
commit 420a1017eb
6 changed files with 371 additions and 0 deletions

View File

@ -58,6 +58,7 @@ If this causes major headaches to anyone please contact commons-dev at jakarta.a
<li>DefaultedMap - Returns a default value when the key is not found, without adding the default value to the map itself [30911]</li>
<li>GrowthList - Decorator that causes set and indexed add to expand the list rather than throw IndexOutOfBoundsException [34171]</li>
<li>LoopingListIterator - When the end of the list is reached the iteration continues from the start [30166]</li>
<li>BoundedBuffer - A new wrapper class which can make any buffer bounded [37473]</li>
</ul>
<center><h3>ENHANCEMENTS</h3></center>

Binary file not shown.

Binary file not shown.

View File

@ -21,6 +21,7 @@ import org.apache.commons.collections.buffer.SynchronizedBuffer;
import org.apache.commons.collections.buffer.TransformedBuffer;
import org.apache.commons.collections.buffer.TypedBuffer;
import org.apache.commons.collections.buffer.UnmodifiableBuffer;
import org.apache.commons.collections.buffer.BoundedBuffer;
/**
* Provides utility methods and decorators for {@link Buffer} instances.
@ -102,6 +103,33 @@ public class BufferUtils {
return BlockingBuffer.decorate(buffer, timeout);
}
/**
* Returns a synchronized buffer backed by the given buffer that will block on {@link Buffer#add(Object)} and
* {@link Buffer#addAll(java.util.Collection)} until enough object(s) are removed from the buffer to allow
* the object(s) to be added and still maintain the maximum size.
* @param buffer the buffer to make bounded
* @param maximumSize the maximum size
* @return a bounded buffer backed by the given buffer
* @throws IllegalArgumentException if the given buffer is null
*/
public static Buffer boundedBuffer( Buffer buffer, int maximumSize ) {
return BoundedBuffer.decorate( buffer, maximumSize );
}
/**
* Returns a synchronized buffer backed by the given buffer that will block on {@link Buffer#add(Object)} and
* {@link Buffer#addAll(java.util.Collection)} until enough object(s) are removed from the buffer to allow
* the object(s) to be added and still maintain the maximum size or the timeout expires.
* @param buffer the buffer to make bounded
* @param maximumSize the maximum size
* @param timeout the maximum time to wait
* @return a bounded buffer backed by the given buffer
* @throws IllegalArgumentException if the given buffer is null
*/
public static Buffer boundedBuffer( Buffer buffer, int maximumSize, long timeout ) {
return BoundedBuffer.decorate( buffer, maximumSize, timeout );
}
/**
* Returns an unmodifiable buffer backed by the given buffer.
*

View File

@ -0,0 +1,161 @@
/*
* Copyright 2001-2005 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.commons.collections.buffer;
import org.apache.commons.collections.Buffer;
import org.apache.commons.collections.BufferOverflowException;
import org.apache.commons.collections.BufferUnderflowException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collection;
import java.util.Iterator;
/**
* A wrapper class for buffers which makes them bounded.
* @author James Carman
* @since 3.2
*/
public class BoundedBuffer extends SynchronizedBuffer {
private static final long serialVersionUID = 1536432911093974264L;
private final int maximumSize;
private final long timeout;
/**
* Factory method to create a bounded buffer.
* @param buffer the buffer to decorate, must not be null
* @param maximumSize the maximum size
* @return a new bounded buffer
* @throws IllegalArgumentException if the buffer is null
*/
public static Buffer decorate( Buffer buffer, int maximumSize ) {
return new BoundedBuffer( buffer, maximumSize );
}
/**
* Factory method to create a bounded buffer that blocks for a maximum
* amount of time.
* @param buffer the buffer to decorate, must not be null
* @param maximumSize the maximum size
* @param timeout the maximum amount of time to wait.
* @return a new bounded buffer
* @throws IllegalArgumentException if the buffer is null
*/
public static Buffer decorate( Buffer buffer, int maximumSize, long timeout ) {
return new BoundedBuffer( buffer, maximumSize, timeout );
}
/**
* Constructor that wraps (not copies) another buffer, making it bounded.
* @param buffer the buffer to wrap, must not be null
* @param maximumSize the maximum size of the buffer
* @throws IllegalArgumentException if the buffer is null
*/
protected BoundedBuffer( Buffer buffer, int maximumSize ) {
this( buffer, maximumSize, -1 );
}
/**
* Constructor that wraps (not copies) another buffer, making it bounded waiting only up to
* a maximum amount of time.
* @param buffer the buffer to wrap, must not be null
* @param maximumSize the maximum size of the buffer
* @param timeout the maximum amount of time to wait
* @throws IllegalArgumentException if the buffer is null
*/
protected BoundedBuffer( Buffer buffer, int maximumSize, long timeout ) {
super( buffer );
this.maximumSize = maximumSize;
this.timeout = timeout;
}
public Object remove() {
synchronized( lock ) {
Object returnValue = getBuffer().remove();
lock.notifyAll();
return returnValue;
}
}
public boolean add( Object o ) {
synchronized( lock ) {
timeoutWait( 1 );
return getBuffer().add( o );
}
}
public boolean addAll( final Collection c ) {
synchronized( lock ) {
timeoutWait( c.size() );
return getBuffer().addAll( c );
}
}
public Iterator iterator() {
return new NotifyingIterator( collection.iterator() );
}
private void timeoutWait( final int nAdditions ) {
synchronized( lock ) {
if( timeout < 0 && getBuffer().size() + nAdditions > maximumSize ) {
throw new BufferOverflowException( "Buffer size cannot exceed " + maximumSize + "." );
}
final long expiration = System.currentTimeMillis() + timeout;
long timeLeft = expiration - System.currentTimeMillis();
while( timeLeft > 0 && getBuffer().size() + nAdditions > maximumSize ) {
try {
lock.wait( timeLeft );
timeLeft = expiration - System.currentTimeMillis();
}
catch( InterruptedException e ) {
PrintWriter out = new PrintWriter( new StringWriter() );
e.printStackTrace( out );
throw new BufferUnderflowException( "Caused by InterruptedException: " + out.toString() );
}
}
if( getBuffer().size() + nAdditions > maximumSize ) {
throw new BufferOverflowException( "Timeout expired." );
}
}
}
private class NotifyingIterator implements Iterator {
private final Iterator i;
public NotifyingIterator( Iterator i ) {
this.i = i;
}
public void remove() {
synchronized( lock ) {
i.remove();
lock.notifyAll();
}
}
public boolean hasNext() {
return i.hasNext();
}
public Object next() {
return i.next();
}
}
}

View File

@ -0,0 +1,181 @@
/*
* Copyright 2001-2005 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.commons.collections.buffer;
import org.apache.commons.collections.AbstractTestObject;
import org.apache.commons.collections.Buffer;
import org.apache.commons.collections.BufferOverflowException;
import java.util.Iterator;
import java.util.Collections;
import java.util.Arrays;
public class TestBoundedBuffer extends AbstractTestObject {
public TestBoundedBuffer( String testName ) {
super( testName );
}
public String getCompatibilityVersion() {
return "3.2";
}
public boolean isEqualsCheckable() {
return false;
}
public Object makeObject() {
return BoundedBuffer.decorate( new UnboundedFifoBuffer(), 1 );
}
public void testAddToFullBufferNoTimeout() {
final Buffer bounded = BoundedBuffer.decorate( new UnboundedFifoBuffer(), 1 );
bounded.add( "Hello" );
try {
bounded.add( "World" );
fail();
}
catch( BufferOverflowException e ) {
}
}
public void testAddAllToFullBufferNoTimeout() {
final Buffer bounded = BoundedBuffer.decorate( new UnboundedFifoBuffer(), 1 );
bounded.add( "Hello" );
try {
bounded.addAll( Collections.singleton( "World" ) );
fail();
}
catch( BufferOverflowException e ) {
}
}
public void testAddToFullBufferRemoveViaIterator() {
final Buffer bounded = BoundedBuffer.decorate( new UnboundedFifoBuffer(), 1, 500 );
bounded.add( "Hello" );
new DelayedIteratorRemove( bounded, 200 ).start();
bounded.add( "World" );
assertEquals( 1, bounded.size() );
assertEquals( "World", bounded.get() );
}
public void testAddAllToFullBufferRemoveViaIterator() {
final Buffer bounded = BoundedBuffer.decorate( new UnboundedFifoBuffer(), 2, 500 );
bounded.add( "Hello" );
bounded.add( "World" );
new DelayedIteratorRemove( bounded, 200, 2 ).start();
bounded.addAll( Arrays.asList( new String[] { "Foo", "Bar" } ) );
assertEquals( 2, bounded.size() );
assertEquals( "Foo", bounded.remove() );
assertEquals( "Bar", bounded.remove() );
}
public void testAddToFullBufferWithTimeout() {
final Buffer bounded = BoundedBuffer.decorate( new UnboundedFifoBuffer(), 1, 500 );
bounded.add( "Hello" );
new DelayedRemove( bounded, 200 ).start();
bounded.add( "World" );
assertEquals( 1, bounded.size() );
assertEquals( "World", bounded.get() );
try {
bounded.add( "!" );
fail();
}
catch( BufferOverflowException e ) {
}
}
public void testAddAllToFullBufferWithTimeout() {
final Buffer bounded = BoundedBuffer.decorate( new UnboundedFifoBuffer(), 2, 500 );
bounded.add( "Hello" );
bounded.add( "World" );
new DelayedRemove( bounded, 200, 2 ).start();
bounded.addAll( Arrays.asList( new String[] { "Foo", "Bar" } ) );
assertEquals( 2, bounded.size() );
assertEquals( "Foo", bounded.get() );
try {
bounded.add( "!" );
fail();
}
catch( BufferOverflowException e ) {
}
}
private class DelayedIteratorRemove extends Thread {
private final Buffer buffer;
private final long delay;
private final int nToRemove;
public DelayedIteratorRemove( Buffer buffer, long delay, int nToRemove ) {
this.buffer = buffer;
this.delay = delay;
this.nToRemove = nToRemove;
}
public DelayedIteratorRemove( Buffer buffer, long delay ) {
this( buffer, delay, 1 );
}
public void run() {
try {
Thread.sleep( delay );
Iterator iter = buffer.iterator();
for( int i = 0; i < nToRemove; ++i ) {
iter.next();
iter.remove();
}
}
catch( InterruptedException e ) {
}
}
}
private class DelayedRemove extends Thread {
private final Buffer buffer;
private final long delay;
private final int nToRemove;
public DelayedRemove( Buffer buffer, long delay, int nToRemove ) {
this.buffer = buffer;
this.delay = delay;
this.nToRemove = nToRemove;
}
public DelayedRemove( Buffer buffer, long delay ) {
this( buffer, delay, 1 );
}
public void run() {
try {
Thread.sleep( delay );
for( int i = 0; i < nToRemove; ++i ) {
buffer.remove();
}
}
catch( InterruptedException e ) {
}
}
}
}