Issue #547 EWYK leaves low resource mode

Directly implement PEC in EPC low resource mode
This commit is contained in:
Greg Wilkins 2016-05-05 10:42:08 +10:00
parent e6e63d5549
commit 6591137ba0
5 changed files with 122 additions and 77 deletions

View File

@ -139,7 +139,6 @@ public class ThreadStarvationTest extends HttpServerTestFixture
assertThat(response,containsString("200 OK")); assertThat(response,containsString("200 OK"));
assertThat(response,containsString("Read Input 10")); assertThat(response,containsString("Read Input 10"));
} }
} }

View File

@ -80,6 +80,7 @@ public interface ExecutionStrategy
Runnable produce(); Runnable produce();
} }
/** /**
* <p>A factory for {@link ExecutionStrategy}.</p> * <p>A factory for {@link ExecutionStrategy}.</p>
*/ */

View File

@ -43,32 +43,30 @@ import org.eclipse.jetty.util.thread.ThreadPool;
* pressure on producers. * pressure on producers.
* </p> * </p>
*/ */
public class ExecuteProduceConsume implements ExecutionStrategy, Runnable public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements ExecutionStrategy, Runnable
{ {
private static final Logger LOG = Log.getLogger(ExecuteProduceConsume.class); private static final Logger LOG = Log.getLogger(ExecuteProduceConsume.class);
private final Locker _locker = new Locker(); private final Locker _locker = new Locker();
private final Runnable _runExecute = new RunExecute(); private final Runnable _runExecute = new RunExecute();
private final Producer _producer; private final Producer _producer;
private final Executor _executor;
private boolean _idle=true; private boolean _idle=true;
private boolean _execute; private boolean _execute;
private boolean _producing; private boolean _producing;
private boolean _pending; private boolean _pending;
private final ThreadPool _threadpool; private final ThreadPool _threadpool;
private final ExecutionStrategy _lowresources;
public ExecuteProduceConsume(Producer producer, Executor executor) public ExecuteProduceConsume(Producer producer, Executor executor)
{ {
this(producer,executor,(executor instanceof ThreadPool)?new ProduceExecuteConsume(producer,executor):null); super(executor);
this._producer = producer;
_threadpool = (executor instanceof ThreadPool)?((ThreadPool)executor):null;
} }
@Deprecated
public ExecuteProduceConsume(Producer producer, Executor executor, ExecutionStrategy lowResourceStrategy) public ExecuteProduceConsume(Producer producer, Executor executor, ExecutionStrategy lowResourceStrategy)
{ {
this._producer = producer; this(producer,executor);
this._executor = executor;
_threadpool = (executor instanceof ThreadPool)?((ThreadPool)executor):null;
_lowresources = _threadpool==null?null:lowResourceStrategy;
} }
@Override @Override
@ -100,7 +98,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
} }
if (produce) if (produce)
produceAndRun(); produceConsume();
} }
@Override @Override
@ -117,7 +115,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
_execute=true; _execute=true;
} }
if (dispatch) if (dispatch)
_executor.execute(_runExecute); execute(_runExecute);
} }
@Override @Override
@ -137,28 +135,50 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
if (produce) if (produce)
{ {
// If we are low on resources, then switch to PEC strategy which does not // If we are low on threads, this could be the last thread, so we must not consume.
// suffer as badly from thread starvation // So call produceExecuteConsume instead
while (_threadpool!=null && _threadpool.isLowOnThreads()) if (_threadpool!=null && _threadpool.isLowOnThreads() && !produceExecuteConsume())
{ return;
LOG.debug("EPC low resources {}",this);
try produceConsume();
{
_lowresources.execute();
}
catch(Throwable e)
{
// just warn if lowresources execute fails and keep producing
LOG.warn(e);
} }
} }
// no longer low resources so produceAndRun normally /**
produceAndRun(); * @return true if we are still producing
*/
private boolean produceExecuteConsume()
{
if (LOG.isDebugEnabled())
LOG.debug("{} Low Resources",this);
while (_threadpool.isLowOnThreads())
{
Runnable task = _producer.produce();
if (LOG.isDebugEnabled())
LOG.debug("{} produced {}", _producer, task);
if (task == null)
{
// No task, so we are now idle
try (Lock locked = _locker.lock())
{
if (LOG.isDebugEnabled())
LOG.debug("{} Idle Low Resources",this);
_producing=false;
_idle=false;
} }
return false;
} }
private void produceAndRun() // Execute the task.
execute(task);
}
if (LOG.isDebugEnabled())
LOG.debug("{} No longer Low Resources",this);
return true;
}
private void produceConsume()
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} produce enter",this); LOG.debug("{} produce enter",this);
@ -184,6 +204,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
if (task == null) if (task == null)
{ {
// There is no task. // There is no task.
// Could another one just have been queued with an execute?
if (_execute) if (_execute)
{ {
_idle=false; _idle=false;
@ -214,31 +235,9 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
// Spawn a new thread to continue production by running the produce loop. // Spawn a new thread to continue production by running the produce loop.
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} dispatch",this); LOG.debug("{} dispatch",this);
try if (!execute(this))
{
_executor.execute(this);
}
catch(RejectedExecutionException e)
{
// If we cannot execute, then discard/reject the task and keep producing
LOG.debug(e);
LOG.warn("RejectedExecution {}",task);
try
{
if (task instanceof Rejectable)
((Rejectable)task).reject();
}
catch (Exception x)
{
e.addSuppressed(x);
LOG.warn(e);
}
finally
{
task=null; task=null;
} }
}
}
// Run the task. // Run the task.
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
@ -262,6 +261,7 @@ public class ExecuteProduceConsume implements ExecutionStrategy, Runnable
LOG.debug("{} produce exit",this); LOG.debug("{} produce exit",this);
} }
public Boolean isIdle() public Boolean isIdle()
{ {
try (Lock locked = _locker.lock()) try (Lock locked = _locker.lock())

View File

@ -0,0 +1,65 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util.thread.strategy;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
public abstract class ExecutingExecutionStrategy implements ExecutionStrategy
{
private static final Logger LOG = Log.getLogger(ExecutingExecutionStrategy.class);
private final Executor _executor;
protected ExecutingExecutionStrategy(Executor executor)
{
_executor=executor;
}
protected boolean execute(Runnable task)
{
try
{
_executor.execute(task);
return true;
}
catch(RejectedExecutionException e)
{
// If we cannot execute, then discard/reject the task and keep producing
LOG.debug(e);
LOG.warn("RejectedExecution {}",task);
try
{
if (task instanceof Rejectable)
((Rejectable)task).reject();
}
catch (Exception x)
{
e.addSuppressed(x);
LOG.warn(e);
}
}
return false;
}
}

View File

@ -29,18 +29,17 @@ import org.eclipse.jetty.util.thread.ExecutionStrategy;
* <p>A strategy where the caller thread iterates over task production, submitting each * <p>A strategy where the caller thread iterates over task production, submitting each
* task to an {@link Executor} for execution.</p> * task to an {@link Executor} for execution.</p>
*/ */
public class ProduceExecuteConsume implements ExecutionStrategy public class ProduceExecuteConsume extends ExecutingExecutionStrategy implements ExecutionStrategy
{ {
private static final Logger LOG = Log.getLogger(ProduceExecuteConsume.class); private static final Logger LOG = Log.getLogger(ProduceExecuteConsume.class);
private final Producer _producer; private final Producer _producer;
private final Executor _executor;
private State _state = State.IDLE; private State _state = State.IDLE;
public ProduceExecuteConsume(Producer producer, Executor executor) public ProduceExecuteConsume(Producer producer, Executor executor)
{ {
super(executor);
this._producer = producer; this._producer = producer;
this._executor = executor;
} }
@Override @Override
@ -73,26 +72,7 @@ public class ProduceExecuteConsume implements ExecutionStrategy
} }
// Execute the task. // Execute the task.
try execute(task);
{
_executor.execute(task);
}
catch (RejectedExecutionException e)
{
// Discard/reject tasks that cannot be executed
if (task instanceof Rejectable)
{
try
{
((Rejectable)task).reject();
}
catch (Throwable x)
{
e.addSuppressed(x);
LOG.warn(e);
}
}
}
} }
} }