Bug 37472: Implement a TimeoutBuffer class

git-svn-id: https://svn.apache.org/repos/asf/jakarta/commons/proper/collections/trunk@332643 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James W. Carman 2005-11-11 20:39:13 +00:00
parent 5abcddb1d5
commit 8e3968cd11
6 changed files with 365 additions and 3 deletions

Binary file not shown.

Binary file not shown.

View File

@ -21,6 +21,7 @@ import org.apache.commons.collections.buffer.SynchronizedBuffer;
import org.apache.commons.collections.buffer.TransformedBuffer;
import org.apache.commons.collections.buffer.TypedBuffer;
import org.apache.commons.collections.buffer.UnmodifiableBuffer;
import org.apache.commons.collections.buffer.TimeoutBuffer;
/**
* Provides utility methods and decorators for {@link Buffer} instances.
@ -85,6 +86,22 @@ public class BufferUtils {
return BlockingBuffer.decorate(buffer);
}
/**
* Returns a synchronized buffer backed by the given buffer that will
* block on {@link Buffer#get()} and {@link Buffer#remove()} operations until
* <code>timeout</code> expires. If the buffer is empty, then the
* {@link Buffer#get()} and {@link Buffer#remove()} operations will block
* until new elements are added to the buffer, rather than immediately throwing a
* <code>BufferUnderflowException</code>.
*
* @param buffer the buffer to synchronize, must not be null
* @return a blocking buffer backed by that buffer
* @throws IllegalArgumentException if the Buffer is null
*/
public static Buffer timeoutBuffer(Buffer buffer, long timeout) {
return TimeoutBuffer.decorate(buffer, timeout);
}
/**
* Returns an unmodifiable buffer backed by the given buffer.
*

View File

@ -0,0 +1,103 @@
/*
* Copyright 2001-2005 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.commons.collections.buffer;
import org.apache.commons.collections.Buffer;
/**
* Decorates another <code>Buffer</code> to make {@link #get()} and
* {@link #remove()} block (until timeout expires) when the <code>Buffer</code> is empty.
* <p>
* If either <code>get</code> or <code>remove</code> is called on an empty
* <code>Buffer</code>, the calling thread waits (until timeout expires) for notification that
* an <code>add</code> or <code>addAll</code> operation has completed.
* <p>
* When one or more entries are added to an empty <code>Buffer</code>,
* all threads blocked in <code>get</code> or <code>remove</code> are notified.
* There is no guarantee that concurrent blocked <code>get</code> or
* <code>remove</code> requests will be "unblocked" and receive data in the
* order that they arrive.
* <p>
* This class is Serializable from Commons Collections 3.2.
*
* @author James Carman
* @version $Revision$ $Date$
* @since Commons Collections 3.2
*/
public class TimeoutBuffer extends BlockingBuffer {
//----------------------------------------------------------------------------------------------------------------------
// Fields
//----------------------------------------------------------------------------------------------------------------------
private static final long serialVersionUID = 1719328905017860541L;
private final long timeout;
//----------------------------------------------------------------------------------------------------------------------
// Static Methods
//----------------------------------------------------------------------------------------------------------------------
public static Buffer decorate( Buffer buffer, long timeout ) {
return new TimeoutBuffer( buffer, timeout );
}
//----------------------------------------------------------------------------------------------------------------------
// Constructors
//----------------------------------------------------------------------------------------------------------------------
public TimeoutBuffer( Buffer buffer, long timeout ) {
super( buffer );
this.timeout = timeout;
}
public long getTimeout() {
return timeout;
}
//----------------------------------------------------------------------------------------------------------------------
// Buffer Implementation
//----------------------------------------------------------------------------------------------------------------------
public Object get() {
return get( timeout );
}
public Object remove() {
return remove( timeout );
}
public boolean equals( Object o ) {
if( this == o ) {
return true;
}
if( o == null || getClass() != o.getClass() ) {
return false;
}
if( !super.equals( o ) ) {
return false;
}
final TimeoutBuffer that = ( TimeoutBuffer ) o;
if( timeout != that.timeout ) {
return false;
}
return true;
}
public int hashCode() {
int result = super.hashCode();
result = 29 * result + ( int ) ( timeout ^ ( timeout >>> 32 ) );
return result;
}
}

View File

@ -52,7 +52,7 @@ public class TestAll extends TestCase {
suite.addTest(TestSynchronizedBuffer.suite());
suite.addTest(TestTransformedBuffer.suite());
suite.addTest(TestUnmodifiableBuffer.suite());
suite.addTest(TestTimeoutBuffer.suite());
return suite;
}

View File

