SpinLock cleanup of HttpExchange

This commit is contained in:
Greg Wilkins 2015-03-05 18:32:17 +11:00
parent 9b307bc0ed
commit e8b6902b16
3 changed files with 293 additions and 88 deletions

View File

@ -19,29 +19,29 @@
package org.eclipse.jetty.client;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.SpinLock;
public class HttpExchange
{
private static final Logger LOG = Log.getLogger(HttpExchange.class);
private final AtomicBoolean requestComplete = new AtomicBoolean();
private final AtomicBoolean responseComplete = new AtomicBoolean();
private final AtomicInteger complete = new AtomicInteger();
private final AtomicReference<HttpChannel> channel = new AtomicReference<>();
private final HttpDestination destination;
private final HttpRequest request;
private final List<Response.ResponseListener> listeners;
private final HttpResponse response;
private volatile Throwable requestFailure;
private volatile Throwable responseFailure;
private final SpinLock _lock = new SpinLock();
private Boolean requestTerminated;
private Boolean responseTerminated;
private Throwable requestFailure;
private Throwable responseFailure;
public HttpExchange(HttpDestination destination, HttpRequest request, List<Response.ResponseListener> listeners)
{
@ -66,7 +66,10 @@ public class HttpExchange
public Throwable getRequestFailure()
{
return requestFailure;
try(SpinLock.Lock lock = _lock.lock())
{
return requestFailure;
}
}
public List<Response.ResponseListener> getResponseListeners()
@ -81,7 +84,10 @@ public class HttpExchange
public Throwable getResponseFailure()
{
return responseFailure;
try(SpinLock.Lock lock = _lock.lock())
{
return responseFailure;
}
}
public void associate(HttpChannel channel)
@ -98,88 +104,51 @@ public class HttpExchange
public boolean requestComplete()
{
return requestComplete.compareAndSet(false, true);
try(SpinLock.Lock lock = _lock.lock())
{
if (requestTerminated!=null)
return false;
requestTerminated=Boolean.FALSE;
return true;
}
}
public boolean responseComplete()
{
return responseComplete.compareAndSet(false, true);
try(SpinLock.Lock lock = _lock.lock())
{
if (responseTerminated!=null)
return false;
responseTerminated=Boolean.FALSE;
return true;
}
}
public Result terminateRequest(Throwable failure)
{
int requestSuccess = 0b0011;
int requestFailure = 0b0001;
return terminate(failure == null ? requestSuccess : requestFailure, failure);
}
public Result terminateResponse(Throwable failure)
{
if (failure == null)
try(SpinLock.Lock lock = _lock.lock())
{
int responseSuccess = 0b1100;
return terminate(responseSuccess, null);
}
else
{
proceed(failure);
int responseFailure = 0b0100;
return terminate(responseFailure, failure);
}
}
/**
* This method needs to atomically compute whether this exchange is completed,
* that is both request and responses are completed (either with a success or
* a failure).
*
* Furthermore, this method needs to atomically compute whether the exchange
* has completed successfully (both request and response are successful) or not.
*
* To do this, we use 2 bits for the request (one to indicate completion, one
* to indicate success), and similarly for the response.
* By using {@link AtomicInteger} to atomically sum these codes we can know
* whether the exchange is completed and whether is successful.
*
* @return the {@link Result} - if any - associated with the status
*/
private Result terminate(int code, Throwable failure)
{
int current = update(code, failure);
int terminated = 0b0101;
if ((current & terminated) == terminated)
{
// Request and response terminated
if (LOG.isDebugEnabled())
LOG.debug("{} terminated", this);
return new Result(getRequest(), getRequestFailure(), getResponse(), getResponseFailure());
requestTerminated=Boolean.TRUE;
requestFailure=failure;
if (Boolean.TRUE.equals(responseTerminated))
return new Result(getRequest(), requestFailure, getResponse(), responseFailure);
}
return null;
}
private int update(int code, Throwable failure)
public Result terminateResponse(Throwable failure)
{
while (true)
try(SpinLock.Lock lock = _lock.lock())
{
int current = complete.get();
boolean updateable = (current & code) == 0;
if (updateable)
{
int candidate = current | code;
if (!complete.compareAndSet(current, candidate))
continue;
current = candidate;
if ((code & 0b01) == 0b01)
requestFailure = failure;
if ((code & 0b0100) == 0b0100)
responseFailure = failure;
if (LOG.isDebugEnabled())
LOG.debug("{} updated", this);
}
return current;
responseTerminated=Boolean.TRUE;
responseFailure=failure;
if (Boolean.TRUE.equals(requestTerminated))
return new Result(getRequest(), requestFailure, getResponse(), responseFailure);
}
return null;
}
public boolean abort(Throwable cause)
{
if (destination.remove(this))
@ -203,7 +172,24 @@ public class HttpExchange
private boolean fail(Throwable cause)
{
if (update(0b0101, cause) == 0b0101)
boolean notify=false;
try(SpinLock.Lock lock = _lock.lock())
{
if (!Boolean.TRUE.equals(requestTerminated))
{
requestTerminated=Boolean.TRUE;
notify=true;
requestFailure=cause;
}
if (!Boolean.TRUE.equals(responseTerminated))
{
responseTerminated=Boolean.TRUE;
notify=true;
responseFailure=cause;
}
}
if (notify)
{
if (LOG.isDebugEnabled())
LOG.debug("Failing {}: {}", this, cause);
@ -222,8 +208,11 @@ public class HttpExchange
public void resetResponse()
{
responseComplete.set(false);
complete.addAndGet(-0b1100);
try(SpinLock.Lock lock = _lock.lock())
{
responseTerminated=null;
responseFailure=null;
}
}
public void proceed(Throwable failure)
@ -233,20 +222,16 @@ public class HttpExchange
channel.proceed(this, failure);
}
private String toString(int code)
{
String padding = "0000";
String status = Integer.toBinaryString(code);
return String.format("%s@%x status=%s%s",
HttpExchange.class.getSimpleName(),
hashCode(),
padding.substring(status.length()),
status);
}
@Override
public String toString()
{
return toString(complete.get());
try(SpinLock.Lock lock = _lock.lock())
{
return String.format("%s@%x req=%s/%s res=%s/%s",
HttpExchange.class.getSimpleName(),
hashCode(),
requestTerminated,requestFailure,
responseTerminated,responseFailure);
}
}
}

View File

@ -0,0 +1,78 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util.thread;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/* ------------------------------------------------------------ */
/** Spin Lock
* <p>This is a lock designed to protect VERY short sections of
* critical code. Threads attempting to take the lock will spin
* forever until the lock is available, thus it is important that
* the code protected by this lock is extremely simple and non
* blocking. The reason for this lock is that it prevents a thread
* from giving up a CPU core when contending for the lock.</p>
* <pre>
* try(SpinLock.Lock lock = spinlock.lock())
* {
* // something very quick and non blocking
* }
* </pre>
*/
public class SpinLock
{
private final AtomicReference<Thread> _lock = new AtomicReference<>(null);
private final Lock _unlock = new Lock();
public Lock lock()
{
Thread thread = Thread.currentThread();
while(true)
{
if (!_lock.compareAndSet(null,thread))
{
if (_lock.get()==thread)
throw new IllegalStateException("SpinLock is not reentrant");
continue;
}
return _unlock;
}
}
public boolean isLocked()
{
return _lock.get()!=null;
}
public boolean isLockedThread()
{
return _lock.get()==Thread.currentThread();
}
public class Lock implements AutoCloseable
{
@Override
public void close()
{
_lock.set(null);
}
}
}

View File

@ -0,0 +1,142 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util.thread;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
public class SpinLockTest
{
@Test
public void testLocked()
{
SpinLock lock = new SpinLock();
assertFalse(lock.isLocked());
try(SpinLock.Lock l = lock.lock())
{
assertTrue(lock.isLocked());
}
finally
{
assertFalse(lock.isLocked());
}
assertFalse(lock.isLocked());
}
@Test
public void testLockedException()
{
SpinLock lock = new SpinLock();
assertFalse(lock.isLocked());
try(SpinLock.Lock l = lock.lock())
{
assertTrue(lock.isLocked());
throw new Exception();
}
catch(Exception e)
{
assertFalse(lock.isLocked());
}
finally
{
assertFalse(lock.isLocked());
}
assertFalse(lock.isLocked());
}
@Test
public void testContend() throws Exception
{
final SpinLock lock = new SpinLock();
final CountDownLatch held0 = new CountDownLatch(1);
final CountDownLatch hold0 = new CountDownLatch(1);
Thread thread0 = new Thread()
{
@Override
public void run()
{
try(SpinLock.Lock l = lock.lock())
{
held0.countDown();
hold0.await();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
};
thread0.start();
held0.await();
assertTrue(lock.isLocked());
final CountDownLatch held1 = new CountDownLatch(1);
final CountDownLatch hold1 = new CountDownLatch(1);
Thread thread1 = new Thread()
{
@Override
public void run()
{
try(SpinLock.Lock l = lock.lock())
{
held1.countDown();
hold1.await();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
};
thread1.start();
// thread1 will be spinning here
assertFalse(held1.await(100,TimeUnit.MILLISECONDS));
// Let thread0 complete
hold0.countDown();
thread0.join();
// thread1 can progress
held1.await();
// let thread1 complete
hold1.countDown();
thread1.join();
assertFalse(lock.isLocked());
}
}