Merge branch 'jetty-9.1' into jetty-9.1-altstart

This commit is contained in:
Joakim Erdfelt 2013-08-23 11:49:41 -07:00
commit 2e94149ea3
64 changed files with 848 additions and 478 deletions

View File

@ -26,7 +26,7 @@ import org.eclipse.jetty.util.log.Logger;
public abstract class HttpChannel
{
protected static final Logger LOG = Log.getLogger(new Object(){}.getClass().getEnclosingClass());
protected static final Logger LOG = Log.getLogger(HttpChannel.class);
private final AtomicReference<HttpExchange> exchange = new AtomicReference<>();
private final HttpDestination destination;

View File

@ -47,7 +47,7 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
public abstract class HttpDestination implements Destination, Closeable, Dumpable
{
protected static final Logger LOG = Log.getLogger(new Object(){}.getClass().getEnclosingClass());
protected static final Logger LOG = Log.getLogger(HttpDestination.class);
private final HttpClient client;
private final String scheme;

View File

@ -26,7 +26,6 @@ import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.api.Response;
@ -65,7 +64,7 @@ import org.eclipse.jetty.util.log.Logger;
*/
public abstract class HttpReceiver
{
protected static final Logger LOG = Log.getLogger(new Object(){}.getClass().getEnclosingClass());
protected static final Logger LOG = Log.getLogger(HttpReceiver.class);
private final AtomicReference<ResponseState> responseState = new AtomicReference<>(ResponseState.IDLE);
private final HttpChannel channel;
@ -432,13 +431,6 @@ public abstract class HttpReceiver
responseState.set(ResponseState.FAILURE);
}
public void idleTimeout()
{
// If we cannot fail, it means a response arrived
// just when we were timeout idling, so we don't close
responseFailure(new TimeoutException());
}
public boolean abort(Throwable cause)
{
return responseFailure(cause);

View File

@ -57,7 +57,7 @@ import org.eclipse.jetty.util.log.Logger;
*/
public abstract class HttpSender implements AsyncContentProvider.Listener
{
protected static final Logger LOG = Log.getLogger(new Object(){}.getClass().getEnclosingClass());
protected static final Logger LOG = Log.getLogger(HttpSender.class);
private final AtomicReference<RequestState> requestState = new AtomicReference<>(RequestState.QUEUED);
private final AtomicReference<SenderState> senderState = new AtomicReference<>(SenderState.IDLE);
@ -737,6 +737,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
@Override
public void failed(Throwable failure)
{
super.failed(failure);
anyToFailure(failure);
}
}

View File

@ -103,11 +103,6 @@ public class HttpChannelOverHTTP extends HttpChannel
}
}
public void idleTimeout()
{
receiver.idleTimeout();
}
@Override
public String toString()
{

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.client.http;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
@ -93,21 +95,12 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
HttpExchange exchange = channel.getHttpExchange();
if (exchange != null)
idleTimeout();
else
getHttpDestination().remove(this);
return exchange.getRequest().abort(new TimeoutException());
getHttpDestination().remove(this);
return true;
}
protected void idleTimeout()
{
// TODO: we need to fail the exchange if we did not get an answer from the server
// TODO: however this mechanism does not seem to be available in SPDY if not subclassing SPDYConnection
// TODO: but the API (Session) does not have such facilities; perhaps we need to add a callback to ISession
channel.idleTimeout();
}
@Override
public void onFillable()
{

View File

@ -33,7 +33,7 @@ public class ArrayByteBufferPool implements ByteBufferPool
public ArrayByteBufferPool()
{
this(64,2048,64*1024);
this(0,1024,64*1024);
}
public ArrayByteBufferPool(int minSize, int increment, int maxSize)

View File

@ -39,6 +39,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.TypeUtil;
@ -309,6 +310,11 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
ContainerLifeCycle.dump(out, indent, TypeUtil.asList(_selectors));
}
private enum State
{
CHANGES, MORE_CHANGES, SELECT, WAKEUP, PROCESS
}
/**
* <p>{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.</p>
* <p>{@link ManagedSelector} runs the select loop, which waits on {@link Selector#select()} until events
@ -317,13 +323,11 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
*/
public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
{
private final AtomicReference<State> _state= new AtomicReference<>(State.PROCESS);
private final Queue<Runnable> _changes = new ConcurrentArrayQueue<>();
private final int _id;
private Selector _selector;
private volatile Thread _thread;
private boolean _needsWakeup = true;
private boolean _runningChanges = false;
public ManagedSelector(int id)
{
@ -336,6 +340,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
{
super.doStart();
_selector = Selector.open();
_state.set(State.PROCESS);
}
@Override
@ -357,48 +362,65 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
*/
public void submit(Runnable change)
{
// if we have been called by the selector thread we can directly run the change
if (_thread==Thread.currentThread())
if (isSelectorThread())
{
// If we are already iterating over the changes, just add this change to the list.
// No race here because it is this thread that is iterating over the changes.
if (_runningChanges)
_changes.offer(change);
if (_state.get() == State.PROCESS)
{
// We are processing, so lets handle existing changes
runChanges();
// and then directly run the passed change without queueing it
runChange(change);
}
else
{
// Otherwise we run the queued changes
runChanges();
// and then directly run the passed change
runChange(change);
// We must be iterating in CHANGES or MORE_CHANGES
// state, so just append to the queue to preserve order.
_changes.offer(change);
}
}
else
{
// otherwise we have to queue the change and wakeup the selector
// Otherwise we have to queue the change and possibly wakeup the selector
_changes.offer(change);
LOG.debug("Queued change {}", change);
boolean wakeup = _needsWakeup;
if (wakeup)
wakeup();
out: while (true)
{
switch (_state.get())
{
case SELECT:
// Avoid multiple wakeup() calls if we the CAS fails
if (!_state.compareAndSet(State.SELECT, State.WAKEUP))
continue;
wakeup();
break out;
case CHANGES:
// Tell the selector thread that we have more changes.
// If we fail to CAS, we possibly need to wakeup(), so loop.
if (_state.compareAndSet(State.CHANGES, State.MORE_CHANGES))
break out;
continue;
case WAKEUP:
// Do nothing, we have already a wakeup scheduled
break out;
case MORE_CHANGES:
// Do nothing, we already notified the selector thread of more changes
break out;
case PROCESS:
// Do nothing, the changes will be run after the processing
break out;
default:
throw new IllegalStateException();
}
}
}
}
private void runChanges()
{
try
{
if (_runningChanges)
throw new IllegalStateException();
_runningChanges=true;
Runnable change;
while ((change = _changes.poll()) != null)
runChange(change);
}
finally
{
_runningChanges=false;
}
Runnable change;
while ((change = _changes.poll()) != null)
runChange(change);
}
protected void runChange(Runnable change)
@ -418,7 +440,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
LOG.debug("Starting {} on {}", _thread, this);
while (isRunning())
select();
processChanges();
runChanges();
}
finally
{
@ -437,7 +459,30 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
boolean debug = LOG.isDebugEnabled();
try
{
processChanges();
_state.set(State.CHANGES);
// Run the changes, and only exit if we ran all changes
out: while(true)
{
switch (_state.get())
{
case CHANGES:
runChanges();
if (_state.compareAndSet(State.CHANGES, State.SELECT))
break out;
continue;
case MORE_CHANGES:
runChanges();
_state.set(State.CHANGES);
continue;
default:
throw new IllegalStateException();
}
}
// Must check first for SELECT and *then* for WAKEUP
// because we read the state twice in the assert, and
// it could change from SELECT to WAKEUP in between.
assert _state.get() == State.SELECT || _state.get() == State.WAKEUP;
if (debug)
LOG.debug("Selector loop waiting on select");
@ -445,7 +490,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
if (debug)
LOG.debug("Selector loop woken up from select, {}/{} selected", selected, _selector.keys().size());
_needsWakeup = false;
_state.set(State.PROCESS);
Set<SelectionKey> selectedKeys = _selector.selectedKeys();
for (SelectionKey key : selectedKeys)
@ -474,20 +519,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
}
}
private void processChanges()
{
runChanges();
// If tasks are submitted between these 2 statements, they will not
// wakeup the selector, therefore below we run again the tasks
_needsWakeup = true;
// Run again the tasks to avoid the race condition where a task is
// submitted but will not wake up the selector
runChanges();
}
private void processKey(SelectionKey key)
{
Object attachment = key.attachment();

View File

@ -74,6 +74,10 @@ import org.eclipse.jetty.util.IO;
*/
public class JettyRunForkedMojo extends AbstractMojo
{
public static final String DEFAULT_WEBAPP_SRC = "src"+File.separator+"main"+File.separator+"webapp";
public static final String FAKE_WEBAPP = "webapp-tmp";
public String PORT_SYSPROPERTY = "jetty.port";
/**
@ -421,8 +425,19 @@ public class JettyRunForkedMojo extends AbstractMojo
}
//sort out base dir of webapp
if (webAppSourceDirectory != null)
props.put("base.dir", webAppSourceDirectory.getAbsolutePath());
if (webAppSourceDirectory == null || !webAppSourceDirectory.exists())
{
webAppSourceDirectory = new File (project.getBasedir(), DEFAULT_WEBAPP_SRC);
if (!webAppSourceDirectory.exists())
{
//try last resort of making a fake empty dir
File target = new File(project.getBuild().getDirectory());
webAppSourceDirectory = new File(target, FAKE_WEBAPP);
if (!webAppSourceDirectory.exists())
webAppSourceDirectory.mkdirs();
}
}
props.put("base.dir", webAppSourceDirectory.getAbsolutePath());
//sort out the resource base directories of the webapp
StringBuilder builder = new StringBuilder();

View File

@ -68,6 +68,7 @@ import org.eclipse.jetty.webapp.WebAppContext;
public class JettyRunMojo extends AbstractJettyMojo
{
public static final String DEFAULT_WEBAPP_SRC = "src"+File.separator+"main"+File.separator+"webapp";
public static final String FAKE_WEBAPP = "webapp-tmp";
@ -181,10 +182,19 @@ public class JettyRunMojo extends AbstractJettyMojo
try
{
if ((webAppSourceDirectory == null) || !webAppSourceDirectory.exists())
{
File defaultWebAppSrcDir = new File (project.getBasedir(), DEFAULT_WEBAPP_SRC);
getLog().info("webAppSourceDirectory"+(webAppSourceDirectory == null ? " not set." : " does not exist.")+" Defaulting to "+defaultWebAppSrcDir.getAbsolutePath());
webAppSourceDirectory = defaultWebAppSrcDir;
{
getLog().info("webAppSourceDirectory"+(webAppSourceDirectory == null ? " not set." : (webAppSourceDirectory.getAbsolutePath()+" does not exist."))+" Trying "+DEFAULT_WEBAPP_SRC);
webAppSourceDirectory = new File (project.getBasedir(), DEFAULT_WEBAPP_SRC);
if (!webAppSourceDirectory.exists())
{
getLog().info("webAppSourceDirectory "+webAppSourceDirectory.getAbsolutePath()+" does not exist. Trying "+project.getBuild().getDirectory()+File.separator+FAKE_WEBAPP);
//try last resort of making a fake empty dir
File target = new File(project.getBuild().getDirectory());
webAppSourceDirectory = new File(target, FAKE_WEBAPP);
if (!webAppSourceDirectory.exists())
webAppSourceDirectory.mkdirs();
}
}
else
getLog().info( "Webapp source directory = " + webAppSourceDirectory.getCanonicalPath());
@ -504,7 +514,8 @@ public class JettyRunMojo extends AbstractJettyMojo
{
getLog().info("Reconfiguring scanner after change to pom.xml ...");
scanList.clear();
scanList.add(new File(webApp.getDescriptor()));
if (webApp.getDescriptor() != null)
scanList.add(new File(webApp.getDescriptor()));
if (webApp.getJettyEnvXml() != null)
scanList.add(new File(webApp.getJettyEnvXml()));
scanList.addAll(extraScanTargets);
@ -599,7 +610,7 @@ public class JettyRunMojo extends AbstractJettyMojo
for (Artifact a:warArtifacts)
{
if (overlayMatchesArtifact (o, a))
if (o.matchesArtifact (a.getGroupId(), a.getArtifactId(), a.getClassifier()))
{
return a;
}
@ -607,22 +618,4 @@ public class JettyRunMojo extends AbstractJettyMojo
return null;
}
/**
* @param o
* @param a
* @return
*/
protected boolean overlayMatchesArtifact(OverlayConfig o, Artifact a)
{
if (((o.getGroupId() == null && a.getGroupId() == null) || (o.getGroupId() != null && o.getGroupId().equals(a.getGroupId())))
&& ((o.getArtifactId() == null && a.getArtifactId() == null) || (o.getArtifactId() != null && o.getArtifactId().equals(a.getArtifactId())))
&& ((o.getClassifier() == null) || (o.getClassifier().equals(a.getClassifier()))))
return true;
return false;
}
}

View File

@ -169,18 +169,20 @@ public class MavenWebInfConfiguration extends WebInfConfiguration
for (Overlay o:jwac.getOverlays())
{
//can refer to the current project in list of overlays for ordering purposes
if (o.getConfig() != null && o.getConfig().isCurrentProject())
if (o.getConfig() != null && o.getConfig().isCurrentProject() && _originalResourceBase.exists())
{
resourceBaseCollection.add(_originalResourceBase);
LOG.info("Adding virtual project to resource base list");
continue;
}
Resource unpacked = unpackOverlay(jwac,o);
_unpackedOverlayResources.add(unpacked); //remember the unpacked overlays for later so we can delete the tmp files
resourceBaseCollection.add(unpacked); //add in the selectively unpacked overlay in the correct order to the webapps resource base
LOG.info("Adding "+unpacked+" to resource base list");
}
if (!resourceBaseCollection.contains(_originalResourceBase))
if (!resourceBaseCollection.contains(_originalResourceBase) && _originalResourceBase.exists())
{
if (jwac.getBaseAppFirst())
{
@ -273,33 +275,4 @@ public class MavenWebInfConfiguration extends WebInfConfiguration
LOG.info("Unpacked overlay: "+overlay+" to "+unpackedOverlay);
return unpackedOverlay;
}
protected Artifact getArtifactForOverlay (OverlayConfig o, List<Artifact> warArtifacts)
{
if (o == null || warArtifacts == null || warArtifacts.isEmpty())
return null;
for (Artifact a:warArtifacts)
{
if (overlayMatchesArtifact (o, a))
{
return a;
}
}
return null;
}
protected boolean overlayMatchesArtifact(OverlayConfig o, Artifact a)
{
if ((o.getGroupId() == null && a.getGroupId() == null) || (o.getGroupId() != null && o.getGroupId().equals(a.getGroupId())))
{
if ((o.getArtifactId() == null && a.getArtifactId() == null) || (o.getArtifactId() != null && o.getArtifactId().equals(a.getArtifactId())))
{
if ((o.getClassifier() == null) || (o.getClassifier().equals(a.getClassifier())))
return true;
}
}
return false;
}
}

View File

@ -24,6 +24,7 @@ import java.util.Iterator;
import java.util.List;
import org.codehaus.plexus.util.xml.Xpp3Dom;
import org.apache.maven.artifact.Artifact;
import java.util.Arrays;
@ -264,6 +265,44 @@ public class OverlayConfig
return false;
}
/**
* Check if this overlay configuration matches an Artifact's info
*
* @param gid Artifact groupId
* @param aid Artifact artifactId
* @param cls Artifact classifier
* @return
*/
public boolean matchesArtifact (String gid, String aid, String cls)
{
if (((getGroupId() == null && gid == null) || (getGroupId() != null && getGroupId().equals(gid)))
&&((getArtifactId() == null && aid == null) || (getArtifactId() != null && getArtifactId().equals(aid)))
&&((getClassifier() == null) || (getClassifier().equals(cls))))
return true;
return false;
}
/**
* Check if this overlay configuration matches an Artifact's info
*
* @param gid
* @param aid
* @return
*/
public boolean matchesArtifact (String gid, String aid)
{
if (((getGroupId() == null && gid == null) || (getGroupId() != null && getGroupId().equals(gid)))
&&((getArtifactId() == null && aid == null) || (getArtifactId() != null && getArtifactId().equals(aid))))
return true;
return false;
}
public String toString()
{
StringBuffer strbuff = new StringBuffer();

View File

@ -452,23 +452,19 @@ public class Starter
{
if (wars == null || wars.isEmpty() || c == null)
return null;
Artifact war = null;
Iterator<Artifact> itor = wars.iterator();
while(itor.hasNext() && war == null)
{
Artifact a = itor.next();
if (((c.getGroupId() == null && a.gid == null) || (c.getGroupId() != null && c.getGroupId().equals(a.gid)))
&& ((c.getArtifactId() == null && a.aid == null) || (c.getArtifactId() != null && c.getArtifactId().equals(a.aid)))
&& ((c.getClassifier() == null) || (c.getClassifier().equals(a.aid))))
{
if (c.matchesArtifact(a.gid, a.aid, null))
war = a;
}
}
return war;
}
/**
* @param csv
* @return

View File

@ -265,32 +265,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
onFillable();
}
@Override
public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException
{
try
{
if (info==null)
new ContentCallback(content,lastContent,_writeBlocker).iterate();
else
{
// If we are still expecting a 100 continues
if (_channel.isExpecting100Continue())
// then we can't be persistent
_generator.setPersistent(false);
new CommitCallback(info,content,lastContent,_writeBlocker).iterate();
}
_writeBlocker.block();
}
catch (ClosedChannelException e)
{
throw new EofException(e);
}
catch (IOException e)
{
throw e;
}
}
@Override
public void send(ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback)

View File

@ -62,20 +62,17 @@ public class HttpOutput extends ServletOutputStream implements Runnable
private volatile Throwable _onError;
/*
ACTION OPEN ASYNC READY PENDING UNREADY
-------------------------------------------------------------------------------
setWriteListener() READY->owp ise ise ise ise
write() OPEN ise PENDING wpe wpe
flush() OPEN ise PENDING wpe wpe
isReady() OPEN:true READY:true READY:true UNREADY:false UNREADY:false
write completed - - - ASYNC READY->owp
*/
ACTION OPEN ASYNC READY PENDING UNREADY
-------------------------------------------------------------------------------
setWriteListener() READY->owp ise ise ise ise
write() OPEN ise PENDING wpe wpe
flush() OPEN ise PENDING wpe wpe
isReady() OPEN:true READY:true READY:true UNREADY:false UNREADY:false
write completed - - - ASYNC READY->owp
*/
enum State { OPEN, ASYNC, READY, PENDING, UNREADY, CLOSED }
private final AtomicReference<State> _state=new AtomicReference<>(State.OPEN);
public HttpOutput(HttpChannel<?> channel)
{
_channel = channel;
@ -212,17 +209,17 @@ write completed - - - ASYNC READY->owp
@Override
public void write(byte[] b, int off, int len) throws IOException
{
_written+=len;
boolean complete=_channel.getResponse().isAllContentWritten(_written);
_written+=len;
boolean complete=_channel.getResponse().isAllContentWritten(_written);
// Async or Blocking ?
while(true)
{
switch(_state.get())
{
// Async or Blocking ?
while(true)
{
switch(_state.get())
{
case OPEN:
// process blocking below
break;
break;
case ASYNC:
throw new IllegalStateException("isReady() not called");
@ -242,7 +239,7 @@ write completed - - - ASYNC READY->owp
int filled = BufferUtil.fill(_aggregate, b, off, len);
// return if we are not complete, not full and filled all the content
if (!complete && filled==len && !BufferUtil.isFull(_aggregate))
if (filled==len && !BufferUtil.isFull(_aggregate))
{
if (!_state.compareAndSet(State.PENDING, State.ASYNC))
throw new IllegalStateException();
@ -258,61 +255,61 @@ write completed - - - ASYNC READY->owp
new AsyncWrite(b,off,len,complete).process();
return;
case PENDING:
case PENDING:
case UNREADY:
throw new WritePendingException();
throw new WritePendingException();
case CLOSED:
throw new EofException("Closed");
}
break;
}
}
break;
}
// handle blocking write
// handle blocking write
// Should we aggregate?
int capacity = getBufferSize();
if (!complete && len<=_commitSize)
{
if (_aggregate == null)
_aggregate = _channel.getByteBufferPool().acquire(capacity, false);
// Should we aggregate?
int capacity = getBufferSize();
if (!complete && len<=_commitSize)
{
if (_aggregate == null)
_aggregate = _channel.getByteBufferPool().acquire(capacity, false);
// YES - fill the aggregate with content from the buffer
int filled = BufferUtil.fill(_aggregate, b, off, len);
// YES - fill the aggregate with content from the buffer
int filled = BufferUtil.fill(_aggregate, b, off, len);
// return if we are not complete, not full and filled all the content
if (!complete && filled==len && !BufferUtil.isFull(_aggregate))
return;
// return if we are not complete, not full and filled all the content
if (filled==len && !BufferUtil.isFull(_aggregate))
return;
// adjust offset/length
off+=filled;
len-=filled;
}
// adjust offset/length
off+=filled;
len-=filled;
}
// flush any content from the aggregate
if (BufferUtil.hasContent(_aggregate))
{
_channel.write(_aggregate, complete && len==0);
// flush any content from the aggregate
if (BufferUtil.hasContent(_aggregate))
{
_channel.write(_aggregate, complete && len==0);
// should we fill aggregate again from the buffer?
if (len>0 && !complete && len<=_commitSize)
{
BufferUtil.append(_aggregate, b, off, len);
return;
}
}
// should we fill aggregate again from the buffer?
if (len>0 && !complete && len<=_commitSize)
{
BufferUtil.append(_aggregate, b, off, len);
return;
}
}
// write any remaining content in the buffer directly
if (len>0)
_channel.write(ByteBuffer.wrap(b, off, len), complete);
// write any remaining content in the buffer directly
if (len>0)
_channel.write(ByteBuffer.wrap(b, off, len), complete);
else if (complete)
_channel.write(BufferUtil.EMPTY_BUFFER,complete);
if (complete)
{
closed();
}
if (complete)
{
closed();
}
}
@ -763,7 +760,7 @@ write completed - - - ASYNC READY->owp
{
try
{
loop: while(true)
while(true)
{
State last=_state.get();
switch(last)
@ -786,8 +783,7 @@ write completed - - - ASYNC READY->owp
default:
throw new IllegalStateException();
}
break loop;
break;
}
}
catch (Exception e)
@ -800,6 +796,7 @@ write completed - - - ASYNC READY->owp
@Override
public void failed(Throwable e)
{
super.failed(e);
_onError=e;
_channel.getState().onWritePossible();
}
@ -818,8 +815,9 @@ write completed - - - ASYNC READY->owp
*/
private class InputStreamWritingCB extends IteratingNestedCallback
{
final InputStream _in;
final ByteBuffer _buffer;
private final InputStream _in;
private final ByteBuffer _buffer;
private boolean _eof;
public InputStreamWritingCB(InputStream in, Callback callback)
{
@ -831,12 +829,19 @@ write completed - - - ASYNC READY->owp
@Override
protected boolean process() throws Exception
{
boolean eof=false;
if (_eof)
{
// Handle EOF
closed();
_channel.getByteBufferPool().release(_buffer);
return true;
}
int len=_in.read(_buffer.array(),0,_buffer.capacity());
if (len<0)
{
eof=true;
_eof=true;
len=0;
_in.close();
}
@ -845,7 +850,7 @@ write completed - - - ASYNC READY->owp
// read ahead for EOF to try for single commit
int len2=_in.read(_buffer.array(),len,_buffer.capacity()-len);
if (len2<0)
eof=true;
_eof=true;
else
len+=len2;
}
@ -853,15 +858,7 @@ write completed - - - ASYNC READY->owp
// write what we have
_buffer.position(0);
_buffer.limit(len);
_channel.write(_buffer,eof,this);
// Handle EOF
if (eof)
{
closed();
_channel.getByteBufferPool().release(_buffer);
return true;
}
_channel.write(_buffer,_eof,this);
return false;
}
@ -894,8 +891,9 @@ write completed - - - ASYNC READY->owp
*/
private class ReadableByteChannelWritingCB extends IteratingNestedCallback
{
final ReadableByteChannel _in;
final ByteBuffer _buffer;
private final ReadableByteChannel _in;
private final ByteBuffer _buffer;
private boolean _eof;
public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback)
{
@ -907,14 +905,19 @@ write completed - - - ASYNC READY->owp
@Override
protected boolean process() throws Exception
{
if (_eof)
{
closed();
_channel.getByteBufferPool().release(_buffer);
return true;
}
_buffer.clear();
boolean eof=false;
int len=_in.read(_buffer);
if (len<0)
{
eof=true;
len=0;
_eof=true;
_in.close();
}
else if (len<_buffer.capacity())
@ -922,22 +925,14 @@ write completed - - - ASYNC READY->owp
// read ahead for EOF to try for single commit
int len2=_in.read(_buffer);
if (len2<0)
eof=true;
_eof=true;
else
len+=len2;
}
// write what we have
_buffer.flip();
_channel.write(_buffer,eof,this);
// Handle EOF
if (eof)
{
closed();
_channel.getByteBufferPool().release(_buffer);
return true;
}
_channel.write(_buffer,_eof,this);
return false;
}

View File

@ -18,7 +18,6 @@
package org.eclipse.jetty.server;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http.HttpGenerator;
@ -26,9 +25,6 @@ import org.eclipse.jetty.util.Callback;
public interface HttpTransport
{
@Deprecated
void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException;
void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback);
void send(ByteBuffer content, boolean lastContent, Callback callback);

View File

@ -522,6 +522,8 @@ public class ResourceHandler extends HandlerWrapper
@Override
public void failed(Throwable x)
{
LOG.warn(x.toString());
LOG.debug(x);
async.complete();
}
};

View File

@ -18,9 +18,6 @@
package org.eclipse.jetty.server;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
@ -28,11 +25,11 @@ import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.URISyntaxException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.ArrayByteBufferPool;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.http.SimpleHttpParser;
import org.eclipse.jetty.toolchain.test.http.SimpleHttpResponse;
@ -41,6 +38,9 @@ import org.eclipse.jetty.util.log.StdErrLog;
import org.junit.After;
import org.junit.Before;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
public abstract class AbstractHttpTest
{
protected static Server server;
@ -57,7 +57,7 @@ public abstract class AbstractHttpTest
public void setUp() throws Exception
{
server = new Server();
connector = new ServerConnector(server,1,1);
connector = new ServerConnector(server,null,null,new ArrayByteBufferPool(64,2048,64*1024),1,1,new HttpConnectionFactory());
connector.setIdleTimeout(10000);
server.addConnector(connector);

View File

@ -84,14 +84,6 @@ public class ResponseTest
ByteBufferQueuedHttpInput input = new ByteBufferQueuedHttpInput();
_channel = new HttpChannel<ByteBuffer>(connector, new HttpConfiguration(), endp, new HttpTransport()
{
@Override
public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException
{
BlockingCallback cb = new BlockingCallback();
send(info,content,lastContent,cb);
cb.block();
}
@Override
public void send(ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback)
{

View File

@ -541,7 +541,9 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
private IStream createStream(SynStreamFrame frame, StreamFrameListener listener, boolean local, Promise<Stream> promise)
{
IStream associatedStream = streams.get(frame.getAssociatedStreamId());
IStream stream = new StandardStream(frame.getStreamId(), frame.getPriority(), this, associatedStream, promise);
IStream stream = new StandardStream(frame.getStreamId(), frame.getPriority(), this, associatedStream,
scheduler, promise);
stream.setIdleTimeout(endPoint.getIdleTimeout());
flowControlStrategy.onNewStream(this, stream);
stream.updateCloseState(frame.isClose(), local);

View File

@ -26,6 +26,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.io.IdleTimeout;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.PushInfo;
@ -43,8 +44,9 @@ import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
public class StandardStream implements IStream
public class StandardStream extends IdleTimeout implements IStream
{
private static final Logger LOG = Log.getLogger(Stream.class);
private final Map<String, Object> attributes = new ConcurrentHashMap<>();
@ -60,8 +62,9 @@ public class StandardStream implements IStream
private volatile CloseState closeState = CloseState.OPENED;
private volatile boolean reset = false;
public StandardStream(int id, byte priority, ISession session, IStream associatedStream, Promise<Stream> promise)
public StandardStream(int id, byte priority, ISession session, IStream associatedStream, Scheduler scheduler, Promise<Stream> promise)
{
super(scheduler);
this.id = id;
this.priority = priority;
this.session = session;
@ -105,6 +108,18 @@ public class StandardStream implements IStream
return priority;
}
@Override
protected void onIdleExpired(TimeoutException timeout)
{
listener.onFailure(timeout);
}
@Override
public boolean isOpen()
{
return !isClosed();
}
@Override
public int getWindowSize()
{
@ -194,6 +209,7 @@ public class StandardStream implements IStream
@Override
public void process(ControlFrame frame)
{
notIdle();
switch (frame.getType())
{
case SYN_STREAM:
@ -234,6 +250,7 @@ public class StandardStream implements IStream
@Override
public void process(DataInfo dataInfo)
{
notIdle();
// TODO: in v3 we need to send a rst instead of just ignoring
// ignore data frame if this stream is remotelyClosed already
if (isRemotelyClosed())
@ -349,6 +366,7 @@ public class StandardStream implements IStream
@Override
public void push(PushInfo pushInfo, Promise<Stream> promise)
{
notIdle();
if (isClosed() || isReset())
{
promise.failed(new StreamException(getId(), StreamStatus.STREAM_ALREADY_CLOSED,
@ -373,6 +391,7 @@ public class StandardStream implements IStream
@Override
public void reply(ReplyInfo replyInfo, Callback callback)
{
notIdle();
if (isUnidirectional())
throw new IllegalStateException("Protocol violation: cannot send SYN_REPLY frames in unidirectional streams");
openState = OpenState.REPLY_SENT;
@ -395,6 +414,7 @@ public class StandardStream implements IStream
@Override
public void data(DataInfo dataInfo, Callback callback)
{
notIdle();
if (!canSend())
{
session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter());
@ -425,6 +445,7 @@ public class StandardStream implements IStream
@Override
public void headers(HeadersInfo headersInfo, Callback callback)
{
notIdle();
if (!canSend())
{
session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter());

View File

@ -225,4 +225,16 @@ public interface Stream
*/
public Set<Stream> getPushedStreams();
/**
* Get the idle timeout set for this particular stream
* @return the idle timeout
*/
public long getIdleTimeout();
/**
* Set an idle timeout for this stream
* @param timeout
*/
public void setIdleTimeout(long timeout);
}

View File

@ -69,6 +69,12 @@ public interface StreamFrameListener extends EventListener
*/
public void onData(Stream stream, DataInfo dataInfo);
/**
* <p>Callback invoked on errors.</p>
* @param x
*/
public void onFailure(Throwable x);
/**
* <p>Empty implementation of {@link StreamFrameListener}</p>
*/
@ -94,5 +100,10 @@ public interface StreamFrameListener extends EventListener
public void onData(Stream stream, DataInfo dataInfo)
{
}
@Override
public void onFailure(Throwable x)
{
}
}
}

View File

@ -24,7 +24,9 @@ import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.SPDYException;
@ -47,6 +49,8 @@ import org.junit.runner.RunWith;
@RunWith(AdvancedRunner.class)
public class AsyncTimeoutTest
{
EndPoint endPoint = new ByteArrayEndPoint();
@Slow
@Test
public void testAsyncTimeoutInControlFrames() throws Exception
@ -60,7 +64,7 @@ public class AsyncTimeoutTest
scheduler.start(); // TODO need to use jetty lifecycles better here
Generator generator = new Generator(bufferPool, new StandardCompressionFactory.StandardCompressor());
Session session = new StandardSession(SPDY.V2, bufferPool, threadPool, scheduler, new TestController(),
null, null, 1, null, generator, new FlowControlStrategy.None())
endPoint, null, 1, null, generator, new FlowControlStrategy.None())
{
@Override
public void flush()
@ -103,7 +107,7 @@ public class AsyncTimeoutTest
scheduler.start();
Generator generator = new Generator(bufferPool, new StandardCompressionFactory.StandardCompressor());
Session session = new StandardSession(SPDY.V2, bufferPool, threadPool, scheduler, new TestController(),
null, null, 1, null, generator, new FlowControlStrategy.None())
endPoint, null, 1, null, generator, new FlowControlStrategy.None())
{
@Override
protected void write(ByteBuffer buffer, Callback callback)

View File

@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
@ -75,6 +76,7 @@ import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class StandardSessionTest
@ -84,6 +86,10 @@ public class StandardSessionTest
@Mock
private Controller controller;
@Mock
private EndPoint endPoint;
private ExecutorService threadPool;
private StandardSession session;
private Scheduler scheduler;
@ -97,8 +103,9 @@ public class StandardSessionTest
threadPool = Executors.newCachedThreadPool();
scheduler = new TimerScheduler();
scheduler.start();
session = new StandardSession(VERSION, bufferPool, threadPool, scheduler, controller, null, null, 1, null,
session = new StandardSession(VERSION, bufferPool, threadPool, scheduler, controller, endPoint, null, 1, null,
generator, new FlowControlStrategy.None());
when(endPoint.getIdleTimeout()).thenReturn(30000L);
headers = new Fields();
}
@ -428,7 +435,7 @@ public class StandardSessionTest
final CountDownLatch failedCalledLatch = new CountDownLatch(2);
SynStreamFrame synStreamFrame = new SynStreamFrame(VERSION, SynInfo.FLAG_CLOSE, 1, 0, (byte)0, (short)0, null);
IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null);
IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null, null);
stream.updateWindowSize(8192);
Callback.Adapter callback = new Callback.Adapter()
{
@ -502,7 +509,7 @@ public class StandardSessionTest
private void testHeaderFramesAreSentInOrder(final byte priority0, final byte priority1, final byte priority2) throws InterruptedException, ExecutionException
{
final StandardSession testLocalSession = new StandardSession(VERSION, bufferPool, threadPool, scheduler,
new ControllerMock(), null, null, 1, null, generator, new FlowControlStrategy.None());
new ControllerMock(), endPoint, null, 1, null, generator, new FlowControlStrategy.None());
HashSet<Future> tasks = new HashSet<>();
int numberOfTasksToRun = 128;

View File

@ -18,16 +18,6 @@
package org.eclipse.jetty.spdy;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@ -43,23 +33,44 @@ import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.StringDataInfo;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.frames.SynStreamFrame;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Fields;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class StandardStreamTest
{
private final ScheduledExecutorScheduler scheduler = new ScheduledExecutorScheduler();
@Mock
private ISession session;
@Mock
private SynStreamFrame synStreamFrame;
@Before
public void setUp() throws Exception
{
scheduler.start();
}
/**
* Test method for {@link Stream#push(org.eclipse.jetty.spdy.api.PushInfo)}.
*/
@ -67,7 +78,7 @@ public class StandardStreamTest
@Test
public void testSyn()
{
Stream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null);
Stream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null, null);
Set<Stream> streams = new HashSet<>();
streams.add(stream);
when(synStreamFrame.isClose()).thenReturn(false);
@ -100,7 +111,8 @@ public class StandardStreamTest
@Test
public void testSynOnClosedStream()
{
IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null);
IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session,
null, null , null);
stream.updateCloseState(true, true);
stream.updateCloseState(true, false);
assertThat("stream expected to be closed", stream.isClosed(), is(true));
@ -121,11 +133,57 @@ public class StandardStreamTest
public void testSendDataOnHalfClosedStream() throws InterruptedException, ExecutionException, TimeoutException
{
SynStreamFrame synStreamFrame = new SynStreamFrame(SPDY.V2, SynInfo.FLAG_CLOSE, 1, 0, (byte)0, (short)0, null);
IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null, null);
IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session,
null, scheduler, null);
stream.updateWindowSize(8192);
stream.updateCloseState(synStreamFrame.isClose(), true);
assertThat("stream is half closed", stream.isHalfClosed(), is(true));
stream.data(new StringDataInfo("data on half closed stream", true));
verify(session, never()).data(any(IStream.class), any(DataInfo.class), anyInt(), any(TimeUnit.class), any(Callback.class));
}
@Test
@Slow
public void testIdleTimeout() throws InterruptedException, ExecutionException, TimeoutException
{
final CountDownLatch onFailCalledLatch = new CountDownLatch(1);
IStream stream = new StandardStream(1, (byte)0, session, null, scheduler, null);
stream.setIdleTimeout(500);
stream.setStreamFrameListener(new StreamFrameListener.Adapter()
{
@Override
public void onFailure(Throwable x)
{
assertThat("exception is a TimeoutException", x, is(instanceOf(TimeoutException.class)));
onFailCalledLatch.countDown();
}
});
stream.process(new StringDataInfo("string", false));
Thread.sleep(1000);
assertThat("onFailure has been called", onFailCalledLatch.await(5, TimeUnit.SECONDS), is(true));
}
@Test
@Slow
public void testIdleTimeoutIsInterruptedWhenReceiving() throws InterruptedException, ExecutionException,
TimeoutException
{
final CountDownLatch onFailCalledLatch = new CountDownLatch(1);
IStream stream = new StandardStream(1, (byte)0, session, null, scheduler, null);
stream.setStreamFrameListener(new StreamFrameListener.Adapter()
{
@Override
public void onFailure(Throwable x)
{
assertThat("exception is a TimeoutException", x, is(instanceOf(TimeoutException.class)));
onFailCalledLatch.countDown();
}
});
stream.process(new StringDataInfo("string", false));
Thread.sleep(500);
stream.process(new StringDataInfo("string", false));
Thread.sleep(500);
assertThat("onFailure has been called", onFailCalledLatch.await(1, TimeUnit.SECONDS), is(false));
}
}

View File

@ -139,4 +139,13 @@ public class HttpReceiverOverSPDY extends HttpReceiver implements StreamFrameLis
responseFailure(x);
}
}
@Override
public void onFailure(Throwable x)
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return;
exchange.getRequest().abort(x);
}
}

View File

@ -50,7 +50,7 @@ public class HttpSenderOverSPDY extends HttpSender
protected void sendHeaders(HttpExchange exchange, final HttpContent content, final Callback callback)
{
final Request request = exchange.getRequest();
final long idleTimeout = request.getIdleTimeout();
short spdyVersion = getHttpChannel().getSession().getVersion();
Fields fields = new Fields();
HttpField hostHeader = null;
@ -81,6 +81,7 @@ public class HttpSenderOverSPDY extends HttpSender
@Override
public void succeeded(Stream stream)
{
stream.setIdleTimeout(idleTimeout);
if (content.hasContent())
HttpSenderOverSPDY.this.stream = stream;
callback.succeeded();

View File

@ -42,7 +42,6 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.junit.Assert;
import org.junit.Test;
import org.junit.Ignore;
public class HttpClientTest extends AbstractHttpClientServerTest
{
@ -324,7 +323,6 @@ public class HttpClientTest extends AbstractHttpClientServerTest
@Slow
@Test
@Ignore
public void test_Request_IdleTimeout() throws Exception
{
final long idleTimeout = 1000;

View File

@ -39,7 +39,7 @@ import org.eclipse.jetty.util.log.Logger;
public class HTTPSPDYServerConnectionFactory extends SPDYServerConnectionFactory implements HttpConfiguration.ConnectionFactory
{
private static final String CHANNEL_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.HTTPChannelOverSPDY";
private static final Logger logger = Log.getLogger(HTTPSPDYServerConnectionFactory.class);
private static final Logger LOG = Log.getLogger(HTTPSPDYServerConnectionFactory.class);
private final PushStrategy pushStrategy;
private final HttpConfiguration httpConfiguration;
@ -94,7 +94,7 @@ public class HTTPSPDYServerConnectionFactory extends SPDYServerConnectionFactory
// can arrive on the same connection, so we need to create an
// HttpChannel for each SYN in order to run concurrently.
logger.debug("Received {} on {}", synInfo, stream);
LOG.debug("Received {} on {}", synInfo, stream);
Fields headers = synInfo.getHeaders();
// According to SPDY/3 spec section 3.2.1 user-agents MUST support gzip compression. Firefox omits the
@ -136,7 +136,7 @@ public class HTTPSPDYServerConnectionFactory extends SPDYServerConnectionFactory
@Override
public void onHeaders(Stream stream, HeadersInfo headersInfo)
{
logger.debug("Received {} on {}", headersInfo, stream);
LOG.debug("Received {} on {}", headersInfo, stream);
HttpChannelOverSPDY channel = (HttpChannelOverSPDY)stream.getAttribute(CHANNEL_ATTRIBUTE);
channel.requestHeaders(headersInfo.getHeaders(), headersInfo.isClose());
}
@ -150,9 +150,15 @@ public class HTTPSPDYServerConnectionFactory extends SPDYServerConnectionFactory
@Override
public void onData(Stream stream, final DataInfo dataInfo)
{
logger.debug("Received {} on {}", dataInfo, stream);
LOG.debug("Received {} on {}", dataInfo, stream);
HttpChannelOverSPDY channel = (HttpChannelOverSPDY)stream.getAttribute(CHANNEL_ATTRIBUTE);
channel.requestContent(dataInfo, dataInfo.isClose());
}
@Override
public void onFailure(Throwable x)
{
LOG.debug(x);
}
}
}

View File

@ -209,20 +209,6 @@ public class HttpTransportOverSPDY implements HttpTransport
reply(stream, reply, callback);
}
@Override
public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException
{
send(info, content, lastContent, streamBlocker);
try
{
streamBlocker.block();
}
catch (Exception e)
{
LOG.debug(e);
}
}
@Override
public void completed()
{

View File

@ -213,7 +213,7 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse
private HTTPStream(int id, byte priority, ISession session, IStream associatedStream)
{
super(id, priority, session, associatedStream, null);
super(id, priority, session, associatedStream, getHttpChannel().getScheduler(), null);
}
@Override
@ -318,7 +318,7 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse
{
private HTTPPushStream(int id, byte priority, ISession session, IStream associatedStream)
{
super(id, priority, session, associatedStream, null);
super(id, priority, session, associatedStream, getHttpChannel().getScheduler(), null);
}
@Override

View File

@ -170,6 +170,12 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
streamPromise.data(serverDataInfo);
}
@Override
public void onFailure(Throwable x)
{
LOG.debug(x);
}
private Session produceSession(String host, short version, InetSocketAddress address)
{
try
@ -267,6 +273,12 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
pushStreamPromise.data(clientDataInfo);
}
@Override
public void onFailure(Throwable x)
{
LOG.debug(x);
}
}
private class ProxyStreamFrameListener extends StreamFrameListener.Adapter

View File

@ -75,17 +75,17 @@ public abstract class AbstractHTTPSPDYTest
protected InetSocketAddress startHTTPServer(Handler handler) throws Exception
{
return startHTTPServer(SPDY.V2, handler);
return startHTTPServer(SPDY.V2, handler, 30000);
}
protected InetSocketAddress startHTTPServer(short version, Handler handler) throws Exception
protected InetSocketAddress startHTTPServer(short version, Handler handler, long idleTimeout) throws Exception
{
QueuedThreadPool threadPool = new QueuedThreadPool(256);
threadPool.setName("serverQTP");
server = new Server(threadPool);
connector = newHTTPSPDYServerConnector(version);
connector.setPort(0);
connector.setIdleTimeout(30000);
connector.setIdleTimeout(idleTimeout);
server.addConnector(connector);
server.setHandler(handler);
server.start();

View File

@ -79,7 +79,7 @@ public class ConcurrentStreamsTest extends AbstractHTTPSPDYTest
throw new ServletException(x);
}
}
}), null);
}, 30000), null);
// Perform slow request. This will wait on server side until the fast request wakes it up
Fields headers = createHeaders(slowPath);

View File

@ -262,7 +262,7 @@ public class HttpTransportOverSPDYTest
verify(stream, times(1)).reply(replyInfoCaptor.capture(), any(Callback.class));
assertThat("ReplyInfo close is false", replyInfoCaptor.getValue().isClose(), is(false));
httpTransportOverSPDY.send(HttpGenerator.RESPONSE_500_INFO, null,true);
httpTransportOverSPDY.send(HttpGenerator.RESPONSE_500_INFO, null,true, new Callback.Adapter());
verify(stream, times(1)).data(any(DataInfo.class), any(Callback.class));

View File

@ -82,7 +82,7 @@ public class PushStrategyBenchmarkTest extends AbstractHTTPSPDYTest
@Test
public void benchmarkPushStrategy() throws Exception
{
InetSocketAddress address = startHTTPServer(version, new PushStrategyBenchmarkHandler());
InetSocketAddress address = startHTTPServer(version, new PushStrategyBenchmarkHandler(), 30000);
// Plain HTTP
ConnectionFactory factory = new HttpConnectionFactory(new HttpConfiguration());

View File

@ -357,7 +357,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
outputStream.write(bytes);
baseRequest.setHandled(true);
}
});
}, 30000);
Session pushCacheBuildSession = startClient(version, bigResponseServerAddress, null);
Fields mainResourceHeaders = createHeadersWithoutReferrer(mainResource);
@ -443,7 +443,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
baseRequest.setHandled(true);
}
});
return startHTTPServer(version, gzipHandler);
return startHTTPServer(version, gzipHandler, 30000);
}
private Session sendMainRequestAndCSSRequest(SessionFrameListener sessionFrameListener, boolean awaitPush) throws Exception
@ -597,7 +597,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
output.print("body { background: #FFF; }");
baseRequest.setHandled(true);
}
});
}, 30000);
Session session1 = startClient(version, address, null);
final CountDownLatch mainResourceLatch = new CountDownLatch(1);
@ -688,7 +688,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
}
baseRequest.setHandled(true);
}
});
}, 30000);
Session session1 = startClient(version, address, null);
final CountDownLatch mainResourceLatch = new CountDownLatch(1);
@ -799,7 +799,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
output.print("\u0000");
baseRequest.setHandled(true);
}
});
}, 30000);
Session session1 = startClient(version, address, null);
final CountDownLatch mainResourceLatch = new CountDownLatch(1);
@ -919,7 +919,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
output.print("<html><head/><body>HELLO</body></html>");
baseRequest.setHandled(true);
}
});
}, 30000);
Session session1 = startClient(version, address, null);
final CountDownLatch mainResourceLatch = new CountDownLatch(1);
@ -1004,7 +1004,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
output.print("body { background: #FFF; }");
baseRequest.setHandled(true);
}
});
}, 30000);
Session session1 = startClient(version, address, null);
final CountDownLatch mainResourceLatch = new CountDownLatch(1);

View File

@ -26,7 +26,9 @@ import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
@ -50,6 +52,8 @@ import org.eclipse.jetty.spdy.api.StringDataInfo;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
import org.eclipse.jetty.util.Fields;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.log.StdErrLog;
import org.junit.Assert;
import org.junit.Test;
@ -57,6 +61,7 @@ import org.junit.Test;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
@ -65,6 +70,8 @@ import static org.junit.Assert.fail;
public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
{
private static final Logger LOG = Log.getLogger(ServerHTTPSPDYTest.class);
public ServerHTTPSPDYTest(short version)
{
super(version);
@ -90,7 +97,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
assertThat(httpRequest.getHeader("host"), is("localhost:" + connector.getLocalPort()));
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getLocalPort(), version, "GET", path);
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -131,7 +138,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
assertEquals(query, httpRequest.getQueryString());
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", uri);
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -174,7 +181,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
assertThat("requestUri is /foo", httpRequest.getRequestURI(), is(path));
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", uri);
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -216,7 +223,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
httpResponse.getWriter().write("body that shouldn't be sent on a HEAD request");
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "HEAD", path);
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -256,7 +263,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
request.setHandled(true);
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "POST", path);
headers.put("content-type", "application/x-www-form-urlencoded");
@ -305,7 +312,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
assertNotNull(httpRequest.getServerName());
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "POST", path);
headers.put("content-type", "application/x-www-form-urlencoded");
@ -347,7 +354,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
assertEquals("2", httpRequest.getParameter("b"));
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "POST", path);
headers.put("content-type", "application/x-www-form-urlencoded");
@ -392,7 +399,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
assertEquals("2", httpRequest.getParameter("b"));
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "POST", path);
headers.put("content-type", "application/x-www-form-urlencoded");
@ -434,7 +441,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
output.write(data.getBytes("UTF-8"));
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -480,7 +487,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
output.write(data);
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -531,7 +538,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
output.write(data2.getBytes("UTF-8"));
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -586,7 +593,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
output.write(data);
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -642,7 +649,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
}
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -695,7 +702,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
output.write(data);
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -748,7 +755,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
output.close();
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -806,7 +813,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
output.write(data2.getBytes("UTF-8"));
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -859,7 +866,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
httpResponse.sendRedirect(location);
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -896,7 +903,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
httpResponse.sendError(HttpServletResponse.SC_NOT_FOUND);
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -941,7 +948,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
{
throw new NullPointerException("thrown_explicitly_by_the_test");
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -995,7 +1002,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
output.write(pangram2.getBytes("UTF-8"));
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -1054,7 +1061,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
request.getResponse().getHttpOutput().sendContent(ByteBuffer.wrap(data));
handlerLatch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -1110,7 +1117,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
output.write(data);
output.write(data);
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -1174,7 +1181,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
}
}.start();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "POST", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -1217,7 +1224,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
}
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "POST", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -1262,7 +1269,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
}
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "POST", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -1330,7 +1337,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
}
}.start();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "POST", "/foo");
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -1398,7 +1405,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
output.write(data);
}
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "POST", "/foo");
final CountDownLatch responseLatch = new CountDownLatch(2);
@ -1439,7 +1446,7 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
request.setHandled(true);
latch.countDown();
}
}), null);
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "POST", "/foo");
final CountDownLatch responseLatch = new CountDownLatch(1);
@ -1460,4 +1467,145 @@ public class ServerHTTPSPDYTest extends AbstractHTTPSPDYTest
assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testIdleTimeout() throws Exception
{
final int idleTimeout = 500;
final CountDownLatch timeoutReceivedLatch = new CountDownLatch(1);
Session session = startClient(version, startHTTPServer(version, new AbstractHandler()
{
@Override
public void handle(String target, final Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse)
throws IOException, ServletException
{
try
{
Thread.sleep(2 * idleTimeout);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
request.setHandled(true);
}
}, 30000), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/");
Stream stream = session.syn(new SynInfo(5, TimeUnit.SECONDS, headers, true, (byte)0),
new StreamFrameListener.Adapter()
{
@Override
public void onFailure(Throwable x)
{
assertThat("we got a TimeoutException", x, instanceOf(TimeoutException.class));
timeoutReceivedLatch.countDown();
}
});
stream.setIdleTimeout(idleTimeout);
assertThat("idle timeout hit", timeoutReceivedLatch.await(5, TimeUnit.SECONDS), is(true));
}
@Test
public void testIdleTimeoutSetOnConnectionOnly() throws Exception
{
final int idleTimeout = 500;
final CountDownLatch timeoutReceivedLatch = new CountDownLatch(1);
Session session = startClient(version, startHTTPServer(version, new AbstractHandler()
{
@Override
public void handle(String target, final Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse)
throws IOException, ServletException
{
try
{
Thread.sleep(2 * idleTimeout);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
request.setHandled(true);
}
}, idleTimeout), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/");
Stream stream = session.syn(new SynInfo(5, TimeUnit.SECONDS, headers, true, (byte)0),
new StreamFrameListener.Adapter()
{
@Override
public void onFailure(Throwable x)
{
assertThat("we got a TimeoutException", x, instanceOf(TimeoutException.class));
timeoutReceivedLatch.countDown();
}
});
assertThat("idle timeout hit", timeoutReceivedLatch.await(5, TimeUnit.SECONDS), is(true));
}
@Test
public void testSingleStreamIdleTimeout() throws Exception
{
final int idleTimeout = 500;
final CountDownLatch timeoutReceivedLatch = new CountDownLatch(1);
final CountDownLatch replyReceivedLatch = new CountDownLatch(3);
Session session = startClient(version, startHTTPServer(version, new AbstractHandler()
{
@Override
public void handle(String target, final Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse)
throws IOException, ServletException
{
if ("true".equals(request.getHeader("slow")))
{
try
{
Thread.sleep(2 * idleTimeout);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
}
request.setHandled(true);
}
}, idleTimeout), null);
Fields headers = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/");
Fields slowHeaders = SPDYTestUtils.createHeaders("localhost", connector.getPort(), version, "GET", "/");
slowHeaders.add("slow", "true");
sendSingleRequestThatIsNotExpectedToTimeout(replyReceivedLatch, session, headers);
session.syn(new SynInfo(5, TimeUnit.SECONDS, slowHeaders, true, (byte)0),
new StreamFrameListener.Adapter()
{
@Override
public void onFailure(Throwable x)
{
assertThat("we got a TimeoutException", x, instanceOf(TimeoutException.class));
timeoutReceivedLatch.countDown();
}
});
Thread.sleep(idleTimeout / 2);
sendSingleRequestThatIsNotExpectedToTimeout(replyReceivedLatch, session, headers);
Thread.sleep(idleTimeout / 2);
sendSingleRequestThatIsNotExpectedToTimeout(replyReceivedLatch, session, headers);
assertThat("idle timeout hit", timeoutReceivedLatch.await(5, TimeUnit.SECONDS), is(true));
assertThat("received replies on 3 non idle requests", replyReceivedLatch.await(5, TimeUnit.SECONDS),
is(true));
}
private void sendSingleRequestThatIsNotExpectedToTimeout(final CountDownLatch replyReceivedLatch, Session session, Fields headers) throws ExecutionException, InterruptedException, TimeoutException
{
session.syn(new SynInfo(5, TimeUnit.SECONDS, headers, true, (byte)0),
new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
replyReceivedLatch.countDown();
}
});
}
}

View File

@ -18,7 +18,7 @@
package org.eclipse.jetty.util;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/* ------------------------------------------------------------ */
@ -29,7 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
* if that task completes quickly and uses the calling thread to callback
* the success notification, this can result in a growing stack depth.
* </p>
* <p>To avoid this issue, this callback uses an AtomicBoolean to note
* <p>To avoid this issue, this callback uses an AtomicReference to note
* if the success callback has been called during the processing of a
* sub task, and if so then the processing iterates rather than recurses.
* </p>
@ -41,7 +41,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public abstract class IteratingCallback implements Callback
{
private final AtomicBoolean _iterating = new AtomicBoolean();
private enum State { WAITING, ITERATING, SUCCEEDED, FAILED };
private final AtomicReference<State> _state = new AtomicReference<>(State.WAITING);
public IteratingCallback()
{
@ -71,32 +72,93 @@ public abstract class IteratingCallback implements Callback
try
{
// Keep iterating as long as succeeded() is called during process()
while(_iterating.compareAndSet(false,true))
// If we are in WAITING state, either this is the first iteration or
// succeeded()/failed() were called already.
while(_state.compareAndSet(State.WAITING,State.ITERATING))
{
// process and test if we are complete
// Make some progress by calling process()
if (process())
{
completed();
// A true return indicates we are finished a no further callbacks
// are scheduled. So we must still be ITERATING.
if (_state.compareAndSet(State.ITERATING,State.SUCCEEDED))
completed();
else
throw new IllegalStateException("Already "+_state.get());
return;
}
// else a callback has been scheduled. If it has not happened yet,
// we will still be ITERATING
else if (_state.compareAndSet(State.ITERATING,State.WAITING))
// no callback yet, so break the loop and wait for it
break;
// The callback must have happened and we are either WAITING already or FAILED
// the loop test will work out which
}
}
catch(Exception e)
{
_iterating.set(false);
failed(e);
}
finally
{
_iterating.set(false);
}
}
/* ------------------------------------------------------------ */
@Override
public void succeeded()
{
if (!_iterating.compareAndSet(true,false))
iterate();
// Try a short cut for the fast method. If we are still iterating
if (_state.compareAndSet(State.ITERATING,State.WAITING))
// then next loop will continue processing, so nothing to do here
return;
// OK do it properly
loop: while(true)
{
switch(_state.get())
{
case ITERATING:
if (_state.compareAndSet(State.ITERATING,State.WAITING))
break loop;
continue;
case WAITING:
// we are really waiting, so use this callback thread to iterate some more
iterate();
break loop;
default:
throw new IllegalStateException("Already "+_state.get());
}
}
}
/* ------------------------------------------------------------ */
/**
* Derivations of this method should always call super.failed(x)
* to check the state before handling the failure.
* @see org.eclipse.jetty.util.Callback#failed(java.lang.Throwable)
*/
@Override
public void failed(Throwable x)
{
loop: while(true)
{
switch(_state.get())
{
case ITERATING:
if (_state.compareAndSet(State.ITERATING,State.FAILED))
break loop;
continue;
case WAITING:
if (_state.compareAndSet(State.WAITING,State.FAILED))
break loop;
continue;
default:
throw new IllegalStateException("Already "+_state.get(),x);
}
}
}
}

View File

@ -56,6 +56,7 @@ public abstract class IteratingNestedCallback extends IteratingCallback
@Override
public void failed(Throwable x)
{
super.failed(x);
_callback.failed(x);
}

View File

@ -78,7 +78,7 @@ public class SchedulerTest
public void testExecution() throws Exception
{
final AtomicLong executed = new AtomicLong();
long expected=System.currentTimeMillis()+3000;
long expected=System.currentTimeMillis()+1000;
Scheduler.Task task=_scheduler.schedule(new Runnable()
{
@Override
@ -86,9 +86,9 @@ public class SchedulerTest
{
executed.set(System.currentTimeMillis());
}
},3000,TimeUnit.MILLISECONDS);
},1000,TimeUnit.MILLISECONDS);
Thread.sleep(4000);
Thread.sleep(1500);
Assert.assertFalse(task.cancel());
Assert.assertThat(executed.get(),Matchers.greaterThanOrEqualTo(expected));
Assert.assertThat(expected-executed.get(),Matchers.lessThan(1000L));
@ -98,7 +98,7 @@ public class SchedulerTest
public void testTwoExecution() throws Exception
{
final AtomicLong executed = new AtomicLong();
long expected=System.currentTimeMillis()+3000;
long expected=System.currentTimeMillis()+1000;
Scheduler.Task task=_scheduler.schedule(new Runnable()
{
@Override
@ -106,15 +106,15 @@ public class SchedulerTest
{
executed.set(System.currentTimeMillis());
}
},3000,TimeUnit.MILLISECONDS);
},1000,TimeUnit.MILLISECONDS);
Thread.sleep(4000);
Thread.sleep(1500);
Assert.assertFalse(task.cancel());
Assert.assertThat(executed.get(),Matchers.greaterThanOrEqualTo(expected));
Assert.assertThat(expected-executed.get(),Matchers.lessThan(1000L));
final AtomicLong executed1 = new AtomicLong();
long expected1=System.currentTimeMillis()+3000;
long expected1=System.currentTimeMillis()+1000;
Scheduler.Task task1=_scheduler.schedule(new Runnable()
{
@Override
@ -122,9 +122,9 @@ public class SchedulerTest
{
executed1.set(System.currentTimeMillis());
}
},3000,TimeUnit.MILLISECONDS);
},1000,TimeUnit.MILLISECONDS);
Thread.sleep(4000);
Thread.sleep(1500);
Assert.assertFalse(task1.cancel());
Assert.assertThat(executed1.get(),Matchers.greaterThanOrEqualTo(expected1));
Assert.assertThat(expected1-executed1.get(),Matchers.lessThan(1000L));
@ -200,10 +200,17 @@ public class SchedulerTest
@Test
@Slow
@Ignore
public void testManySchedulesAndCancels() throws Exception
{
schedule(100,5000,3800,200);
}
@Test
public void testFewSchedulesAndCancels() throws Exception
{
schedule(10,500,380,20);
}
@Test
@Slow

