Merge remote-tracking branch 'origin/master' into jetty-9-modularstart

This commit is contained in:
Greg Wilkins 2013-04-29 14:03:16 +10:00
commit 89da7623dc
31 changed files with 900 additions and 240 deletions

View File

@ -652,6 +652,9 @@ public class HttpParser
case CACHE_CONTROL:
case USER_AGENT:
add_to_connection_trie=_connectionFields!=null && _field==null;
break;
default: break;
}
if (add_to_connection_trie && !_connectionFields.isFull() && _header!=null && _valueString!=null)
@ -1089,6 +1092,8 @@ public class HttpParser
BufferUtil.clear(buffer);
}
return false;
default: break;
}
// Request/response line
@ -1262,6 +1267,9 @@ public class HttpParser
BufferUtil.clear(buffer);
return false;
}
default:
break;
}
}
@ -1340,8 +1348,19 @@ public class HttpParser
case CLOSED:
case END:
break;
case EOF_CONTENT:
_handler.messageComplete();
break;
default:
LOG.warn("Closing {}",this);
if (_state.ordinal()>State.END.ordinal())
{
_handler.earlyEOF();
_handler.messageComplete();
}
else
LOG.warn("Closing {}",this);
}
setState(State.CLOSED);
_endOfContent=EndOfContent.UNKNOWN_CONTENT;
@ -1369,6 +1388,7 @@ public class HttpParser
/* ------------------------------------------------------------------------------- */
private void setState(State state)
{
// LOG.debug("{} --> {}",_state,state);
_state=state;
}

View File

