diff --git a/jetty-servlets/src/main/java/org/eclipse/jetty/servlets/DataRateLimitedServlet.java b/jetty-servlets/src/main/java/org/eclipse/jetty/servlets/DataRateLimitedServlet.java
new file mode 100644
index 00000000000..0fab80d4d5b
--- /dev/null
+++ b/jetty-servlets/src/main/java/org/eclipse/jetty/servlets/DataRateLimitedServlet.java
@@ -0,0 +1,297 @@
+package org.eclipse.jetty.servlets;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel.MapMode;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.WriteListener;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.eclipse.jetty.server.HttpOutput;
+
+/**
+ * A servlet that uses the Servlet 3.1 asynchronous IO API to server
+ * static content at a limited data rate.
+ *
+ * Two implementations are supported:
+ * - the {@link StandardDataStream} impl uses only standard
+ * APIs, but produces more garbage due to the byte[] nature of the API.
+ *
- the {@link JettyDataStream} impl uses a Jetty API to write a ByteBuffer
+ * and thus allow the efficient use of file mapped buffers without any
+ * temporary buffer copies (I did tell the JSR that this was a good idea to
+ * have in the standard!).
+ *
+ *
+ * The data rate is controlled by setting init parameters:
+ *
+ * - buffersize
- The amount of data in bytes written per write
+ * - pause
- The period in ms to wait after a write before attempting another
+ * - pool
- The size of the thread pool used to service the writes (defaults to available processors)
+ *
+ * Thus if buffersize = 1024 and pause = 100, the data rate will be limited to 10KB per second.
+ */
+public class DataRateLimitedServlet extends HttpServlet
+{
+ private static final long serialVersionUID = -4771757707068097025L;
+ private int buffersize=8192;
+ private int pause=100;
+ ScheduledThreadPoolExecutor scheduler;
+ private final ConcurrentHashMap cache=new ConcurrentHashMap<>();
+
+ @Override
+ public void init() throws ServletException
+ {
+ // read the init params
+ String tmp = getInitParameter("buffersize");
+ if (tmp!=null)
+ buffersize=Integer.parseInt(tmp);
+ tmp = getInitParameter("pause");
+ if (tmp!=null)
+ pause=Integer.parseInt(tmp);
+ tmp = getInitParameter("pool");
+ int pool=tmp==null?Runtime.getRuntime().availableProcessors():Integer.parseInt(tmp);
+
+ // Create and start a shared scheduler.
+ scheduler=new ScheduledThreadPoolExecutor(pool);
+ }
+
+ @Override
+ public void destroy()
+ {
+ scheduler.shutdown();
+ }
+
+ @Override
+ protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
+ {
+ // Get the path of the static resource to serve.
+ String info=request.getPathInfo();
+
+ // We don't handle directories
+ if (info.endsWith("/"))
+ {
+ response.sendError(503,"directories not supported");
+ return;
+ }
+
+ // Set the mime type of the response
+ String content_type=getServletContext().getMimeType(info);
+ response.setContentType(content_type==null?"application/x-data":content_type);
+
+ // Look for a matching file path
+ String path = request.getPathTranslated();
+
+ // If we have a file path and this is a jetty response, we can use the JettyStream impl
+ ServletOutputStream out = response.getOutputStream();
+ if (path != null && out instanceof HttpOutput)
+ {
+ // If the file exists
+ File file = new File(path);
+ if (file.exists() && file.canRead())
+ {
+ // Set the content length
+ response.setContentLengthLong(file.length());
+
+ // Look for a file mapped buffer in the cache
+ ByteBuffer mapped=cache.get(path);
+
+ // Handle cache miss
+ if (mapped==null)
+ {
+ // TODO implement LRU cache flush
+ try (RandomAccessFile raf = new RandomAccessFile(file, "r"))
+ {
+ ByteBuffer buf = raf.getChannel().map(MapMode.READ_ONLY,0,raf.length());
+ mapped=cache.putIfAbsent(path,buf);
+ if (mapped==null)
+ mapped=buf;
+ }
+ }
+
+ // start async request handling
+ AsyncContext async=request.startAsync();
+
+ // Set a JettyStream as the write listener to write the content asynchronously.
+ out.setWriteListener(new JettyDataStream(mapped,async,out));
+ return;
+ }
+ }
+
+ // Jetty API was not used, so lets try the standards approach
+
+ // Can we find the content as an input stream
+ InputStream content = getServletContext().getResourceAsStream(info);
+ if (content==null)
+ {
+ response.sendError(404);
+ return;
+ }
+
+ // Set a StandardStream as he write listener to write the content asynchronously
+ out.setWriteListener(new StandardDataStream(content,request.startAsync(),out));
+ }
+
+ /**
+ * A standard API Stream writer
+ */
+ private final class StandardDataStream implements WriteListener, Runnable
+ {
+ private final InputStream content;
+ private final AsyncContext async;
+ private final ServletOutputStream out;
+
+ private StandardDataStream(InputStream content, AsyncContext async, ServletOutputStream out)
+ {
+ this.content = content;
+ this.async = async;
+ this.out = out;
+ }
+
+ @Override
+ public void onWritePossible() throws IOException
+ {
+ // If we are able to write
+ if(out.isReady())
+ {
+ // Allocated a copy buffer for each write, so as to not hold while paused
+ // TODO put these buffers into a pool
+ byte[] buffer = new byte[buffersize];
+
+ // read some content into the copy buffer
+ int len=content.read(buffer);
+
+ // If we are at EOF
+ if (len<0)
+ {
+ // complete the async lifecycle
+ async.complete();
+ return;
+ }
+
+ // write out the copy buffer. This will be an asynchronous write
+ // and will always return immediately without blocking. If a subsequent
+ // call to out.isReady() returns false, then this onWritePossible method
+ // will be called back when a write is possible.
+ out.write(buffer,0,len);
+
+ // Schedule a timer callback to pause writing. Because isReady() is not called,
+ // a onWritePossible callback is no scheduled.
+ scheduler.schedule(this,pause,TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ // When the pause timer wakes up, call onWritePossible. Either isReady() will return
+ // true and another chunk of content will be written, or it will return false and the
+ // onWritePossible() callback will be scheduled when a write is next possible.
+ onWritePossible();
+ }
+ catch(Exception e)
+ {
+ onError(e);
+ }
+ }
+
+ @Override
+ public void onError(Throwable t)
+ {
+ getServletContext().log("Async Error",t);
+ async.complete();
+ }
+ }
+
+
+ /**
+ * A Jetty API DataStream
+ *
+ */
+ private final class JettyDataStream implements WriteListener, Runnable
+ {
+ private final ByteBuffer content;
+ private final int limit;
+ private final AsyncContext async;
+ private final HttpOutput out;
+
+ private JettyDataStream(ByteBuffer content, AsyncContext async, ServletOutputStream out)
+ {
+ // Make a readonly copy of the passed buffer. This uses the same underlying content
+ // without a copy, but gives this instance its own position and limit.
+ this.content = content.asReadOnlyBuffer();
+ // remember the ultimate limit.
+ this.limit=this.content.limit();
+ this.async = async;
+ this.out = (HttpOutput)out;
+ }
+
+ @Override
+ public void onWritePossible() throws IOException
+ {
+ // If we are able to write
+ if(out.isReady())
+ {
+ // Position our buffers limit to allow only buffersize bytes to be written
+ int l=content.position()+buffersize;
+ // respect the ultimate limit
+ if (l>limit)
+ l=limit;
+ content.limit(l);
+
+ // if all content has been written
+ if (!content.hasRemaining())
+ {
+ // complete the async lifecycle
+ async.complete();
+ return;
+ }
+
+ // write our limited buffer. This will be an asynchronous write
+ // and will always return immediately without blocking. If a subsequent
+ // call to out.isReady() returns false, then this onWritePossible method
+ // will be called back when a write is possible.
+ out.write(content);
+
+ // Schedule a timer callback to pause writing. Because isReady() is not called,
+ // a onWritePossible callback is no scheduled.
+ scheduler.schedule(this,pause,TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ // When the pause timer wakes up, call onWritePossible. Either isReady() will return
+ // true and another chunk of content will be written, or it will return false and the
+ // onWritePossible() callback will be scheduled when a write is next possible.
+ onWritePossible();
+ }
+ catch(Exception e)
+ {
+ onError(e);
+ }
+ }
+
+ @Override
+ public void onError(Throwable t)
+ {
+ getServletContext().log("Async Error",t);
+ async.complete();
+ }
+ }
+}
diff --git a/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/DataRateLimitedServletTest.java b/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/DataRateLimitedServletTest.java
new file mode 100644
index 00000000000..c001f3f1d4d
--- /dev/null
+++ b/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/DataRateLimitedServletTest.java
@@ -0,0 +1,111 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.servlets;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertThat;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+
+import org.eclipse.jetty.server.HttpConfiguration;
+import org.eclipse.jetty.server.LocalConnector;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.toolchain.test.TestingDir;
+import org.eclipse.jetty.util.resource.Resource;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class DataRateLimitedServletTest
+{
+ public static final int BUFFER=8192;
+ public static final int PAUSE=10;
+
+ @Rule
+ public TestingDir testdir = new TestingDir();
+
+ private Server server;
+ private LocalConnector connector;
+ private ServletContextHandler context;
+
+ @Before
+ public void init() throws Exception
+ {
+ server = new Server();
+
+ connector = new LocalConnector(server);
+ connector.getConnectionFactory(HttpConfiguration.ConnectionFactory.class).getHttpConfiguration().setSendServerVersion(false);
+
+ context = new ServletContextHandler();
+
+ context.setContextPath("/context");
+ context.setWelcomeFiles(new String[]{"index.html", "index.jsp", "index.htm"});
+ context.setBaseResource(Resource.newResource(testdir.getEmptyDir()));
+
+ ServletHolder holder =context.addServlet(DataRateLimitedServlet.class,"/stream/*");
+ holder.setInitParameter("buffersize",""+BUFFER);
+ holder.setInitParameter("pause",""+PAUSE);
+ server.setHandler(context);
+ server.addConnector(connector);
+
+ server.start();
+ }
+
+ @After
+ public void destroy() throws Exception
+ {
+ server.stop();
+ server.join();
+ }
+
+ @Test
+ public void testStream() throws Exception
+ {
+ File content = testdir.getFile("content.txt");
+ try(OutputStream out = new FileOutputStream(content);)
+ {
+ byte[] b= new byte[1024];
+
+ for (int i=1024;i-->0;)
+ {
+ Arrays.fill(b,(byte)('0'+(i%10)));
+ out.write(b);
+ out.write('\n');
+ }
+ }
+
+ long start=System.currentTimeMillis();
+ String response = connector.getResponses("GET /context/stream/content.txt HTTP/1.0\r\n\r\n");
+ long duration=System.currentTimeMillis()-start;
+
+ assertThat(response.length(),greaterThan(1024*1024));
+ assertThat(response,containsString("200 OK"));
+ assertThat(duration,greaterThan(PAUSE*1024L*1024/BUFFER));
+
+ }
+}