View File

@ -33,7 +33,7 @@ public class JettyEchoSocket extends WebSocketAdapter
@Override
public void onWebSocketBinary(byte[] payload, int offset, int len)
{
getRemote().sendBytesByFuture(BufferUtil.toBuffer(payload,offset,len));
getRemote().sendBytes(BufferUtil.toBuffer(payload,offset,len),null);
}
@Override
@ -45,6 +45,6 @@ public class JettyEchoSocket extends WebSocketAdapter
@Override
public void onWebSocketText(String message)
{
getRemote().sendStringByFuture(message);
getRemote().sendString(message,null);
}
}

View File

@ -70,7 +70,7 @@ public class JettyEchoSocket
public void onMessage(String msg)
{
incomingMessages.add(msg);
remote.sendStringByFuture(msg);
remote.sendString(msg,null);
}
@OnWebSocketConnect

View File

@ -116,7 +116,7 @@ public class WebSocketClientTest
Assert.assertThat("client.connectionManager.sessions.size",client.getConnectionManager().getSessions().size(),is(1));
cliSock.getSession().getRemote().sendStringByFuture("Hello World!");
cliSock.getSession().getRemote().sendString("Hello World!",null);
srvSock.echoMessage(1,TimeUnit.MILLISECONDS,500);
// wait for response from server
cliSock.waitForMessage(500,TimeUnit.MILLISECONDS);

