Merge branch 'jetty-9' into jetty-9-oneconnector

Conflicts:
	jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java
	jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java
	jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java
	jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java
	jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java
This commit is contained in:
Greg Wilkins 2012-08-03 12:39:21 +10:00
commit 29833f000b
12 changed files with 813 additions and 215 deletions

View File

@ -21,6 +21,16 @@
<artifactId>jetty-test-helper</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>

View File

@ -27,6 +27,8 @@ import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/* ------------------------------------------------------------ */
@ -35,6 +37,7 @@ import org.eclipse.jetty.util.StringUtil;
*/
public class ByteArrayEndPoint extends AbstractEndPoint
{
static final Logger LOG = Log.getLogger(ByteArrayEndPoint.class);
public final static InetSocketAddress NOIP=new InetSocketAddress(0);
private final AtomicReference<Future<?>> _timeout = new AtomicReference<>();
@ -354,7 +357,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
public void reset()
{
_readInterest.close();
_writeFlusher.close();
_writeFlusher.onClose();
_ishut=false;
_oshut=false;
_closed=false;
@ -423,19 +426,21 @@ public class ByteArrayEndPoint extends AbstractEndPoint
long idleElapsed = System.currentTimeMillis() - idleTimestamp;
long idleLeft = idleTimeout - idleElapsed;
if (isOutputShutdown() || _readInterest.isInterested() || _writeFlusher.isWriting())
if (isOutputShutdown() || _readInterest.isInterested() || _writeFlusher.isInProgress())
{
if (idleTimestamp != 0 && idleTimeout > 0)
{
if (idleLeft < 0)
if (idleLeft <= 0)
{
if (isOutputShutdown())
close();
notIdle();
LOG.debug("{} idle timeout expired", this);
TimeoutException timeout = new TimeoutException("Idle timeout expired: " + idleElapsed + "/" + idleTimeout + " ms");
_readInterest.failed(timeout);
_writeFlusher.failed(timeout);
_writeFlusher.onFail(timeout);
if (isOutputShutdown())
close();
notIdle();
}
}
}

View File

@ -194,7 +194,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
LOG.debug("{} idle timeout check, elapsed: {} ms, remaining: {} ms", this, idleElapsed, idleLeft);
if (isOutputShutdown() || _readInterest.isInterested() || _writeFlusher.isWriting())
if (isOutputShutdown() || _readInterest.isInterested() || _writeFlusher.isInProgress())
{
if (idleTimestamp != 0 && idleTimeout > 0)
{
@ -202,13 +202,13 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
{
LOG.debug("{} idle timeout expired", this);
TimeoutException timeout = new TimeoutException("Idle timeout expired: " + idleElapsed + "/" + idleTimeout + " ms");
_readInterest.failed(timeout);
_writeFlusher.onFail(timeout);
if (isOutputShutdown())
close();
notIdle();
TimeoutException timeout = new TimeoutException("Idle timeout expired: " + idleElapsed + "/" + idleTimeout + " ms");
_readInterest.failed(timeout);
_writeFlusher.failed(timeout);
}
}
}
@ -268,7 +268,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
public void onClose()
{
super.onClose();
_writeFlusher.close();
_writeFlusher.onClose();
_readInterest.close();
}

View File

@ -1,212 +1,421 @@
// ========================================================================
// Copyright (c) 2012-2012 Mort Bay Consulting Pty. Ltd.
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.io;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.WritePendingException;
import java.util.ConcurrentModificationException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/* ------------------------------------------------------------ */
/**
* A Utility class to help implement {@link EndPoint#write(Object, Callback, ByteBuffer...)}
* by calling {@link EndPoint#flush(ByteBuffer...)} until all content is written.
* The abstract method {@link #onIncompleteFlushed()} is called when not all content has been
* written after a call to flush and should organise for the {@link #completeWrite()}
* method to be called when a subsequent call to flush should be able to make more progress.
*
* TODO remove synchronisation
* A Utility class to help implement {@link EndPoint#write(Object, Callback, ByteBuffer...)} by calling
* {@link EndPoint#flush(ByteBuffer...)} until all content is written.
* The abstract method {@link #onIncompleteFlushed()} is called when not all content has been written after a call to
* flush and should organise for the {@link #completeWrite()} method to be called when a subsequent call to flush
* should be able to make more progress.
* <p>
*/
abstract public class WriteFlusher
{
private final static ByteBuffer[] NO_BUFFERS= new ByteBuffer[0];
private final AtomicBoolean _writing = new AtomicBoolean(false);
private final EndPoint _endp;
private static final Logger LOG = Log.getLogger(WriteFlusher.class);
private static final EnumMap<StateType, Set<StateType>> __stateTransitions = new EnumMap<>(StateType.class);
private static final State __IDLE = new IdleState();
private static final State __WRITING = new WritingState();
private static final State __COMPLETING = new CompletingState();
private final EndPoint _endPoint;
private final AtomicReference<State> _state = new AtomicReference<>();
private ByteBuffer[] _buffers;
private Object _context;
private Callback<Object> _callback;
protected WriteFlusher(EndPoint endp)
static
{
_endp=endp;
// fill the state machine
__stateTransitions.put(StateType.IDLE, EnumSet.of(StateType.WRITING));
__stateTransitions.put(StateType.WRITING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
__stateTransitions.put(StateType.PENDING, EnumSet.of(StateType.COMPLETING,StateType.IDLE));
__stateTransitions.put(StateType.COMPLETING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
__stateTransitions.put(StateType.FAILED, EnumSet.of(StateType.IDLE));
}
/* ------------------------------------------------------------ */
public synchronized <C> void write(C context, Callback<C> callback, ByteBuffer... buffers)
// A write operation may either complete immediately:
// IDLE-->WRITING-->IDLE
// Or it may not completely flush and go via the PENDING state
// IDLE-->WRITING-->PENDING-->COMPLETING-->IDLE
// Or it may take several cycles to complete
// IDLE-->WRITING-->PENDING-->COMPLETING-->PENDING-->COMPLETING-->IDLE
//
// If a failure happens while in IDLE, it is a noop since there is no operation to tell of the failure.
// If a failure happens in PENDING state, then the fail method calls the pending callback and moves to IDLE state
// Otherwise if a fail happens, the state is set to FAIL, so that a subsequent attempt to move out of WRITING or COMPLETING
// will discover the failure and call the callbacks before returning to IDLE
// Thus the possible paths for a failure are:
//
// IDLE--(fail)-->IDLE
// IDLE-->WRITING--(fail)-->FAILED-->IDLE
// IDLE-->WRITING-->PENDING--(fail)-->IDLE
// IDLE-->WRITING-->PENDING-->COMPLETING--(fail)-->FAILED-->IDLE
//
protected WriteFlusher(EndPoint endPoint)
{
_state.set(__IDLE);
_endPoint = endPoint;
}
private enum StateType
{
IDLE,
WRITING,
PENDING,
COMPLETING,
FAILED
}
/**
* Tries to update the current state to the given new state.
* @param nextState the desired new state
* @return the previous state or null if the state transition failed
* @throws WritePendingException if currentState is WRITING and new state is WRITING (api usage error)
*/
private boolean updateState(State previous,State next)
{
if (!isTransitionAllowed(previous,next))
throw new IllegalStateException();
return _state.compareAndSet(previous,next);
}
private void pendingFail(PendingState<?> pending)
{
State current = _state.get();
if (current.getType()==StateType.FAILED)
{
FailedState failed=(FailedState)current;
if (updateState(failed,__IDLE))
{
pending.fail(failed.getCause());
return;
}
}
throw new IllegalStateException();
}
private boolean isTransitionAllowed(State currentState, State newState)
{
Set<StateType> allowedNewStateTypes = __stateTransitions.get(currentState.getType());
if (currentState.getType() == StateType.WRITING && newState.getType() == StateType.WRITING)
{
if (callback==null)
throw new IllegalArgumentException();
if (!_writing.compareAndSet(false,true))
throw new WritePendingException();
}
if (!allowedNewStateTypes.contains(newState.getType()))
{
LOG.debug("StateType update: {} -> {} not allowed", currentState, newState);
return false;
}
return true;
}
/**
* State represents a State of WriteFlusher.
*/
private static class State
{
private final StateType _type;
private State(StateType stateType)
{
_type = stateType;
}
public StateType getType()
{
return _type;
}
@Override
public String toString()
{
return String.format("%s", _type);
}
}
/**
* In IdleState WriteFlusher is idle and accepts new writes
*/
private static class IdleState extends State
{
private IdleState()
{
super(StateType.IDLE);
}
}
/**
* In WritingState WriteFlusher is currently writing.
*/
private static class WritingState extends State
{
private WritingState()
{
super(StateType.WRITING);
}
}
/**
* In FailedState no more operations are allowed. The current implementation will never recover from this state.
*/
private static class FailedState extends State
{
private final Throwable _cause;
private FailedState(Throwable cause)
{
super(StateType.FAILED);
_cause=cause;
}
public Throwable getCause()
{
return _cause;
}
}
/**
* In CompletingState WriteFlusher is flushing buffers that have not been fully written in write(). If write()
* didn't flush all buffers in one go, it'll switch the State to PendingState. completeWrite() will then switch to
* this state and try to flush the remaining buffers.
*/
private static class CompletingState extends State
{
private CompletingState()
{
super(StateType.COMPLETING);
}
}
/**
* In PendingState not all buffers could be written in one go. Then write() will switch to PendingState() and
* preserve the state by creating a new PendingState object with the given parameters.
*
* @param <C>
*/
private class PendingState<C> extends State
{
private final C _context;
private final Callback<C> _callback;
private ByteBuffer[] _buffers;
private PendingState(ByteBuffer[] buffers, C context, Callback<C> callback)
{
super(StateType.PENDING);
_buffers = buffers;
_context = context;
_callback = callback;
}
public ByteBuffer[] getBuffers()
{
return _buffers;
}
protected void fail(Throwable cause)
{
_callback.failed(_context, cause);
}
protected void complete()
{
_callback.completed(_context);
}
}
/**
* Abstract call to be implemented by specific WriteFlushers. It should schedule a call to {@link #completeWrite()}
* or {@link #onFail(Throwable)} when appropriate.
*/
abstract protected void onIncompleteFlushed();
/**
* Tries to switch state to WRITING. If successful it writes the given buffers to the EndPoint. If state transition
* fails it'll fail the callback.
*
* If not all buffers can be written in one go it creates a new {@link PendingState} object to preserve the state
* and then calls {@link #onIncompleteFlushed()}. The remaining buffers will be written in {@link #completeWrite()}.
*
* If all buffers have been written it calls callback.complete().
*
* @param context context to pass to the callback
* @param callback the callback to call on either failed or complete
* @param buffers the buffers to flush to the endpoint
* @param <C> type of the context
*/
public <C> void write(C context, Callback<C> callback, ByteBuffer... buffers) throws WritePendingException
{
if (callback == null)
throw new IllegalArgumentException();
LOG.debug("write: {}", this);
if (!updateState(__IDLE,__WRITING))
throw new WritePendingException();
try
{
_endp.flush(buffers);
_endPoint.flush(buffers);
// Are we complete?
for (ByteBuffer b : buffers)
{
if (b.hasRemaining())
{
_buffers=buffers;
_context=context;
_callback=(Callback<Object>)callback;
_writing.set(true); // Needed as memory barrier
PendingState<?> pending=new PendingState<>(buffers, context, callback);
if (updateState(__WRITING,pending))
onIncompleteFlushed();
return;
}
}
if (!_writing.compareAndSet(true,false))
throw new ConcurrentModificationException();
// If updateState didn't succeed, we don't care as our buffers have been written
if (updateState(__WRITING,__IDLE))
callback.completed(context);
else
pendingFail(new PendingState<>(buffers, context, callback));
}
catch (IOException e)
{
if (!_writing.compareAndSet(true,false))
throw new ConcurrentModificationException(e);
callback.failed(context,e);
if (updateState(__WRITING,__IDLE))
callback.failed(context, e);
else
pendingFail(new PendingState<>(buffers, context, callback));
}
}
/* ------------------------------------------------------------ */
/**
* Abstract call to be implemented by specific WriteFlushers.
* It should schedule a call to {@link #completeWrite()} or
* {@link #failed(Throwable)} when appropriate.
* @return true if a flush can proceed.
* Complete a write that has not completed and that called {@link #onIncompleteFlushed()} to request a call to this
* method when a call to {@link EndPoint#flush(ByteBuffer...)} is likely to be able to progress.
*
* It tries to switch from PENDING to COMPLETING. If state transition fails, then it does nothing as the callback
* should have been already failed. That's because the only way to switch from PENDING outside this method is
* {@link #onFail(Throwable)} or {@link #onClose()}
*/
abstract protected void onIncompleteFlushed();
/* ------------------------------------------------------------ */
/* Remove empty buffers from the start of a multi buffer array
*/
private synchronized ByteBuffer[] compact(ByteBuffer[] buffers)
public void completeWrite()
{
if (buffers.length<2)
return buffers;
int b=0;
while (b<buffers.length && BufferUtil.isEmpty(buffers[b]))
b++;
if (b==0)
return buffers;
if (b==buffers.length)
return NO_BUFFERS;
State previous = _state.get();
PendingState<?> pending=null;
ByteBuffer[] compact=new ByteBuffer[buffers.length-b];
System.arraycopy(buffers,b,compact,0,compact.length);
return compact;
}
if (previous.getType()!=StateType.PENDING)
return; // failure already handled.
/* ------------------------------------------------------------ */
/**
* Complete a write that has not completed and that called
* {@link #onIncompleteFlushed()} to request a call to this
* method when a call to {@link EndPoint#flush(ByteBuffer...)}
* is likely to be able to progress.
*/
public synchronized void completeWrite()
{
if (!isWriting())
return; // TODO throw?
pending=(PendingState<?>)previous;
if (!updateState(pending,__COMPLETING))
return; // failure already handled.
try
{
while(true)
{
_buffers=compact(_buffers);
_endp.flush(_buffers);
ByteBuffer[] buffers = pending.getBuffers();
_endPoint.flush(buffers);
// Are we complete?
for (ByteBuffer b : _buffers)
for (ByteBuffer b : buffers)
{
if (b.hasRemaining())
{
if (updateState(__COMPLETING,pending))
onIncompleteFlushed();
return;
}
}
break;
}
// we are complete and ready
Callback<Object> callback=_callback;
Object context=_context;
_buffers=null;
_callback=null;
_context=null;
if (!_writing.compareAndSet(true,false))
throw new ConcurrentModificationException();
callback.completed(context);
// If updateState didn't succeed, we don't care as our buffers have been written
if(updateState(__COMPLETING,__IDLE))
pending.complete();
else
pendingFail(pending);
}
catch (IOException e)
{
Callback<Object> callback=_callback;
Object context=_context;
_buffers=null;
_callback=null;
_context=null;
if (!_writing.compareAndSet(true,false))
throw new ConcurrentModificationException();
callback.failed(context,e);
if(updateState(__COMPLETING,__IDLE))
pending.fail(e);
else
pendingFail(pending);
}
}
public void onFail(Throwable cause)
{
LOG.debug("failed: " + this, cause);
// Keep trying to handle the failure until we get to IDLE or FAILED state
while(true)
{
State current=_state.get();
switch(current.getType())
{
case IDLE:
return;
case PENDING:
PendingState<?> pending = (PendingState<?>)current;
if (updateState(pending,__IDLE))
{
pending.fail(cause);
return;
}
break;
/* ------------------------------------------------------------ */
/**
* Fail the write in progress and cause any calls to get to throw
* the cause wrapped as an execution exception.
* @return true if a write was in progress
*/
public synchronized boolean failed(Throwable cause)
default:
if (updateState(current,new FailedState(cause)))
return;
break;
}
}
}
public void onClose()
{
if (!_writing.compareAndSet(true,false))
return false;
Callback<Object> callback=_callback;
Object context=_context;
_buffers=null;
_callback=null;
_context=null;
callback.failed(context,cause);
if (_state.get()==__IDLE)
return;
onFail(new ClosedChannelException());
}
public boolean isIdle()
{
return _state.get().getType() == StateType.IDLE;
}
public boolean isInProgress()
{
switch(_state.get().getType())
{
case WRITING:
case PENDING:
case COMPLETING:
return true;
}
/* ------------------------------------------------------------ */
/**
* Fail the write with a {@link ClosedChannelException}. This is similar
* to a call to {@link #failed(Throwable)}, except that the exception is
* not instantiated unless a write was in progress.
* @return true if a write was in progress
*/
public synchronized boolean close()
{
if (!_writing.compareAndSet(true,false))
default:
return false;
Callback<Object> callback=_callback;
Object context=_context;
_buffers=null;
_callback=null;
_context=null;
callback.failed(context,new ClosedChannelException());
return true;
}
}
/* ------------------------------------------------------------ */
public synchronized boolean isWriting()
{
return _writing.get();
}
/* ------------------------------------------------------------ */
@Override
public String toString()
{
return String.format("WriteFlusher@%x{%b,%s,%s}",hashCode(),isWriting(),_callback,_context);
return String.format("WriteFlusher@%x{%s}", hashCode(), _state.get());
}
}

View File

@ -27,8 +27,8 @@ import javax.net.ssl.SSLException;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.AbstractEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.ReadInterest;
import org.eclipse.jetty.io.RuntimeIOException;
@ -170,7 +170,7 @@ public class SslConnection extends AbstractConnection
if (_decryptedEndPoint._flushRequiresFillToProgress)
{
_decryptedEndPoint._flushRequiresFillToProgress = false;
_decryptedEndPoint._writeFlusher.failed(cause);
_decryptedEndPoint._writeFlusher.onFail(cause);
}
}
}
@ -183,7 +183,7 @@ public class SslConnection extends AbstractConnection
hashCode(),
_sslEngine.getHandshakeStatus(),
_decryptedEndPoint._readInterest.isInterested() ? "R" : "",
_decryptedEndPoint._writeFlusher.isWriting() ? "W" : "");
_decryptedEndPoint._writeFlusher.isInProgress() ? "W" : "");
}
/* ------------------------------------------------------------ */
@ -218,7 +218,7 @@ public class SslConnection extends AbstractConnection
_readInterest.readable();
}
if (_writeFlusher.isWriting())
if (_writeFlusher.isInProgress())
_writeFlusher.completeWrite();
}
}
@ -244,8 +244,8 @@ public class SslConnection extends AbstractConnection
_readInterest.failed(x);
}
if (_writeFlusher.isWriting())
_writeFlusher.failed(x);
if (_writeFlusher.isInProgress())
_writeFlusher.onFail(x);
// TODO release all buffers??? or may in onClose
}
@ -706,7 +706,7 @@ public class SslConnection extends AbstractConnection
@Override
public String toString()
{
return String.format("%s{%s%s%s}", super.toString(), _readInterest.isInterested() ? "R" : "", _writeFlusher.isWriting() ? "W" : "", _cannotAcceptMoreAppDataToFlush ? "w" : "");
return String.format("%s{%s%s%s}", super.toString(), _readInterest.isInterested() ? "R" : "", _writeFlusher.isInProgress() ? "W" : "", _cannotAcceptMoreAppDataToFlush ? "w" : "");
}
}