@ -24,6 +24,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -112,13 +113,130 @@ public abstract class AbstractConnection implements Connection
if (_state.compareAndSet(State.FILLING,State.FILLING_INTERESTED))
break loop;
break;
case FILLING_BLOCKED:
if (_state.compareAndSet(State.FILLING_BLOCKED,State.FILLING_BLOCKED_INTERESTED))
break loop;
break;
case BLOCKED:
if (_state.compareAndSet(State.BLOCKED,State.BLOCKED_INTERESTED))
break loop;
break;
case FILLING_BLOCKED_INTERESTED:
case FILLING_INTERESTED:
case BLOCKED_INTERESTED:
case INTERESTED:
break loop;
}
}
}
private void unblock()
{
LOG.debug("unblock {}",this);
loop:while(true)
{
switch(_state.get())
{
case FILLING_BLOCKED:
if (_state.compareAndSet(State.FILLING_BLOCKED,State.FILLING))
break loop;
break;
case FILLING_BLOCKED_INTERESTED:
if (_state.compareAndSet(State.FILLING_BLOCKED_INTERESTED,State.FILLING_INTERESTED))
break loop;
break;
case BLOCKED_INTERESTED:
if (_state.compareAndSet(State.BLOCKED_INTERESTED,State.INTERESTED))
{
getEndPoint().fillInterested(_readCallback);
break loop;
}
break;
case BLOCKED:
if (_state.compareAndSet(State.BLOCKED,State.IDLE))
break loop;
break;
case FILLING:
case IDLE:
case FILLING_INTERESTED:
case INTERESTED:
break loop;
}
}
}
/**
*/
protected void block(final BlockingCallback callback)
{
LOG.debug("block {}",this);
final Callback blocked=new Callback()
{
@Override
public void succeeded()
{
unblock();
callback.succeeded();
}
@Override
public void failed(Throwable x)
{
unblock();
callback.failed(x);
}
};
loop:while(true)
{
switch(_state.get())
{
case IDLE:
if (_state.compareAndSet(State.IDLE,State.BLOCKED))
{
getEndPoint().fillInterested(blocked);
break loop;
}
break;
case FILLING:
if (_state.compareAndSet(State.FILLING,State.FILLING_BLOCKED))
{
getEndPoint().fillInterested(blocked);
break loop;
}
break;
case FILLING_INTERESTED:
if (_state.compareAndSet(State.FILLING_INTERESTED,State.FILLING_BLOCKED_INTERESTED))
{
getEndPoint().fillInterested(blocked);
break loop;
}
break;
case BLOCKED:
case BLOCKED_INTERESTED:
case FILLING_BLOCKED:
case FILLING_BLOCKED_INTERESTED:
throw new IllegalStateException("Already Blocked");
case INTERESTED:
throw new IllegalStateException();
}
}
}
/**
* <p>Callback method invoked when the endpoint is ready to be read.</p>
@ -225,7 +343,7 @@ public abstract class AbstractConnection implements Connection
private enum State
{
IDLE, INTERESTED, FILLING, FILLING_INTERESTED
IDLE, INTERESTED, FILLING, FILLING_INTERESTED, FILLING_BLOCKED, BLOCKED, FILLING_BLOCKED_INTERESTED, BLOCKED_INTERESTED
}
private class ReadCallback implements Callback, Runnable
@ -247,12 +365,25 @@ public abstract class AbstractConnection implements Connection
{
case IDLE:
case INTERESTED:
throw new IllegalStateException();
case BLOCKED:
case BLOCKED_INTERESTED:
LOG.warn(new IllegalStateException());
return;
case FILLING:
if (_state.compareAndSet(State.FILLING,State.IDLE))
break loop;
break;
case FILLING_BLOCKED:
if (_state.compareAndSet(State.FILLING_BLOCKED,State.BLOCKED))
break loop;
break;
case FILLING_BLOCKED_INTERESTED:
if (_state.compareAndSet(State.FILLING_BLOCKED_INTERESTED,State.BLOCKED_INTERESTED))
break loop;
break;
case FILLING_INTERESTED:
if (_state.compareAndSet(State.FILLING_INTERESTED,State.INTERESTED))
@ -266,7 +397,7 @@ public abstract class AbstractConnection implements Connection
}
}
else
LOG.warn(new Throwable());
LOG.warn(new IllegalStateException());
}
@Override

View File

@ -284,7 +284,7 @@ public abstract class AbstractJettyMojo extends AbstractMojo
/**
* A wrapper for the Server object
*/
protected JettyServer server = JettyServer.getInstance();
protected JettyServer server = new JettyServer();
/**
@ -494,6 +494,8 @@ public abstract class AbstractJettyMojo extends AbstractMojo
String tmp = System.getProperty(PORT_SYSPROPERTY, MavenServerConnector.DEFAULT_PORT_STR);
httpConnector.setPort(Integer.parseInt(tmp.trim()));
}
if (httpConnector.getServer() == null)
httpConnector.setServer(this.server);
this.server.addConnector(httpConnector);
}
@ -504,12 +506,13 @@ public abstract class AbstractJettyMojo extends AbstractMojo
//if <httpConnector> not configured in the pom, create one
if (httpConnector == null)
{
httpConnector = new MavenServerConnector();
httpConnector = new MavenServerConnector();
//use any jetty.port settings provided
String tmp = System.getProperty(PORT_SYSPROPERTY, MavenServerConnector.DEFAULT_PORT_STR);
httpConnector.setPort(Integer.parseInt(tmp.trim()));
}
if (httpConnector.getServer() == null)
httpConnector.setServer(this.server);
this.server.setConnectors(new Connector[] {httpConnector});
}

View File

@ -38,16 +38,6 @@ public class JettyServer extends org.eclipse.jetty.server.Server
{
public static final JettyServer __instance = new JettyServer();
/**
* Singleton instance
* @return
*/
public static JettyServer getInstance()
{
return __instance;
}
private RequestLog requestLog;
private ContextHandlerCollection contexts;
@ -56,7 +46,7 @@ public class JettyServer extends org.eclipse.jetty.server.Server
/**
*
*/
private JettyServer()
public JettyServer()
{
super();
setStopAtShutdown(true);

View File

@ -19,21 +19,259 @@
package org.eclipse.jetty.maven.plugin;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.thread.Scheduler;
/**
* MavenServerConnector
*
*
* As the ServerConnector class does not have a no-arg constructor, and moreover requires
* the server instance passed in to all its constructors, it cannot
* be referenced in the pom.xml. This class wraps a ServerConnector, delaying setting the
* server instance. Only a few of the setters from the ServerConnector class are supported.
*/
public class MavenServerConnector extends ServerConnector
public class MavenServerConnector extends AbstractLifeCycle implements Connector
{
public static int DEFAULT_PORT = 8080;
public static String DEFAULT_PORT_STR = String.valueOf(DEFAULT_PORT);
public static int DEFAULT_MAX_IDLE_TIME = 30000;
private Server server;
private ServerConnector delegate;
private String host;
private String name;
private int port;
private long idleTimeout;
private int lingerTime;
public MavenServerConnector()
{
super(JettyServer.getInstance());
}
public void setServer(Server server)
{
this.server = server;
}
public void setHost(String host)
{
this.host = host;
}
public String getHost()
{
return this.host;
}
public void setPort(int port)
{
this.port = port;
}
public int getPort ()
{
return this.port;
}
public void setName (String name)
{
this.name = name;
}
public void setIdleTimeout(long idleTimeout)
{
this.idleTimeout = idleTimeout;
}
public void setSoLingerTime(int lingerTime)
{
this.lingerTime = lingerTime;
}
@Override
protected void doStart() throws Exception
{
if (this.server == null)
throw new IllegalStateException("Server not set for MavenServerConnector");
this.delegate = new ServerConnector(this.server);
this.delegate.setName(this.name);
this.delegate.setPort(this.port);
this.delegate.setHost(this.host);
this.delegate.setIdleTimeout(idleTimeout);
this.delegate.setSoLingerTime(lingerTime);
this.delegate.start();
super.doStart();
}
@Override
protected void doStop() throws Exception
{
this.delegate.stop();
super.doStop();
this.delegate = null;
}
/**
* @see org.eclipse.jetty.util.component.Graceful#shutdown()
*/
@Override
public Future<Void> shutdown()
{
checkDelegate();
return this.delegate.shutdown();
}
/**
* @see org.eclipse.jetty.server.Connector#getServer()
*/
@Override
public Server getServer()
{
return this.server;
}
/**
* @see org.eclipse.jetty.server.Connector#getExecutor()
*/
@Override
public Executor getExecutor()
{
checkDelegate();
return this.delegate.getExecutor();
}
/**
* @see org.eclipse.jetty.server.Connector#getScheduler()
*/
@Override
public Scheduler getScheduler()
{
checkDelegate();
return this.delegate.getScheduler();
}
/**
* @see org.eclipse.jetty.server.Connector#getByteBufferPool()
*/
@Override
public ByteBufferPool getByteBufferPool()
{
checkDelegate();
return this.delegate.getByteBufferPool();
}
/**
* @see org.eclipse.jetty.server.Connector#getConnectionFactory(java.lang.String)
*/
@Override
public ConnectionFactory getConnectionFactory(String nextProtocol)
{
checkDelegate();
return this.delegate.getConnectionFactory(nextProtocol);
}
/**
* @see org.eclipse.jetty.server.Connector#getConnectionFactory(java.lang.Class)
*/
@Override
public <T> T getConnectionFactory(Class<T> factoryType)
{
checkDelegate();
return this.delegate.getConnectionFactory(factoryType);
}
/**
* @see org.eclipse.jetty.server.Connector#getDefaultConnectionFactory()
*/
@Override
public ConnectionFactory getDefaultConnectionFactory()
{
checkDelegate();
return getDefaultConnectionFactory();
}
/**
* @see org.eclipse.jetty.server.Connector#getConnectionFactories()
*/
@Override
public Collection<ConnectionFactory> getConnectionFactories()
{
checkDelegate();
return this.delegate.getConnectionFactories();
}
/**
* @see org.eclipse.jetty.server.Connector#getProtocols()
*/
@Override
public List<String> getProtocols()
{
checkDelegate();
return this.delegate.getProtocols();
}
/**
* @see org.eclipse.jetty.server.Connector#getIdleTimeout()
*/
@Override
@ManagedAttribute("maximum time a connection can be idle before being closed (in ms)")
public long getIdleTimeout()
{
checkDelegate();
return this.delegate.getIdleTimeout();
}
/**
* @see org.eclipse.jetty.server.Connector#getTransport()
*/
@Override
public Object getTransport()
{
checkDelegate();
return this.delegate.getTransport();
}
/**
* @see org.eclipse.jetty.server.Connector#getConnectedEndPoints()
*/
@Override
public Collection<EndPoint> getConnectedEndPoints()
{
checkDelegate();
return this.delegate.getConnectedEndPoints();
}
/**
* @see org.eclipse.jetty.server.Connector#getName()
*/
@Override
public String getName()
{
return this.name;
}
private void checkDelegate() throws IllegalStateException
{
if (this.delegate == null)
throw new IllegalStateException ("MavenServerConnector delegate not ready");
}
}

View File

@ -55,7 +55,7 @@ public class Starter
private List<File> jettyXmls; // list of jetty.xml config files to apply - Mandatory
private File contextXml; //name of context xml file to configure the webapp - Mandatory
private JettyServer server;
private JettyServer server = new JettyServer();
private JettyWebAppContext webApp;
@ -120,8 +120,6 @@ public class Starter
{
LOG.debug("Starting Jetty Server ...");
this.server = JettyServer.getInstance();
//apply any configs from jetty.xml files first
applyJettyXml ();
@ -132,6 +130,7 @@ public class Starter
{
//if a SystemProperty -Djetty.port=<portnum> has been supplied, use that as the default port
MavenServerConnector httpConnector = new MavenServerConnector();
httpConnector.setServer(this.server);
String tmp = System.getProperty(PORT_SYSPROPERTY, MavenServerConnector.DEFAULT_PORT_STR);
httpConnector.setPort(Integer.parseInt(tmp.trim()));
connectors = new Connector[] {httpConnector};

View File

@ -27,9 +27,7 @@
</goals>
<configuration>
<includes>**</includes>
<excludes>**/MANIFEST.MF</excludes>
<excludes>**/ECLIPSEF.RSA</excludes>
<excludes>**/ECLIPSEF.SF</excludes>
<excludes>**/MANIFEST.MF,META-INF/*.RSA,META-INF/*.DSA,META-INF/*.SF</excludes>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>true</overWriteSnapshots>

View File

@ -207,8 +207,29 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
// Can the parser progress (even with an empty buffer)
boolean call_channel=_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer);
// If there is a request buffer, we are re-entering here
if (!call_channel && BufferUtil.isEmpty(_requestBuffer))
// Parse the buffer
if (call_channel)
{
// Parse as much content as there is available before calling the channel
// this is both efficient (may queue many chunks), will correctly set available for 100 continues
// and will drive the parser to completion if all content is available.
while (_parser.inContentState())
{
if (!_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer))
break;
}
// The parser returned true, which indicates the channel is ready to handle a request.
// Call the channel and this will either handle the request/response to completion OR,
// if the request suspends, the request/response will be incomplete so the outer loop will exit.
_channel.run();
// Return if suspended or upgraded
if (_channel.getState().isSuspended() || getEndPoint().getConnection()!=this)
return;
}
else if (BufferUtil.isEmpty(_requestBuffer))
{
if (_requestBuffer == null)
_requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT);
@ -242,33 +263,15 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
releaseRequestBuffer();
return;
}
// Parse what we have read
call_channel=_parser.parseNext(_requestBuffer);
}
// Parse the buffer
if (call_channel)
else
{
// Parse as much content as there is available before calling the channel
// this is both efficient (may queue many chunks), will correctly set available for 100 continues
// and will drive the parser to completion if all content is available.
while (_parser.inContentState())
{
if (!_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer))
break;
}
// The parser returned true, which indicates the channel is ready to handle a request.
// Call the channel and this will either handle the request/response to completion OR,
// if the request suspends, the request/response will be incomplete so the outer loop will exit.
_channel.run();
// Return if suspended or upgraded
if (_channel.getState().isSuspended() || getEndPoint().getConnection()!=this)
return;
}
// TODO work out how we can get here and a better way to handle it
LOG.warn("Unexpected state: "+this+ " "+_channel+" "+_channel.getRequest());
if (!_channel.getState().isSuspended())
getEndPoint().close();
return;
}
}
}
catch (EofException e)
@ -547,7 +550,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
// need to call blockForContent again
while (event && BufferUtil.hasContent(_requestBuffer) && _parser.inContentState())
_parser.parseNext(_requestBuffer);
// If we have an event, return
if (event)
return;
@ -563,7 +566,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
// Wait until we can read
getEndPoint().fillInterested(_readBlocker);
block(_readBlocker);
LOG.debug("{} block readable on {}",this,_readBlocker);
_readBlocker.block();

View File

@ -19,7 +19,6 @@
package org.eclipse.jetty.servlet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.InputStream;
@ -345,6 +344,45 @@ public class AsyncServletTest
Assert.assertThat(response,Matchers.not(Matchers.containsString(content)));
}
@Test
public void testAsyncRead() throws Exception
{
String header="GET /ctx/path/info?suspend=2000&resume=1500 HTTP/1.1\r\n"+
"Host: localhost\r\n"+
"Content-Length: 10\r\n"+
"\r\n";
String body="12345678\r\n";
String close="GET /ctx/path/info HTTP/1.1\r\n"+
"Host: localhost\r\n"+
"Connection: close\r\n"+
"\r\n";
try (Socket socket = new Socket("localhost",_port);)
{
socket.setSoTimeout(10000);
socket.getOutputStream().write(header.getBytes("ISO-8859-1"));
Thread.sleep(500);
socket.getOutputStream().write(body.getBytes("ISO-8859-1"),0,2);
Thread.sleep(500);
socket.getOutputStream().write(body.getBytes("ISO-8859-1"),2,8);
socket.getOutputStream().write(close.getBytes("ISO-8859-1"));
String response = IO.toString(socket.getInputStream());
assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
assertContains(
"history: REQUEST\r\n"+
"history: initial\r\n"+
"history: suspend\r\n"+
"history: async-read=10\r\n"+
"history: resume\r\n"+
"history: ASYNC\r\n"+
"history: !initial\r\n"+
"history: onComplete\r\n",response);
}
}
public synchronized String process(String query,String content) throws Exception
{
String request = "GET /ctx/path/info";
@ -364,9 +402,8 @@ public class AsyncServletTest
int port=_port;
String response=null;
try
try (Socket socket = new Socket("localhost",port);)
{
Socket socket = new Socket("localhost",port);
socket.setSoTimeout(1000000);
socket.getOutputStream().write(request.getBytes("UTF-8"));
@ -379,11 +416,10 @@ public class AsyncServletTest
throw e;
}
// System.err.println(response);
return response;
}
private static class AsyncServlet extends HttpServlet
@ -429,7 +465,7 @@ public class AsyncServletTest
if (request.getDispatcherType()==DispatcherType.REQUEST)
{
((HttpServletResponse)response).addHeader("history","initial");
response.addHeader("history","initial");
if (read_before>0)
{
byte[] buf=new byte[read_before];
@ -442,6 +478,30 @@ public class AsyncServletTest
while(b!=-1)
b=in.read();
}
else if (request.getContentLength()>0)
{
new Thread()
{
@Override
public void run()
{
int c=0;
try
{
InputStream in=request.getInputStream();
int b=0;
while(b!=-1)
if((b=in.read())>=0)
c++;
response.addHeader("history","async-read="+c);
}
catch(Exception e)
{
e.printStackTrace();
}
}
}.start();
}
if (suspend_for>=0)
{
@ -449,7 +509,7 @@ public class AsyncServletTest
if (suspend_for>0)
async.setTimeout(suspend_for);
async.addListener(__listener);
((HttpServletResponse)response).addHeader("history","suspend");
response.addHeader("history","suspend");
if (complete_after>0)
{
@ -527,7 +587,7 @@ public class AsyncServletTest
}
else
{
((HttpServletResponse)response).addHeader("history","!initial");
response.addHeader("history","!initial");
if (suspend2_for>=0 && request.getAttribute("2nd")==null)
{
@ -540,7 +600,7 @@ public class AsyncServletTest
async.setTimeout(suspend2_for);
}
// continuation.addContinuationListener(__listener);
((HttpServletResponse)response).addHeader("history","suspend");
response.addHeader("history","suspend");
if (complete2_after>0)
{
@ -581,7 +641,7 @@ public class AsyncServletTest
@Override
public void run()
{
((HttpServletResponse)response).addHeader("history","resume");
response.addHeader("history","resume");
async.dispatch();
}
};
@ -592,7 +652,7 @@ public class AsyncServletTest
}
else if (resume2_after==0)
{
((HttpServletResponse)response).addHeader("history","dispatch");
response.addHeader("history","dispatch");
async.dispatch();
}
}
@ -633,15 +693,11 @@ public class AsyncServletTest
@Override
public void onStartAsync(AsyncEvent event) throws IOException
{
// TODO Auto-generated method stub
}
@Override
public void onError(AsyncEvent event) throws IOException
{
// TODO Auto-generated method stub
}
@Override

View File

@ -32,7 +32,7 @@ public class PushSynInfo extends SynInfo
private int associatedStreamId;
public PushSynInfo(int associatedStreamId, PushInfo pushInfo){
super(pushInfo.getHeaders(), pushInfo.isClose());
super(pushInfo.getTimeout(), pushInfo.getUnit(), pushInfo.getHeaders(), pushInfo.isClose(), (byte)0);
this.associatedStreamId = associatedStreamId;
}

View File

@ -284,8 +284,8 @@ public class HttpTransportOverSPDY implements HttpTransport
Fields pushHeaders = createPushHeaders(scheme, host, pushResource);
final Fields pushRequestHeaders = createRequestHeaders(scheme, host, uri, pushResource);
// TODO: handle the timeout better
stream.push(new PushInfo(0, TimeUnit.MILLISECONDS, pushHeaders, false), new Promise.Adapter<Stream>()
stream.push(new PushInfo(pushHeaders, false),
new Promise.Adapter<Stream>()
{
@Override
public void succeeded(Stream pushStream)

View File

@ -22,8 +22,9 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParseListener;
public class ClientUpgradeResponse extends UpgradeResponse
public class ClientUpgradeResponse extends UpgradeResponse implements HttpResponseHeaderParseListener
{
private ByteBuffer remainingBuffer;
@ -43,6 +44,7 @@ public class ClientUpgradeResponse extends UpgradeResponse
throw new UnsupportedOperationException("Not supported on client implementation");
}
@Override
public void setRemainingBuffer(ByteBuffer remainingBuffer)
{
this.remainingBuffer = remainingBuffer;

View File

@ -39,11 +39,12 @@ import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.ClientUpgradeResponse;
import org.eclipse.jetty.websocket.client.io.HttpResponseHeaderParser.ParseException;
import org.eclipse.jetty.websocket.common.AcceptHash;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.common.extensions.ExtensionStack;
import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser;
import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser.ParseException;
/**
* This is the initial connection handling that exists immediately after physical connection is established to destination server.
@ -92,7 +93,7 @@ public class UpgradeConnection extends AbstractConnection
this.request = connectPromise.getRequest();
// Setup the parser
this.parser = new HttpResponseHeaderParser();
this.parser = new HttpResponseHeaderParser(new ClientUpgradeResponse());
}
public void disconnect(boolean onlyOutput)
@ -173,7 +174,7 @@ public class UpgradeConnection extends AbstractConnection
{
LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
}
ClientUpgradeResponse resp = parser.parse(buffer);
ClientUpgradeResponse resp = (ClientUpgradeResponse)parser.parse(buffer);
if (resp != null)
{
// Got a response!

View File

@ -0,0 +1,32 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.io.http;
import java.nio.ByteBuffer;
public interface HttpResponseHeaderParseListener
{
void addHeader(String name, String value);
void setRemainingBuffer(ByteBuffer copy);
void setStatusCode(int statusCode);
void setStatusReason(String statusReason);
}

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.client.io;
package org.eclipse.jetty.websocket.common.io.http;
import java.nio.ByteBuffer;
import java.util.regex.Matcher;
@ -25,7 +25,6 @@ import java.util.regex.Pattern;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.Utf8LineParser;
import org.eclipse.jetty.websocket.client.ClientUpgradeResponse;
/**
* Responsible for reading UTF8 Response Header lines and parsing them into a provided UpgradeResponse object.
@ -56,12 +55,13 @@ public class HttpResponseHeaderParser
private static final Pattern PAT_HEADER = Pattern.compile("([^:]+):\\s*(.*)");
private static final Pattern PAT_STATUS_LINE = Pattern.compile("^HTTP/1.[01]\\s+(\\d+)\\s+(.*)",Pattern.CASE_INSENSITIVE);
private ClientUpgradeResponse response;
private Utf8LineParser lineParser;
private final HttpResponseHeaderParseListener listener;
private final Utf8LineParser lineParser;
private State state;
public HttpResponseHeaderParser()
public HttpResponseHeaderParser(HttpResponseHeaderParseListener listener)
{
this.listener = listener;
this.lineParser = new Utf8LineParser();
this.state = State.STATUS_LINE;
}
@ -71,7 +71,7 @@ public class HttpResponseHeaderParser
return (state == State.END);
}
public ClientUpgradeResponse parse(ByteBuffer buf) throws ParseException
public HttpResponseHeaderParseListener parse(ByteBuffer buf) throws ParseException
{
while (!isDone() && (buf.remaining() > 0))
{
@ -84,8 +84,8 @@ public class HttpResponseHeaderParser
ByteBuffer copy = ByteBuffer.allocate(buf.remaining());
BufferUtil.put(buf,copy);
BufferUtil.flipToFlush(copy,0);
this.response.setRemainingBuffer(copy);
return this.response;
this.listener.setRemainingBuffer(copy);
return listener;
}
}
}
@ -98,22 +98,21 @@ public class HttpResponseHeaderParser
{
case STATUS_LINE:
{
this.response = new ClientUpgradeResponse();
Matcher mat = PAT_STATUS_LINE.matcher(line);
if (!mat.matches())
{
throw new ParseException("Unexpected HTTP upgrade response status line [" + line + "]");
throw new ParseException("Unexpected HTTP response status line [" + line + "]");
}
try
{
response.setStatusCode(Integer.parseInt(mat.group(1)));
listener.setStatusCode(Integer.parseInt(mat.group(1)));
}
catch (NumberFormatException e)
{
throw new ParseException("Unexpected HTTP upgrade response status code",e);
throw new ParseException("Unexpected HTTP response status code",e);
}
response.setStatusReason(mat.group(2));
listener.setStatusReason(mat.group(2));
state = State.HEADER;
break;
}
@ -130,8 +129,8 @@ public class HttpResponseHeaderParser
{
String headerName = header.group(1);
String headerValue = header.group(2);
// TODO: need to split header/value if comma delimited
response.addHeader(headerName,headerValue);
// do need to split header/value if comma delimited?
listener.addHeader(headerName,headerValue);
}
break;
}

View File

@ -16,9 +16,10 @@
// ========================================================================
//
package org.eclipse.jetty.websocket.client.internal.io;
package org.eclipse.jetty.websocket.common.io.http;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -27,8 +28,6 @@ import java.util.List;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.client.io.HttpResponseHeaderParser;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@ -43,6 +42,32 @@ public class HttpResponseHeaderParserTest
buf.put(ByteBuffer.wrap(StringUtil.getBytes(line,StringUtil.__UTF8)));
}
@Test
public void testParseNotFound()
{
StringBuilder resp = new StringBuilder();
resp.append("HTTP/1.1 404 Not Found\r\n");
resp.append("Date: Fri, 26 Apr 2013 21:43:08 GMT\r\n");
resp.append("Content-Type: text/html; charset=ISO-8859-1\r\n");
resp.append("Cache-Control: must-revalidate,no-cache,no-store\r\n");
resp.append("Content-Length: 38\r\n");
resp.append("Server: Jetty(9.0.0.v20130308)\r\n");
resp.append("\r\n");
// and some body content
resp.append("What you are looking for is not here\r\n");
ByteBuffer buf = BufferUtil.toBuffer(resp.toString(),StringUtil.__UTF8_CHARSET);
HttpResponseParseCapture capture = new HttpResponseParseCapture();
HttpResponseHeaderParser parser = new HttpResponseHeaderParser(capture);
assertThat("Parser.parse",parser.parse(buf),notNullValue());
assertThat("Response.statusCode",capture.getStatusCode(),is(404));
assertThat("Response.statusReason",capture.getStatusReason(),is("Not Found"));
assertThat("Response.headers[Content-Length]",capture.getHeader("Content-Length"),is("38"));
assertThat("Response.remainingBuffer",capture.getRemainingBuffer().remaining(),is(38));
}
@Test
public void testParseRealWorldResponse()
{
@ -73,14 +98,14 @@ public class HttpResponseHeaderParserTest
BufferUtil.flipToFlush(buf,0);
// Parse Buffer
HttpResponseHeaderParser parser = new HttpResponseHeaderParser();
UpgradeResponse response = parser.parse(buf);
Assert.assertThat("Response",response,notNullValue());
HttpResponseParseCapture capture = new HttpResponseParseCapture();
HttpResponseHeaderParser parser = new HttpResponseHeaderParser(capture);
assertThat("Parser.parse",parser.parse(buf),notNullValue());
Assert.assertThat("Response.statusCode",response.getStatusCode(),is(200));
Assert.assertThat("Response.statusReason",response.getStatusReason(),is("OK"));
Assert.assertThat("Response.statusCode",capture.getStatusCode(),is(200));
Assert.assertThat("Response.statusReason",capture.getStatusReason(),is("OK"));
Assert.assertThat("Response.header[age]",response.getHeader("age"),is("518097"));
Assert.assertThat("Response.header[age]",capture.getHeader("age"),is("518097"));
}
@Test
@ -122,24 +147,47 @@ public class HttpResponseHeaderParserTest
small3.position(70);
// Parse Buffer
HttpResponseHeaderParser parser = new HttpResponseHeaderParser();
UpgradeResponse response;
HttpResponseParseCapture capture = new HttpResponseParseCapture();
HttpResponseHeaderParser parser = new HttpResponseHeaderParser(capture);
assertThat("Parser.parse",parser.parse(buf),notNullValue());
// Parse small 1
response = parser.parse(small1);
Assert.assertThat("Small 1",response,nullValue());
Assert.assertThat("Small 1",parser.parse(small1),nullValue());
// Parse small 2
response = parser.parse(small2);
Assert.assertThat("Small 2",response,nullValue());
Assert.assertThat("Small 2",parser.parse(small2),nullValue());
// Parse small 3
response = parser.parse(small3);
Assert.assertThat("Small 3",response,notNullValue());
Assert.assertThat("Small 3",parser.parse(small3),notNullValue());
Assert.assertThat("Response.statusCode",response.getStatusCode(),is(200));
Assert.assertThat("Response.statusReason",response.getStatusReason(),is("OK"));
Assert.assertThat("Response.statusCode",capture.getStatusCode(),is(200));
Assert.assertThat("Response.statusReason",capture.getStatusReason(),is("OK"));
Assert.assertThat("Response.header[age]",response.getHeader("age"),is("518097"));
Assert.assertThat("Response.header[age]",capture.getHeader("age"),is("518097"));
}
@Test
public void testParseUpgrade()
{
// Example from RFC6455 - Section 1.2 (Protocol Overview)
StringBuilder resp = new StringBuilder();
resp.append("HTTP/1.1 101 Switching Protocols\r\n");
resp.append("Upgrade: websocket\r\n");
resp.append("Connection: Upgrade\r\n");
resp.append("Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=\r\n");
resp.append("Sec-WebSocket-Protocol: chat\r\n");
resp.append("\r\n");
ByteBuffer buf = BufferUtil.toBuffer(resp.toString(),StringUtil.__UTF8_CHARSET);
HttpResponseParseCapture capture = new HttpResponseParseCapture();
HttpResponseHeaderParser parser = new HttpResponseHeaderParser(capture);
assertThat("Parser.parse",parser.parse(buf),notNullValue());
assertThat("Response.statusCode",capture.getStatusCode(),is(101));
assertThat("Response.statusReason",capture.getStatusReason(),is("Switching Protocols"));
assertThat("Response.headers[Upgrade]",capture.getHeader("Upgrade"),is("websocket"));
assertThat("Response.headers[Connection]",capture.getHeader("Connection"),is("Upgrade"));
assertThat("Buffer.remaining",buf.remaining(),is(0));
}
}

View File

@ -0,0 +1,76 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.io.http;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
public class HttpResponseParseCapture implements HttpResponseHeaderParseListener
{
private int statusCode;
private String statusReason;
private Map<String, String> headers = new HashMap<>();
private ByteBuffer remainingBuffer;
@Override
public void addHeader(String name, String value)
{
headers.put(name.toLowerCase(Locale.ENGLISH),value);
}
public String getHeader(String name)
{
return headers.get(name.toLowerCase(Locale.ENGLISH));
}
public ByteBuffer getRemainingBuffer()
{
return remainingBuffer;
}
public int getStatusCode()
{
return statusCode;
}
public String getStatusReason()
{
return statusReason;
}
@Override
public void setRemainingBuffer(ByteBuffer copy)
{
this.remainingBuffer = copy;
}
@Override
public void setStatusCode(int code)
{
this.statusCode = code;
}
@Override
public void setStatusReason(String reason)
{
this.statusReason = reason;
}
}

View File

@ -103,7 +103,7 @@ public class AnnotatedMaxMessageSizeTest
// Read frame (hopefully text frame)
IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,500);
WebSocketFrame tf = capture.getFrames().get(0);
WebSocketFrame tf = capture.getFrames().poll();
Assert.assertThat("Text Frame.status code",tf.getPayloadAsUTF8(),is(msg));
}
finally

View File

@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.server.blockhead.BlockheadClient;
import org.eclipse.jetty.websocket.server.blockhead.HttpResponse;
import org.eclipse.jetty.websocket.server.examples.MyEchoServlet;
import org.eclipse.jetty.websocket.server.helper.IncomingFramesCapture;
import org.junit.AfterClass;
@ -60,8 +61,8 @@ public class ChromeTest
client.setProtocols("chat");
client.connect();
client.sendStandardRequest();
String response = client.expectUpgradeResponse();
Assert.assertThat("Response",response,containsString("x-webkit-deflate-frame"));
HttpResponse response = client.expectUpgradeResponse();
Assert.assertThat("Response",response.getExtensionsHeader(),containsString("x-webkit-deflate-frame"));
// Generate text frame
String msg = "this is an echo ... cho ... ho ... o";
@ -69,7 +70,7 @@ public class ChromeTest
// Read frame (hopefully text frame)
IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,500);
WebSocketFrame tf = capture.getFrames().get(0);
WebSocketFrame tf = capture.getFrames().poll();
Assert.assertThat("Text Frame.status code",tf.getPayloadAsUTF8(),is(msg));
}
finally

View File

@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.server.blockhead.BlockheadClient;
import org.eclipse.jetty.websocket.server.blockhead.HttpResponse;
import org.eclipse.jetty.websocket.server.helper.EchoServlet;
import org.eclipse.jetty.websocket.server.helper.IncomingFramesCapture;
import org.junit.AfterClass;
@ -80,9 +81,9 @@ public class FragmentExtensionTest
client.setTimeout(TimeUnit.SECONDS,1);
client.connect();
client.sendStandardRequest();
String resp = client.expectUpgradeResponse();
HttpResponse resp = client.expectUpgradeResponse();
Assert.assertThat("Response",resp,containsString("fragment"));
Assert.assertThat("Response",resp.getExtensionsHeader(),containsString("fragment"));
String msg = "Sent as a long message that should be split";
client.write(WebSocketFrame.text(msg));
@ -91,7 +92,7 @@ public class FragmentExtensionTest
IncomingFramesCapture capture = client.readFrames(parts.length,TimeUnit.MILLISECONDS,1000);
for (int i = 0; i < parts.length; i++)
{
WebSocketFrame frame = capture.getFrames().get(i);
WebSocketFrame frame = capture.getFrames().poll();
Assert.assertThat("text[" + i + "].payload",frame.getPayloadAsUTF8(),is(parts[i]));
}
}

View File

@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.server.blockhead.BlockheadClient;
import org.eclipse.jetty.websocket.server.blockhead.HttpResponse;
import org.eclipse.jetty.websocket.server.helper.EchoServlet;
import org.eclipse.jetty.websocket.server.helper.IncomingFramesCapture;
import org.junit.AfterClass;
@ -64,9 +65,9 @@ public class FrameCompressionExtensionTest
client.setTimeout(TimeUnit.SECONDS,1);
client.connect();
client.sendStandardRequest();
String resp = client.expectUpgradeResponse();
HttpResponse resp = client.expectUpgradeResponse();
Assert.assertThat("Response",resp,containsString("x-webkit-deflate-frame"));
Assert.assertThat("Response",resp.getExtensionsHeader(),containsString("x-webkit-deflate-frame"));
String msg = "Hello";
@ -74,7 +75,7 @@ public class FrameCompressionExtensionTest
client.write(WebSocketFrame.text(msg));
IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,1000);
WebSocketFrame frame = capture.getFrames().get(0);
WebSocketFrame frame = capture.getFrames().poll();
Assert.assertThat("TEXT.payload",frame.getPayloadAsUTF8(),is(msg.toString()));
// Client sends second message
@ -83,7 +84,7 @@ public class FrameCompressionExtensionTest
client.write(WebSocketFrame.text(msg));
capture = client.readFrames(1,TimeUnit.SECONDS,1);
frame = capture.getFrames().get(0);
frame = capture.getFrames().poll();
Assert.assertThat("TEXT.payload",frame.getPayloadAsUTF8(),is(msg.toString()));
}
finally

View File

@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.server.blockhead.BlockheadClient;
import org.eclipse.jetty.websocket.server.blockhead.HttpResponse;
import org.eclipse.jetty.websocket.server.helper.EchoServlet;
import org.eclipse.jetty.websocket.server.helper.IncomingFramesCapture;
import org.junit.AfterClass;
@ -65,14 +66,14 @@ public class IdentityExtensionTest
client.setTimeout(TimeUnit.SECONDS,1);
client.connect();
client.sendStandardRequest();
String resp = client.expectUpgradeResponse();
HttpResponse resp = client.expectUpgradeResponse();
Assert.assertThat("Response",resp,containsString("identity"));
Assert.assertThat("Response",resp.getExtensionsHeader(),containsString("identity"));
client.write(WebSocketFrame.text("Hello"));
IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,1000);
WebSocketFrame frame = capture.getFrames().get(0);
WebSocketFrame frame = capture.getFrames().poll();
Assert.assertThat("TEXT.payload",frame.getPayloadAsUTF8(),is("Hello"));
}
finally

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.websocket.server;
import static org.hamcrest.Matchers.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@ -43,15 +45,11 @@ import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
/**
* Tests various close scenarios
*/
@Ignore
public class WebSocketCloseTest
{
@SuppressWarnings("serial")
@ -145,7 +143,7 @@ public class WebSocketCloseTest
client.expectUpgradeResponse();
IncomingFramesCapture capture = client.readFrames(1,TimeUnit.SECONDS,1);
WebSocketFrame frame = capture.getFrames().get(0);
WebSocketFrame frame = capture.getFrames().poll();
Assert.assertThat("frames[0].opcode",frame.getOpCode(),is(OpCode.CLOSE));
CloseInfo close = new CloseInfo(frame);
Assert.assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.NORMAL));

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.websocket.server;
import static org.hamcrest.Matchers.*;
import org.eclipse.jetty.websocket.server.blockhead.BlockheadClient;
import org.eclipse.jetty.websocket.server.blockhead.HttpResponse;
import org.eclipse.jetty.websocket.server.examples.MyEchoServlet;
import org.junit.AfterClass;
import org.junit.Assert;
@ -56,9 +57,10 @@ public class WebSocketInvalidVersionTest
{
client.connect();
client.sendStandardRequest();
String respHeader = client.readResponseHeader();
Assert.assertThat("Response Code",respHeader,startsWith("HTTP/1.1 400 Unsupported websocket version specification"));
Assert.assertThat("Response Header Versions",respHeader,containsString("Sec-WebSocket-Version: 13\r\n"));
HttpResponse response = client.readResponseHeader();
Assert.assertThat("Response Status Code",response.getStatusCode(),is(400));
Assert.assertThat("Response Status Reason",response.getStatusReason(),containsString("Unsupported websocket version specification"));
Assert.assertThat("Response Versions",response.getHeader("Sec-WebSocket-Version"),is("13"));
}
finally
{

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.websocket.server;
import static org.hamcrest.Matchers.*;
import java.net.URI;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
@ -96,13 +97,14 @@ public class WebSocketServerSessionTest
// Read frame (hopefully text frame)
IncomingFramesCapture capture = client.readFrames(4,TimeUnit.MILLISECONDS,500);
WebSocketFrame tf = capture.getFrames().pop();
Queue<WebSocketFrame> frames = capture.getFrames();
WebSocketFrame tf = frames.poll();
Assert.assertThat("Parameter Map[snack]",tf.getPayloadAsUTF8(),is("[cashews]"));
tf = capture.getFrames().pop();
tf = frames.poll();
Assert.assertThat("Parameter Map[amount]",tf.getPayloadAsUTF8(),is("[handful]"));
tf = capture.getFrames().pop();
tf = frames.poll();
Assert.assertThat("Parameter Map[brand]",tf.getPayloadAsUTF8(),is("[off]"));
tf = capture.getFrames().pop();
tf = frames.poll();
Assert.assertThat("Parameter Map[cost]",tf.getPayloadAsUTF8(),is("<null>"));
}
finally

View File

@ -115,7 +115,7 @@ public class WebSocketServletRFCTest
// Read frame echo'd back (hopefully a single binary frame)
IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,1000);
Frame binmsg = capture.getFrames().get(0);
Frame binmsg = capture.getFrames().poll();
int expectedSize = buf1.length + buf2.length + buf3.length;
Assert.assertThat("BinaryFrame.payloadLength",binmsg.getPayloadLength(),is(expectedSize));
@ -181,7 +181,7 @@ public class WebSocketServletRFCTest
// Read frame (hopefully text frame)
IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,500);
WebSocketFrame tf = capture.getFrames().get(0);
WebSocketFrame tf = capture.getFrames().poll();
Assert.assertThat("Text Frame.status code",tf.getPayloadAsUTF8(),is(msg));
}
finally
@ -209,7 +209,7 @@ public class WebSocketServletRFCTest
// Read frame (hopefully close frame)
IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,500);
Frame cf = capture.getFrames().get(0);
Frame cf = capture.getFrames().poll();
CloseInfo close = new CloseInfo(cf);
Assert.assertThat("Close Frame.status code",close.getStatusCode(),is(StatusCode.SERVER_ERROR));
}
@ -252,7 +252,7 @@ public class WebSocketServletRFCTest
// Read frame (hopefully text frame)
IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,500);
WebSocketFrame tf = capture.getFrames().get(0);
WebSocketFrame tf = capture.getFrames().poll();
Assert.assertThat("Text Frame.status code",tf.getPayloadAsUTF8(),is(msg));
}
finally
@ -292,7 +292,7 @@ public class WebSocketServletRFCTest
}
IncomingFramesCapture capture = client.readFrames(1,TimeUnit.SECONDS,1);
WebSocketFrame frame = capture.getFrames().get(0);
WebSocketFrame frame = capture.getFrames().poll();
Assert.assertThat("frames[0].opcode",frame.getOpCode(),is(OpCode.CLOSE));
CloseInfo close = new CloseInfo(frame);
Assert.assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.MESSAGE_TOO_LARGE));
@ -334,7 +334,7 @@ public class WebSocketServletRFCTest
}
IncomingFramesCapture capture = client.readFrames(1,TimeUnit.SECONDS,1);
WebSocketFrame frame = capture.getFrames().get(0);
WebSocketFrame frame = capture.getFrames().poll();
Assert.assertThat("frames[0].opcode",frame.getOpCode(),is(OpCode.CLOSE));
CloseInfo close = new CloseInfo(frame);
Assert.assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.MESSAGE_TOO_LARGE));
@ -367,7 +367,7 @@ public class WebSocketServletRFCTest
client.writeRaw(bb);
IncomingFramesCapture capture = client.readFrames(1,TimeUnit.SECONDS,1);
WebSocketFrame frame = capture.getFrames().get(0);
WebSocketFrame frame = capture.getFrames().poll();
Assert.assertThat("frames[0].opcode",frame.getOpCode(),is(OpCode.CLOSE));
CloseInfo close = new CloseInfo(frame);
Assert.assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.BAD_PAYLOAD));
@ -413,7 +413,7 @@ public class WebSocketServletRFCTest
// Read frame (hopefully text frame)
IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,500);
WebSocketFrame tf = capture.getFrames().get(0);
WebSocketFrame tf = capture.getFrames().poll();
Assert.assertThat("Text Frame.status code",tf.getPayloadAsUTF8(),is(msg));
}
finally

