460210 - ExecutionStragegy producer for SelectManager calls onOpen from produce method

Additional refactoring to better handle HttpInput state.   Moved the unready and read possible states into the HttpChannelState
This commit is contained in:
Greg Wilkins 2015-02-23 23:05:09 +11:00
parent 9b118c9dd2
commit 5d6bb9f5d0
16 changed files with 453 additions and 309 deletions

View File

@ -54,11 +54,6 @@ public class HttpChannelOverHTTP2 extends HttpChannel
{
super(connector, configuration, endPoint, transport);
}
protected HttpInput newHttpInput()
{
return new HttpInputOverHTTP2(getState());
}
private IStream getStream()
{

View File

@ -1,40 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.server;
import org.eclipse.jetty.server.HttpChannelState;
import org.eclipse.jetty.server.HttpInput;
// TODO This class is the same as the default. Is it needed?
public class HttpInputOverHTTP2 extends HttpInput
{
private final HttpChannelState _httpChannelState;
public HttpInputOverHTTP2(HttpChannelState httpChannelState)
{
_httpChannelState=httpChannelState;
}
@Override
protected void onReadPossible()
{
_httpChannelState.onReadPossible();
}
}

View File

@ -80,12 +80,8 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
/** Bytes written after interception (eg after compression) */
private long _written;
public HttpChannel(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransport transport)
{
this(connector,configuration,endPoint,transport,null);
}
public HttpChannel(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransport transport, HttpInput input)
public HttpChannel(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransport transport)
{
_connector = connector;
_configuration = configuration;
@ -93,28 +89,16 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
_transport = transport;
_state = new HttpChannelState(this);
_request = new Request(this, input!=null?input:newHttpInput());
_request = new Request(this, newHttpInput(_state));
_response = new Response(this, newHttpOutput());
_requestLog=_connector==null?null:_connector.getServer().getRequestLog();
if (LOG.isDebugEnabled())
LOG.debug("new {} -> {},{},{}",this,_endPoint,_endPoint.getConnection(),_state);
}
protected HttpInput newHttpInput()
protected HttpInput newHttpInput(HttpChannelState state)
{
// TODO - this is a convoluted way to construct the HttpInput. Can this be simplified?
// TODO - the issue is that the HttpInput needs access to the HttpState, which is not
// TODO - constructed until the HttpChannel is constructed... so there is chicken and egg.
return new HttpInput()
{
@Override
protected void onReadPossible()
{
// TODO All three implementations just do this? do we need to override onReadPossible at all?
getState().onReadPossible();
}
};
return new HttpInput(state);
}
protected HttpOutput newHttpOutput()
@ -249,6 +233,10 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
_written=0;
}
public void asyncReadFillInterested()
{
}
@Override
public void run()
{
@ -515,20 +503,19 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
_request.setMetaData(request);
}
public void onContent(HttpInput.Content content)
public boolean onContent(HttpInput.Content content)
{
if (LOG.isDebugEnabled())
LOG.debug("{} content {}", this, content);
HttpInput input = _request.getHttpInput();
input.addContent(content);
return _request.getHttpInput().addContent(content);
}
public void onRequestComplete()
public boolean onRequestComplete()
{
if (LOG.isDebugEnabled())
LOG.debug("{} onRequestComplete", this);
_request.getHttpInput().eof();
return _request.getHttpInput().eof();
}
public void onCompleted()
@ -538,9 +525,9 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
_transport.onCompleted();
}
public void onEarlyEOF()
public boolean onEarlyEOF()
{
_request.getHttpInput().earlyEOF();
return _request.getHttpInput().earlyEOF();
}
public void onBadMessage(int status, String reason)
@ -751,4 +738,5 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
}
}

View File

@ -50,25 +50,23 @@ class HttpChannelOverHttp extends HttpChannel implements HttpParser.RequestHandl
private final MetaData.Request _metadata = new MetaData.Request(_fields);
private final HttpConnection _httpConnection;
private HttpField _connection;
private boolean _delayedForContent;
private boolean _unknownExpectation = false;
private boolean _expect100Continue = false;
private boolean _expect102Processing = false;
public HttpChannelOverHttp(HttpConnection httpConnection, Connector connector, HttpConfiguration config, EndPoint endPoint, HttpTransport transport)
{
this(httpConnection,connector,config,endPoint,transport,new HttpInputOverHTTP(httpConnection));
}
public HttpChannelOverHttp(HttpConnection httpConnection, Connector connector, HttpConfiguration config, EndPoint endPoint, HttpTransport transport,HttpInput input)
{
super(connector,config,endPoint,transport,input);
super(connector,config,endPoint,transport);
_httpConnection = httpConnection;
_metadata.setURI(new HttpURI());
}
protected HttpInput newHttpInput()
@Override
protected HttpInput newHttpInput(HttpChannelState state)
{
throw new IllegalStateException();
return new HttpInputOverHTTP(state);
}
@Override
@ -219,10 +217,16 @@ class HttpChannelOverHttp extends HttpChannel implements HttpParser.RequestHandl
public boolean content(ByteBuffer content)
{
// TODO avoid creating the Content object with wrapper?
onContent(new HttpInput.Content(content));
return true;
boolean handle = onContent(new HttpInput.Content(content)) || _delayedForContent;
_delayedForContent=false;
return handle;
}
public void asyncReadFillInterested()
{
_httpConnection.asyncReadFillInterested();
}
@Override
public void badMessage(int status, String reason)
{
@ -336,10 +340,9 @@ class HttpChannelOverHttp extends HttpChannel implements HttpParser.RequestHandl
// Should we delay dispatch until we have some content?
// We should not delay if there is no content expect or client is expecting 100 or the response is already committed or the request buffer already has something in it to parse
if (getHttpConfiguration().isDelayDispatchUntilContent() && _httpConnection.getParser().getContentLength()>0 && !isExpecting100Continue() && !isCommitted() && _httpConnection.isRequestBufferEmpty())
return false;
return true;
_delayedForContent = (getHttpConfiguration().isDelayDispatchUntilContent() && _httpConnection.getParser().getContentLength()>0 && !isExpecting100Continue() && !isCommitted() && _httpConnection.isRequestBufferEmpty());
return !_delayedForContent;
}
@Override
@ -359,8 +362,7 @@ class HttpChannelOverHttp extends HttpChannel implements HttpParser.RequestHandl
@Override
public boolean messageComplete()
{
onRequestComplete();
return false;
return onRequestComplete();
}
@Override