View File

@ -0,0 +1,37 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common;
import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.websocket.api.WriteCallback;
public class BlockingWriteCallback extends BlockingCallback implements WriteCallback
{
@Override
public void writeFailed(Throwable x)
{
failed(x);
}
@Override
public void writeSuccess()
{
succeeded();
}
}

View File

@ -27,6 +27,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -52,6 +53,18 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
private static final int TEXT = 1;
private static final int BINARY = 2;
private static final int CONTROL = 3;
private static final WriteCallback NOOP_CALLBACK = new WriteCallback()
{
@Override
public void writeSuccess()
{
}
@Override
public void writeFailed(Throwable x)
{
}
};
private static final Logger LOG = Log.getLogger(WebSocketRemoteEndpoint.class);
public final LogicalConnection connection;
@ -72,19 +85,11 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
private void blockingWrite(WebSocketFrame frame) throws IOException
{
Future<Void> fut = sendAsyncFrame(frame);
try
{
fut.get(); // block till done
}
catch (ExecutionException e)
{
throw new IOException("Failed to write bytes",e.getCause());
}
catch (InterruptedException e)
{
throw new IOException("Failed to write bytes",e);
}
// TODO Blocking callbacks can be recycled, but they do not handle concurrent calls,
// so if some mutual exclusion can be applied, then this callback can be reused.
BlockingWriteCallback callback = new BlockingWriteCallback();
sendFrame(frame,callback);
callback.block();
}
public InetSocketAddress getInetSocketAddress()
@ -150,13 +155,12 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
@Override
public void sendBytes(ByteBuffer data, WriteCallback callback)
{
Objects.requireNonNull(callback,"WriteCallback cannot be null");
msgType.set(BINARY);
if (LOG.isDebugEnabled())
{
LOG.debug("sendBytes({}, {})",BufferUtil.toDetailString(data),callback);
}
sendFrame(new BinaryFrame().setPayload(data),callback);
sendFrame(new BinaryFrame().setPayload(data),callback==null?NOOP_CALLBACK:callback);
}
public void sendFrame(WebSocketFrame frame, WriteCallback callback)
@ -356,13 +360,12 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
@Override
public void sendString(String text, WriteCallback callback)
{
Objects.requireNonNull(callback,"WriteCallback cannot be null");
msgType.set(TEXT);
TextFrame frame = new TextFrame().setPayload(text);
if (LOG.isDebugEnabled())
{
LOG.debug("sendString({},{})",BufferUtil.toDetailString(frame.getPayload()),callback);
}
sendFrame(frame,callback);
sendFrame(frame,callback==null?NOOP_CALLBACK:callback);
}
}

