417223 - removed deprecated ThreadPool.dispatch

This commit is contained in:
Greg Wilkins 2013-11-04 17:21:50 +11:00
parent 17bf8ccee0
commit 36c807c2f3
9 changed files with 38 additions and 103 deletions

View File

@ -64,27 +64,6 @@ public class DelegatingThreadPool extends ContainerLifeCycle implements ThreadPo
_executor.execute(job); _executor.execute(job);
} }
/* ------------------------------------------------------------ */
@Override
public boolean dispatch(Runnable job)
{
final Executor executor=_executor;
if (executor instanceof ThreadPool)
return ((ThreadPool)executor).dispatch(job);
try
{
_executor.execute(job);
return true;
}
catch(RejectedExecutionException e)
{
LOG.warn(e);
return false;
}
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override @Override
public int getIdleThreads() public int getIdleThreads()

View File

@ -49,15 +49,13 @@ import org.junit.Test;
public class IOTest public class IOTest
{ {
@Test @Test
public void testIO() throws InterruptedException public void testIO() throws Exception
{ {
// Only a little test // Only a little test
ByteArrayInputStream in = new ByteArrayInputStream("The quick brown fox jumped over the lazy dog".getBytes()); ByteArrayInputStream in = new ByteArrayInputStream("The quick brown fox jumped over the lazy dog".getBytes());
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
IO.copyThread(in, out); IO.copy(in, out);
Thread.sleep(1500);
// System.err.println(out);
assertEquals("copyThread", out.toString(), "The quick brown fox jumped over the lazy dog"); assertEquals("copyThread", out.toString(), "The quick brown fox jumped over the lazy dog");
} }

View File

@ -62,12 +62,11 @@ public class SslConnectionTest
final AtomicInteger _dispatches = new AtomicInteger(); final AtomicInteger _dispatches = new AtomicInteger();
protected QueuedThreadPool _threadPool = new QueuedThreadPool() protected QueuedThreadPool _threadPool = new QueuedThreadPool()
{ {
@Override @Override
public boolean dispatch(Runnable job) public void execute(Runnable job)
{ {
_dispatches.incrementAndGet(); _dispatches.incrementAndGet();
return super.dispatch(job); super.execute(job);
} }
}; };

View File

@ -89,7 +89,7 @@ public class LowResourcesMonitorTest
for (int i=0;i<100;i++) for (int i=0;i<100;i++)
{ {
_threadPool.dispatch(new Runnable() _threadPool.execute(new Runnable()
{ {
@Override @Override
public void run() public void run()

View File

@ -30,6 +30,7 @@ import java.util.HashMap;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
@ -304,7 +305,7 @@ public class CGI extends HttpServlet
LOG.debug("Environment: " + env.getExportString()); LOG.debug("Environment: " + env.getExportString());
LOG.debug("Command: " + execCmd); LOG.debug("Command: " + execCmd);
Process p; final Process p;
if (dir == null) if (dir == null)
p = Runtime.getRuntime().exec(execCmd, env.getEnvArray()); p = Runtime.getRuntime().exec(execCmd, env.getEnvArray());
else else
@ -316,13 +317,28 @@ public class CGI extends HttpServlet
else if (len > 0) else if (len > 0)
writeProcessInput(p, req.getInputStream(), len); writeProcessInput(p, req.getInputStream(), len);
IO.copyThread(p.getErrorStream(), System.err);
// hook processes output to browser's input (sync) // hook processes output to browser's input (sync)
// if browser closes stream, we should detect it and kill process... // if browser closes stream, we should detect it and kill process...
OutputStream os = null; OutputStream os = null;
AsyncContext async=req.startAsync();
try try
{ {
async.start(new Runnable()
{
@Override
public void run()
{
try
{
IO.copy(p.getErrorStream(), System.err);
}
catch (IOException e)
{
LOG.warn(e);
}
}
});
// read any headers off the top of our input stream // read any headers off the top of our input stream
// NOTE: Multiline header items not supported! // NOTE: Multiline header items not supported!
String line = null; String line = null;
@ -398,6 +414,7 @@ public class CGI extends HttpServlet
} }
p.destroy(); p.destroy();
// LOG.debug("CGI: terminated!"); // LOG.debug("CGI: terminated!");
async.complete();
} }
} }

View File

@ -54,17 +54,6 @@ public class IO
/* ------------------------------------------------------------------- */ /* ------------------------------------------------------------------- */
public static final int bufferSize = 64*1024; public static final int bufferSize = 64*1024;
/* ------------------------------------------------------------------- */
// TODO get rid of this singleton!
private static class Singleton {
static final QueuedThreadPool __pool=new QueuedThreadPool();
static
{
try{__pool.start();}
catch(Exception e){LOG.warn(e); System.exit(1);}
}
}
/* ------------------------------------------------------------------- */ /* ------------------------------------------------------------------- */
static class Job implements Runnable static class Job implements Runnable
@ -118,23 +107,6 @@ public class IO
} }
} }
/* ------------------------------------------------------------------- */
/** Copy Stream in to Stream out until EOF or exception.
* in own thread
*/
public static void copyThread(InputStream in, OutputStream out)
{
try{
Job job=new Job(in,out);
if (!Singleton.__pool.dispatch(job))
job.run();
}
catch(Exception e)
{
LOG.warn(e);
}
}
/* ------------------------------------------------------------------- */ /* ------------------------------------------------------------------- */
/** Copy Stream in to Stream out until EOF or exception. /** Copy Stream in to Stream out until EOF or exception.
*/ */
@ -144,24 +116,6 @@ public class IO
copy(in,out,-1); copy(in,out,-1);
} }
/* ------------------------------------------------------------------- */
/** Copy Stream in to Stream out until EOF or exception
* in own thread
*/
public static void copyThread(Reader in, Writer out)
{
try
{
Job job=new Job(in,out);
if (!Singleton.__pool.dispatch(job))
job.run();
}
catch(Exception e)
{
LOG.warn(e);
}
}
/* ------------------------------------------------------------------- */ /* ------------------------------------------------------------------- */
/** Copy Reader to Writer out until EOF or exception. /** Copy Reader to Writer out until EOF or exception.
*/ */