View File

@ -151,7 +151,7 @@ public class Fuzzer
for (int i = 0; i < expectedCount; i++)
{
WebSocketFrame expected = expect.get(i);
WebSocketFrame actual = capture.getFrames().pop();
WebSocketFrame actual = capture.getFrames().poll();
prefix = "Frame[" + i + "]";
@ -188,14 +188,16 @@ public class Fuzzer
// we expect that the close handshake to have occurred and the server should have closed the connection
try
{
@SuppressWarnings("unused")
int val = client.read();
ByteBuffer buf = ByteBuffer.wrap(new byte[]
{ 0x00 });
BufferUtil.flipToFill(buf);
int len = client.read(buf);
Assert.fail("Server has not closed socket");
Assert.assertThat("Server has not closed socket",len,lessThanOrEqualTo(0));
}
catch (SocketException e)
catch (IOException e)
{
// valid path
}
IOState ios = client.getIOState();

View File

@ -19,13 +19,10 @@
package org.eclipse.jetty.websocket.server.blockhead;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import java.io.BufferedReader;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.InetAddress;
@ -41,8 +38,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.net.ssl.HttpsURLConnection;
@ -51,7 +46,6 @@ import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketException;
@ -71,6 +65,7 @@ import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.ExtensionStack;
import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
import org.eclipse.jetty.websocket.common.io.IOState;
import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser;
import org.eclipse.jetty.websocket.server.helper.IncomingFramesCapture;
import org.junit.Assert;
@ -117,6 +112,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
private ExtensionStack extensionStack;
private IOState ioState;
private CountDownLatch disconnectedLatch = new CountDownLatch(1);
private ByteBuffer remainingBuffer;
public BlockheadClient(URI destWebsocketURI) throws URISyntaxException
{
@ -234,32 +230,31 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
}
}
public String expectUpgradeResponse() throws IOException
public HttpResponse expectUpgradeResponse() throws IOException
{
String respHeader = readResponseHeader();
HttpResponse response = readResponseHeader();
if (LOG.isDebugEnabled())
{
LOG.debug("Response Header: {}{}",'\n',respHeader);
LOG.debug("Response Header: {}{}",'\n',response);
}
Assert.assertThat("Response Code",respHeader,startsWith("HTTP/1.1 101 Switching Protocols"));
Assert.assertThat("Response Header Upgrade",respHeader,containsString("Upgrade: WebSocket\r\n"));
Assert.assertThat("Response Header Connection",respHeader,containsString("Connection: Upgrade\r\n"));
Assert.assertThat("Response Status Code",response.getStatusCode(),is(101));
Assert.assertThat("Response Status Reason",response.getStatusReason(),is("Switching Protocols"));
Assert.assertThat("Response Header[Upgrade]",response.getHeader("Upgrade"),is("WebSocket"));
Assert.assertThat("Response Header[Connection]",response.getHeader("Connection"),is("Upgrade"));
// Validate the Sec-WebSocket-Accept
Pattern patAcceptHeader = Pattern.compile("Sec-WebSocket-Accept: (.*=)",Pattern.CASE_INSENSITIVE);
Matcher matAcceptHeader = patAcceptHeader.matcher(respHeader);
Assert.assertThat("Response Header Sec-WebSocket-Accept Exists?",matAcceptHeader.find(),is(true));
String acceptKey = response.getHeader("Sec-WebSocket-Accept");
Assert.assertThat("Response Header[Sec-WebSocket-Accept Exists]",acceptKey,notNullValue());
String reqKey = REQUEST_HASH_KEY;
String expectedHash = AcceptHash.hashKey(reqKey);
String acceptKey = matAcceptHeader.group(1);
Assert.assertThat("Valid Sec-WebSocket-Accept Hash?",acceptKey,is(expectedHash));
// collect extensions configured in response header
List<ExtensionConfig> configs = getExtensionConfigs(respHeader);
List<ExtensionConfig> configs = getExtensionConfigs(response);
extensionStack = new ExtensionStack(this.extensionFactory);
extensionStack.negotiate(configs);
@ -288,7 +283,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
LOG.debug("outgoing = {}",outgoing);
LOG.debug("incoming = {}",extensionStack);
return respHeader;
return response;
}
public void flush() throws IOException
@ -296,22 +291,16 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
out.flush();
}
private List<ExtensionConfig> getExtensionConfigs(String respHeader)
private List<ExtensionConfig> getExtensionConfigs(HttpResponse response)
{
List<ExtensionConfig> configs = new ArrayList<>();
Pattern expat = Pattern.compile("Sec-WebSocket-Extensions: (.*)\r",Pattern.CASE_INSENSITIVE);
Matcher mat = expat.matcher(respHeader);
int offset = 0;
while (mat.find(offset))
String econf = response.getHeader("Sec-WebSocket-Extensions");
if (econf != null)
{
String econf = mat.group(1);
LOG.debug("Found Extension Response: {}",econf);
ExtensionConfig config = ExtensionConfig.parse(econf);
configs.add(config);
offset = mat.end(1);
}
return configs;
}
@ -423,35 +412,6 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
return (socket != null) && (socket.isConnected());
}
public void lookFor(String string) throws IOException
{
String orig = string;
Utf8StringBuilder scanned = new Utf8StringBuilder();
try
{
while (true)
{
int b = in.read();
if (b < 0)
{
throw new EOFException();
}
scanned.append((byte)b);
assertEquals("looking for\"" + orig + "\" in '" + scanned + "'",string.charAt(0),b);
if (string.length() == 1)
{
break;
}
string = string.substring(1);
}
}
catch (IOException e)
{
System.err.println("IOE while looking for \"" + orig + "\" in '" + scanned + "'");
throw e;
}
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback)
{
@ -487,17 +447,18 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
}
}
public int read() throws IOException
{
return in.read();
}
public int read(ByteBuffer buf) throws IOException
{
if (eof)
{
throw new EOFException("Hit EOF");
}
if ((remainingBuffer != null) && (remainingBuffer.remaining() > 0))
{
return BufferUtil.put(remainingBuffer,buf);
}
int len = 0;
int b;
while ((in.available() > 0) && (buf.remaining() > 0))
@ -572,25 +533,24 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
return incomingFrames;
}
public String readResponseHeader() throws IOException
public HttpResponse readResponseHeader() throws IOException
{
InputStreamReader isr = new InputStreamReader(in);
BufferedReader reader = new BufferedReader(isr);
StringBuilder header = new StringBuilder();
// Read the response header
String line = reader.readLine();
Assert.assertNotNull(line);
Assert.assertThat(line,startsWith("HTTP/1.1 "));
header.append(line).append("\r\n");
while ((line = reader.readLine()) != null)
HttpResponse response = new HttpResponse();
HttpResponseHeaderParser parser = new HttpResponseHeaderParser(response);
ByteBuffer buf = BufferUtil.allocate(512);
do
{
if (line.trim().length() == 0)
{
break;
}
header.append(line).append("\r\n");
BufferUtil.flipToFill(buf);
read(buf);
BufferUtil.flipToFlush(buf,0);
}
return header.toString();
while (parser.parse(buf) == null);
remainingBuffer = response.getRemainingBuffer();
return response;
}
public void sendStandardRequest() throws IOException