View File

@ -209,39 +209,37 @@ public class WriteBytesProvider implements Callback
public void failAll(Throwable t)
{
boolean notified = false;
// Collect entries for callback
List<FrameEntry> callbacks = new ArrayList<>();
// fail active (if set)
if (active != null)
synchronized (this)
{
FrameEntry entry = active;
active = null;
entry.notifyFailure(t);
notified = true;
// fail active (if set)
if (active != null)
{
FrameEntry entry = active;
active = null;
callbacks.add(entry);
}
callbacks.addAll(past);
callbacks.addAll(queue);
past.clear();
queue.clear();
}
failure = t;
// fail past
while (!past.isEmpty())
// notify flush callback
if (!callbacks.isEmpty())
{
FrameEntry entry = past.pop();
entry.notifyFailure(t);
notified = true;
}
// fail others
while (!queue.isEmpty())
{
FrameEntry entry = queue.pop();
entry.notifyFailure(t);
notified = true;
}
if (notified)
{
// notify flush callback
// TODO: always notify instead?
flushCallback.failed(t);
// notify entry callbacks
for (FrameEntry entry : callbacks)
{
entry.notifyFailure(t);
}
}
}
@ -363,22 +361,31 @@ public class WriteBytesProvider implements Callback
@Override
public void succeeded()
{
if ((active != null) && (active.frame.remaining() <= 0))
{
// All done with active FrameEntry
FrameEntry entry = active;
active = null;
entry.notifySucceeded();
}
// Collect entries for callback
List<FrameEntry> callbacks = new ArrayList<>();
while (!past.isEmpty())
synchronized (this)
{
FrameEntry entry = past.pop();
entry.notifySucceeded();
if ((active != null) && (active.frame.remaining() <= 0))
{
// All done with active FrameEntry
FrameEntry entry = active;
active = null;
callbacks.add(entry);
}
callbacks.addAll(past);
past.clear();
}
// notify flush callback
flushCallback.succeeded();
// notify entry callbacks outside of synchronize
for (FrameEntry entry : callbacks)
{
entry.notifySucceeded();
}
}
@Override

