444721 - PushCacheFilter cleanup/improvements.

Ported the "maxAssociations" functionality from SPDY's ReferrerPushStrategy.
Added JMX support.
Removed __renew__ special path in favour of a JMX method.
Added clearPushCache() JMX method.
Made push reentrant by eliminating the check for "org.eclipse.jetty.pushed".
This commit is contained in:
Simone Bordet 2015-04-10 15:22:30 +02:00
parent d89aa3a866
commit ffadcd6757
5 changed files with 197 additions and 71 deletions

View File

@ -23,7 +23,6 @@ import java.nio.charset.StandardCharsets;
import java.util.EnumSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.servlet.DispatcherType;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
@ -325,4 +324,116 @@ public class PushCacheFilterTest extends AbstractTest
Assert.assertTrue(pushLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(primaryResponseLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testRecursivePush() throws Exception
{
final String primaryResource = "/primary.html";
final String secondaryResource = "/secondary.css";
final String tertiaryResource = "/tertiary.png";
start(new HttpServlet()
{
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
String requestURI = request.getRequestURI();
final ServletOutputStream output = response.getOutputStream();
if (requestURI.endsWith(primaryResource))
output.print("<html><head></head><body>PRIMARY</body></html>");
else if (requestURI.endsWith(secondaryResource))
output.print("body { background-image: url(\"" + tertiaryResource + "\"); }");
if (requestURI.endsWith(tertiaryResource))
output.write("TERTIARY".getBytes(StandardCharsets.UTF_8));
}
});
final Session session = newClient(new Session.Listener.Adapter());
// Request for the primary, secondary and tertiary resource to build the cache.
final String primaryURI = "http://localhost:" + connector.getLocalPort() + servletPath + primaryResource;
HttpFields primaryFields = new HttpFields();
MetaData.Request primaryRequest = newRequest("GET", primaryResource, primaryFields);
final CountDownLatch warmupLatch = new CountDownLatch(1);
session.newStream(new HeadersFrame(0, primaryRequest, null, true), new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
callback.succeeded();
if (frame.isEndStream())
{
// Request for the secondary resource.
final String secondaryURI = "http://localhost:" + connector.getLocalPort() + servletPath + secondaryResource;
HttpFields secondaryFields = new HttpFields();
secondaryFields.put(HttpHeader.REFERER, primaryURI);
MetaData.Request secondaryRequest = newRequest("GET", secondaryResource, secondaryFields);
session.newStream(new HeadersFrame(0, secondaryRequest, null, true), new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
callback.succeeded();
if (frame.isEndStream())
{
// Request for the tertiary resource.
HttpFields tertiaryFields = new HttpFields();
tertiaryFields.put(HttpHeader.REFERER, secondaryURI);
MetaData.Request tertiaryRequest = newRequest("GET", tertiaryResource, tertiaryFields);
session.newStream(new HeadersFrame(0, tertiaryRequest, null, true), new Promise.Adapter<Stream>(), new Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
if (frame.isEndStream())
warmupLatch.countDown();
}
});
}
}
});
}
}
});
Assert.assertTrue(warmupLatch.await(5, TimeUnit.SECONDS));
Thread.sleep(1000);
// Request again the primary resource, we should get the secondary and tertiary resource pushed.
primaryRequest = newRequest("GET", primaryResource, primaryFields);
final CountDownLatch primaryResponseLatch = new CountDownLatch(1);
final CountDownLatch pushLatch = new CountDownLatch(2);
session.newStream(new HeadersFrame(0, primaryRequest, null, true), new Promise.Adapter<Stream>(), new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
if (frame.isEndStream())
primaryResponseLatch.countDown();
}
@Override
public Stream.Listener onPush(Stream stream, PushPromiseFrame frame)
{
return new Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
callback.succeeded();
if (frame.isEndStream())
pushLatch.countDown();
}
@Override
public Stream.Listener onPush(Stream stream, PushPromiseFrame frame)
{
return this;
}
};
}
});
Assert.assertTrue(pushLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(primaryResponseLatch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -1,3 +1,4 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.http2.hpack.LEVEL=INFO
#org.eclipse.jetty.http2.LEVEL=DEBUG
#org.eclipse.jetty.servlets.LEVEL=DEBUG

View File

@ -52,6 +52,6 @@ public class PushPromiseFrame extends Frame
@Override
public String toString()
{
return String.format("%s#%d/%d", super.toString(), streamId, promisedStreamId);
return String.format("%s#%d/#%d", super.toString(), streamId, promisedStreamId);
}
}

View File

@ -20,6 +20,7 @@
package org.eclipse.jetty.servlets;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@ -29,7 +30,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
@ -48,55 +48,64 @@ import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.util.URIUtil;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.ManagedOperation;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* A filter that builds a cache of associated resources to push
* using the following heuristics:<ul>
* <li>If a request has a If-xxx header, this suggests it's cache is already hot,
* so no resources are pushed.
* <li>If a request has a referrer header that matches this site, then
* this indicates that it is an associated resource
* <li>If the time period between a request and an associated request is small,
* that indicates a possible push resource
* </ul>
* <p>A filter that builds a cache of secondary resources associated
* to primary resources.</p>
* <p>A typical request for a primary resource such as {@code index.html}
* is immediately followed by a number of requests for secondary resources.
* Secondary resource requests will have a {@code Referer} HTTP header
* that points to {@code index.html}, which is used to associate the secondary
* resource to the primary resource.</p>
* <p>Only secondary resources that are requested within a (small) time period
* from the request of the primary resource are associated with the primary
* resource.</p>
* <p>This allows to build a cache of secondary resources associated with
* primary resources. When a request for a primary resource arrives, associated
* secondary resources are pushed to the client, unless the request carries
* {@code If-xxx} header that hint that the client has the resources in its
* cache.</p>
*/
@ManagedObject("Push cache based on the HTTP 'Referer' header")
public class PushCacheFilter implements Filter
{
private static final Logger LOG = Log.getLogger(PushCacheFilter.class);
private final ConcurrentMap<String, PrimaryResource> _cache = new ConcurrentHashMap<>();
private long _associatePeriod = 4000L;
private volatile long _renew = System.nanoTime();
private String _renewPath = "/__renewPushCache__";
private final Set<Integer> _ports = new HashSet<>();
private final Set<String> _hosts = new HashSet<>();
private final ConcurrentMap<String, PrimaryResource> _cache = new ConcurrentHashMap<>();
private long _associatePeriod = 4000L;
private int _maxAssociations = 16;
private long _renew = System.nanoTime();
@Override
public void init(FilterConfig config) throws ServletException
{
String associatePeriod = config.getInitParameter("associatePeriod");
if (associatePeriod != null)
_associatePeriod = Long.valueOf(associatePeriod);
_associatePeriod = Long.parseLong(associatePeriod);
String renew=config.getInitParameter("renewPath");
if (renew!=null)
_renewPath=renew;
String maxAssociations = config.getInitParameter("maxAssociations");
if (maxAssociations != null)
_maxAssociations = Integer.parseInt(maxAssociations);
String hosts = config.getInitParameter("hosts");
if (hosts != null)
for (String h:hosts.split(","))
_hosts.add(h);
Collections.addAll(_hosts, hosts.split(","));
String ports = config.getInitParameter("ports");
if (ports != null)
for (String p : ports.split(","))
_ports.add(Integer.parseInt(p));
// Expose for JMX.
config.getServletContext().setAttribute(config.getFilterName(), this);
if (LOG.isDebugEnabled())
LOG.debug("p={} renew={} hosts={} ports={}",_associatePeriod,_renewPath,_hosts,_ports);
LOG.debug("period={} max={} hosts={} ports={}", _associatePeriod, _maxAssociations, _hosts, _ports);
}
@Override
@ -105,19 +114,12 @@ public class PushCacheFilter implements Filter
long now = System.nanoTime();
HttpServletRequest request = (HttpServletRequest)req;
if (Boolean.TRUE==req.getAttribute("org.eclipse.jetty.pushed"))
{
if (LOG.isDebugEnabled())
LOG.debug("PUSH {}", request.getRequestURI());
chain.doFilter(req,resp);
return;
}
// Iterating over fields is more efficient than multiple gets
HttpFields fields = Request.getBaseRequest(req).getHttpFields();
HttpFields fields = Request.getBaseRequest(request).getHttpFields();
boolean conditional = false;
String referrer = null;
loop: for (int i = 0; i < fields.size(); i++)
loop:
for (int i = 0; i < fields.size(); i++)
{
HttpField field = fields.getField(i);
HttpHeader header = field.getHeader();
@ -146,16 +148,6 @@ public class PushCacheFilter implements Filter
LOG.debug("{} {} referrer={} conditional={}", request.getMethod(), request.getRequestURI(), referrer, conditional);
String path = URIUtil.addPaths(request.getServletPath(), request.getPathInfo());
if (path.endsWith(_renewPath))
{
if (LOG.isDebugEnabled())
LOG.debug("Renew {}", now);
_renew=now;
resp.getOutputStream().print("PUSH CACHE RESET");
resp.flushBuffer();
return;
}
if (referrer != null)
{
HttpURI referrerURI = new HttpURI(referrer);
@ -164,8 +156,8 @@ public class PushCacheFilter implements Filter
if (port <= 0)
port = request.isSecure() ? 443 : 80;
boolean referred_from_here=(_hosts.size()>0 )?_hosts.contains(host):request.getServerName().equals(host);
referred_from_here&=(_ports.size()>0)?_ports.contains(port):port==request.getServerPort();
boolean referred_from_here = _hosts.size() > 0 ? _hosts.contains(host) : host.equals(request.getServerName());
referred_from_here &= _ports.size() > 0 ? _ports.contains(port) : port == request.getServerPort();
if (referred_from_here)
{
@ -182,16 +174,26 @@ public class PushCacheFilter implements Filter
RequestDispatcher dispatcher = request.getServletContext().getRequestDispatcher(path);
if (now - primaryTimestamp < TimeUnit.MILLISECONDS.toNanos(_associatePeriod))
{
if (primaryResource._associated.putIfAbsent(path, dispatcher) == null)
ConcurrentMap<String, RequestDispatcher> associated = primaryResource._associated;
// Not strictly concurrent-safe, just best effort to limit associations.
if (associated.size() <= _maxAssociations)
{
if (associated.putIfAbsent(path, dispatcher) == null)
{
if (LOG.isDebugEnabled())
LOG.debug("Associated {} -> {}", referrerPathNoContext, dispatcher);
LOG.debug("Associated {} to {}", path, referrerPathNoContext);
}
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Not associated {} -> {}, outside associate period of {}ms", referrerPathNoContext, dispatcher, _associatePeriod);
LOG.debug("Not associated {} to {}, exceeded max associations of {}", path, referrerPathNoContext, _maxAssociations);
}
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Not associated {} to {}, outside associate period of {}ms", path, referrerPathNoContext, _associatePeriod);
}
}
}
@ -213,7 +215,7 @@ public class PushCacheFilter implements Filter
primaryResource = primaryResource == null ? t : primaryResource;
primaryResource._timestamp.compareAndSet(0, now);
if (LOG.isDebugEnabled())
LOG.debug("Cached {}", path);
LOG.debug("Cached primary resource {}", path);
}
else
{
@ -222,7 +224,7 @@ public class PushCacheFilter implements Filter
{
primaryResource._associated.clear();
if (LOG.isDebugEnabled())
LOG.debug("Clear associated {}", path);
LOG.debug("Clear associated resources for {}", path);
}
}
@ -232,22 +234,22 @@ public class PushCacheFilter implements Filter
for (RequestDispatcher dispatcher : primaryResource._associated.values())
{
if (LOG.isDebugEnabled())
LOG.debug("Pushing {} <- {}", dispatcher, path);
LOG.debug("Pushing {} for {}", dispatcher, path);
((Dispatcher)dispatcher).push(request);
}
}
chain.doFilter(req, resp);
chain.doFilter(request, resp);
}
@Override
public void destroy()
{
_cache.clear();
clearPushCache();
}
@ManagedAttribute("The push cache contents")
public Map<String, String> getCache()
public Map<String, String> getPushCache()
{
Map<String, String> result = new HashMap<>();
for (Map.Entry<String, PrimaryResource> entry : _cache.entrySet())
@ -259,6 +261,18 @@ public class PushCacheFilter implements Filter
return result;
}
@ManagedOperation(value = "Renews the push cache contents", impact = "ACTION")
public void renewPushCache()
{
_renew = System.nanoTime();
}
@ManagedOperation(value = "Clears the push cache contents", impact = "ACTION")
public void clearPushCache()
{
_cache.clear();
}
private static class PrimaryResource
{
private final ConcurrentMap<String, RequestDispatcher> _associated = new ConcurrentHashMap<>();

View File

@ -10,7 +10,7 @@
<context-param>
<param-name>org.eclipse.jetty.server.context.ManagedAttributes</param-name>
<param-value>QoSFilter,TransparentProxy.ThreadPool,TransparentProxy.HttpClient</param-value>
<param-value>PushFilter,QoSFilter,TransparentProxy.ThreadPool,TransparentProxy.HttpClient</param-value>
</context-param>
<!-- Declare TestListener, which declares TestFilter -->