View File

@ -0,0 +1,95 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.server.blockhead;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParseListener;
public class HttpResponse implements HttpResponseHeaderParseListener
{
private int statusCode;
private String statusReason;
private Map<String, String> headers = new HashMap<>();
private ByteBuffer remainingBuffer;
@Override
public void addHeader(String name, String value)
{
headers.put(name.toLowerCase(Locale.ENGLISH),value);
}
public String getExtensionsHeader()
{
return getHeader("Sec-WebSocket-Extensions");
}
public String getHeader(String name)
{
return headers.get(name.toLowerCase(Locale.ENGLISH));
}
public ByteBuffer getRemainingBuffer()
{
return remainingBuffer;
}
public int getStatusCode()
{
return statusCode;
}
public String getStatusReason()
{
return statusReason;
}
@Override
public void setRemainingBuffer(ByteBuffer copy)
{
this.remainingBuffer = copy;
}
@Override
public void setStatusCode(int code)
{
this.statusCode = code;
}
@Override
public void setStatusReason(String reason)
{
this.statusReason = reason;
}
@Override
public String toString()
{
StringBuilder str = new StringBuilder();
str.append("HTTP/1.1 ").append(statusCode).append(' ').append(statusReason);
for (Map.Entry<String, String> entry : headers.entrySet())
{
str.append('\n').append(entry.getKey()).append(": ").append(entry.getValue());
}
return str.toString();
}
}