View File

@ -32,6 +32,7 @@ import org.eclipse.jetty.server.handler.ContextHandler.Context;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.SpinLock;
/**
* Implementation of AsyncContext interface that holds the state of request-response cycle.
@ -85,14 +86,16 @@ public class HttpChannelState
}
private final boolean DEBUG=LOG.isDebugEnabled();
private final SpinLock _lock=new SpinLock();
private final HttpChannel _channel;
private List<AsyncListener> _asyncListeners;
private State _state;
private Async _async;
private boolean _initial;
private boolean _asyncRead;
private boolean _asyncWrite;
private boolean _asyncReadPossible;
private boolean _asyncReadUnready;
private boolean _asyncWrite; // TODO refactor same as read
private long _timeoutMs=DEFAULT_TIMEOUT;
private AsyncContextEvent _event;
@ -106,7 +109,8 @@ public class HttpChannelState
public State getState()
{
synchronized(this)
// TODO use SpinLock instead of synchronized?
try(SpinLock.Lock lock=_lock.lock())
{
return _state;
}
@ -114,7 +118,7 @@ public class HttpChannelState
public void addListener(AsyncListener listener)
{
synchronized(this)
try(SpinLock.Lock lock=_lock.lock())
{
if (_asyncListeners==null)
_asyncListeners=new ArrayList<>();
@ -124,7 +128,7 @@ public class HttpChannelState
public void setTimeout(long ms)
{
synchronized(this)
try(SpinLock.Lock lock=_lock.lock())
{
_timeoutMs=ms;
}
@ -132,7 +136,7 @@ public class HttpChannelState
public long getTimeout()
{
synchronized(this)
try(SpinLock.Lock lock=_lock.lock())
{
return _timeoutMs;
}
@ -140,7 +144,7 @@ public class HttpChannelState
public AsyncContextEvent getAsyncContextEvent()
{
synchronized(this)
try(SpinLock.Lock lock=_lock.lock())
{
return _event;
}
@ -149,15 +153,17 @@ public class HttpChannelState
@Override
public String toString()
{
synchronized (this)
try(SpinLock.Lock lock=_lock.lock())
{
return String.format("%s@%x{s=%s i=%b a=%s}",getClass().getSimpleName(),hashCode(),_state,_initial,_async);
return String.format("%s@%x{s=%s a=%s i=%b r=%s w=%b}",getClass().getSimpleName(),hashCode(),_state,_async,_initial,
_asyncReadPossible?(_asyncReadUnready?"PU":"P!U"):(_asyncReadUnready?"!PU":"!P!U"),
_asyncWrite);
}
}
public String getStatusString()
{
synchronized (this)
try(SpinLock.Lock lock=_lock.lock())
{
return String.format("s=%s i=%b a=%s",_state,_initial,_async);
}
@ -168,10 +174,10 @@ public class HttpChannelState
*/
protected Action handling()
{
synchronized (this)
if(DEBUG)
LOG.debug("{} handling {}",this,_state);
try(SpinLock.Lock lock=_lock.lock())
{
if(DEBUG)
LOG.debug("{} handling {}",this,_state);
switch(_state)
{
case IDLE:
@ -186,12 +192,14 @@ public class HttpChannelState
return Action.WAIT;
case ASYNC_WOKEN:
if (_asyncRead)
if (_asyncReadPossible)
{
_state=State.ASYNC_IO;
_asyncRead=false;
_asyncReadUnready=false;
return Action.READ_CALLBACK;
}
// TODO refactor the same as read
if (_asyncWrite)
{
_state=State.ASYNC_IO;
@ -218,10 +226,6 @@ public class HttpChannelState
_async=null;
return Action.ASYNC_EXPIRED;
case STARTED:
// TODO
if (DEBUG)
LOG.debug("TODO Fix this double dispatch",new IllegalStateException(this
.getStatusString()));
return Action.WAIT;
}
}
@ -238,7 +242,7 @@ public class HttpChannelState
{
final List<AsyncListener> lastAsyncListeners;
synchronized (this)
try(SpinLock.Lock lock=_lock.lock())
{
if (_state!=State.DISPATCHED || _async!=null)
throw new IllegalStateException(this.getStatusString());
@ -267,7 +271,7 @@ public class HttpChannelState
protected void error(Throwable th)
{
synchronized (this)
try(SpinLock.Lock lock=_lock.lock())
{
if (_event!=null)
_event.setThrowable(th);
@ -283,11 +287,15 @@ public class HttpChannelState
*/
protected Action unhandle()
{
synchronized (this)
Action action;
boolean schedule_timeout=false;
boolean read_interested=false;
if(DEBUG)
LOG.debug("{} unhandle {}",this,_state);
try(SpinLock.Lock lock=_lock.lock())
{
if(DEBUG)
LOG.debug("{} unhandle {}",this,_state);
switch(_state)
{
case DISPATCHED:
@ -297,7 +305,6 @@ public class HttpChannelState
throw new IllegalStateException(this.getStatusString());
}
if (_async!=null)
{
_initial=false;
@ -306,49 +313,73 @@ public class HttpChannelState
case COMPLETE:
_state=State.COMPLETING;
_async=null;
return Action.COMPLETE;
action = Action.COMPLETE;
break;
case DISPATCH:
_state=State.DISPATCHED;
_async=null;
return Action.ASYNC_DISPATCH;
action = Action.ASYNC_DISPATCH;
break;
case EXPIRED:
_state=State.DISPATCHED;
_async=null;
return Action.ASYNC_EXPIRED;
action = Action.ASYNC_EXPIRED;
break;
case STARTED:
if (_asyncRead)
if (_asyncReadUnready && _asyncReadPossible)
{
_state=State.ASYNC_IO;
_asyncRead=false;
return Action.READ_CALLBACK;
_asyncReadUnready=false;
action = Action.READ_CALLBACK;
}
if (_asyncWrite)
else if (_asyncWrite) // TODO refactor same as read
{
_asyncWrite=false;
_state=State.ASYNC_IO;
return Action.WRITE_CALLBACK;
action = Action.WRITE_CALLBACK;
}
scheduleTimeout();
_state=State.ASYNC_WAIT;
return Action.WAIT;
else
{
schedule_timeout=true;
read_interested=_asyncReadUnready;
_state=State.ASYNC_WAIT;
action = Action.WAIT;
}
break;
case EXPIRING:
scheduleTimeout();
schedule_timeout=true;
_state=State.ASYNC_WAIT;
return Action.WAIT;
action = Action.WAIT;
break;
default:
_state=State.COMPLETING;
action = Action.COMPLETE;
break;
}
}
_state=State.COMPLETING;
return Action.COMPLETE;
else
{
_state=State.COMPLETING;
action = Action.COMPLETE;
}
}
if (schedule_timeout)
scheduleTimeout();
if (read_interested)
_channel.asyncReadFillInterested();
return action;
}
public void dispatch(ServletContext context, String path)
{
boolean dispatch;
synchronized (this)
try(SpinLock.Lock lock=_lock.lock())
{
if (_async!=Async.STARTED && _async!=Async.EXPIRING)
throw new IllegalStateException("AsyncContext#dispath "+this.getStatusString());
@ -388,7 +419,7 @@ public class HttpChannelState
{
final List<AsyncListener> aListeners;
AsyncContextEvent event;
synchronized (this)
try(SpinLock.Lock lock=_lock.lock())
{
if (_async!=Async.STARTED)
return;
@ -416,7 +447,7 @@ public class HttpChannelState
}
boolean dispatch=false;
synchronized (this)
try(SpinLock.Lock lock=_lock.lock())
{
if (_async==Async.EXPIRING)
{
@ -437,7 +468,7 @@ public class HttpChannelState
{
// just like resume, except don't set _dispatched=true;
boolean handle=false;
synchronized (this)
try(SpinLock.Lock lock=_lock.lock())
{
if (_async!=Async.STARTED && _async!=Async.EXPIRING)
throw new IllegalStateException(this.getStatusString());
@ -462,7 +493,7 @@ public class HttpChannelState
public void errorComplete()
{
synchronized (this)
try(SpinLock.Lock lock=_lock.lock())
{
_async=Async.COMPLETE;
_event.setDispatchContext(null);
@ -476,7 +507,7 @@ public class HttpChannelState
{
final List<AsyncListener> aListeners;
final AsyncContextEvent event;
synchronized (this)
try(SpinLock.Lock lock=_lock.lock())
{
switch(_state)
{
@ -523,7 +554,8 @@ public class HttpChannelState
protected void recycle()
{
synchronized (this)
cancelTimeout();
try(SpinLock.Lock lock=_lock.lock())
{
switch(_state)
{
@ -539,17 +571,17 @@ public class HttpChannelState
_state=State.IDLE;
_async=null;
_initial=true;
_asyncRead=false;
_asyncReadPossible=_asyncReadUnready=false;
_asyncWrite=false;
_timeoutMs=DEFAULT_TIMEOUT;
cancelTimeout();
_event=null;
}
}
public void upgrade()
{
synchronized (this)
cancelTimeout();
try(SpinLock.Lock lock=_lock.lock())
{
switch(_state)
{
@ -563,10 +595,9 @@ public class HttpChannelState
_state=State.UPGRADED;
_async=null;
_initial=true;
_asyncRead=false;
_asyncReadPossible=_asyncReadUnready=false;
_asyncWrite=false;
_timeoutMs=DEFAULT_TIMEOUT;
cancelTimeout();
_event=null;
}
}
@ -587,7 +618,7 @@ public class HttpChannelState
protected void cancelTimeout()
{
final AsyncContextEvent event;
synchronized (this)
try(SpinLock.Lock lock=_lock.lock())
{
event=_event;
}
@ -597,7 +628,7 @@ public class HttpChannelState
public boolean isIdle()
{
synchronized (this)
try(SpinLock.Lock lock=_lock.lock())
{
return _state==State.IDLE;
}
@ -605,7 +636,7 @@ public class HttpChannelState
public boolean isExpired()
{
synchronized (this)
try(SpinLock.Lock lock=_lock.lock())
{
return _async==Async.EXPIRED;
}
@ -613,7 +644,7 @@ public class HttpChannelState
public boolean isInitial()
{
synchronized(this)
try(SpinLock.Lock lock=_lock.lock())
{
return _initial;
}
@ -621,7 +652,7 @@ public class HttpChannelState
public boolean isSuspended()
{
synchronized(this)
try(SpinLock.Lock lock=_lock.lock())
{
return _state==State.ASYNC_WAIT || _state==State.DISPATCHED && _async==Async.STARTED;
}
@ -629,7 +660,7 @@ public class HttpChannelState
boolean isCompleting()
{
synchronized (this)
try(SpinLock.Lock lock=_lock.lock())
{
return _state==State.COMPLETING;
}
@ -637,7 +668,7 @@ public class HttpChannelState
boolean isCompleted()
{
synchronized (this)
try(SpinLock.Lock lock=_lock.lock())
{
return _state == State.COMPLETED;
}
@ -645,7 +676,7 @@ public class HttpChannelState
public boolean isAsyncStarted()
{
synchronized (this)
try(SpinLock.Lock lock=_lock.lock())
{
if (_state==State.DISPATCHED)
return _async!=null;
@ -656,7 +687,7 @@ public class HttpChannelState
public boolean isAsync()
{
synchronized (this)
try(SpinLock.Lock lock=_lock.lock())
{
return !_initial || _async!=null;
}
@ -675,7 +706,7 @@ public class HttpChannelState
public ContextHandler getContextHandler()
{
final AsyncContextEvent event;
synchronized (this)
try(SpinLock.Lock lock=_lock.lock())
{
event=_event;
}
@ -692,7 +723,7 @@ public class HttpChannelState
public ServletResponse getServletResponse()
{
final AsyncContextEvent event;
synchronized (this)
try(SpinLock.Lock lock=_lock.lock())
{
event=_event;
}
@ -716,30 +747,53 @@ public class HttpChannelState
_channel.getRequest().setAttribute(name,attribute);
}
public void onReadPossible()
public void onReadUnready()
{
boolean handle=false;
synchronized (this)
boolean interested=false;
try(SpinLock.Lock lock=_lock.lock())
{
_asyncRead=true;
if (_state==State.ASYNC_WAIT)
// We were already unready, this is not a state change, so do nothing
if (!_asyncReadUnready)
{
_state=State.ASYNC_WOKEN;
handle=true;
_asyncReadUnready=true;
_asyncReadPossible=false; // Assumes this has been checked in isReady() with lock held
if (_state==State.ASYNC_WAIT)
interested=true;
}
}
// TODO, do we need to execute? or should it be done elsewhere?
if (handle)
_channel.execute(_channel);
if (interested)
_channel.asyncReadFillInterested();
}
public boolean onReadPossible()
{
boolean woken=false;
try(SpinLock.Lock lock=_lock.lock())
{
_asyncReadPossible=true;
if (_state==State.ASYNC_WAIT && _asyncReadUnready)
{
woken=true;
_state=State.ASYNC_WOKEN;
}
}
return woken;
}
public boolean isReadPossible()
{
try(SpinLock.Lock lock=_lock.lock())
{
return _asyncReadPossible;
}
}
public void onWritePossible()
{
boolean handle=false;
synchronized (this)
try(SpinLock.Lock lock=_lock.lock())
{
_asyncWrite=true;
if (_state==State.ASYNC_WAIT)
@ -752,5 +806,5 @@ public class HttpChannelState
if (handle)
_channel.execute(_channel);
}
}

View File

@ -23,10 +23,14 @@ import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException;
import java.util.concurrent.RejectedExecutionException;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http.PreEncodedHttpField;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
@ -43,11 +47,12 @@ import org.eclipse.jetty.util.log.Logger;
*/
public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport, Connection.UpgradeFrom
{
private static final Logger LOG = Log.getLogger(HttpConnection.class);
public static final HttpField CONNECTION_CLOSE = new PreEncodedHttpField(HttpHeader.CONNECTION,HttpHeaderValue.CLOSE.asString());
public static final String UPGRADE_CONNECTION_ATTRIBUTE = "org.eclipse.jetty.server.HttpConnection.UPGRADE";
private static final boolean REQUEST_BUFFER_DIRECT=false;
private static final boolean HEADER_BUFFER_DIRECT=false;
private static final boolean CHUNK_BUFFER_DIRECT=false;
private static final Logger LOG = Log.getLogger(HttpConnection.class);
private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<>();
private final HttpConfiguration _config;
@ -90,8 +95,8 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
_connector = connector;
_bufferPool = _connector.getByteBufferPool();
_generator = newHttpGenerator();
_input = newHttpInput();
_channel = newHttpChannel(_input);
_channel = newHttpChannel();
_input = _channel.getRequest().getHttpInput();
_parser = newHttpParser();
if (LOG.isDebugEnabled())
LOG.debug("New HTTP Connection {}", this);
@ -107,14 +112,9 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
return new HttpGenerator(_config.getSendServerVersion(),_config.getSendXPoweredBy());
}
protected HttpInput newHttpInput()
protected HttpChannelOverHttp newHttpChannel()
{
return new HttpInputOverHTTP(this);
}
protected HttpChannelOverHttp newHttpChannel(HttpInput httpInput)
{
return new HttpChannelOverHttp(this, _connector, _config, getEndPoint(), this, httpInput);
return new HttpChannelOverHttp(this, _connector, _config, getEndPoint(), this);
}
protected HttpParser newHttpParser()
@ -262,22 +262,21 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
/** Fill and parse data looking for content
* @throws IOException
*/
protected boolean parseContent()
protected boolean fillAndParseForContent()
{
if (LOG.isDebugEnabled())
LOG.debug("{} parseContent",this);
boolean handled=false;
while (_parser.inContentState())
{
if (LOG.isDebugEnabled())
LOG.debug("{} parseContent",this);
int filled = fillRequestBuffer();
boolean handle = parseRequestBuffer();
handled|=handle;
if (handle || filled<=0)
if (handle || filled<=0 || _channel.getRequest().getHttpInput().hasContent())
break;
}
return handled;
}
/* ------------------------------------------------------------ */
private int fillRequestBuffer()
@ -545,22 +544,17 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
@Override
public void succeeded()
{
if (parseContent())
_channel.handle(); // TODO this call to handle can be duplicated by HttpInput.addContent calling onReadPossible
if (fillAndParseForContent())
_channel.handle();
else if (!_input.isFinished())
// TODO This may not always be correct. The main use-case is when the asyncReadCallback has succeeded because of
// some data that is not sufficient to produce anything to read (Eg one byte of a chunk header). We can't add
// zero length content because HttpInput.read() cannot return 0 as no bytes read! So instead we just say we are
// fill interested and look for more content. BUT maybe there is a case when this is not needed..... hmmm I think
// this is probably OK as the AsyncReadCallback is only ever used when there is not another thread reading etc.
asyncReadFillInterested();
}
@Override
public void failed(Throwable x)
{
_input.failed(x);
_channel.handle();
if (_input.failed(x))
_channel.handle();
}
}
@ -768,7 +762,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
public void asyncReadFillInterested()
{
getEndPoint().fillInterested(_asyncReadCallback);
}
public void blockingReadFillInterested()

View File

@ -28,6 +28,7 @@ import javax.servlet.ServletInputStream;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@ -41,29 +42,29 @@ import org.eclipse.jetty.util.log.Logger;
* maintains two states: the content state that tells whether there is content to consume and the EOF
* state that tells whether an EOF has arrived.
* Only once the content has been consumed the content state is moved to the EOF state.
*
*/
public abstract class HttpInput extends ServletInputStream implements Runnable
public class HttpInput extends ServletInputStream implements Runnable
{
private final static Logger LOG = Log.getLogger(HttpInput.class);
private final static Content EOF_CONTENT = new PoisonPillContent("EOF");
private final static Content EARLY_EOF_CONTENT = new PoisonPillContent("EARLY_EOF");
private final static Content EOF_CONTENT = new EofContent("EOF");
private final static Content EARLY_EOF_CONTENT = new EofContent("EARLY_EOF");
private final byte[] _oneByteBuffer = new byte[1];
private final ArrayQueue<Content> _inputQ = new ArrayQueue<>();
private final HttpChannelState _channelState;
private ReadListener _listener;
private boolean _unready;
private State _state = STREAM;
private long _contentConsumed;
public HttpInput()
public HttpInput(HttpChannelState state)
{
_channelState=state;
}
protected abstract void onReadPossible();
public Object lock()
protected HttpChannelState getHttpChannelState()
{
return _inputQ;
return _channelState;
}
public void recycle()
@ -77,7 +78,6 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
item = _inputQ.pollUnsafe();
}
_listener = null;
_unready = false;
_state = STREAM;
_contentConsumed = 0;
}
@ -86,6 +86,8 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
@Override
public int available()
{
int available=0;
boolean woken=false;
synchronized (_inputQ)
{
Content content = _inputQ.peekUnsafe();
@ -97,17 +99,27 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
}
catch(IOException e)
{
failed(e);
woken=failed(e);
}
content = _inputQ.peekUnsafe();
}
if (content!=null)
return remaining(content);
return 0;
available= remaining(content);
}
if (woken)
wake();
return available;
}
private void wake()
{
// TODO review if this is correct
_channelState.getHttpChannel().getConnector().getExecutor().execute(_channelState.getHttpChannel());
}
@Override
public int read() throws IOException
{
@ -127,7 +139,12 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
Content item = nextContent();
if (item!=null)
{
if (LOG.isDebugEnabled())
LOG.debug("{} read {} from {}",this,len,item);
int l = get(item, b, off, len);
consumeNonContent();
return l;
}
@ -157,8 +174,6 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
*/
protected Content nextContent() throws IOException
{
if (!Thread.holdsLock(_inputQ))
throw new IllegalStateException();
Content content = pollContent();
if (content==null && !isFinished())
{
@ -175,9 +190,6 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
*/
protected Content pollContent()
{
if (!Thread.holdsLock(_inputQ))
throw new IllegalStateException();
// Items are removed only when they are fully consumed.
Content content = _inputQ.peekUnsafe();
// Skip consumed items at the head of the queue.
@ -195,7 +207,10 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
else
{
_state=AEOF;
onReadPossible();
_channelState.onReadUnready();
boolean woken = _channelState.onReadPossible(); // force callback
if (woken)
wake();
}
}
else if (content==EARLY_EOF_CONTENT)
@ -207,6 +222,28 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
return content;
}
/**
*/
protected void consumeNonContent()
{
// Items are removed only when they are fully consumed.
Content content = _inputQ.peekUnsafe();
// Skip consumed items at the head of the queue.
while (content != null && remaining(content) == 0)
{
// Defer EOF until read
if (content instanceof EofContent)
break;
// Consume all other empty content
_inputQ.pollUnsafe();
content.succeeded();
if (LOG.isDebugEnabled())
LOG.debug("{} consumed {}", this, content);
content = _inputQ.peekUnsafe();
}
}
/**
* Get the next readable from the inputQ, calling {@link #produceContent()}
* if need be. EOF is NOT processed and state is not changed.
@ -216,8 +253,6 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
*/
protected Content nextReadable() throws IOException
{
if (!Thread.holdsLock(_inputQ))
throw new IllegalStateException();
Content content = pollReadable();
if (content==null && !isFinished())
{
@ -234,9 +269,6 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
*/
protected Content pollReadable()
{
if (!Thread.holdsLock(_inputQ))
throw new IllegalStateException();
// Items are removed only when they are fully consumed.
Content content = _inputQ.peekUnsafe();
@ -279,8 +311,6 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
int l = Math.min(content.remaining(), length);
content.getContent().get(buffer, offset, l);
_contentConsumed+=l;
if (l>0 && !content.hasContent())
pollContent(); // hungry succeed
return l;
}
@ -309,8 +339,6 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
*/
protected void blockForContent() throws IOException
{
if (!Thread.holdsLock(_inputQ))
throw new IllegalStateException();
try
{
if (LOG.isDebugEnabled())
@ -328,30 +356,36 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
*
* @param content the content to add
*/
public void addContent(Content item)
public boolean addContent(Content item)
{
boolean call_on_read_possible=false;
boolean woken=false;
synchronized (_inputQ)
{
boolean wasEmpty = _inputQ.isEmpty();
_inputQ.add(item);
if (LOG.isDebugEnabled())
LOG.debug("{} queued {}", this, item);
LOG.debug("{} addContent {}", this, item);
if (wasEmpty)
if (wasEmpty) // TODO do we need this guard?
{
if (_listener==null)
_inputQ.notify();
else
call_on_read_possible = _unready;
woken=_channelState.onReadPossible();
}
}
// TODO currently this is an active method that can dispatch a call to HttpChannel.handle. This can be duplicated by the AsyncReadCallback!
if (call_on_read_possible)
onReadPossible();
return woken;
}
public boolean hasContent()
{
synchronized (_inputQ)
{
return !_inputQ.isEmpty();
}
}
public void unblock()
{
synchronized (_inputQ)
@ -375,18 +409,18 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
* Typically this will result in an EOFException being thrown
* from a subsequent read rather than a -1 return.
*/
public void earlyEOF()
public boolean earlyEOF()
{
addContent(EARLY_EOF_CONTENT);
return addContent(EARLY_EOF_CONTENT);
}
/**
* This method should be called to signal that all the expected
* content arrived.
*/
public void eof()
public boolean eof()
{
addContent(EOF_CONTENT);
return addContent(EOF_CONTENT);
}
public boolean consumeAll()
@ -448,16 +482,13 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
{
if (_listener == null )
return true;
if (_unready)
return false;
if (_state instanceof EOFState)
return true;
if (nextReadable()!=null)
return true;
_unready = true;
_channelState.onReadUnready();
}
unready();
return false;
}
catch(IOException e)
@ -467,10 +498,6 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
}
}
protected void unready()
{
}
@Override
public void setReadListener(ReadListener readListener)
{
@ -484,7 +511,7 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
throw new IllegalStateException("state=" + _state);
_state = ASYNC;
_listener = readListener;
_unready = true;
_channelState.onReadUnready();
content=nextContent()!=null;
}
}
@ -493,15 +520,16 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
throw new RuntimeIOException(e);
}
if (content)
onReadPossible();
else
unready();
boolean woken = content && _channelState.onReadPossible();
// TODO something with woken?
if (woken)
throw new IllegalStateException("How do we wake?");
}
public void failed(Throwable x)
public boolean failed(Throwable x)
{
boolean call_on_read_possible=false;
boolean woken=false;
synchronized (_inputQ)
{
if (_state instanceof ErrorState)
@ -512,14 +540,20 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
if (_listener==null)
_inputQ.notify();
else
call_on_read_possible=true;
woken=_channelState.onReadPossible();
}
if (call_on_read_possible)
onReadPossible();
return woken;
}
/* ------------------------------------------------------------ */
/*
* <p>
* While this class is-a Runnable, it should never be dispatched in it's own thread. It is a
* runnable only so that the calling thread can use {@link ContextHandler#handle(Runnable)}
* to setup classloaders etc.
* </p>
*/
@Override
public void run()
{
@ -537,22 +571,22 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
_state=EOF;
aeof=true;
}
else if (!_unready)
return;
listener = _listener;
error = _state instanceof ErrorState?((ErrorState)_state).getError():null;
_unready=false;
}
try
{
if (aeof)
if (error!=null)
{
_channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE); // TODO ???
listener.onError(error);
}
else if (aeof)
listener.onAllDataRead();
else if (error == null)
listener.onDataAvailable();
else
listener.onError(error);
}
catch (Throwable e)
{
@ -561,7 +595,10 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
try
{
if (aeof || error==null)
{
_channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE); // TODO ???
listener.onError(e);
}
}
catch (Throwable e2)
{
@ -588,6 +625,14 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
}
}
public static class EofContent extends PoisonPillContent
{
EofContent(String name)
{
super(name);
}
}
public static class Content extends Callback.Adapter
{
private final ByteBuffer _content;
@ -611,6 +656,12 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
{
return _content.remaining();
}
@Override
public String toString()
{
return String.format("Content@%x{%s}",hashCode(),BufferUtil.toDetailString(_content));
}
}
@ -722,4 +773,5 @@ public abstract class HttpInput extends ServletInputStream implements Runnable
return "AEOF";
}
};
}

View File

@ -22,35 +22,21 @@ import java.io.IOException;
public class HttpInputOverHTTP extends HttpInput
{
private final HttpConnection _httpConnection;
public HttpInputOverHTTP(HttpConnection httpConnection)
public HttpInputOverHTTP(HttpChannelState state)
{
_httpConnection = httpConnection;
super(state);
}
@Override
protected void produceContent() throws IOException
{
_httpConnection.parseContent();
((HttpConnection)getHttpChannelState().getHttpChannel().getEndPoint().getConnection()).fillAndParseForContent();
}
@Override
protected void blockForContent() throws IOException
{
_httpConnection.blockingReadFillInterested();
((HttpConnection)getHttpChannelState().getHttpChannel().getEndPoint().getConnection()).blockingReadFillInterested();
super.blockForContent();
}
@Override
protected void unready()
{
_httpConnection.asyncReadFillInterested();
}
@Override
protected void onReadPossible()
{
_httpConnection.getHttpChannel().getState().onReadPossible();
}
}

View File

@ -98,9 +98,9 @@ public class ExtendedServerTest extends HttpServerTestBase
}
@Override
protected HttpChannelOverHttp newHttpChannel(HttpInput httpInput)
protected HttpChannelOverHttp newHttpChannel()
{
return new HttpChannelOverHttp(this, getConnector(), getHttpConfiguration(), getEndPoint(), this, httpInput)
return new HttpChannelOverHttp(this, getConnector(), getHttpConfiguration(), getEndPoint(), this)
{
@Override
public boolean startRequest(String method, String uri, HttpVersion version)

View File

@ -104,14 +104,30 @@ public class HttpInputTest
@Before
public void before()
{
_in=new HttpInput()
_in=new HttpInput(new HttpChannelState(new HttpChannel(null,new HttpConfiguration(),null,null)
{
@Override
protected void onReadPossible()
{
_history.add("onReadPossible");
public void asyncReadFillInterested()
{
_history.add("asyncReadFillInterested");
}
})
{
@Override
public void onReadUnready()
{
_history.add("unready");
super.onReadUnready();
}
@Override
public boolean onReadPossible()
{
_history.add("onReadPossible");
return super.onReadPossible();
}
})
{
@Override
protected void produceContent() throws IOException
{
@ -132,12 +148,6 @@ public class HttpInputTest
_history.add("blockForContent");
super.blockForContent();
}
@Override
protected void unready()
{
_history.add("unready");
}
};
}
@ -174,6 +184,7 @@ public class HttpInputTest
assertThat(_in.read(),equalTo((int)'A'));
assertThat(_in.getContentConsumed(),equalTo(1L));
assertThat(_in.read(),equalTo((int)'B'));
assertThat(_in.getContentConsumed(),equalTo(2L));
assertThat(_history.poll(),equalTo("Content succeeded AB"));
assertThat(_history.poll(),nullValue());
@ -252,9 +263,9 @@ public class HttpInputTest
assertThat(_in.read(),equalTo((int)'C'));
assertThat(_in.isFinished(),equalTo(false));
assertThat(_in.read(),equalTo((int)'D'));
assertThat(_in.isFinished(),equalTo(true));
assertThat(_history.poll(),equalTo("Content succeeded CD"));
assertThat(_history.poll(),nullValue());
assertThat(_in.isFinished(),equalTo(false));
assertThat(_in.read(),equalTo(-1));
assertThat(_in.isFinished(),equalTo(true));
@ -327,8 +338,8 @@ public class HttpInputTest
public void testAsyncEmpty() throws Exception
{
_in.setReadListener(_listener);
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),nullValue());
_in.run();
@ -341,6 +352,8 @@ public class HttpInputTest
assertThat(_history.poll(),nullValue());
assertThat(_in.isReady(),equalTo(false));
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),nullValue());
}
@ -349,8 +362,8 @@ public class HttpInputTest
public void testAsyncRead() throws Exception
{
_in.setReadListener(_listener);
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),nullValue());
_in.run();
@ -382,6 +395,7 @@ public class HttpInputTest
assertThat(_in.isReady(),equalTo(true));
assertThat(_history.poll(),equalTo("produceContent 1"));
assertThat(_history.poll(),equalTo("onReadPossible"));
assertThat(_history.poll(),nullValue());
assertThat(_in.read(),equalTo((int)'C'));
@ -402,8 +416,8 @@ public class HttpInputTest
public void testAsyncEOF() throws Exception
{
_in.setReadListener(_listener);
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),nullValue());
_in.run();
@ -413,10 +427,12 @@ public class HttpInputTest
_in.eof();
assertThat(_in.isReady(),equalTo(true));
assertThat(_in.isFinished(),equalTo(false));
assertThat(_history.poll(),equalTo("onReadPossible"));
assertThat(_history.poll(),nullValue());
assertThat(_in.read(),equalTo(-1));
assertThat(_in.isFinished(),equalTo(true));
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),equalTo("onReadPossible"));
assertThat(_history.poll(),nullValue());
}
@ -425,8 +441,8 @@ public class HttpInputTest
public void testAsyncReadEOF() throws Exception
{
_in.setReadListener(_listener);
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),nullValue());
_in.run();
@ -460,11 +476,13 @@ public class HttpInputTest
assertThat(_in.isFinished(),equalTo(false));
assertThat(_in.isReady(),equalTo(true));
assertThat(_history.poll(),equalTo("produceContent 1"));
assertThat(_history.poll(),equalTo("onReadPossible"));
assertThat(_history.poll(),nullValue());
assertThat(_in.isFinished(),equalTo(false));
assertThat(_in.read(),equalTo(-1));
assertThat(_in.isFinished(),equalTo(true));
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),equalTo("onReadPossible"));
assertThat(_history.poll(),nullValue());
@ -478,8 +496,8 @@ public class HttpInputTest
public void testAsyncError() throws Exception
{
_in.setReadListener(_listener);
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),equalTo("unready"));
assertThat(_history.poll(),equalTo("produceContent 0"));
assertThat(_history.poll(),nullValue());
_in.run();
assertThat(_history.poll(),equalTo("onDataAvailable"));

View File

@ -1390,6 +1390,7 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
{
int point = points[j];
// System.err.println("write: "+new String(bytes, last, point - last));
os.write(bytes, last, point - last);
last = point;
os.flush();
@ -1400,6 +1401,7 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
}
// Write the last fragment
// System.err.println("Write: "+new String(bytes, last, bytes.length - last));
os.write(bytes, last, bytes.length - last);
os.flush();
Thread.sleep(PAUSE);

View File

@ -1262,7 +1262,7 @@ public class RequestTest
{
long start=System.currentTimeMillis();
String response = _connector.getResponses(request);
assertTrue(response.contains("Form too many keys"));
assertThat(response,Matchers.containsString("Form too many keys"));
long now=System.currentTimeMillis();
assertTrue((now-start)<5000);
}

View File

@ -132,6 +132,7 @@ public class AsyncIOServletTest
Assert.assertThat("onError message",t.getMessage(),is(throwable.getMessage()));
latch.countDown();
response.setStatus(500);
asyncContext.complete();
}
});
@ -147,6 +148,7 @@ public class AsyncIOServletTest
try (Socket client = new Socket("localhost", connector.getLocalPort()))
{
client.setSoTimeout(5000);
OutputStream output = client.getOutputStream();
output.write(request.getBytes("UTF-8"));
output.flush();
@ -217,6 +219,7 @@ public class AsyncIOServletTest
try (Socket client = new Socket("localhost", connector.getLocalPort()))
{
client.setSoTimeout(5000);
OutputStream output = client.getOutputStream();
output.write(request.getBytes("UTF-8"));
output.flush();
@ -508,6 +511,91 @@ public class AsyncIOServletTest
}
@Test
public void testOnAllDataRead() throws Exception
{
String text = "X";
final byte[] data = text.getBytes(StandardCharsets.ISO_8859_1);
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException
{
response.flushBuffer();
final AsyncContext async = request.startAsync();
async.setTimeout(5000);
final ServletInputStream in = request.getInputStream();
final ServletOutputStream out = response.getOutputStream();
in.setReadListener(new ReadListener()
{
@Override
public void onError(Throwable t)
{
t.printStackTrace();
async.complete();
}
@Override
public void onDataAvailable() throws IOException
{
try
{
Thread.sleep(1000);
if (!in.isReady())
throw new IllegalStateException();
if (in.read()!='X')
throw new IllegalStateException();
if (!in.isReady())
throw new IllegalStateException();
if (in.read()!=-1)
throw new IllegalStateException();
}
catch(Exception e)
{
e.printStackTrace();
}
}
@Override
public void onAllDataRead() throws IOException
{
out.write("OK\n".getBytes(StandardCharsets.ISO_8859_1));
async.complete();
}
});
}
});
String request = "GET " + path + " HTTP/1.1\r\n" +
"Host: localhost:" + connector.getLocalPort() + "\r\n" +
"Content-Type: text/plain\r\n"+
"Content-Length: "+data.length+"\r\n" +
"Connection: close\r\n" +
"\r\n";
try (Socket client = new Socket("localhost", connector.getLocalPort()))
{
client.setSoTimeout(5000);
OutputStream output = client.getOutputStream();
output.write(request.getBytes("UTF-8"));
output.flush();
Thread.sleep(100);
output.write(data);
output.flush();
BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
String line=in.readLine();
assertThat(line, containsString("200 OK"));
while (line.length()>0)
line=in.readLine();
line=in.readLine();
assertThat(line, containsString("OK"));
}
}
@Test
public void testOtherThreadOnAllDataRead() throws Exception
{
@ -522,6 +610,7 @@ public class AsyncIOServletTest
response.flushBuffer();
final AsyncContext async = request.startAsync();
async.setTimeout(5000);
final ServletInputStream in = request.getInputStream();
final ServletOutputStream out = response.getOutputStream();
@ -580,6 +669,7 @@ public class AsyncIOServletTest
try (Socket client = new Socket("localhost", connector.getLocalPort()))
{
client.setSoTimeout(5000);
OutputStream output = client.getOutputStream();
output.write(request.getBytes("UTF-8"));
output.flush();

View File

@ -404,19 +404,18 @@ public class AsyncServletIOTest
if (!onDataAvailable.compareAndSet(false,true))
throw new IllegalStateException();
// System.err.println("ODA");
//System.err.println("ODA");
while (in.isReady() && !in.isFinished())
{
_oda.incrementAndGet();
int len=in.read(_buf);
// System.err.println("read "+len);
//System.err.println("read "+len);
if (len>0)
_read.addAndGet(len);
}
if (!onDataAvailable.compareAndSet(true,false))
throw new IllegalStateException();
}
@Override

View File

@ -952,10 +952,7 @@ public class BufferUtil
StringBuilder buf = new StringBuilder();
buf.append(buffer.getClass().getSimpleName());
buf.append("@");
if (buffer.hasArray())
buf.append(Integer.toHexString(Arrays.hashCode(buffer.array())));
else
buf.append(Integer.toHexString(buf.hashCode()));
buf.append(Integer.toHexString(System.identityHashCode(buffer)));
buf.append("[p=");
buf.append(buffer.position());
buf.append(",l=");

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.util.thread;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/* ------------------------------------------------------------ */
@ -38,15 +39,18 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class SpinLock
{
private final AtomicBoolean _lock = new AtomicBoolean(false);
private final AtomicReference<Thread> _lock = new AtomicReference<>(null);
private final Lock _unlock = new Lock();
public Lock lock()
{
Thread thread = Thread.currentThread();
while(true)
{
if (!_lock.compareAndSet(false,true))
if (!_lock.compareAndSet(null,thread))
{
if (_lock.get()==thread)
throw new IllegalStateException("SpinLock is not reentrant");
continue;
}
return _unlock;
@ -55,7 +59,12 @@ public class SpinLock
public boolean isLocked()
{
return _lock.get();
return _lock.get()!=null;
}
public boolean isLockedThread()
{
return _lock.get()==Thread.currentThread();
}
public class Lock implements AutoCloseable
@ -63,8 +72,7 @@ public class SpinLock
@Override
public void close()
{
if (!_lock.compareAndSet(true,false))
throw new IllegalStateException();
_lock.set(null);
}
}
}