398467 Servlet 3.1 Non Blocking IO
Added the DataRateLimitedServlet as an async example
This commit is contained in:
parent
48bedc3fd4
commit
5bfcfc908a
|
@ -0,0 +1,297 @@
|
|||
package org.eclipse.jetty.servlets;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel.MapMode;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.servlet.AsyncContext;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.ServletOutputStream;
|
||||
import javax.servlet.WriteListener;
|
||||
import javax.servlet.http.HttpServlet;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.eclipse.jetty.server.HttpOutput;
|
||||
|
||||
/**
|
||||
* A servlet that uses the Servlet 3.1 asynchronous IO API to server
|
||||
* static content at a limited data rate.
|
||||
* <p>
|
||||
* Two implementations are supported: <ul>
|
||||
* <li>the {@link StandardDataStream} impl uses only standard
|
||||
* APIs, but produces more garbage due to the byte[] nature of the API.
|
||||
* <li>the {@link JettyDataStream} impl uses a Jetty API to write a ByteBuffer
|
||||
* and thus allow the efficient use of file mapped buffers without any
|
||||
* temporary buffer copies (I did tell the JSR that this was a good idea to
|
||||
* have in the standard!).
|
||||
* </ul>
|
||||
* <p>
|
||||
* The data rate is controlled by setting init parameters:
|
||||
* <dl>
|
||||
* <dt>buffersize</dt><dd>The amount of data in bytes written per write</dd>
|
||||
* <dt>pause</dt><dd>The period in ms to wait after a write before attempting another</dd>
|
||||
* <dt>pool</dt><dd>The size of the thread pool used to service the writes (defaults to available processors)</dd>
|
||||
* </dl>
|
||||
* Thus if buffersize = 1024 and pause = 100, the data rate will be limited to 10KB per second.
|
||||
*/
|
||||
public class DataRateLimitedServlet extends HttpServlet
|
||||
{
|
||||
private static final long serialVersionUID = -4771757707068097025L;
|
||||
private int buffersize=8192;
|
||||
private int pause=100;
|
||||
ScheduledThreadPoolExecutor scheduler;
|
||||
private final ConcurrentHashMap<String, ByteBuffer> cache=new ConcurrentHashMap<>();
|
||||
|
||||
@Override
|
||||
public void init() throws ServletException
|
||||
{
|
||||
// read the init params
|
||||
String tmp = getInitParameter("buffersize");
|
||||
if (tmp!=null)
|
||||
buffersize=Integer.parseInt(tmp);
|
||||
tmp = getInitParameter("pause");
|
||||
if (tmp!=null)
|
||||
pause=Integer.parseInt(tmp);
|
||||
tmp = getInitParameter("pool");
|
||||
int pool=tmp==null?Runtime.getRuntime().availableProcessors():Integer.parseInt(tmp);
|
||||
|
||||
// Create and start a shared scheduler.
|
||||
scheduler=new ScheduledThreadPoolExecutor(pool);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy()
|
||||
{
|
||||
scheduler.shutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
|
||||
{
|
||||
// Get the path of the static resource to serve.
|
||||
String info=request.getPathInfo();
|
||||
|
||||
// We don't handle directories
|
||||
if (info.endsWith("/"))
|
||||
{
|
||||
response.sendError(503,"directories not supported");
|
||||
return;
|
||||
}
|
||||
|
||||
// Set the mime type of the response
|
||||
String content_type=getServletContext().getMimeType(info);
|
||||
response.setContentType(content_type==null?"application/x-data":content_type);
|
||||
|
||||
// Look for a matching file path
|
||||
String path = request.getPathTranslated();
|
||||
|
||||
// If we have a file path and this is a jetty response, we can use the JettyStream impl
|
||||
ServletOutputStream out = response.getOutputStream();
|
||||
if (path != null && out instanceof HttpOutput)
|
||||
{
|
||||
// If the file exists
|
||||
File file = new File(path);
|
||||
if (file.exists() && file.canRead())
|
||||
{
|
||||
// Set the content length
|
||||
response.setContentLengthLong(file.length());
|
||||
|
||||
// Look for a file mapped buffer in the cache
|
||||
ByteBuffer mapped=cache.get(path);
|
||||
|
||||
// Handle cache miss
|
||||
if (mapped==null)
|
||||
{
|
||||
// TODO implement LRU cache flush
|
||||
try (RandomAccessFile raf = new RandomAccessFile(file, "r"))
|
||||
{
|
||||
ByteBuffer buf = raf.getChannel().map(MapMode.READ_ONLY,0,raf.length());
|
||||
mapped=cache.putIfAbsent(path,buf);
|
||||
if (mapped==null)
|
||||
mapped=buf;
|
||||
}
|
||||
}
|
||||
|
||||
// start async request handling
|
||||
AsyncContext async=request.startAsync();
|
||||
|
||||
// Set a JettyStream as the write listener to write the content asynchronously.
|
||||
out.setWriteListener(new JettyDataStream(mapped,async,out));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Jetty API was not used, so lets try the standards approach
|
||||
|
||||
// Can we find the content as an input stream
|
||||
InputStream content = getServletContext().getResourceAsStream(info);
|
||||
if (content==null)
|
||||
{
|
||||
response.sendError(404);
|
||||
return;
|
||||
}
|
||||
|
||||
// Set a StandardStream as he write listener to write the content asynchronously
|
||||
out.setWriteListener(new StandardDataStream(content,request.startAsync(),out));
|
||||
}
|
||||
|
||||
/**
|
||||
* A standard API Stream writer
|
||||
*/
|
||||
private final class StandardDataStream implements WriteListener, Runnable
|
||||
{
|
||||
private final InputStream content;
|
||||
private final AsyncContext async;
|
||||
private final ServletOutputStream out;
|
||||
|
||||
private StandardDataStream(InputStream content, AsyncContext async, ServletOutputStream out)
|
||||
{
|
||||
this.content = content;
|
||||
this.async = async;
|
||||
this.out = out;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWritePossible() throws IOException
|
||||
{
|
||||
// If we are able to write
|
||||
if(out.isReady())
|
||||
{
|
||||
// Allocated a copy buffer for each write, so as to not hold while paused
|
||||
// TODO put these buffers into a pool
|
||||
byte[] buffer = new byte[buffersize];
|
||||
|
||||
// read some content into the copy buffer
|
||||
int len=content.read(buffer);
|
||||
|
||||
// If we are at EOF
|
||||
if (len<0)
|
||||
{
|
||||
// complete the async lifecycle
|
||||
async.complete();
|
||||
return;
|
||||
}
|
||||
|
||||
// write out the copy buffer. This will be an asynchronous write
|
||||
// and will always return immediately without blocking. If a subsequent
|
||||
// call to out.isReady() returns false, then this onWritePossible method
|
||||
// will be called back when a write is possible.
|
||||
out.write(buffer,0,len);
|
||||
|
||||
// Schedule a timer callback to pause writing. Because isReady() is not called,
|
||||
// a onWritePossible callback is no scheduled.
|
||||
scheduler.schedule(this,pause,TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
try
|
||||
{
|
||||
// When the pause timer wakes up, call onWritePossible. Either isReady() will return
|
||||
// true and another chunk of content will be written, or it will return false and the
|
||||
// onWritePossible() callback will be scheduled when a write is next possible.
|
||||
onWritePossible();
|
||||
}
|
||||
catch(Exception e)
|
||||
{
|
||||
onError(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t)
|
||||
{
|
||||
getServletContext().log("Async Error",t);
|
||||
async.complete();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* A Jetty API DataStream
|
||||
*
|
||||
*/
|
||||
private final class JettyDataStream implements WriteListener, Runnable
|
||||
{
|
||||
private final ByteBuffer content;
|
||||
private final int limit;
|
||||
private final AsyncContext async;
|
||||
private final HttpOutput out;
|
||||
|
||||
private JettyDataStream(ByteBuffer content, AsyncContext async, ServletOutputStream out)
|
||||
{
|
||||
// Make a readonly copy of the passed buffer. This uses the same underlying content
|
||||
// without a copy, but gives this instance its own position and limit.
|
||||
this.content = content.asReadOnlyBuffer();
|
||||
// remember the ultimate limit.
|
||||
this.limit=this.content.limit();
|
||||
this.async = async;
|
||||
this.out = (HttpOutput)out;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWritePossible() throws IOException
|
||||
{
|
||||
// If we are able to write
|
||||
if(out.isReady())
|
||||
{
|
||||
// Position our buffers limit to allow only buffersize bytes to be written
|
||||
int l=content.position()+buffersize;
|
||||
// respect the ultimate limit
|
||||
if (l>limit)
|
||||
l=limit;
|
||||
content.limit(l);
|
||||
|
||||
// if all content has been written
|
||||
if (!content.hasRemaining())
|
||||
{
|
||||
// complete the async lifecycle
|
||||
async.complete();
|
||||
return;
|
||||
}
|
||||
|
||||
// write our limited buffer. This will be an asynchronous write
|
||||
// and will always return immediately without blocking. If a subsequent
|
||||
// call to out.isReady() returns false, then this onWritePossible method
|
||||
// will be called back when a write is possible.
|
||||
out.write(content);
|
||||
|
||||
// Schedule a timer callback to pause writing. Because isReady() is not called,
|
||||
// a onWritePossible callback is no scheduled.
|
||||
scheduler.schedule(this,pause,TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
try
|
||||
{
|
||||
// When the pause timer wakes up, call onWritePossible. Either isReady() will return
|
||||
// true and another chunk of content will be written, or it will return false and the
|
||||
// onWritePossible() callback will be scheduled when a write is next possible.
|
||||
onWritePossible();
|
||||
}
|
||||
catch(Exception e)
|
||||
{
|
||||
onError(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t)
|
||||
{
|
||||
getServletContext().log("Async Error",t);
|
||||
async.complete();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,111 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.servlets;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.eclipse.jetty.server.HttpConfiguration;
|
||||
import org.eclipse.jetty.server.LocalConnector;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||
import org.eclipse.jetty.servlet.ServletHolder;
|
||||
import org.eclipse.jetty.toolchain.test.TestingDir;
|
||||
import org.eclipse.jetty.util.resource.Resource;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
||||
public class DataRateLimitedServletTest
|
||||
{
|
||||
public static final int BUFFER=8192;
|
||||
public static final int PAUSE=10;
|
||||
|
||||
@Rule
|
||||
public TestingDir testdir = new TestingDir();
|
||||
|
||||
private Server server;
|
||||
private LocalConnector connector;
|
||||
private ServletContextHandler context;
|
||||
|
||||
@Before
|
||||
public void init() throws Exception
|
||||
{
|
||||
server = new Server();
|
||||
|
||||
connector = new LocalConnector(server);
|
||||
connector.getConnectionFactory(HttpConfiguration.ConnectionFactory.class).getHttpConfiguration().setSendServerVersion(false);
|
||||
|
||||
context = new ServletContextHandler();
|
||||
|
||||
context.setContextPath("/context");
|
||||
context.setWelcomeFiles(new String[]{"index.html", "index.jsp", "index.htm"});
|
||||
context.setBaseResource(Resource.newResource(testdir.getEmptyDir()));
|
||||
|
||||
ServletHolder holder =context.addServlet(DataRateLimitedServlet.class,"/stream/*");
|
||||
holder.setInitParameter("buffersize",""+BUFFER);
|
||||
holder.setInitParameter("pause",""+PAUSE);
|
||||
server.setHandler(context);
|
||||
server.addConnector(connector);
|
||||
|
||||
server.start();
|
||||
}
|
||||
|
||||
@After
|
||||
public void destroy() throws Exception
|
||||
{
|
||||
server.stop();
|
||||
server.join();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStream() throws Exception
|
||||
{
|
||||
File content = testdir.getFile("content.txt");
|
||||
try(OutputStream out = new FileOutputStream(content);)
|
||||
{
|
||||
byte[] b= new byte[1024];
|
||||
|
||||
for (int i=1024;i-->0;)
|
||||
{
|
||||
Arrays.fill(b,(byte)('0'+(i%10)));
|
||||
out.write(b);
|
||||
out.write('\n');
|
||||
}
|
||||
}
|
||||
|
||||
long start=System.currentTimeMillis();
|
||||
String response = connector.getResponses("GET /context/stream/content.txt HTTP/1.0\r\n\r\n");
|
||||
long duration=System.currentTimeMillis()-start;
|
||||
|
||||
assertThat(response.length(),greaterThan(1024*1024));
|
||||
assertThat(response,containsString("200 OK"));
|
||||
assertThat(duration,greaterThan(PAUSE*1024L*1024/BUFFER));
|
||||
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue