From 5bfcfc908a96afc1e451d690f4d33960f11e51a9 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Thu, 18 Jul 2013 15:39:37 +1000 Subject: [PATCH] 398467 Servlet 3.1 Non Blocking IO Added the DataRateLimitedServlet as an async example --- .../servlets/DataRateLimitedServlet.java | 297 ++++++++++++++++++ .../servlets/DataRateLimitedServletTest.java | 111 +++++++ 2 files changed, 408 insertions(+) create mode 100644 jetty-servlets/src/main/java/org/eclipse/jetty/servlets/DataRateLimitedServlet.java create mode 100644 jetty-servlets/src/test/java/org/eclipse/jetty/servlets/DataRateLimitedServletTest.java diff --git a/jetty-servlets/src/main/java/org/eclipse/jetty/servlets/DataRateLimitedServlet.java b/jetty-servlets/src/main/java/org/eclipse/jetty/servlets/DataRateLimitedServlet.java new file mode 100644 index 00000000000..0fab80d4d5b --- /dev/null +++ b/jetty-servlets/src/main/java/org/eclipse/jetty/servlets/DataRateLimitedServlet.java @@ -0,0 +1,297 @@ +package org.eclipse.jetty.servlets; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel.MapMode; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import javax.servlet.AsyncContext; +import javax.servlet.ServletException; +import javax.servlet.ServletOutputStream; +import javax.servlet.WriteListener; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.server.HttpOutput; + +/** + * A servlet that uses the Servlet 3.1 asynchronous IO API to server + * static content at a limited data rate. + *

+ * Two implementations are supported:

+ *

+ * The data rate is controlled by setting init parameters: + *

+ *
buffersize
The amount of data in bytes written per write
+ *
pause
The period in ms to wait after a write before attempting another
+ *
pool
The size of the thread pool used to service the writes (defaults to available processors)
+ *
+ * Thus if buffersize = 1024 and pause = 100, the data rate will be limited to 10KB per second. + */ +public class DataRateLimitedServlet extends HttpServlet +{ + private static final long serialVersionUID = -4771757707068097025L; + private int buffersize=8192; + private int pause=100; + ScheduledThreadPoolExecutor scheduler; + private final ConcurrentHashMap cache=new ConcurrentHashMap<>(); + + @Override + public void init() throws ServletException + { + // read the init params + String tmp = getInitParameter("buffersize"); + if (tmp!=null) + buffersize=Integer.parseInt(tmp); + tmp = getInitParameter("pause"); + if (tmp!=null) + pause=Integer.parseInt(tmp); + tmp = getInitParameter("pool"); + int pool=tmp==null?Runtime.getRuntime().availableProcessors():Integer.parseInt(tmp); + + // Create and start a shared scheduler. + scheduler=new ScheduledThreadPoolExecutor(pool); + } + + @Override + public void destroy() + { + scheduler.shutdown(); + } + + @Override + protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + // Get the path of the static resource to serve. + String info=request.getPathInfo(); + + // We don't handle directories + if (info.endsWith("/")) + { + response.sendError(503,"directories not supported"); + return; + } + + // Set the mime type of the response + String content_type=getServletContext().getMimeType(info); + response.setContentType(content_type==null?"application/x-data":content_type); + + // Look for a matching file path + String path = request.getPathTranslated(); + + // If we have a file path and this is a jetty response, we can use the JettyStream impl + ServletOutputStream out = response.getOutputStream(); + if (path != null && out instanceof HttpOutput) + { + // If the file exists + File file = new File(path); + if (file.exists() && file.canRead()) + { + // Set the content length + response.setContentLengthLong(file.length()); + + // Look for a file mapped buffer in the cache + ByteBuffer mapped=cache.get(path); + + // Handle cache miss + if (mapped==null) + { + // TODO implement LRU cache flush + try (RandomAccessFile raf = new RandomAccessFile(file, "r")) + { + ByteBuffer buf = raf.getChannel().map(MapMode.READ_ONLY,0,raf.length()); + mapped=cache.putIfAbsent(path,buf); + if (mapped==null) + mapped=buf; + } + } + + // start async request handling + AsyncContext async=request.startAsync(); + + // Set a JettyStream as the write listener to write the content asynchronously. + out.setWriteListener(new JettyDataStream(mapped,async,out)); + return; + } + } + + // Jetty API was not used, so lets try the standards approach + + // Can we find the content as an input stream + InputStream content = getServletContext().getResourceAsStream(info); + if (content==null) + { + response.sendError(404); + return; + } + + // Set a StandardStream as he write listener to write the content asynchronously + out.setWriteListener(new StandardDataStream(content,request.startAsync(),out)); + } + + /** + * A standard API Stream writer + */ + private final class StandardDataStream implements WriteListener, Runnable + { + private final InputStream content; + private final AsyncContext async; + private final ServletOutputStream out; + + private StandardDataStream(InputStream content, AsyncContext async, ServletOutputStream out) + { + this.content = content; + this.async = async; + this.out = out; + } + + @Override + public void onWritePossible() throws IOException + { + // If we are able to write + if(out.isReady()) + { + // Allocated a copy buffer for each write, so as to not hold while paused + // TODO put these buffers into a pool + byte[] buffer = new byte[buffersize]; + + // read some content into the copy buffer + int len=content.read(buffer); + + // If we are at EOF + if (len<0) + { + // complete the async lifecycle + async.complete(); + return; + } + + // write out the copy buffer. This will be an asynchronous write + // and will always return immediately without blocking. If a subsequent + // call to out.isReady() returns false, then this onWritePossible method + // will be called back when a write is possible. + out.write(buffer,0,len); + + // Schedule a timer callback to pause writing. Because isReady() is not called, + // a onWritePossible callback is no scheduled. + scheduler.schedule(this,pause,TimeUnit.MILLISECONDS); + } + } + + @Override + public void run() + { + try + { + // When the pause timer wakes up, call onWritePossible. Either isReady() will return + // true and another chunk of content will be written, or it will return false and the + // onWritePossible() callback will be scheduled when a write is next possible. + onWritePossible(); + } + catch(Exception e) + { + onError(e); + } + } + + @Override + public void onError(Throwable t) + { + getServletContext().log("Async Error",t); + async.complete(); + } + } + + + /** + * A Jetty API DataStream + * + */ + private final class JettyDataStream implements WriteListener, Runnable + { + private final ByteBuffer content; + private final int limit; + private final AsyncContext async; + private final HttpOutput out; + + private JettyDataStream(ByteBuffer content, AsyncContext async, ServletOutputStream out) + { + // Make a readonly copy of the passed buffer. This uses the same underlying content + // without a copy, but gives this instance its own position and limit. + this.content = content.asReadOnlyBuffer(); + // remember the ultimate limit. + this.limit=this.content.limit(); + this.async = async; + this.out = (HttpOutput)out; + } + + @Override + public void onWritePossible() throws IOException + { + // If we are able to write + if(out.isReady()) + { + // Position our buffers limit to allow only buffersize bytes to be written + int l=content.position()+buffersize; + // respect the ultimate limit + if (l>limit) + l=limit; + content.limit(l); + + // if all content has been written + if (!content.hasRemaining()) + { + // complete the async lifecycle + async.complete(); + return; + } + + // write our limited buffer. This will be an asynchronous write + // and will always return immediately without blocking. If a subsequent + // call to out.isReady() returns false, then this onWritePossible method + // will be called back when a write is possible. + out.write(content); + + // Schedule a timer callback to pause writing. Because isReady() is not called, + // a onWritePossible callback is no scheduled. + scheduler.schedule(this,pause,TimeUnit.MILLISECONDS); + } + } + + @Override + public void run() + { + try + { + // When the pause timer wakes up, call onWritePossible. Either isReady() will return + // true and another chunk of content will be written, or it will return false and the + // onWritePossible() callback will be scheduled when a write is next possible. + onWritePossible(); + } + catch(Exception e) + { + onError(e); + } + } + + @Override + public void onError(Throwable t) + { + getServletContext().log("Async Error",t); + async.complete(); + } + } +} diff --git a/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/DataRateLimitedServletTest.java b/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/DataRateLimitedServletTest.java new file mode 100644 index 00000000000..c001f3f1d4d --- /dev/null +++ b/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/DataRateLimitedServletTest.java @@ -0,0 +1,111 @@ +// +// ======================================================================== +// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.servlets; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.greaterThan; +import static org.junit.Assert.assertThat; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStream; +import java.util.Arrays; + +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.LocalConnector; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.toolchain.test.TestingDir; +import org.eclipse.jetty.util.resource.Resource; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +public class DataRateLimitedServletTest +{ + public static final int BUFFER=8192; + public static final int PAUSE=10; + + @Rule + public TestingDir testdir = new TestingDir(); + + private Server server; + private LocalConnector connector; + private ServletContextHandler context; + + @Before + public void init() throws Exception + { + server = new Server(); + + connector = new LocalConnector(server); + connector.getConnectionFactory(HttpConfiguration.ConnectionFactory.class).getHttpConfiguration().setSendServerVersion(false); + + context = new ServletContextHandler(); + + context.setContextPath("/context"); + context.setWelcomeFiles(new String[]{"index.html", "index.jsp", "index.htm"}); + context.setBaseResource(Resource.newResource(testdir.getEmptyDir())); + + ServletHolder holder =context.addServlet(DataRateLimitedServlet.class,"/stream/*"); + holder.setInitParameter("buffersize",""+BUFFER); + holder.setInitParameter("pause",""+PAUSE); + server.setHandler(context); + server.addConnector(connector); + + server.start(); + } + + @After + public void destroy() throws Exception + { + server.stop(); + server.join(); + } + + @Test + public void testStream() throws Exception + { + File content = testdir.getFile("content.txt"); + try(OutputStream out = new FileOutputStream(content);) + { + byte[] b= new byte[1024]; + + for (int i=1024;i-->0;) + { + Arrays.fill(b,(byte)('0'+(i%10))); + out.write(b); + out.write('\n'); + } + } + + long start=System.currentTimeMillis(); + String response = connector.getResponses("GET /context/stream/content.txt HTTP/1.0\r\n\r\n"); + long duration=System.currentTimeMillis()-start; + + assertThat(response.length(),greaterThan(1024*1024)); + assertThat(response,containsString("200 OK")); + assertThat(duration,greaterThan(PAUSE*1024L*1024/BUFFER)); + + } +}