View File

@ -340,18 +340,11 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
{ {
_detailedDump = detailedDump; _detailedDump = detailedDump;
} }
@Override
public boolean dispatch(Runnable job)
{
LOG.debug("{} dispatched {}", this, job);
return isRunning() && _jobs.offer(job);
}
@Override @Override
public void execute(Runnable job) public void execute(Runnable job)
{ {
if (!dispatch(job)) if (!isRunning() || !_jobs.offer(job))
{ {
LOG.warn("{} rejected {}", this, job); LOG.warn("{} rejected {}", this, job);
throw new RejectedExecutionException(job.toString()); throw new RejectedExecutionException(job.toString());

View File

@ -27,18 +27,13 @@ import org.eclipse.jetty.util.component.LifeCycle;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** ThreadPool. /** ThreadPool.
* *
* A specialization of Executor interface that provides reporting methods (eg {@link #getThreads()})
* and the option of configuration methods (e.g. @link {@link SizedThreadPool#setMaxThreads(int)}).
* *
*/ */
@ManagedObject("Pool of Threads") @ManagedObject("Pool of Threads")
public interface ThreadPool extends Executor public interface ThreadPool extends Executor
{ {
/* ------------------------------------------------------------ */
/**
* @deprecated use {@link Executor#execute(Runnable)}
*/
@Deprecated
public abstract boolean dispatch(Runnable job);
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* Blocks until the thread pool is {@link LifeCycle#stop stopped}. * Blocks until the thread pool is {@link LifeCycle#stop stopped}.

View File

@ -88,7 +88,7 @@ public class QueuedThreadPoolTest
waitForIdle(tp,5); waitForIdle(tp,5);
RunningJob job=new RunningJob(); RunningJob job=new RunningJob();
tp.dispatch(job); tp.execute(job);
waitForIdle(tp,4); waitForIdle(tp,4);
waitForThreads(tp,5); waitForThreads(tp,5);
@ -104,14 +104,14 @@ public class QueuedThreadPoolTest
for (int i=0;i<jobs.length;i++) for (int i=0;i<jobs.length;i++)
{ {
jobs[i]=new RunningJob(); jobs[i]=new RunningJob();
tp.dispatch(jobs[i]); tp.execute(jobs[i]);
} }
waitForIdle(tp,1); waitForIdle(tp,1);
waitForThreads(tp,6); waitForThreads(tp,6);
job=new RunningJob(); job=new RunningJob();
tp.dispatch(job); tp.execute(job);
waitForIdle(tp,1); waitForIdle(tp,1);
waitForThreads(tp,7); waitForThreads(tp,7);
@ -135,7 +135,7 @@ public class QueuedThreadPoolTest
for (int i=0;i<jobs.length;i++) for (int i=0;i<jobs.length;i++)
{ {
jobs[i]=new RunningJob(); jobs[i]=new RunningJob();
tp.dispatch(jobs[i]); tp.execute(jobs[i]);
} }
waitForIdle(tp,0); waitForIdle(tp,0);
@ -182,10 +182,10 @@ public class QueuedThreadPoolTest
waitForThreads(tp,2); waitForThreads(tp,2);
sleep.set(200); sleep.set(200);
tp.dispatch(job); tp.execute(job);
tp.dispatch(job); tp.execute(job);
for (int i=0;i<20;i++) for (int i=0;i<20;i++)
tp.dispatch(job); tp.execute(job);
waitForThreads(tp,10); waitForThreads(tp,10);
waitForIdle(tp,0); waitForIdle(tp,0);
@ -193,7 +193,7 @@ public class QueuedThreadPoolTest
sleep.set(5); sleep.set(5);
for (int i=0;i<500;i++) for (int i=0;i<500;i++)
{ {
tp.dispatch(job); tp.execute(job);
Thread.sleep(10); Thread.sleep(10);
} }
waitForThreads(tp,2); waitForThreads(tp,2);
@ -206,7 +206,7 @@ public class QueuedThreadPoolTest
QueuedThreadPool tp= new QueuedThreadPool(); QueuedThreadPool tp= new QueuedThreadPool();
tp.setStopTimeout(500); tp.setStopTimeout(500);
tp.start(); tp.start();
tp.dispatch(new Runnable(){ tp.execute(new Runnable(){
public void run () { public void run () {
while (true) { while (true) {
try { try {