Merge remote-tracking branch 'origin/jetty-9.4.x'

This commit is contained in:
Greg Wilkins 2017-11-14 18:15:55 +01:00
commit bb0f06fecc
12 changed files with 361 additions and 257 deletions

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.client.http;
import java.util.Locale;
import java.util.concurrent.atomic.LongAdder;
import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpExchange;
@ -39,6 +40,8 @@ public class HttpChannelOverHTTP extends HttpChannel
private final HttpConnectionOverHTTP connection;
private final HttpSenderOverHTTP sender;
private final HttpReceiverOverHTTP receiver;
private final LongAdder inMessages = new LongAdder();
private final LongAdder outMessages = new LongAdder();
public HttpChannelOverHTTP(HttpConnectionOverHTTP connection)
{
@ -80,7 +83,10 @@ public class HttpChannelOverHTTP extends HttpChannel
{
HttpExchange exchange = getHttpExchange();
if (exchange != null)
sender.send(exchange);
{
outMessages.increment();
sender.send( exchange );
}
}
@Override
@ -127,6 +133,7 @@ public class HttpChannelOverHTTP extends HttpChannel
public void receive()
{
inMessages.increment();
receiver.receive();
}
@ -180,6 +187,16 @@ public class HttpChannelOverHTTP extends HttpChannel
}
}
protected long getMessagesIn()
{
return inMessages.longValue();
}
protected long getMessagesOut()
{
return outMessages.longValue();
}
@Override
public String toString()
{

View File

@ -23,6 +23,7 @@ import java.nio.channels.AsynchronousCloseException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
@ -49,6 +50,9 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
private final HttpChannelOverHTTP channel;
private long idleTimeout;
private final LongAdder bytesIn = new LongAdder();
private final LongAdder bytesOut = new LongAdder();
public HttpConnectionOverHTTP(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
{
super(endPoint, destination.getHttpClient().getExecutor());
@ -72,6 +76,41 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
return (HttpDestinationOverHTTP)delegate.getHttpDestination();
}
@Override
public long getBytesIn()
{
return bytesIn.longValue();
}
protected void addBytesIn(long bytesIn)
{
this.bytesIn.add(bytesIn);
}
@Override
public long getBytesOut()
{
return bytesOut.longValue();
}
protected void addBytesOut(long bytesOut)
{
this.bytesOut.add(bytesOut);
}
@Override
public long getMessagesIn()
{
return getHttpChannel().getMessagesIn();
}
@Override
public long getMessagesOut()
{
return getHttpChannel().getMessagesOut();
}
@Override
public void send(Request request, Response.CompleteListener listener)
{

View File

@ -128,6 +128,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
if (read > 0)
{
connection.addBytesIn(read);
if (parse())
return;
}

View File

@ -59,7 +59,7 @@ public class HttpSenderOverHTTP extends HttpSender
{
try
{
new HeadersCallback(exchange, content, callback).iterate();
new HeadersCallback(exchange, content, callback, getHttpChannel().getHttpConnection()).iterate();
}
catch (Throwable x)
{
@ -191,17 +191,19 @@ public class HttpSenderOverHTTP extends HttpSender
private final HttpExchange exchange;
private final Callback callback;
private final MetaData.Request metaData;
private final HttpConnectionOverHTTP httpConnectionOverHTTP;
private ByteBuffer headerBuffer;
private ByteBuffer chunkBuffer;
private ByteBuffer contentBuffer;
private boolean lastContent;
private boolean generated;
public HeadersCallback(HttpExchange exchange, HttpContent content, Callback callback)
public HeadersCallback(HttpExchange exchange, HttpContent content, Callback callback, HttpConnectionOverHTTP httpConnectionOverHTTP)
{
super(false);
this.exchange = exchange;
this.callback = callback;
this.httpConnectionOverHTTP = httpConnectionOverHTTP;
HttpRequest request = exchange.getRequest();
ContentProvider requestContent = request.getContent();
@ -258,6 +260,11 @@ public class HttpSenderOverHTTP extends HttpSender
chunkBuffer = BufferUtil.EMPTY_BUFFER;
if (contentBuffer == null)
contentBuffer = BufferUtil.EMPTY_BUFFER;
httpConnectionOverHTTP.addBytesOut( BufferUtil.length(headerBuffer)
+ BufferUtil.length(chunkBuffer)
+ BufferUtil.length(contentBuffer));
endPoint.write(this, headerBuffer, chunkBuffer, contentBuffer);
generated = true;
return Action.SCHEDULED;

View File

@ -58,7 +58,6 @@ import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;
public class ManagedSelector extends ContainerLifeCycle implements Dumpable
{
private static final Logger LOG = Log.getLogger(ManagedSelector.class);
private static final long MAX_ACTION_PERIOD_MS = Long.getLong("org.eclipse.jetty.io.ManagedSelector.MAX_ACTION_PERIOD_MS",100);
private final Locker _locker = new Locker();
private boolean _selecting = false;
@ -67,7 +66,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
private final int _id;
private final ExecutionStrategy _strategy;
private Selector _selector;
private long _actionTime = -1;
private int _actionCount;
public ManagedSelector(SelectorManager selectorManager, int id)
{
@ -75,7 +74,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
_id = id;
SelectorProducer producer = new SelectorProducer();
Executor executor = selectorManager.getExecutor();
_strategy = new EatWhatYouKill(producer,executor,_selectorManager.getBean(ReservedThreadExecutor.class));
_strategy = new EatWhatYouKill(producer,executor,_selectorManager.getBean(ReservedThreadExecutor.class));
addBean(_strategy,true);
setStopTimeout(5000);
}
@ -219,6 +218,14 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
submit(new DestroyEndPoint(endPoint));
}
private int getActionSize()
{
try (Locker.Lock lock = _locker.lock())
{
return _actions.size();
}
}
@Override
public String dump()
{
@ -249,11 +256,12 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
public String toString()
{
Selector selector = _selector;
return String.format("%s id=%s keys=%d selected=%d",
return String.format("%s id=%s keys=%d selected=%d actions=%d",
super.toString(),
_id,
selector != null && selector.isOpen() ? selector.keys().size() : -1,
selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1,
getActionSize());
}
/**
@ -304,7 +312,6 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
private Runnable nextAction()
{
long now = System.nanoTime();
Selector selector = null;
Runnable action = null;
try (Locker.Lock lock = _locker.lock())
@ -312,38 +319,48 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
// It is important to avoid live-lock (busy blocking) here. If too many actions
// are submitted, this can indefinitely defer selection happening. Similarly if
// we give too much priority to selection, it may prevent actions from being run.
// The solution implemented here is to put a maximum time limit on handling actions
// so that this method will fall through to selection if more than MAX_ACTION_PERIOD_MS
// is spent running actions. The time period is cleared whenever a selection occurs,
// so that a full period can be spent on actions after every select.
if (_actionTime == -1)
{
_actionTime = now;
// The solution implemented here is to only process the number of actions that were
// originally in the action queue before attempting a select
if (_actionCount==0)
{
// Calculate how many actions we are prepared to handle before selection
_actionCount = _actions.size();
if (_actionCount>0)
action = _actions.poll();
else
_selecting = true;
}
else if ((now - _actionTime) > TimeUnit.MILLISECONDS.toNanos(MAX_ACTION_PERIOD_MS) && _actions.size() > 0)
else if (_actionCount==1)
{
// Too much time spent handling actions, give selection a go,
// immediately waking up (as if remaining action were just added).
selector = _selector;
_selecting = false;
_actionTime = -1;
_actionCount = 0;
if (LOG.isDebugEnabled())
LOG.debug("Forcing selection, actions={}",_actions.size());
}
if (selector == null)
{
action = _actions.poll();
if (action == null)
if (_actions.size()==0)
{
// No more actions, so we time to do some selecting
// This was the last action, so select normally
_selecting = true;
_actionTime = -1;
}
else
{
// there are still more actions to handle, so
// immediately wake up (as if remaining action were just added).
selector = _selector;
_selecting = false;
}
}
else
{
_actionCount--;
action = _actions.poll();
}
}
if (LOG.isDebugEnabled())
LOG.debug("action={} wakeup={}",action,selector!=null);
if (selector != null)
selector.wakeup();

View File

@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import org.eclipse.jetty.http.HttpCompliance;
import org.eclipse.jetty.http.HttpField;
@ -69,6 +70,8 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
private final AsyncReadCallback _asyncReadCallback = new AsyncReadCallback();
private final SendCallback _sendCallback = new SendCallback();
private final boolean _recordHttpComplianceViolations;
private final LongAdder bytesIn = new LongAdder();
private final LongAdder bytesOut = new LongAdder();
/**
* Get the current connection that this thread is dispatched to.
@ -226,6 +229,8 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
{
// Fill the request buffer (if needed).
int filled = fillRequestBuffer();
if (filled>0)
bytesIn.add(filled);
// Parse the request buffer.
boolean handle = parseRequestBuffer();
@ -516,7 +521,9 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
if(_sendCallback.reset(info,head,content,lastContent,callback))
{
_sendCallback.iterate();
}
}
@ -562,6 +569,18 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
_blockingReadCallback.failed(e);
}
@Override
public long getBytesIn()
{
return bytesIn.longValue();
}
@Override
public long getBytesOut()
{
return bytesOut.longValue();
}
@Override
public String toConnectionString()
{
@ -726,35 +745,52 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
BufferUtil.clear(chunk);
BufferUtil.clear(_content);
}
// If we have a header
byte gather_write = 0;
long bytes = 0;
if (BufferUtil.hasContent(_header))
{
if (BufferUtil.hasContent(_content))
{
if (BufferUtil.hasContent(chunk))
getEndPoint().write(this, _header, chunk, _content);
else
getEndPoint().write(this, _header, _content);
}
else
gather_write += 4;
bytes += _header.remaining();
}
if (BufferUtil.hasContent(chunk))
{
gather_write += 2;
bytes += chunk.remaining();
}
if (BufferUtil.hasContent(_content))
{
gather_write += 1;
bytes += _content.remaining();
}
HttpConnection.this.bytesOut.add(bytes);
switch(gather_write)
{
case 7:
getEndPoint().write(this, _header, chunk, _content);
break;
case 6:
getEndPoint().write(this, _header, chunk);
break;
case 5:
getEndPoint().write(this, _header, _content);
break;
case 4:
getEndPoint().write(this, _header);
}
else if (BufferUtil.hasContent(chunk))
{
if (BufferUtil.hasContent(_content))
break;
case 3:
getEndPoint().write(this, chunk, _content);
else
break;
case 2:
getEndPoint().write(this, chunk);
break;
case 1:
getEndPoint().write(this, _content);
break;
default:
succeeded();
}
else if (BufferUtil.hasContent(_content))
{
getEndPoint().write(this, _content);
}
else
{
succeeded(); // nothing to write
}
return Action.SCHEDULED;
}
case SHUTDOWN_OUT:

View File

@ -1,91 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util;
import java.util.concurrent.atomic.AtomicReference;
/**
* ConcurrentStack
*
* Nonblocking stack using variation of Treiber's algorithm
* that allows for reduced garbage
*/
public class ConcurrentStack<I>
{
private final NodeStack<Holder> stack = new NodeStack<>();
public void push(I item)
{
stack.push(new Holder(item));
}
public I pop()
{
Holder<I> holder = stack.pop();
if (holder==null)
return null;
return holder.item;
}
private static class Holder<I> extends Node
{
final I item;
Holder(I item)
{
this.item = item;
}
}
public static class Node
{
Node next;
}
public static class NodeStack<N extends Node>
{
AtomicReference<Node> stack = new AtomicReference<Node>();
public void push(N node)
{
while(true)
{
Node top = stack.get();
node.next = top;
if (stack.compareAndSet(top,node))
break;
}
}
public N pop()
{
while (true)
{
Node top = stack.get();
if (top==null)
return null;
if (stack.compareAndSet(top,top.next))
{
top.next = null;
return (N)top;
}
}
}
}
}

View File

@ -20,7 +20,6 @@ package org.eclipse.jetty.util;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.text.SimpleDateFormat;
@ -44,7 +43,7 @@ import java.util.TimerTask;
* Old files are retained for a number of days before being deleted.
* </p>
*/
public class RolloverFileOutputStream extends FilterOutputStream
public class RolloverFileOutputStream extends OutputStream
{
private static Timer __rollover;
@ -53,6 +52,7 @@ public class RolloverFileOutputStream extends FilterOutputStream
final static String ROLLOVER_FILE_BACKUP_FORMAT = "HHmmssSSS";
final static int ROLLOVER_FILE_RETAIN_DAYS = 31;
private OutputStream _out;
private RollTask _rollTask;
private SimpleDateFormat _fileBackupFormat;
private SimpleDateFormat _fileDateFormat;
@ -128,7 +128,8 @@ public class RolloverFileOutputStream extends FilterOutputStream
* @param append If true, existing files will be appended to.
* @param retainDays The number of days to retain files before deleting them. 0 to retain forever.
* @param zone the timezone for the output
* @param dateFormat The format for the date file substitution. The default is "yyyy_MM_dd".
* @param dateFormat The format for the date file substitution. The default is "yyyy_MM_dd". If set to the
* empty string, the file is rolledover to the same filename, with the current file being renamed to the backup filename.
* @param backupFormat The format for the file extension of backup files. The default is "HHmmssSSS".
* @throws IOException if unable to create output
*/
@ -143,7 +144,8 @@ public class RolloverFileOutputStream extends FilterOutputStream
this(filename,append,retainDays,zone,dateFormat,backupFormat,ZonedDateTime.now(zone.toZoneId()));
}
/* ------------------------------------------------------------ */
RolloverFileOutputStream(String filename,
boolean append,
int retainDays,
@ -153,8 +155,6 @@ public class RolloverFileOutputStream extends FilterOutputStream
ZonedDateTime now)
throws IOException
{
super(null);
if (dateFormat==null)
dateFormat=ROLLOVER_FILE_DATE_FORMAT;
_fileDateFormat = new SimpleDateFormat(dateFormat);
@ -179,16 +179,17 @@ public class RolloverFileOutputStream extends FilterOutputStream
_append=append;
_retainDays=retainDays;
// Calculate Today's Midnight, based on Configured TimeZone (will be in past, even if by a few milliseconds)
setFile(now);
synchronized(RolloverFileOutputStream.class)
{
if (__rollover==null)
__rollover=new Timer(RolloverFileOutputStream.class.getName(),true);
// Calculate Today's Midnight, based on Configured TimeZone (will be in past, even if by a few milliseconds)
setFile(now);
// This will schedule the rollover event to the next midnight
scheduleNextRollover(now);
}
// This will schedule the rollover event to the next midnight
scheduleNextRollover(now);
}
/* ------------------------------------------------------------ */
@ -212,7 +213,10 @@ public class RolloverFileOutputStream extends FilterOutputStream
// Schedule next rollover event to occur, based on local machine's Unix Epoch milliseconds
long delay = midnight.toInstant().toEpochMilli() - now.toInstant().toEpochMilli();
__rollover.schedule(_rollTask,delay);
synchronized(RolloverFileOutputStream.class)
{
__rollover.schedule(_rollTask,delay);
}
}
/* ------------------------------------------------------------ */
@ -236,46 +240,70 @@ public class RolloverFileOutputStream extends FilterOutputStream
}
/* ------------------------------------------------------------ */
synchronized void setFile(ZonedDateTime now)
void setFile(ZonedDateTime now)
throws IOException
{
// Check directory
File file = new File(_filename);
_filename=file.getCanonicalPath();
file=new File(_filename);
File dir= new File(file.getParent());
if (!dir.isDirectory() || !dir.canWrite())
throw new IOException("Cannot write log directory "+dir);
// Is this a rollover file?
String filename=file.getName();
int i=filename.toLowerCase(Locale.ENGLISH).indexOf(YYYY_MM_DD);
if (i>=0)
File oldFile = null;
File newFile = null;
File backupFile = null;
synchronized (this)
{
file=new File(dir,
filename.substring(0,i)+
_fileDateFormat.format(new Date(now.toInstant().toEpochMilli()))+
filename.substring(i+YYYY_MM_DD.length()));
}
if (file.exists()&&!file.canWrite())
throw new IOException("Cannot write log file "+file);
// Check directory
File file = new File(_filename);
_filename=file.getCanonicalPath();
file=new File(_filename);
File dir= new File(file.getParent());
if (!dir.isDirectory() || !dir.canWrite())
throw new IOException("Cannot write log directory "+dir);
// Do we need to change the output stream?
if (out==null || !file.equals(_file))
{
// Yep
_file=file;
if (!_append && file.exists())
file.renameTo(new File(file.toString()+"."+_fileBackupFormat.format(new Date(now.toInstant().toEpochMilli()))));
OutputStream oldOut=out;
out=new FileOutputStream(file.toString(),_append);
if (oldOut!=null)
oldOut.close();
//if(log.isDebugEnabled())log.debug("Opened "+_file);
// Is this a rollover file?
String filename=file.getName();
int datePattern=filename.toLowerCase(Locale.ENGLISH).indexOf(YYYY_MM_DD);
if (datePattern>=0)
{
file=new File(dir,
filename.substring(0,datePattern)+
_fileDateFormat.format(new Date(now.toInstant().toEpochMilli()))+
filename.substring(datePattern+YYYY_MM_DD.length()));
}
if (file.exists()&&!file.canWrite())
throw new IOException("Cannot write log file "+file);
// Do we need to change the output stream?
if (_out==null || datePattern>=0)
{
// Yep
oldFile = _file;
_file=file;
newFile = _file;
if (!_append && file.exists())
{
backupFile = new File(file.toString()+"."+_fileBackupFormat.format(new Date(now.toInstant().toEpochMilli())));
file.renameTo(backupFile);
}
OutputStream oldOut=_out;
_out=new FileOutputStream(file.toString(),_append);
if (oldOut!=null)
oldOut.close();
//if(log.isDebugEnabled())log.debug("Opened "+_file);
}
}
if (newFile!=null)
rollover(oldFile,backupFile,newFile);
}
/* ------------------------------------------------------------ */
/** This method is called whenever a log file is rolled over
* @param oldFile The original filename or null if this is the first creation
* @param backupFile The backup filename or null if the filename is dated.
* @param newFile The new filename that is now being used for logging
*/
protected void rollover(File oldFile, File backupFile, File newFile)
{
}
/* ------------------------------------------------------------ */
void removeOldFiles(ZonedDateTime now)
{
@ -309,36 +337,66 @@ public class RolloverFileOutputStream extends FilterOutputStream
}
}
/* ------------------------------------------------------------ */
public void write(int b) throws IOException
{
synchronized(this)
{
_out.write(b);
}
}
/* ------------------------------------------------------------ */
@Override
public void write (byte[] buf)
throws IOException
throws IOException
{
out.write (buf);
synchronized(this)
{
_out.write (buf);
}
}
/* ------------------------------------------------------------ */
@Override
public void write (byte[] buf, int off, int len)
throws IOException
throws IOException
{
out.write (buf, off, len);
synchronized(this)
{
_out.write (buf, off, len);
}
}
/* ------------------------------------------------------------ */
public void flush() throws IOException
{
synchronized(this)
{
_out.flush();
}
}
/* ------------------------------------------------------------ */
@Override
public void close()
throws IOException
{
synchronized(RolloverFileOutputStream.class)
synchronized(this)
{
try{super.close();}
try
{
_out.close();
}
finally
{
out=null;
_out=null;
_file=null;
}
}
synchronized(RolloverFileOutputStream.class)
{
if (_rollTask != null)
{
_rollTask.cancel();
@ -354,13 +412,10 @@ public class RolloverFileOutputStream extends FilterOutputStream
{
try
{
synchronized(RolloverFileOutputStream.class)
{
ZonedDateTime now = ZonedDateTime.now(_fileDateFormat.getTimeZone().toZoneId());
RolloverFileOutputStream.this.setFile(now);
RolloverFileOutputStream.this.scheduleNextRollover(now);
RolloverFileOutputStream.this.removeOldFiles(now);
}
ZonedDateTime now = ZonedDateTime.now(_fileDateFormat.getTimeZone().toZoneId());
RolloverFileOutputStream.this.setFile(now);
RolloverFileOutputStream.this.removeOldFiles(now);
RolloverFileOutputStream.this.scheduleNextRollover(now);
}
catch(Throwable t)
{

View File

@ -18,13 +18,13 @@
package org.eclipse.jetty.util.thread;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import org.eclipse.jetty.util.ConcurrentStack;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
@ -49,7 +49,8 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
{
@Override
public void run()
{}
{
}
@Override
public String toString()
@ -60,10 +61,9 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
private final Executor _executor;
private final int _capacity;
private final ConcurrentStack.NodeStack<ReservedThread> _stack;
private final ConcurrentLinkedDeque<ReservedThread> _stack;
private final AtomicInteger _size = new AtomicInteger();
private final AtomicInteger _pending = new AtomicInteger();
private final AtomicInteger _waiting = new AtomicInteger();
private ThreadPoolBudget.Lease _lease;
private Object _owner;
@ -96,7 +96,7 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
{
_executor = executor;
_capacity = reservedThreads(executor,capacity);
_stack = new ConcurrentStack.NodeStack<>();
_stack = new ConcurrentLinkedDeque<>();
_owner = owner;
LOG.debug("{}",this);
@ -145,12 +145,6 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
return _pending.get();
}
@ManagedAttribute(value = "waiting reserved threads", readonly = true)
public int getWaiting()
{
return _waiting.get();
}
@ManagedAttribute(value = "idletimeout in MS", readonly = true)
public long getIdleTimeoutMs()
{
@ -186,17 +180,13 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
_lease.close();
while(true)
{
ReservedThread thread = _stack.pop();
if (thread==null)
{
super.doStop();
return;
}
ReservedThread thread = _stack.pollFirst();
if (thread == null)
break;
_size.decrementAndGet();
thread.stop();
}
super.doStop();
}
@Override
@ -218,7 +208,7 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
if (task==null)
return false;
ReservedThread thread = _stack.pop();
ReservedThread thread = _stack.pollFirst();
if (thread==null && task!=STOP)
{
startReservedThread();
@ -262,16 +252,16 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
@Override
public String toString()
{
return String.format("%s@%s{s=%d/%d,p=%d,w=%d}",
return String.format("%s@%x{s=%d/%d,p=%d}@%s",
getClass().getSimpleName(),
_owner != null ? _owner : Integer.toHexString(hashCode()),
hashCode(),
_size.get(),
_capacity,
_pending.get(),
_waiting.get());
_owner);
}
private class ReservedThread extends ConcurrentStack.Node implements Runnable
private class ReservedThread implements Runnable
{
private final Locker _locker = new Locker();
private final Condition _wakeup = _locker.newCondition();
@ -301,7 +291,7 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
LOG.debug("{} waiting", this);
Runnable task = null;
while (isRunning() && task==null)
while (task==null)
{
boolean idle = false;
@ -311,7 +301,6 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
{
try
{
_waiting.incrementAndGet();
if (_idleTime == 0)
_wakeup.await();
else
@ -321,10 +310,6 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
{
LOG.ignore(e);
}
finally
{
_waiting.decrementAndGet();
}
}
task = _task;
_task = null;
@ -346,7 +331,7 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
if (LOG.isDebugEnabled())
LOG.debug("{} task={}", this, task);
return task==null?STOP:task;
return task;
}
@Override
@ -354,8 +339,6 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
{
while (isRunning())
{
Runnable task = null;
// test and increment size BEFORE decrementing pending,
// so that we don't have a race starting new pending.
while(true)
@ -383,10 +366,10 @@ public class ReservedThreadExecutor extends AbstractLifeCycle implements Executo
// Insert ourselves in the stack. Size is already incremented, but
// that only effects the decision to keep other threads reserved.
_stack.push(this);
_stack.offerFirst(this);
// Wait for a task
task = reservedWait();
Runnable task = reservedWait();
if (task==STOP)
// return on STOP poison pill

View File

@ -37,25 +37,25 @@ import org.eclipse.jetty.util.thread.Locker.Lock;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
/**
* <p>A strategy where the thread that produces will run the resulting task if it
* <p>A strategy where the thread that produces will run the resulting task if it
* is possible to do so without thread starvation.</p>
*
*
* <p>This strategy preemptively dispatches a thread as a pending producer, so that
* when a thread produces a task it can immediately run the task and let the pending
* producer thread take over producing. If necessary another thread will be dispatched
* to replace the pending producing thread. When operating in this pattern, the
* to replace the pending producing thread. When operating in this pattern, the
* sub-strategy is called Execute Produce Consume (EPC)
* </p>
* <p>However, if the task produced uses the {@link Invocable} API to indicate that
* it will not block, then the strategy will run it directly, regardless of the
* presence of a pending producing thread and then resume producing after the
* <p>However, if the task produced uses the {@link Invocable} API to indicate that
* it will not block, then the strategy will run it directly, regardless of the
* presence of a pending producing thread and then resume producing after the
* task has completed. This sub-strategy is also used if the strategy has been
* configured with a maximum of 0 pending threads and the thread currently producing
* does not use the {@link Invocable} API to indicate that it will not block.
* When operating in this pattern, the sub-strategy is called
* ProduceConsume (PC).
* </p>
* <p>If there is no pending producer thread available and if the task has not
* <p>If there is no pending producer thread available and if the task has not
* indicated it is non-blocking, then this strategy will dispatch the execution of
* the task and immediately continue producing. When operating in this pattern, the
* sub-strategy is called ProduceExecuteConsume (PEC).
@ -67,7 +67,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
private static final Logger LOG = Log.getLogger(EatWhatYouKill.class);
private enum State { IDLE, PRODUCING, REPRODUCING }
private final Locker _locker = new Locker();
private final LongAdder _nonBlocking = new LongAdder();
private final LongAdder _blocking = new LongAdder();
@ -86,7 +86,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
{
this(producer,executor,new ReservedThreadExecutor(executor,maxReserved));
}
public EatWhatYouKill(Producer producer, Executor executor, ReservedThreadExecutor producers)
{
_producer = producer;
@ -108,11 +108,11 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
case IDLE:
execute = true;
break;
case PRODUCING:
_state = State.REPRODUCING;
break;
default:
break;
}
@ -153,13 +153,13 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
_state = State.PRODUCING;
producing = true;
break;
case PRODUCING:
// Keep other Thread producing
if (reproduce)
_state = State.REPRODUCING;
break;
default:
break;
}
@ -170,7 +170,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
public boolean doProduce()
{
boolean producing = true;
while (isRunning() && producing)
while (isRunning() && producing)
{
// If we got here, then we are the thread that is producing.
Runnable task = null;
@ -269,7 +269,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
}
}
}
return producing;
}
@ -323,7 +323,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
getState(builder);
return builder.toString();
}
private void getString(StringBuilder builder)
{
builder.append(getClass().getSimpleName());

View File

@ -325,4 +325,44 @@ public class RolloverFileOutputStreamTest
}
}
}
@Test
public void testRolloverBackup() throws Exception
{
File testDir = MavenTestingUtils.getTargetTestingDir(RolloverFileOutputStreamTest.class.getName() + "_testRollover");
FS.ensureEmpty(testDir);
ZoneId zone = toZoneId("Australia/Sydney");
ZonedDateTime now = toDateTime("2016.04.10-11:59:55.0 PM AEDT", zone);
File template = new File(testDir,"test-rofosyyyy_mm_dd.log");
try (RolloverFileOutputStream rofos =
new RolloverFileOutputStream(template.getAbsolutePath(),false,0,TimeZone.getTimeZone(zone),"",null,now))
{
rofos.write("BEFORE".getBytes());
rofos.flush();
String[] ls = testDir.list();
assertThat(ls.length,is(1));
assertThat(ls[0],is("test-rofos.log"));
TimeUnit.SECONDS.sleep(10);
rofos.write("AFTER".getBytes());
ls = testDir.list();
assertThat(ls.length,is(2));
for (String n : ls)
{
String content = IO.toString(new FileReader(new File(testDir,n)));
if ("test-rofos.log".equals(n))
{
assertThat(content,is("AFTER"));
}
else
{
assertThat(content,is("BEFORE"));
}
}
}
}
}

View File

@ -47,7 +47,7 @@ public class ConnectionStatisticsTest extends AbstractTest
@Test
public void testConnectionStatistics() throws Exception
{
Assume.assumeThat(transport, Matchers.isOneOf(Transport.H2C, Transport.H2));
Assume.assumeThat(transport, Matchers.isOneOf( Transport.HTTP, Transport.H2C, Transport.H2));
start(new AbstractHandler()
{