View File

@ -20,8 +20,9 @@ package org.eclipse.jetty.websocket.server.helper;
import static org.hamcrest.Matchers.*;
import java.util.LinkedList;
import java.util.Queue;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -35,8 +36,8 @@ import org.junit.Assert;
public class IncomingFramesCapture implements IncomingFrames
{
private static final Logger LOG = Log.getLogger(IncomingFramesCapture.class);
private LinkedList<WebSocketFrame> frames = new LinkedList<>();
private LinkedList<WebSocketException> errors = new LinkedList<>();
private EventQueue<WebSocketFrame> frames = new EventQueue<>();
private EventQueue<WebSocketException> errors = new EventQueue<>();
public void assertErrorCount(int expectedCount)
{
@ -81,10 +82,10 @@ public class IncomingFramesCapture implements IncomingFrames
public void dump()
{
System.err.printf("Captured %d incoming frames%n",frames.size());
for (int i = 0; i < frames.size(); i++)
int i = 0;
for (Frame frame : frames)
{
Frame frame = frames.get(i);
System.err.printf("[%3d] %s%n",i,frame);
System.err.printf("[%3d] %s%n",i++,frame);
System.err.printf(" %s%n",BufferUtil.toDetailString(frame.getPayload()));
}
}
@ -102,7 +103,7 @@ public class IncomingFramesCapture implements IncomingFrames
return count;
}
public LinkedList<WebSocketException> getErrors()
public Queue<WebSocketException> getErrors()
{
return errors;
}
@ -120,7 +121,7 @@ public class IncomingFramesCapture implements IncomingFrames
return count;
}
public LinkedList<WebSocketFrame> getFrames()
public Queue<WebSocketFrame> getFrames()
{
return frames;
}

View File

@ -542,7 +542,7 @@
<dependency>
<groupId>org.eclipse.jetty.toolchain</groupId>
<artifactId>jetty-test-helper</artifactId>
<version>2.0</version>
<version>2.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>