View File

@ -35,7 +35,7 @@ public class AnnotatedEchoSocket
{
System.out.printf("Echoing back message [%s]%n",message);
// echo the message back
session.getRemote().sendStringByFuture(message);
session.getRemote().sendString(message,null);
}
}
}

View File

@ -59,7 +59,7 @@ public class ListenerEchoSocket implements WebSocketListener
{
System.out.printf("Echoing back message [%s]%n",message);
// echo the message back
outbound.getRemote().sendStringByFuture(message);
outbound.getRemote().sendString(message,null);
}
}
}

View File

@ -36,6 +36,6 @@ public class MyStatelessEchoSocket
@OnWebSocketMessage
public void onText(Session session, String text)
{
session.getRemote().sendStringByFuture(text);
session.getRemote().sendString(text,null);
}
}

View File

@ -50,15 +50,6 @@ public class HttpTransportOverMux implements HttpTransport
LOG.debug("completed");
}
/**
* Process ResponseInfo object into AddChannelResponse
*/
@Override
public void send(ResponseInfo info, ByteBuffer responseBodyContent, boolean lastContent) throws IOException
{
send(info,responseBodyContent,lastContent,streamBlocker);
streamBlocker.block();
}
@Override
public void send(ResponseInfo info, ByteBuffer responseBodyContent, boolean lastContent, Callback callback)