View File

@ -469,12 +469,11 @@ public class SelectChannelEndPointTest
}
// But endpoint is still open.
assertTrue(_lastEndPoint.isOpen());
if(_lastEndPoint.isOpen())
// Wait for another idle callback
Thread.sleep(idleTimeout * 2);
// endpoint is closed.
// endpoint is closed.
assertFalse(_lastEndPoint.isOpen());
}

View File

@ -1,29 +1,54 @@
package org.eclipse.jetty.io;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertFalse;
import static junit.framework.Assert.assertTrue;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException;
import java.security.SecureRandom;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertFalse;
import static junit.framework.Assert.assertTrue;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class WriteFlusherTest
{
ByteArrayEndPoint _endp;
final AtomicBoolean _flushIncomplete = new AtomicBoolean(false);
WriteFlusher _flusher;
final String _context = new String("Context");
@Mock
private EndPoint _endPointMock;
private WriteFlusher _flusher;
private final AtomicBoolean _flushIncomplete = new AtomicBoolean(false);
private final String _context = new String("Context");
private ByteArrayEndPoint _endp;
@Before
public void before()
@ -40,10 +65,20 @@ public class WriteFlusherTest
};
}
@After
public void after()
@Test
public void testIgnorePreviousFailures() throws Exception
{
_endp.setGrowOutput(true);
FutureCallback<String> callback = new FutureCallback<>();
_flusher.onFail(new IOException("Ignored because no operation in progress"));
_flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!"));
assertCallbackIsDone(callback);
assertFlushIsComplete();
assertThat("context and callback.get() are equal", _context, equalTo(callback.get()));
assertThat("string in endpoint matches expected string", "How now brown cow!",
equalTo(_endp.takeOutputString()));
assertTrue(_flusher.isIdle());
}
@Test
@ -53,10 +88,22 @@ public class WriteFlusherTest
FutureCallback<String> callback = new FutureCallback<>();
_flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!"));
assertTrue(callback.isDone());
assertFalse(_flushIncomplete.get());
assertEquals(_context,callback.get());
assertEquals("How now brown cow!",_endp.takeOutputString());
assertCallbackIsDone(callback);
assertFlushIsComplete();
assertThat("context and callback.get() are equal", _context, equalTo(callback.get()));
assertThat("string in endpoint matches expected string", "How now brown cow!",
equalTo(_endp.takeOutputString()));
assertTrue(_flusher.isIdle());
}
private void assertFlushIsComplete()
{
assertThat("flush is complete", _flushIncomplete.get(), is(false));
}
private void assertCallbackIsDone(FutureCallback<String> callback)
{
assertThat("callback is done", callback.isDone(), is(true));
}
@Test
@ -66,8 +113,8 @@ public class WriteFlusherTest
FutureCallback<String> callback = new FutureCallback<>();
_flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!"));
assertTrue(callback.isDone());
assertFalse(_flushIncomplete.get());
assertCallbackIsDone(callback);
assertFlushIsComplete();
try
{
assertEquals(_context,callback.get());
@ -80,6 +127,7 @@ public class WriteFlusherTest
Assert.assertThat(cause.getMessage(),Matchers.containsString("CLOSED"));
}
assertEquals("",_endp.takeOutputString());
assertTrue(_flusher.isIdle());
}
@ -104,10 +152,11 @@ public class WriteFlusherTest
assertEquals("How now br",_endp.takeOutputString());
_flusher.completeWrite();
assertTrue(callback.isDone());
assertCallbackIsDone(callback);
assertEquals(_context,callback.get());
assertEquals("own cow!",_endp.takeOutputString());
assertFalse(_flushIncomplete.get());
assertFlushIsComplete();
assertTrue(_flusher.isIdle());
}
@Test
@ -133,8 +182,8 @@ public class WriteFlusherTest
assertEquals("How now br",_endp.takeOutputString());
_endp.close();
_flusher.completeWrite();
assertTrue(callback.isDone());
assertFalse(_flushIncomplete.get());
assertCallbackIsDone(callback);
assertFlushIsComplete();
try
{
assertEquals(_context,callback.get());
@ -147,6 +196,7 @@ public class WriteFlusherTest
Assert.assertThat(cause.getMessage(),Matchers.containsString("CLOSED"));
}
assertEquals("",_endp.takeOutputString());
assertTrue(_flusher.isIdle());
}
@Test
@ -169,11 +219,11 @@ public class WriteFlusherTest
_flushIncomplete.set(false);
}
assertEquals("How now br",_endp.takeOutputString());
_flusher.failed(new IOException("Failure"));
assertEquals("How now br", _endp.takeOutputString());
_flusher.onFail(new IOException("Failure"));
_flusher.completeWrite();
assertTrue(callback.isDone());
assertFalse(_flushIncomplete.get());
assertCallbackIsDone(callback);
assertFlushIsComplete();
try
{
assertEquals(_context,callback.get());
@ -185,7 +235,330 @@ public class WriteFlusherTest
Assert.assertTrue(cause instanceof IOException);
Assert.assertThat(cause.getMessage(),Matchers.containsString("Failure"));
}
assertEquals("",_endp.takeOutputString());
assertEquals("", _endp.takeOutputString());
assertTrue(_flusher.isIdle());
}
private static class ConcurrentFlusher extends WriteFlusher implements Runnable
{
final ByteArrayEndPoint _endp;
final SecureRandom _random;
final ScheduledThreadPoolExecutor _scheduler;
final StringBuilder _content=new StringBuilder();
ConcurrentFlusher(ByteArrayEndPoint endp,SecureRandom random, ScheduledThreadPoolExecutor scheduler)
{
super(endp);
_endp=endp;
_random=random;
_scheduler=scheduler;
}
@Override
protected void onIncompleteFlushed()
{
_scheduler.schedule(this,1+_random.nextInt(9),TimeUnit.MILLISECONDS);
}
@Override
public void run()
{
_content.append(_endp.takeOutputString());
completeWrite();
}
public String toString()
{
_content.append(_endp.takeOutputString());
return _content.toString();
}
}
@Test
public void testConcurrent() throws Exception
{
final SecureRandom random = new SecureRandom();
final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(100);
ConcurrentFlusher[] flushers = new ConcurrentFlusher[50000];
FutureCallback<?>[] futures = new FutureCallback<?>[flushers.length];
for (int i=0;i<flushers.length;i++)
{
int size=5+random.nextInt(15);
ByteArrayEndPoint endp = new ByteArrayEndPoint(new byte[]{},size);
final ConcurrentFlusher flusher = new ConcurrentFlusher(endp,random,scheduler);
flushers[i]=flusher;
final FutureCallback<String> callback = new FutureCallback<>();
futures[i]=callback;
scheduler.schedule(new Runnable()
{
@Override
public void run()
{
flusher.onFail(new Throwable("THE CAUSE"));
}
}
,50,TimeUnit.MILLISECONDS);
flusher.write(_context,callback,BufferUtil.toBuffer("How Now Brown Cow."),BufferUtil.toBuffer(" The quick brown fox jumped over the lazy dog!"));
}
int completed=0;
int failed=0;
for (int i=0;i<flushers.length;i++)
{
try
{
futures[i].get();
assertEquals("How Now Brown Cow. The quick brown fox jumped over the lazy dog!",flushers[i].toString());
completed++;
}
catch (Exception e)
{
assertThat(e.getMessage(),Matchers.containsString("THE CAUSE"));
failed++;
}
}
assertThat(completed,Matchers.greaterThan(0));
assertThat(failed,Matchers.greaterThan(0));
scheduler.shutdown();
}
@Test
@Ignore
public void testConcurrentAccessToWriteAndFailed() throws IOException, InterruptedException, ExecutionException
{
ExecutorService executor = Executors.newFixedThreadPool(16);
final CountDownLatch failedCalledLatch = new CountDownLatch(1);
final CountDownLatch writeCalledLatch = new CountDownLatch(1);
final CountDownLatch writeCompleteLatch = new CountDownLatch(1);
final WriteFlusher writeFlusher = new WriteFlusher(_endPointMock)
{
@Override
public <C> void write(C context, Callback<C> callback, ByteBuffer... buffers)
{
super.write(context, callback, buffers);
writeCompleteLatch.countDown();
}
@Override
protected void onIncompleteFlushed()
{
}
};
endPointFlushExpectation(writeCalledLatch, failedCalledLatch);
ExposingStateCallback callback = new ExposingStateCallback();
executor.submit(new Writer(writeFlusher, callback));
assertThat("Write has been called.", writeCalledLatch.await(5, TimeUnit.SECONDS), is(true));
executor.submit(new FailedCaller(writeFlusher, failedCalledLatch)).get();
// callback failed is NOT called because in WRITING state failed() doesn't know about the callback. However
// either the write succeeds or we get an IOException which will call callback.failed()
assertThat("callback failed", callback.isFailed(), is(false));
assertThat("write complete", writeCompleteLatch.await(5, TimeUnit.SECONDS), is(true));
// in this testcase we more or less emulate that the write has successfully finished and we return from
// EndPoint.flush() back to WriteFlusher.write(). Then someone calls failed. So the callback should have been
// completed.
assertThat("callback completed", callback.isCompleted(), is(true));
}
private class ExposingStateCallback extends FutureCallback
{
private boolean failed = false;
private boolean completed = false;
@Override
public void completed(Object context)
{
completed = true;
super.completed(context);
}
@Override
public void failed(Object context, Throwable cause)
{
failed = true;
super.failed(context, cause);
}
public boolean isFailed()
{
return failed;
}
public boolean isCompleted()
{
return completed;
}
}
@Ignore("Intermittent failures.") //TODO: fixme
@Test(expected = WritePendingException.class)
public void testConcurrentAccessToWrite() throws Throwable
{
ExecutorService executor = Executors.newFixedThreadPool(16);
final WriteFlusher writeFlusher = new WriteFlusher(_endPointMock)
{
@Override
protected void onIncompleteFlushed()
{
}
};
// in this test we just want to make sure that we called write twice at the same time
when(_endPointMock.flush(any(ByteBuffer[].class))).thenAnswer(new Answer<Object>()
{
@Override
public Object answer(InvocationOnMock invocation) throws Throwable
{
// make sure we stay here, so write is called twice at the same time
Thread.sleep(5000);
return null;
}
});
executor.submit(new Writer(writeFlusher, new FutureCallback()));
try
{
executor.submit(new Writer(writeFlusher, new FutureCallback())).get();
}
catch (ExecutionException e)
{
throw e.getCause();
}
}
private void endPointFlushExpectation(final CountDownLatch writeCalledLatch,
final CountDownLatch failedCalledLatch) throws IOException
{
when(_endPointMock.flush(any(ByteBuffer[].class))).thenAnswer(new Answer<Object>()
{
int called = 0;
@Override
public Object answer(InvocationOnMock invocation) throws Throwable
{
called++;
Object[] arguments = invocation.getArguments();
ByteBuffer byteBuffer = (ByteBuffer)arguments[0];
BufferUtil.flipToFill(byteBuffer); // pretend everything has been written
writeCalledLatch.countDown();
failedCalledLatch.await(5, TimeUnit.SECONDS);
return null;
}
});
}
@Test
public void testConcurrentAccessToIncompleteWriteAndFailed() throws IOException, InterruptedException, ExecutionException
{
ExecutorService executor = Executors.newFixedThreadPool(16);
final CountDownLatch failedCalledLatch = new CountDownLatch(1);
final CountDownLatch onIncompleteFlushedCalledLatch = new CountDownLatch(1);
final CountDownLatch writeCalledLatch = new CountDownLatch(1);
final CountDownLatch completeWrite = new CountDownLatch(1);
final WriteFlusher writeFlusher = new WriteFlusher(_endPointMock)
{
protected void onIncompleteFlushed()
{
onIncompleteFlushedCalledLatch.countDown();
completeWrite();
completeWrite.countDown();
}
};
endPointFlushExpectationPendingWrite(writeCalledLatch, failedCalledLatch);
ExposingStateCallback callback = new ExposingStateCallback();
executor.submit(new Writer(writeFlusher, callback));
assertThat("Write has been called.", writeCalledLatch.await(5, TimeUnit.SECONDS), is(true));
executor.submit(new FailedCaller(writeFlusher, failedCalledLatch));
assertThat("Failed has been called.", failedCalledLatch.await(5, TimeUnit.SECONDS), is(true));
writeFlusher.write(_context, new FutureCallback<String>(), BufferUtil.toBuffer("foobar"));
assertThat("completeWrite done", completeWrite.await(5, TimeUnit.SECONDS), is(true));
}
//TODO: combine with endPointFlushExpectation
private void endPointFlushExpectationPendingWrite(final CountDownLatch writeCalledLatch, final CountDownLatch
failedCalledLatch)
throws
IOException
{
when(_endPointMock.flush(any(ByteBuffer[].class))).thenAnswer(new Answer<Object>()
{
@Override
public Object answer(InvocationOnMock invocation) throws Throwable
{
writeCalledLatch.countDown();
Object[] arguments = invocation.getArguments();
ByteBuffer byteBuffer = (ByteBuffer)arguments[0];
int oldPos = byteBuffer.position();
if (byteBuffer.remaining() == 2)
{
// make sure failed is called before we go on
failedCalledLatch.await(5, TimeUnit.SECONDS);
BufferUtil.flipToFill(byteBuffer);
}
else if (byteBuffer.remaining() == 3)
{
byteBuffer.position(1); // pretend writing one byte
return 1;
}
else
{
byteBuffer.position(byteBuffer.limit());
}
return byteBuffer.limit() - oldPos;
}
});
}
private static class FailedCaller implements Callable
{
private final WriteFlusher writeFlusher;
private CountDownLatch failedCalledLatch;
public FailedCaller(WriteFlusher writeFlusher, CountDownLatch failedCalledLatch)
{
this.writeFlusher = writeFlusher;
this.failedCalledLatch = failedCalledLatch;
}
@Override
public FutureCallback call()
{
writeFlusher.onFail(new IllegalStateException());
failedCalledLatch.countDown();
return null;
}
}
private class Writer implements Callable
{
private final WriteFlusher writeFlusher;
private FutureCallback<String> callback;
public Writer(WriteFlusher writeFlusher, FutureCallback callback)
{
this.writeFlusher = writeFlusher;
this.callback = callback;
}
@Override
public FutureCallback call()
{
writeFlusher.write(_context, callback, BufferUtil.toBuffer("foo"));
return callback;
}
}
}

