Merge remote-tracking branch 'origin/master' into jetty-8

This commit is contained in:
Jan Bartel 2012-03-30 12:05:46 +11:00
commit 17269583fb
15 changed files with 579 additions and 414 deletions

View File

@ -37,6 +37,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
{
public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
private final boolean WORK_AROUND_JVM_BUG_6346658 = System.getProperty("os.name").toLowerCase().contains("win");
private final SelectorManager.SelectSet _selectSet;
private final SelectorManager _manager;
private SelectionKey _key;
@ -680,6 +681,13 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
*/
@Override
public void close() throws IOException
{
// On unix systems there is a JVM issue that if you cancel before closing, it can
// cause the selector to block waiting for a channel to close and that channel can
// block waiting for the remote end. But on windows, if you don't cancel before a
// close, then the selector can block anyway!
// https://bugs.eclipse.org/bugs/show_bug.cgi?id=357318
if (WORK_AROUND_JVM_BUG_6346658)
{
try
{
@ -691,6 +699,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
{
LOG.ignore(e);
}
}
try
{

View File

@ -69,7 +69,7 @@
-->
<!--
<Call name="createRegistry" class="java.rmi.registry.LocateRegistry">
<Arg type="java.lang.Integer">1099</Arg>
<Arg type="java.lang.Integer"><SystemProperty name="jetty.jmxrmiport" default="1099"/></Arg>
<Call name="sleep" class="java.lang.Thread">
<Arg type="java.lang.Integer">1000</Arg>
</Call>
@ -91,11 +91,11 @@
<New class="javax.management.remote.JMXServiceURL">
<Arg type="java.lang.String">rmi</Arg>
<Arg type="java.lang.String" />
<Arg type="java.lang.Integer">0</Arg>
<Arg type="java.lang.String">/jndi/rmi://localhost:1099/jettyjmx</Arg>
<Arg type="java.lang.Integer"><SystemProperty name="jetty.jmxrmiport" default="1099"/></Arg>
<Arg type="java.lang.String">/jndi/rmi://<SystemProperty name="jetty.jmxrmihost" default="localhost"/>:<SystemProperty name="jetty.jmxrmiport" default="1099"/>/jmxrmi</Arg>
</New>
</Arg>
<Arg>org.eclipse.jetty:name=rmiconnectorserver</Arg>
<Arg>org.eclipse.jetty.jmx:name=rmiconnectorserver</Arg>
<Call name="start" />
</New>
-->

View File

@ -88,7 +88,7 @@ public class DefaultJettyAtJettyHomeHelper {
* that might use them as part of their properties.
* </p>
*/
public static void startJettyAtJettyHome(BundleContext bundleContext)
public static void startJettyAtJettyHome(BundleContext bundleContext) throws Exception
{
String jettyHomeSysProp = System.getProperty(SYS_PROP_JETTY_HOME);
String jettyHomeBundleSysProp = System.getProperty(SYS_PROP_JETTY_HOME_BUNDLE);
@ -137,8 +137,7 @@ public class DefaultJettyAtJettyHomeHelper {
LOG.warn("No default jetty started.");
return;
}
try
{
Server server = new Server();
Dictionary properties = new Hashtable();
properties.put(OSGiServerConstants.MANAGED_JETTY_SERVER_NAME, OSGiServerConstants.MANAGED_JETTY_SERVER_DEFAULT_NAME);
@ -155,12 +154,8 @@ public class DefaultJettyAtJettyHomeHelper {
setProperty(properties,SYS_PROP_JETTY_PORT_SSL,System.getProperty(SYS_PROP_JETTY_PORT_SSL));
bundleContext.registerService(Server.class.getName(), server, properties);
// hookNestedConnectorToBridgeServlet(server);
}
catch (Throwable t)
{
t.printStackTrace();
}
// hookNestedConnectorToBridgeServlet(server);
}
/**

View File

@ -18,31 +18,36 @@ import java.util.Properties;
import org.eclipse.jetty.osgi.boot.OSGiServerConstants;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.osgi.framework.Bundle;
import org.osgi.framework.ServiceEvent;
import org.osgi.framework.ServiceListener;
import org.osgi.framework.ServiceReference;
/**
* Deploy the jetty server instances when they are registered as an OSGi service.
* Deploy the jetty server instances when they are registered as an OSGi
* service.
*/
public class JettyServerServiceTracker implements ServiceListener, IManagedJettyServerRegistry
{
private static Logger LOG = Log.getLogger(JettyServerServiceTracker.class.getName());
/**
* Servers indexed by PIDs. PIDs are generated by the ConfigurationAdmin service.
* Servers indexed by PIDs. PIDs are generated by the ConfigurationAdmin
* service.
*/
private Map<String, ServerInstanceWrapper> _serversIndexedByName = new HashMap<String, ServerInstanceWrapper>();
/** The context-handler to deactivate indexed by ServerInstanceWrapper */
private Map<ServiceReference, ServerInstanceWrapper> _indexByServiceReference = new HashMap<ServiceReference, ServerInstanceWrapper>();
/**
* Stops each one of the registered servers.
*/
public void stop()
{
//not sure that this is really useful but here we go.
// not sure that this is really useful but here we go.
for (ServerInstanceWrapper wrapper : _serversIndexedByName.values())
{
try
@ -51,17 +56,15 @@ public class JettyServerServiceTracker implements ServiceListener, IManagedJetty
}
catch (Throwable t)
{
LOG.warn(t);
}
}
}
/**
* Receives notification that a service has had a lifecycle change.
*
* @param ev
* The <code>ServiceEvent</code> object.
* @param ev The <code>ServiceEvent</code> object.
*/
public void serviceChanged(ServiceEvent ev)
{
@ -80,8 +83,7 @@ public class JettyServerServiceTracker implements ServiceListener, IManagedJetty
}
catch (Exception e)
{
// TODO Auto-generated catch block
e.printStackTrace();
LOG.warn(e);
}
}
}
@ -95,9 +97,11 @@ public class JettyServerServiceTracker implements ServiceListener, IManagedJetty
// we can register it.
}
case ServiceEvent.REGISTERED:
{
try
{
Bundle contributor = sr.getBundle();
Server server = (Server)contributor.getBundleContext().getService(sr);
Server server = (Server) contributor.getBundleContext().getService(sr);
ServerInstanceWrapper wrapper = registerInIndex(server, sr);
Properties props = new Properties();
for (String key : sr.getPropertyKeys())
@ -106,6 +110,11 @@ public class JettyServerServiceTracker implements ServiceListener, IManagedJetty
props.put(key, value);
}
wrapper.start(server, props);
}
catch (Exception e)
{
LOG.warn(e);
}
break;
}
}
@ -113,15 +122,11 @@ public class JettyServerServiceTracker implements ServiceListener, IManagedJetty
private ServerInstanceWrapper registerInIndex(Server server, ServiceReference sr)
{
String name = (String)sr.getProperty(OSGiServerConstants.MANAGED_JETTY_SERVER_NAME);
if (name == null)
{
throw new IllegalArgumentException("The property " +
OSGiServerConstants.MANAGED_JETTY_SERVER_NAME + " is mandatory");
}
String name = (String) sr.getProperty(OSGiServerConstants.MANAGED_JETTY_SERVER_NAME);
if (name == null) { throw new IllegalArgumentException("The property " + OSGiServerConstants.MANAGED_JETTY_SERVER_NAME + " is mandatory"); }
ServerInstanceWrapper wrapper = new ServerInstanceWrapper(name);
_indexByServiceReference.put(sr,wrapper);
_serversIndexedByName.put(name,wrapper);
_indexByServiceReference.put(sr, wrapper);
_serversIndexedByName.put(name, wrapper);
return wrapper;
}
@ -149,13 +154,12 @@ public class JettyServerServiceTracker implements ServiceListener, IManagedJetty
/**
* @param managedServerName The server name
* @return the corresponding jetty server wrapped with its deployment properties.
* @return the corresponding jetty server wrapped with its deployment
* properties.
*/
public ServerInstanceWrapper getServerInstanceWrapper(String managedServerName)
{
return _serversIndexedByName.get(managedServerName == null
? OSGiServerConstants.MANAGED_JETTY_SERVER_DEFAULT_NAME : managedServerName);
return _serversIndexedByName.get(managedServerName == null ? OSGiServerConstants.MANAGED_JETTY_SERVER_DEFAULT_NAME : managedServerName);
}
}