View File

@ -69,7 +69,7 @@ public class LoadTest
@OnWebSocketMessage
public void onWebSocketText(String message)
{
session.getRemote().sendStringByFuture(message);
session.getRemote().sendString(message,null);
long iter = count.incrementAndGet();
if ((iter % 100) == 0)
{

View File

@ -46,7 +46,7 @@ public class ABSocket
// echo the message back.
ByteBuffer data = ByteBuffer.wrap(buf,offset,len);
this.session.getRemote().sendBytesByFuture(data);
this.session.getRemote().sendBytes(data,null);
}
@OnWebSocketConnect
@ -73,7 +73,7 @@ public class ABSocket
try
{
// echo the message back.
this.session.getRemote().sendStringByFuture(message);
this.session.getRemote().sendString(message,null);
}
catch (WebSocketException e)
{

View File

@ -67,7 +67,7 @@ public class BrowserSocket
randomText[i] = letters[rand.nextInt(lettersLen)];
}
msg = String.format("ManyThreads [%s]",String.valueOf(randomText));
remote.sendStringByFuture(msg);
remote.sendString(msg,null);
}
}
}
@ -219,7 +219,7 @@ public class BrowserSocket
}
// Async write
remote.sendStringByFuture(message);
remote.sendString(message,null);
}
private void writeMessage(String format, Object... args)