View File

@ -0,0 +1,2 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.io.LEVEL=WARN

View File

@ -644,9 +644,8 @@ public class StandardSession implements ISession, Parser.Listener, Callback<Stan
notifyOnGoAway(listener,goAwayInfo);
flush();
// SPDY does not require to send back a response to a GO_AWAY.
// We notified the application of the last good stream id,
// tried our best to flush remaining data, and close.
close();
// We notified the application of the last good stream id and
// tried our best to flush remaining data.
}
}

View File

@ -32,7 +32,7 @@ public class DataFrameGenerator
{
ByteBuffer buffer = bufferPool.acquire(DataFrame.HEADER_LENGTH + length, true);
BufferUtil.clearToFill(buffer);
buffer.limit(length + DataFrame.HEADER_LENGTH); //TODO: thomas show Simone :)
buffer.limit(length + DataFrame.HEADER_LENGTH);
buffer.position(DataFrame.HEADER_LENGTH);
// Guaranteed to always be >= 0
int read = dataInfo.readInto(buffer);

View File

@ -150,11 +150,11 @@ public class ClosedStreamTest extends AbstractTest
clientReceivedDataLatch.countDown();
}
}).get();
assertThat("reply has been received by client",replyReceivedLatch.await(500,TimeUnit.SECONDS),is(true));
assertThat("reply has been received by client",replyReceivedLatch.await(5,TimeUnit.SECONDS),is(true));
assertThat("stream is half closed from server",stream.isHalfClosed(),is(true));
assertThat("client has not received any data sent after stream was half closed by server",clientReceivedDataLatch.await(1,TimeUnit.SECONDS),
is(false));
assertThat("sending data threw an exception",exceptionWhenSendingData.await(500,TimeUnit.SECONDS),is(true)); //thomas
assertThat("client has not received any data sent after stream was half closed by server",
clientReceivedDataLatch.await(1,TimeUnit.SECONDS), is(false));
assertThat("sending data threw an exception",exceptionWhenSendingData.await(5,TimeUnit.SECONDS), is(true));
}
@Test

View File

@ -10,6 +10,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
//TODO: Simplify, get rid of DOING. Probably replace states with AtomicBoolean
public class FutureCallback<C> implements Future<C>,Callback<C>
{
// TODO investigate use of a phasor