View File

@ -106,8 +106,15 @@ public class JettyServersManagedFactory implements ManagedServiceFactory, IManag
_serversIndexedByPID.put(pid, serverInstanceWrapper);
_serversNameIndexedByPID.put(pid, name);
_serversPIDIndexedByName.put(name, pid);
try
{
serverInstanceWrapper.start(new Server(), properties);
}
catch (Exception e)
{
throw new ConfigurationException(null, "Error starting jetty server instance", e);
}
}
public synchronized void deleted(String pid)
{

View File

@ -44,21 +44,21 @@ import org.eclipse.jetty.util.resource.Resource;
import org.eclipse.jetty.xml.XmlConfiguration;
import org.xml.sax.SAXParseException;
/**
* Exposes a Jetty Server to be managed by an OSGi ManagedServiceFactory
* Configure and start it.
* Can also be used from the ManagedServiceFactory
* Configure and start it. Can also be used from the ManagedServiceFactory
*/
public class ServerInstanceWrapper {
public class ServerInstanceWrapper
{
/** The value of this property points to the parent director of
* the jetty.xml configuration file currently executed.
* Everything is passed as a URL to support the
* case where the bundle is zipped. */
/**
* The value of this property points to the parent director of the jetty.xml
* configuration file currently executed. Everything is passed as a URL to
* support the case where the bundle is zipped.
*/
public static final String PROPERTY_THIS_JETTY_XML_FOLDER_URL = "this.jetty.xml.parent.folder.url";
private static Logger __logger = Log.getLogger(ServerInstanceWrapper.class.getName());
private static Logger LOG = Log.getLogger(ServerInstanceWrapper.class.getName());
private final String _managedServerName;
@ -66,6 +66,7 @@ public class ServerInstanceWrapper {
* The managed jetty server
*/
private Server _server;
private ContextHandlerCollection _ctxtHandler;
/**
@ -74,12 +75,13 @@ public class ServerInstanceWrapper {
* let the TldScanner find the jars where the tld files are.
*/
private ClassLoader _commonParentClassLoaderForWebapps;
private DeploymentManager _deploymentManager;
private OSGiAppProvider _provider;
private WebBundleDeployerHelper _webBundleDeployerHelper;
public ServerInstanceWrapper(String managedServerName)
{
_managedServerName = managedServerName;
@ -91,8 +93,9 @@ public class ServerInstanceWrapper {
}
/**
* The classloader that should be the parent classloader for
* each webapp deployed on this server.
* The classloader that should be the parent classloader for each webapp
* deployed on this server.
*
* @return
*/
public ClassLoader getParentClassLoaderForWebapps()
@ -116,13 +119,11 @@ public class ServerInstanceWrapper {
return _provider;
}
public Server getServer()
{
return _server;
}
public WebBundleDeployerHelper getWebBundleDeployerHelp()
{
return _webBundleDeployerHelper;
@ -136,8 +137,7 @@ public class ServerInstanceWrapper {
return _ctxtHandler;
}
public void start(Server server, Dictionary props)
public void start(Server server, Dictionary props) throws Exception
{
_server = server;
ClassLoader contextCl = Thread.currentThread().getContextClassLoader();
@ -146,17 +146,10 @@ public class ServerInstanceWrapper {
// passing this bundle's classloader as the context classlaoder
// makes sure there is access to all the jetty's bundles
ClassLoader libExtClassLoader = null;
String sharedURLs = (String)props.get(OSGiServerConstants.MANAGED_JETTY_SHARED_LIB_FOLDER_URLS);
try
{
String sharedURLs = (String) props.get(OSGiServerConstants.MANAGED_JETTY_SHARED_LIB_FOLDER_URLS);
List<File> shared = sharedURLs != null ? extractFiles(sharedURLs) : null;
libExtClassLoader = LibExtClassLoaderHelper.createLibExtClassLoader(
shared, null, server, JettyBootstrapActivator.class.getClassLoader());
}
catch (MalformedURLException e)
{
e.printStackTrace();
}
libExtClassLoader = LibExtClassLoaderHelper.createLibExtClassLoader(shared, null, server, JettyBootstrapActivator.class.getClassLoader());
Thread.currentThread().setContextClassLoader(libExtClassLoader);
@ -164,44 +157,50 @@ public class ServerInstanceWrapper {
init();
//now that we have an app provider we can call the registration customizer.
try
{
URL[] jarsWithTlds = getJarsWithTlds();
_commonParentClassLoaderForWebapps = jarsWithTlds == null
? libExtClassLoader
:new TldLocatableURLClassloader(libExtClassLoader,jarsWithTlds);
}
catch (MalformedURLException e)
{
e.printStackTrace();
}
// now that we have an app provider we can call the registration
// customizer.
URL[] jarsWithTlds = getJarsWithTlds();
_commonParentClassLoaderForWebapps = jarsWithTlds == null ? libExtClassLoader : new TldLocatableURLClassloader(libExtClassLoader, jarsWithTlds);
server.start();
_webBundleDeployerHelper = new WebBundleDeployerHelper(this);
}
catch (Throwable t)
catch (Exception e)
{
t.printStackTrace();
if (server != null)
{
try
{
server.stop();
}
catch (Exception x)
{
LOG.ignore(x);
}
}
throw e;
}
finally
{
Thread.currentThread().setContextClassLoader(contextCl);
}
_webBundleDeployerHelper = new WebBundleDeployerHelper(this);
}
}
public void stop()
{
try {
try
{
if (_server.isRunning())
{
_server.stop();
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
catch (Exception e)
{
LOG.warn(e);
}
}
@ -229,14 +228,13 @@ public class ServerInstanceWrapper {
private URL[] getJarsWithTlds() throws Exception
{
ArrayList<URL> res = new ArrayList<URL>();
WebBundleDeployerHelper.staticInit();//that is not looking great.
WebBundleDeployerHelper.staticInit();// that is not looking great.
for (WebappRegistrationCustomizer regCustomizer : WebBundleDeployerHelper.JSP_REGISTRATION_HELPERS)
{
URL[] urls = regCustomizer.getJarsWithTlds(_provider, WebBundleDeployerHelper.BUNDLE_FILE_LOCATOR_HELPER);
for (URL url : urls)
{
if (!res.contains(url))
res.add(url);
if (!res.contains(url)) res.add(url);
}
}
if (!res.isEmpty())
@ -248,15 +246,11 @@ public class ServerInstanceWrapper {
private void configure(Server server, Dictionary props) throws Exception
{
String jettyConfigurationUrls = (String) props.get(OSGiServerConstants.MANAGED_JETTY_XML_CONFIG_URLS);
List<URL> jettyConfigurations = jettyConfigurationUrls != null
? extractResources(jettyConfigurationUrls) : null;
if (jettyConfigurations == null || jettyConfigurations.isEmpty())
{
return;
}
Map<String,Object> id_map = new HashMap<String,Object>();
id_map.put("Server",server);
Map<String,String> properties = new HashMap<String,String>();
List<URL> jettyConfigurations = jettyConfigurationUrls != null ? extractResources(jettyConfigurationUrls) : null;
if (jettyConfigurations == null || jettyConfigurations.isEmpty()) { return; }
Map<String, Object> id_map = new HashMap<String, Object>();
id_map.put("Server", server);
Map<String, String> properties = new HashMap<String, String>();
Enumeration<Object> en = props.keys();
while (en.hasMoreElements())
{
@ -276,14 +270,16 @@ public class ServerInstanceWrapper {
XmlConfiguration config = new XmlConfiguration(is);
config.getIdMap().putAll(id_map);
//#334062 compute the URL of the folder that contains the jetty.xml conf file
//and set it as a property so we can compute relative paths from it.
// #334062 compute the URL of the folder that contains the
// jetty.xml conf file
// and set it as a property so we can compute relative paths
// from it.
String urlPath = jettyConfiguration.toString();
int lastSlash = urlPath.lastIndexOf('/');
if (lastSlash > 4)
{
urlPath = urlPath.substring(0, lastSlash);
Map<String,String> properties2 = new HashMap<String,String>(properties);
Map<String, String> properties2 = new HashMap<String, String>(properties);
properties2.put(PROPERTY_THIS_JETTY_XML_FOLDER_URL, urlPath);
config.getProperties().putAll(properties2);
}
@ -292,11 +288,11 @@ public class ServerInstanceWrapper {
config.getProperties().putAll(properties);
}
config.configure();
id_map=config.getIdMap();
id_map = config.getIdMap();
}
catch (SAXParseException saxparse)
{
__logger.warn("Unable to configure the jetty/etc file " + jettyConfiguration,saxparse);
LOG.warn("Unable to configure the jetty/etc file " + jettyConfiguration, saxparse);
throw saxparse;
}
finally
@ -307,7 +303,6 @@ public class ServerInstanceWrapper {
}
/**
* Must be called after the server is configured.
*
@ -319,7 +314,7 @@ public class ServerInstanceWrapper {
private void init()
{
// Get the context handler
_ctxtHandler = (ContextHandlerCollection)_server.getChildHandlerByClass(ContextHandlerCollection.class);
_ctxtHandler = (ContextHandlerCollection) _server.getChildHandlerByClass(ContextHandlerCollection.class);
// get a deployerManager
List<DeploymentManager> deployers = _server.getBeans(DeploymentManager.class);
@ -331,38 +326,36 @@ public class ServerInstanceWrapper {
{
if (provider instanceof OSGiAppProvider)
{
_provider=(OSGiAppProvider)provider;
_provider = (OSGiAppProvider) provider;
break;
}
}
if (_provider == null)
{
//create it on the fly with reasonable default values.
// create it on the fly with reasonable default values.
try
{
_provider = new OSGiAppProvider();
_provider.setMonitoredDirResource(
Resource.newResource(getDefaultOSGiContextsHome(
new File(System.getProperty("jetty.home"))).toURI()));
} catch (IOException e) {
e.printStackTrace();
_provider.setMonitoredDirResource(Resource.newResource(getDefaultOSGiContextsHome(new File(System.getProperty("jetty.home"))).toURI()));
}
catch (IOException e)
{
LOG.warn(e);
}
_deploymentManager.addAppProvider(_provider);
}
}
if (_ctxtHandler == null || _provider==null)
throw new IllegalStateException("ERROR: No ContextHandlerCollection or OSGiAppProvider configured");
if (_ctxtHandler == null || _provider == null) throw new IllegalStateException("ERROR: No ContextHandlerCollection or OSGiAppProvider configured");
}
/**
* @return The default folder in which the context files of the osgi bundles
* are located and watched. Or null when the system property
* "jetty.osgi.contexts.home" is not defined.
* If the configuration file defines the OSGiAppProvider's context.
* This will not be taken into account.
* "jetty.osgi.contexts.home" is not defined. If the configuration
* file defines the OSGiAppProvider's context. This will not be
* taken into account.
*/
File getDefaultOSGiContextsHome(File jettyHome)
{
@ -370,10 +363,9 @@ public class ServerInstanceWrapper {
if (jettyContextsHome != null)
{
File contextsHome = new File(jettyContextsHome);
if (!contextsHome.exists() || !contextsHome.isDirectory())
{
throw new IllegalArgumentException("the ${jetty.osgi.contexts.home} '" + jettyContextsHome + " must exist and be a folder");
}
if (!contextsHome.exists() || !contextsHome.isDirectory()) { throw new IllegalArgumentException(
"the ${jetty.osgi.contexts.home} '" + jettyContextsHome
+ " must exist and be a folder"); }
return contextsHome;
}
return new File(jettyHome, "/contexts");
@ -396,19 +388,19 @@ public class ServerInstanceWrapper {
String tok = tokenizer.nextToken();
try
{
urls.add(((DefaultFileLocatorHelper) WebBundleDeployerHelper
.BUNDLE_FILE_LOCATOR_HELPER).getLocalURL(new URL(tok)));
urls.add(((DefaultFileLocatorHelper) WebBundleDeployerHelper.BUNDLE_FILE_LOCATOR_HELPER).getLocalURL(new URL(tok)));
}
catch (Throwable mfe)
{
LOG.warn(mfe);
}
}
return urls;
}
/**
* Get the folders that might contain jars for the legacy J2EE shared libraries
* Get the folders that might contain jars for the legacy J2EE shared
* libraries
*/
private List<File> extractFiles(String propertyValue)
{
@ -420,8 +412,7 @@ public class ServerInstanceWrapper {
try
{
URL url = new URL(tok);
url = ((DefaultFileLocatorHelper) WebBundleDeployerHelper
.BUNDLE_FILE_LOCATOR_HELPER).getFileURL(url);
url = ((DefaultFileLocatorHelper) WebBundleDeployerHelper.BUNDLE_FILE_LOCATOR_HELPER).getFileURL(url);
if (url.getProtocol().equals("file"))
{
Resource res = Resource.newResource(url);
@ -434,11 +425,10 @@ public class ServerInstanceWrapper {
}
catch (Throwable mfe)
{
LOG.warn(mfe);
}
}
return files;
}
}

View File

@ -267,7 +267,7 @@ public class Server extends HandlerWrapper implements Attributes
mex.add(e);
}
if (_connectors!=null)
if (_connectors!=null && mex.size()==0)
{
for (int i=0;i<_connectors.length;i++)
{

View File

@ -414,7 +414,7 @@ public class ProxyServlet implements Servlet
if (request.getQueryString() != null)
uri += "?" + request.getQueryString();
HttpURI url = proxyHttpURI(request.getScheme(),request.getServerName(),request.getServerPort(),uri);
HttpURI url = proxyHttpURI(request,uri);
if (debug != 0)
_log.debug(debug + " proxy " + uri + "-->" + url);
@ -677,6 +677,11 @@ public class ProxyServlet implements Servlet
}
/* ------------------------------------------------------------ */
protected HttpURI proxyHttpURI(HttpServletRequest request, String uri) throws MalformedURLException
{
return proxyHttpURI(request.getScheme(), request.getServerName(), request.getServerPort(), uri);
}
protected HttpURI proxyHttpURI(String scheme, String serverName, int serverPort, String uri) throws MalformedURLException
{
if (!validateDestination(serverName,uri))

View File

@ -35,6 +35,4 @@ public interface ISession extends Session
public <C> void control(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Handler<C> handler, C context);
public <C> void data(IStream stream, DataInfo dataInfo, long timeout, TimeUnit unit, Handler<C> handler, C context);
public int getWindowSize();
}

View File

@ -18,10 +18,11 @@ package org.eclipse.jetty.spdy;
import java.nio.ByteBuffer;
import java.nio.channels.InterruptedByTimeoutException;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
@ -253,9 +254,9 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
@Override
public List<Stream> getStreams()
public Set<Stream> getStreams()
{
List<Stream> result = new ArrayList<>();
Set<Stream> result = new HashSet<>();
result.addAll(streams.values());
return result;
}
@ -540,7 +541,10 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
Settings.Setting windowSizeSetting = frame.getSettings().get(Settings.ID.INITIAL_WINDOW_SIZE);
if (windowSizeSetting != null)
{
int prevWindowSize = windowSize;
windowSize = windowSizeSetting.value();
for (IStream stream : streams.values())
stream.updateWindowSize(windowSize - prevWindowSize);
logger.debug("Updated window size to {}", windowSize);
}
@ -774,12 +778,6 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
threadPool.execute(task);
}
@Override
public int getWindowSize()
{
return windowSize;
}
@Override
public void flush()
{
@ -794,12 +792,21 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
if (frameBytes == null)
return;
buffer = frameBytes.getByteBuffer();
if (buffer == null)
FrameBytes stalled = null;
while (true)
{
enqueueFirst(frameBytes);
logger.debug("Flush skipped, {} frame(s) in queue", queue.size());
buffer = frameBytes.getByteBuffer();
if (buffer != null)
break;
// We are stalled: enqueue as last so other frames can be flushed
enqueueLast(frameBytes);
if (stalled == null)
stalled = frameBytes;
else if (stalled == frameBytes)
return;
logger.debug("Flush stalled for {}, {} frame(s) in queue", frameBytes, queue.size());
frameBytes = queue.poll();
}
flushing = true;

View File

@ -17,7 +17,7 @@
package org.eclipse.jetty.spdy.api;
import java.util.EventListener;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@ -181,7 +181,7 @@ public interface Session
/**
* @return the streams currently active in this session
*/
public List<Stream> getStreams();
public Set<Stream> getStreams();
/**
* <p>Super interface for listeners with callbacks that are invoked on specific session events.</p>

View File

@ -23,6 +23,7 @@ import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
@ -41,6 +42,70 @@ import org.junit.Test;
public class FlowControlTest extends AbstractTest
{
@Test
public void testFlowControlWithConcurrentSettings() throws Exception
{
// Initial window is 64 KiB. We allow the client to send 1024 B
// then we change the window to 512 B. At this point, the client
// must stop sending data (although the initial window allows it)
final int size = 512;
final AtomicReference<DataInfo> dataInfoRef = new AtomicReference<>();
final CountDownLatch dataLatch = new CountDownLatch(2);
final CountDownLatch settingsLatch = new CountDownLatch(1);
Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
stream.reply(new ReplyInfo(true));
return new StreamFrameListener.Adapter()
{
private final AtomicInteger dataFrames = new AtomicInteger();
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
int dataFrameCount = dataFrames.incrementAndGet();
if (dataFrameCount == 1)
{
dataInfoRef.set(dataInfo);
Settings settings = new Settings();
settings.put(new Settings.Setting(Settings.ID.INITIAL_WINDOW_SIZE, size));
stream.getSession().settings(new SettingsInfo(settings));
}
else if (dataFrameCount > 1)
{
dataInfo.consume(dataInfo.length());
dataLatch.countDown();
}
}
};
}
}), new SessionFrameListener.Adapter()
{
@Override
public void onSettings(Session session, SettingsInfo settingsInfo)
{
settingsLatch.countDown();
}
});
Stream stream = session.syn(new SynInfo(false), null).get(5, TimeUnit.SECONDS);
stream.data(new BytesDataInfo(new byte[size * 2], false));
settingsLatch.await(5, TimeUnit.SECONDS);
// Send the second chunk of data, must not arrive since we're flow control stalled now
stream.data(new BytesDataInfo(new byte[size * 2], true));
Assert.assertFalse(dataLatch.await(1, TimeUnit.SECONDS));
// Consume the data arrived to server, this will resume flow control
DataInfo dataInfo = dataInfoRef.get();
dataInfo.consume(dataInfo.length());
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testServerFlowControlOneBigWrite() throws Exception
{
@ -294,6 +359,98 @@ public class FlowControlTest extends AbstractTest
Assert.assertEquals(dataInfo.length(), dataInfo.consumed());
}
@Test
public void testStreamsStalledDoesNotStallOtherStreams() throws Exception
{
final int windowSize = 1024;
final CountDownLatch settingsLatch = new CountDownLatch(1);
Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public void onSettings(Session session, SettingsInfo settingsInfo)
{
settingsLatch.countDown();
}
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
stream.reply(new ReplyInfo(false));
stream.data(new BytesDataInfo(new byte[windowSize * 2], true));
return null;
}
}), null);
Settings settings = new Settings();
settings.put(new Settings.Setting(Settings.ID.INITIAL_WINDOW_SIZE, windowSize));
session.settings(new SettingsInfo(settings));
Assert.assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
final CountDownLatch latch = new CountDownLatch(3);
final AtomicReference<DataInfo> dataInfoRef1 = new AtomicReference<>();
final AtomicReference<DataInfo> dataInfoRef2 = new AtomicReference<>();
session.syn(new SynInfo(true), new StreamFrameListener.Adapter()
{
private final AtomicInteger dataFrames = new AtomicInteger();
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
int frames = dataFrames.incrementAndGet();
if (frames == 1)
{
// Do not consume it to stall flow control
dataInfoRef1.set(dataInfo);
}
else
{
dataInfo.consume(dataInfo.length());
if (dataInfo.isClose())
latch.countDown();
}
}
}).get(5, TimeUnit.SECONDS);
session.syn(new SynInfo(true), new StreamFrameListener.Adapter()
{
private final AtomicInteger dataFrames = new AtomicInteger();
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
int frames = dataFrames.incrementAndGet();
if (frames == 1)
{
// Do not consume it to stall flow control
dataInfoRef2.set(dataInfo);
}
else
{
dataInfo.consume(dataInfo.length());
if (dataInfo.isClose())
latch.countDown();
}
}
}).get(5, TimeUnit.SECONDS);
session.syn(new SynInfo(true), new StreamFrameListener.Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
DataInfo dataInfo1 = dataInfoRef1.getAndSet(null);
if (dataInfo1 != null)
dataInfo1.consume(dataInfo1.length());
DataInfo dataInfo2 = dataInfoRef2.getAndSet(null);
if (dataInfo2 != null)
dataInfo2.consume(dataInfo2.length());
dataInfo.consume(dataInfo.length());
if (dataInfo.isClose())
latch.countDown();
}
}).get(5, TimeUnit.SECONDS);
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
private void expectException(Class<? extends Exception> exception, Callable command)
{
try

View File

@ -381,7 +381,7 @@ public class Main
}
else if (info.equals("@STARTINI"))
{
List<String> ini = loadStartIni(null);
List<String> ini = loadStartIni(new File(_jettyHome,"start.ini"));
if (ini != null && ini.size() > 0)
{
for (String a : ini)

View File

@ -402,8 +402,7 @@ public class WebInfConfiguration extends AbstractConfiguration
}
if (LOG.isDebugEnabled())
LOG.debug("Try webapp=" + web_app + ", exists=" + web_app.exists() + ", directory=" + web_app.isDirectory());
LOG.debug("Try webapp=" + web_app + ", exists=" + web_app.exists() + ", directory=" + web_app.isDirectory()+" file="+(web_app.getFile()));
// Is the WAR usable directly?
if (web_app.exists() && !web_app.isDirectory() && !web_app.toString().startsWith("jar:"))
{
@ -486,17 +485,11 @@ public class WebInfConfiguration extends AbstractConfiguration
}
// Do we need to extract WEB-INF/lib?
if (context.isCopyWebInf())
if (context.isCopyWebInf() && !context.isCopyWebDir())
{
Resource web_inf= web_app.addPath("WEB-INF/");
if (web_inf instanceof ResourceCollection ||
web_inf.exists() &&
web_inf.isDirectory() &&
(web_inf.getFile()==null || !web_inf.getFile().isDirectory()))
{
File extractedWebInfDir= new File(context.getTempDirectory(), "webinf");
if (extractedWebInfDir.exists())
IO.delete(extractedWebInfDir);
@ -537,7 +530,6 @@ public class WebInfConfiguration extends AbstractConfiguration
context.setBaseResource(rc);
}
}
}
public File findWorkDirectory (WebAppContext context) throws IOException

View File

@ -10,6 +10,7 @@
<name>Jetty :: Project</name>
<url>${jetty.url}</url>
<packaging>pom</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jetty.url>http://www.eclipse.org/jetty</jetty.url>