View File

@ -42,7 +42,7 @@ public class BigEchoSocket
LOG.warn("Session is closed");
return;
}
session.getRemote().sendBytesByFuture(ByteBuffer.wrap(buf,offset,length));
session.getRemote().sendBytes(ByteBuffer.wrap(buf,offset,length),null);
}
@OnWebSocketMessage
@ -53,6 +53,6 @@ public class BigEchoSocket
LOG.warn("Session is closed");
return;
}
session.getRemote().sendStringByFuture(message);
session.getRemote().sendString(message,null);
}
}

View File

@ -40,7 +40,7 @@ public class EchoBroadcastSocket
ByteBuffer data = ByteBuffer.wrap(buf,offset,len);
for (EchoBroadcastSocket sock : BROADCAST)
{
sock.session.getRemote().sendBytesByFuture(data.slice());
sock.session.getRemote().sendBytes(data.slice(),null);
}
}
@ -62,7 +62,7 @@ public class EchoBroadcastSocket
{
for (EchoBroadcastSocket sock : BROADCAST)
{
sock.session.getRemote().sendStringByFuture(text);
sock.session.getRemote().sendString(text,null);
}
}
}

View File

@ -60,13 +60,13 @@ public class EchoFragmentSocket
switch (frame.getType())
{
case BINARY:
remote.sendBytesByFuture(buf1);
remote.sendBytesByFuture(buf2);
remote.sendBytes(buf1,null);
remote.sendBytes(buf2,null);
break;
case TEXT:
// NOTE: This impl is not smart enough to split on a UTF8 boundary
remote.sendStringByFuture(BufferUtil.toUTF8String(buf1));
remote.sendStringByFuture(BufferUtil.toUTF8String(buf2));
remote.sendString(BufferUtil.toUTF8String(buf1),null);
remote.sendString(BufferUtil.toUTF8String(buf2),null);
break;
default:
throw new IOException("Unexpected frame type: " + frame.getType());

View File

@ -44,7 +44,7 @@ public class EchoSocket
// echo the message back.
ByteBuffer data = ByteBuffer.wrap(buf,offset,len);
this.session.getRemote().sendBytesByFuture(data);
this.session.getRemote().sendBytes(data,null);
}
@OnWebSocketConnect
@ -59,6 +59,6 @@ public class EchoSocket
LOG.debug("onText({})",message);
// echo the message back.
this.session.getRemote().sendStringByFuture(message);
this.session.getRemote().sendString(message,null);
}
}

View File

@ -41,7 +41,7 @@ public class RFCSocket
// echo the message back.
ByteBuffer data = ByteBuffer.wrap(buf,offset,len);
this.session.getRemote().sendBytesByFuture(data);
this.session.getRemote().sendBytes(data,null);
}
@OnWebSocketConnect
@ -62,6 +62,6 @@ public class RFCSocket
}
// echo the message back.
this.session.getRemote().sendStringByFuture(message);
this.session.getRemote().sendString(message,null);
}
}

View File

@ -61,7 +61,7 @@ public class SessionSocket
if (values == null)
{
session.getRemote().sendStringByFuture("<null>");
session.getRemote().sendString("<null>",null);
return;
}
@ -78,21 +78,21 @@ public class SessionSocket
delim = true;
}
valueStr.append(']');
session.getRemote().sendStringByFuture(valueStr.toString());
session.getRemote().sendString(valueStr.toString(),null);
return;
}
if ("session.isSecure".equals(message))
{
String issecure = String.format("session.isSecure=%b",session.isSecure());
session.getRemote().sendStringByFuture(issecure);
session.getRemote().sendString(issecure,null);
return;
}
if ("session.upgradeRequest.requestURI".equals(message))
{
String response = String.format("session.upgradeRequest.requestURI=%s",session.getUpgradeRequest().getRequestURI().toASCIIString());
session.getRemote().sendStringByFuture(response);
session.getRemote().sendString(response,null);
return;
}
@ -103,7 +103,7 @@ public class SessionSocket
}
// echo the message back.
this.session.getRemote().sendStringByFuture(message);
this.session.getRemote().sendString(message,null);
}
catch (Throwable t)
{

View File

@ -34,6 +34,6 @@ public class MyBinaryEchoSocket
public void onWebSocketText(Session session, byte buf[], int offset, int len)
{
// Echo message back, asynchronously
session.getRemote().sendBytesByFuture(ByteBuffer.wrap(buf,offset,len));
session.getRemote().sendBytes(ByteBuffer.wrap(buf,offset,len),null);
}
}

View File

@ -32,6 +32,6 @@ public class MyEchoSocket
public void onWebSocketText(Session session, String message)
{
// Echo message back, asynchronously
session.getRemote().sendStringByFuture(message);
session.getRemote().sendString(message,null);
}
}

View File

@ -123,7 +123,7 @@ public class WebSocketChatServlet extends WebSocketServlet implements WebSocketC
}
// Async write the message back.
member.remote.sendStringByFuture(data);
member.remote.sendString(data,null);
}
}