@ -0,0 +1,242 @@
/*
* Copyright 2001-2005 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.commons.collections.buffer;
import junit.framework.Test;
import junit.framework.TestSuite;
import org.apache.commons.collections.AbstractTestObject;
import org.apache.commons.collections.ArrayStack;
import org.apache.commons.collections.Buffer;
import org.apache.commons.collections.BufferUnderflowException;
import org.apache.commons.collections.BufferUtils;
/**
* @author James Carman
* @version 1.0
*/
public class TestTimeoutBuffer extends AbstractTestObject {
//----------------------------------------------------------------------------------------------------------------------
// Fields
//----------------------------------------------------------------------------------------------------------------------
private static final int FULL_SIZE = 100;
private static final int TIMEOUT = 100;
//----------------------------------------------------------------------------------------------------------------------
// Static Methods
//----------------------------------------------------------------------------------------------------------------------
public static Test suite() {
return new TestSuite( TestTimeoutBuffer.class );
}
//----------------------------------------------------------------------------------------------------------------------
// Constructors
//----------------------------------------------------------------------------------------------------------------------
public TestTimeoutBuffer( String testName ) {
super( testName );
}
//----------------------------------------------------------------------------------------------------------------------
// Other Methods
//----------------------------------------------------------------------------------------------------------------------
public String getCompatibilityVersion() {
return "3.2";
}
public boolean isEqualsCheckable() {
return false;
}
public Object makeObject() {
return BufferUtils.timeoutBuffer( new ArrayStack(), TIMEOUT );
}
public void testEmptySerialization() {
try {
final TimeoutBuffer b = ( TimeoutBuffer ) readExternalFormFromDisk(
getCanonicalEmptyCollectionName( makeObject() ) );
assertTrue( b.isEmpty() );
}
catch( Exception e ) {
fail( "Could not read object from disk." );
}
}
public void testFullSerialization() {
try {
final TimeoutBuffer b = ( TimeoutBuffer ) readExternalFormFromDisk(
getCanonicalFullCollectionName( makeObject() ) );
assertEquals( FULL_SIZE, b.size() );
}
catch( Exception e ) {
fail( "Could not read object from disk." );
}
}
public void testSuccessfulWaitOnGet() {
Buffer b = ( Buffer ) makeObject();
executeAsynchronously( new Getter( b ) );
executeAsynchronously( new Adder( b, "Hello" ) );
}
private static void executeAsynchronously( Runnable r ) {
new Thread( r ).start();
}
public void testSuccessfulWaitOnRemove() {
Buffer b = ( Buffer ) makeObject();
executeAsynchronously( new Remover( b ) );
executeAsynchronously( new Adder( b, "Hello" ) );
}
public void testTimeoutOnGet() {
final Buffer buffer = makeBuffer();
try {
Getter remover = new Getter( buffer );
executeAsynchronously( remover );
executeAsynchronously( new Adder( buffer, "Howdy" ), TIMEOUT * 2 );
assertFalse( remover.isSuccesful() );
}
catch( BufferUnderflowException e ) {
}
}
private TimeoutBuffer makeBuffer() {
return ( TimeoutBuffer ) makeObject();
}
private static void executeAsynchronously( Runnable r, long delay ) {
new Thread( new DelayedRunnable( r, delay ) ).start();
}
public void testTimeoutOnRemove() {
final Buffer buffer = makeBuffer();
try {
Remover remover = new Remover( buffer );
executeAsynchronously( remover );
executeAsynchronously( new Adder( buffer, "Howdy" ), TIMEOUT * 2 );
assertFalse( remover.isSuccesful() );
}
catch( BufferUnderflowException e ) {
}
}
//----------------------------------------------------------------------------------------------------------------------
// Inner Classes
//----------------------------------------------------------------------------------------------------------------------
private static class DelayedRunnable implements Runnable {
private final Runnable r;
private final long delay;
public DelayedRunnable( Runnable r, long delay ) {
this.r = r;
this.delay = delay;
}
public void run() {
try {
Thread.sleep( delay );
}
catch( InterruptedException e ) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
r.run();
}
}
private static class Adder implements Runnable {
private final Buffer b;
private final Object o;
public Adder( Buffer b, Object o ) {
this.b = b;
this.o = o;
}
public void run() {
b.add( o );
}
}
private static class Remover extends BufferReader {
public Remover( Buffer b ) {
super( b );
}
protected void performOperation() {
b.remove();
}
}
private static abstract class BufferReader implements Runnable {
protected final Buffer b;
private Boolean succesful;
protected BufferReader( Buffer b ) {
this.b = b;
}
protected abstract void performOperation();
public final synchronized void run() {
try {
performOperation();
succesful = Boolean.TRUE;
}
catch( BufferUnderflowException e ) {
succesful = Boolean.FALSE;
}
notifyAll();
}
public synchronized boolean isSuccesful() {
while( succesful == null ) {
try {
wait();
}
catch( InterruptedException e ) {
}
}
return succesful.booleanValue();
}
}
private static class Getter extends BufferReader {
public Getter( Buffer b ) {
super( b );
}
protected void performOperation() {
b.get();
}
}
//----------------------------------------------------------------------------------------------------------------------
// main() method
//----------------------------------------------------------------------------------------------------------------------
public static void main( String args[] ) {
String[] testCaseName = {TestTimeoutBuffer.class.getName()};
junit.textui.TestRunner.main( testCaseName );
}
}