Merge branch 'master' into jetty-8

This commit is contained in:
Jesse McConnell 2012-06-14 15:46:59 -05:00
commit 571b1da632
33 changed files with 2570 additions and 245 deletions

View File

@ -62,6 +62,8 @@ public class DefaultAuthenticatorFactory implements Authenticator.Factory
authenticator=new FormAuthenticator();
else if ( Constraint.__SPNEGO_AUTH.equalsIgnoreCase(auth) )
authenticator = new SpnegoAuthenticator();
else if ( Constraint.__NEGOTIATE_AUTH.equalsIgnoreCase(auth) ) // see Bug #377076
authenticator = new SpnegoAuthenticator(Constraint.__NEGOTIATE_AUTH);
if (Constraint.__CERT_AUTH.equalsIgnoreCase(auth)||Constraint.__CERT_AUTH2.equalsIgnoreCase(auth))
authenticator=new ClientCertAuthenticator();

View File

@ -36,9 +36,25 @@ public class SpnegoAuthenticator extends LoginAuthenticator
{
private static final Logger LOG = Log.getLogger(SpnegoAuthenticator.class);
private String _authMethod = Constraint.__SPNEGO_AUTH;
public SpnegoAuthenticator()
{
}
/**
* Allow for a custom authMethod value to be set for instances where SPENGO may not be appropriate
* @param authMethod
*/
public SpnegoAuthenticator( String authMethod )
{
_authMethod = authMethod;
}
public String getAuthMethod()
{
return Constraint.__SPNEGO_AUTH;
return _authMethod;
}
public Authentication validateRequest(ServletRequest request, ServletResponse response, boolean mandatory) throws ServerAuthException

View File

@ -492,24 +492,9 @@ public class ServletHolder extends Holder<Servlet> implements UserIdentity.Scope
}
// Handle configuring servlets that implement org.apache.jasper.servlet.JspServlet
if (isJspServlet(_servlet))
if (isJspServlet())
{
ContextHandler ch = ((ContextHandler.Context)getServletHandler().getServletContext()).getContextHandler();
/* Set the webapp's classpath for Jasper */
ch.setAttribute("org.apache.catalina.jsp_classpath", ch.getClassPath());
/* Set the system classpath for Jasper */
setInitParameter("com.sun.appserv.jsp.classpath", Loader.getClassPath(ch.getClassLoader()));
/* Set up other classpath attribute */
if ("?".equals(getInitParameter("classpath")))
{
String classpath = ch.getClassPath();
LOG.debug("classpath=" + classpath);
if (classpath != null)
setInitParameter("classpath", classpath);
}
initJspServlet();
}
_servlet.init(_config);
@ -544,6 +529,31 @@ public class ServletHolder extends Holder<Servlet> implements UserIdentity.Scope
}
/* ------------------------------------------------------------ */
/**
* @throws Exception
*/
protected void initJspServlet () throws Exception
{
ContextHandler ch = ((ContextHandler.Context)getServletHandler().getServletContext()).getContextHandler();
/* Set the webapp's classpath for Jasper */
ch.setAttribute("org.apache.catalina.jsp_classpath", ch.getClassPath());
/* Set the system classpath for Jasper */
setInitParameter("com.sun.appserv.jsp.classpath", Loader.getClassPath(ch.getClassLoader().getParent()));
/* Set up other classpath attribute */
if ("?".equals(getInitParameter("classpath")))
{
String classpath = ch.getClassPath();
LOG.debug("classpath=" + classpath);
if (classpath != null)
setInitParameter("classpath", classpath);
}
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.server.UserIdentity.Scope#getContextPath()
@ -642,12 +652,12 @@ public class ServletHolder extends Holder<Servlet> implements UserIdentity.Scope
/* ------------------------------------------------------------ */
private boolean isJspServlet (Servlet servlet)
private boolean isJspServlet ()
{
if (servlet == null)
if (_servlet == null)
return false;
Class c = servlet.getClass();
Class c = _servlet.getClass();
boolean result = false;
while (c != null && !result)

View File

@ -16,6 +16,7 @@
package org.eclipse.jetty.spdy;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@ -89,6 +90,8 @@ public class Promise<T> implements Handler<T>, Future<T>
private T result() throws ExecutionException
{
if (isCancelled())
throw new CancellationException();
Throwable failure = this.failure;
if (failure != null)
throw new ExecutionException(failure);

View File

@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -64,6 +65,7 @@ import org.eclipse.jetty.spdy.frames.SynStreamFrame;
import org.eclipse.jetty.spdy.frames.WindowUpdateFrame;
import org.eclipse.jetty.spdy.generator.Generator;
import org.eclipse.jetty.spdy.parser.Parser;
import org.eclipse.jetty.util.Atomics;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -79,6 +81,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
};
private final Map<String, Object> attributes = new ConcurrentHashMap<>();
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>();
private final LinkedList<FrameBytes> queue = new LinkedList<>();
@ -208,7 +211,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
public void settings(SettingsInfo settingsInfo, long timeout, TimeUnit unit, Handler<Void> handler)
{
SettingsFrame frame = new SettingsFrame(version,settingsInfo.getFlags(),settingsInfo.getSettings());
control(null,frame,timeout,unit,handler,null);
control(null, frame, timeout, unit, handler, null);
}
@Override
@ -244,7 +247,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
@Override
public void goAway(long timeout, TimeUnit unit, Handler<Void> handler)
{
goAway(SessionStatus.OK,timeout,unit,handler);
goAway(SessionStatus.OK, timeout, unit, handler);
}
private void goAway(SessionStatus sessionStatus, long timeout, TimeUnit unit, Handler<Void> handler)
@ -269,6 +272,30 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
return result;
}
@Override
public IStream getStream(int streamId)
{
return streams.get(streamId);
}
@Override
public Object getAttribute(String key)
{
return attributes.get(key);
}
@Override
public void setAttribute(String key, Object value)
{
attributes.put(key, value);
}
@Override
public Object removeAttribute(String key)
{
return attributes.remove(key);
}
@Override
public void onControlFrame(ControlFrame frame)
{
@ -399,7 +426,6 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
};
flowControlStrategy.onDataReceived(this, stream, dataInfo);
stream.process(dataInfo);
updateLastStreamId(stream);
if (stream.isClosed())
removeStream(stream);
}
@ -429,6 +455,8 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private void processSyn(SessionFrameListener listener, IStream stream, SynStreamFrame frame)
{
stream.process(frame);
// Update the last stream id before calling the application (which may send a GO_AWAY)
updateLastStreamId(stream);
SynInfo synInfo = new SynInfo(frame.getHeaders(),frame.isClose(),frame.getPriority());
StreamFrameListener streamListener = notifyOnSyn(listener,stream,synInfo);
stream.setStreamFrameListener(streamListener);
@ -474,7 +502,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private IStream newStream(SynStreamFrame frame)
{
IStream associatedStream = streams.get(frame.getAssociatedStreamId());
IStream stream = new StandardStream(frame, this, associatedStream);
IStream stream = new StandardStream(frame.getStreamId(), frame.getPriority(), this, associatedStream);
flowControlStrategy.onNewStream(this, stream);
return stream;
}
@ -800,9 +828,6 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
{
try
{
if (stream != null)
updateLastStreamId(stream);
// Synchronization is necessary, since we may have concurrent replies
// and those needs to be generated and enqueued atomically in order
// to maintain a correct compression context
@ -830,17 +855,8 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private void updateLastStreamId(IStream stream)
{
int streamId = stream.getId();
if (stream.isClosed() && streamId % 2 != streamIds.get() % 2)
{
// Non-blocking atomic update
int oldValue = lastStreamId.get();
while (streamId > oldValue)
{
if (lastStreamId.compareAndSet(oldValue,streamId))
break;
oldValue = lastStreamId.get();
}
}
if (streamId % 2 != streamIds.get() % 2)
Atomics.updateMax(lastStreamId, streamId);
}
@Override

View File

@ -29,7 +29,6 @@ import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.StreamStatus;
@ -37,7 +36,6 @@ import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.spdy.frames.HeadersFrame;
import org.eclipse.jetty.spdy.frames.SynReplyFrame;
import org.eclipse.jetty.spdy.frames.SynStreamFrame;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -45,9 +43,10 @@ public class StandardStream implements IStream
{
private static final Logger logger = Log.getLogger(Stream.class);
private final Map<String, Object> attributes = new ConcurrentHashMap<>();
private final IStream associatedStream;
private final SynStreamFrame frame;
private final int id;
private final byte priority;
private final ISession session;
private final IStream associatedStream;
private final AtomicInteger windowSize = new AtomicInteger();
private final Set<Stream> pushedStreams = Collections.newSetFromMap(new ConcurrentHashMap<Stream, Boolean>());
private volatile StreamFrameListener listener;
@ -55,9 +54,10 @@ public class StandardStream implements IStream
private volatile CloseState closeState = CloseState.OPENED;
private volatile boolean reset = false;
public StandardStream(SynStreamFrame frame, ISession session, IStream associatedStream)
public StandardStream(int id, byte priority, ISession session, IStream associatedStream)
{
this.frame = frame;
this.id = id;
this.priority = priority;
this.session = session;
this.associatedStream = associatedStream;
}
@ -65,7 +65,7 @@ public class StandardStream implements IStream
@Override
public int getId()
{
return frame.getStreamId();
return id;
}
@Override
@ -95,7 +95,7 @@ public class StandardStream implements IStream
@Override
public byte getPriority()
{
return frame.getPriority();
return priority;
}
@Override
@ -112,7 +112,7 @@ public class StandardStream implements IStream
}
@Override
public Session getSession()
public ISession getSession()
{
return session;
}
@ -150,7 +150,7 @@ public class StandardStream implements IStream
{
case OPENED:
{
closeState = local?CloseState.LOCALLY_CLOSED:CloseState.REMOTELY_CLOSED;
closeState = local ? CloseState.LOCALLY_CLOSED : CloseState.REMOTELY_CLOSED;
break;
}
case LOCALLY_CLOSED:
@ -191,16 +191,16 @@ public class StandardStream implements IStream
{
openState = OpenState.REPLY_RECV;
SynReplyFrame synReply = (SynReplyFrame)frame;
updateCloseState(synReply.isClose(),false);
ReplyInfo replyInfo = new ReplyInfo(synReply.getHeaders(),synReply.isClose());
updateCloseState(synReply.isClose(), false);
ReplyInfo replyInfo = new ReplyInfo(synReply.getHeaders(), synReply.isClose());
notifyOnReply(replyInfo);
break;
}
case HEADERS:
{
HeadersFrame headers = (HeadersFrame)frame;
updateCloseState(headers.isClose(),false);
HeadersInfo headersInfo = new HeadersInfo(headers.getHeaders(),headers.isClose(),headers.isResetCompression());
updateCloseState(headers.isClose(), false);
HeadersInfo headersInfo = new HeadersInfo(headers.getHeaders(), headers.isClose(), headers.isResetCompression());
notifyOnHeaders(headersInfo);
break;
}
@ -269,7 +269,7 @@ public class StandardStream implements IStream
{
if (listener != null)
{
logger.debug("Invoking headers callback with {} on listener {}", frame, listener);
logger.debug("Invoking headers callback with {} on listener {}", headersInfo, listener);
listener.onHeaders(this, headersInfo);
}
}
@ -320,11 +320,11 @@ public class StandardStream implements IStream
{
if (isClosed() || isReset())
{
handler.failed(this, new StreamException(getId(),StreamStatus.STREAM_ALREADY_CLOSED));
handler.failed(this, new StreamException(getId(), StreamStatus.STREAM_ALREADY_CLOSED));
return;
}
PushSynInfo pushSynInfo = new PushSynInfo(getId(),synInfo);
session.syn(pushSynInfo,null,timeout,unit,handler);
PushSynInfo pushSynInfo = new PushSynInfo(getId(), synInfo);
session.syn(pushSynInfo, null, timeout, unit, handler);
}
@Override
@ -341,9 +341,9 @@ public class StandardStream implements IStream
if (isUnidirectional())
throw new IllegalStateException("Protocol violation: cannot send SYN_REPLY frames in unidirectional streams");
openState = OpenState.REPLY_SENT;
updateCloseState(replyInfo.isClose(),true);
SynReplyFrame frame = new SynReplyFrame(session.getVersion(),replyInfo.getFlags(),getId(),replyInfo.getHeaders());
session.control(this,frame,timeout,unit,handler,null);
updateCloseState(replyInfo.isClose(), true);
SynReplyFrame frame = new SynReplyFrame(session.getVersion(), replyInfo.getFlags(), getId(), replyInfo.getHeaders());
session.control(this, frame, timeout, unit, handler, null);
}
@Override
@ -359,18 +359,18 @@ public class StandardStream implements IStream
{
if (!canSend())
{
session.rst(new RstInfo(getId(),StreamStatus.PROTOCOL_ERROR));
session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
throw new IllegalStateException("Protocol violation: cannot send a DATA frame before a SYN_REPLY frame");
}
if (isLocallyClosed())
{
session.rst(new RstInfo(getId(),StreamStatus.PROTOCOL_ERROR));
session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
throw new IllegalStateException("Protocol violation: cannot send a DATA frame on a closed stream");
}
// Cannot update the close state here, because the data that we send may
// be flow controlled, so we need the stream to update the window size.
session.data(this,dataInfo,timeout,unit,handler,null);
session.data(this, dataInfo, timeout, unit, handler, null);
}
@Override
@ -386,18 +386,18 @@ public class StandardStream implements IStream
{
if (!canSend())
{
session.rst(new RstInfo(getId(),StreamStatus.PROTOCOL_ERROR));
session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame before a SYN_REPLY frame");
}
if (isLocallyClosed())
{
session.rst(new RstInfo(getId(),StreamStatus.PROTOCOL_ERROR));
session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame on a closed stream");
}
updateCloseState(headersInfo.isClose(),true);
HeadersFrame frame = new HeadersFrame(session.getVersion(),headersInfo.getFlags(),getId(),headersInfo.getHeaders());
session.control(this,frame,timeout,unit,handler,null);
updateCloseState(headersInfo.isClose(), true);
HeadersFrame frame = new HeadersFrame(session.getVersion(), headersInfo.getFlags(), getId(), headersInfo.getHeaders());
session.control(this, frame, timeout, unit, handler, null);
}
@Override
@ -440,7 +440,7 @@ public class StandardStream implements IStream
@Override
public String toString()
{
return String.format("stream=%d v%d %s",getId(),session.getVersion(),closeState);
return String.format("stream=%d v%d %s", getId(), session.getVersion(), closeState);
}
private boolean canSend()

View File

@ -68,4 +68,10 @@ public class ByteBufferDataInfo extends DataInfo
}
return space;
}
@Override
protected ByteBuffer allocate(int size)
{
return buffer.isDirect() ? ByteBuffer.allocateDirect(size) : super.allocate(size);
}
}

View File

@ -268,6 +268,21 @@ public class Headers implements Iterable<Headers.Header>
return values;
}
/**
* @return the values as a comma separated list
*/
public String valuesAsString()
{
StringBuilder result = new StringBuilder();
for (int i = 0; i < values.length; ++i)
{
if (i > 0)
result.append(", ");
result.append(values[i]);
}
return result.toString();
}
/**
* @return whether the header has multiple values
*/

View File

@ -75,7 +75,7 @@ public interface Session
* @see #syn(SynInfo, StreamFrameListener, long, TimeUnit, Handler)
*/
public Future<Stream> syn(SynInfo synInfo, StreamFrameListener listener);
/**
* <p>Sends asynchronously a SYN_FRAME to create a new {@link Stream SPDY stream}.</p>
* <p>Callers may pass a non-null completion handler to be notified of when the
@ -90,7 +90,7 @@ public interface Session
*/
public void syn(SynInfo synInfo, StreamFrameListener listener, long timeout, TimeUnit unit, Handler<Stream> handler);
/**
* <p>Sends asynchronously a RST_STREAM to abort a stream.</p>
* <p>Callers may use the returned future to wait for the reset to be sent.</p>
@ -180,10 +180,40 @@ public interface Session
public void goAway(long timeout, TimeUnit unit, Handler<Void> handler);
/**
* @return the streams currently active in this session
* @return a snapshot of the streams currently active in this session
* @see #getStream(int)
*/
public Set<Stream> getStreams();
/**
* @param streamId the id of the stream to retrieve
* @return the stream with the given stream id
* @see #getStreams()
*/
public Stream getStream(int streamId);
/**
* @param key the attribute key
* @return an arbitrary object associated with the given key to this session
* @see #setAttribute(String, Object)
*/
public Object getAttribute(String key);
/**
* @param key the attribute key
* @param value an arbitrary object to associate with the given key to this session
* @see #getAttribute(String)
* @see #removeAttribute(String)
*/
public void setAttribute(String key, Object value);
/**
* @param key the attribute key
* @return the arbitrary object associated with the given key to this session
* @see #setAttribute(String, Object)
*/
public Object removeAttribute(String key);
/**
* <p>Super interface for listeners with callbacks that are invoked on specific session events.</p>
*/

View File

@ -405,7 +405,7 @@ public class StandardSessionTest
final CountDownLatch failedCalledLatch = new CountDownLatch(2);
SynStreamFrame synStreamFrame = new SynStreamFrame(SPDY.V2, SynInfo.FLAG_CLOSE, 1, 0, (byte)0, (short)0, null);
IStream stream = new StandardStream(synStreamFrame, session, null);
IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null);
stream.updateWindowSize(8192);
Handler.Adapter<Void> handler = new Handler.Adapter<Void>()
{

View File

@ -46,15 +46,13 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/* ------------------------------------------------------------ */
/**
*/
@RunWith(MockitoJUnitRunner.class)
public class StandardStreamTest
{
@Mock private ISession session;
@Mock private SynStreamFrame synStreamFrame;
@Mock
private ISession session;
@Mock
private SynStreamFrame synStreamFrame;
/**
* Test method for {@link org.eclipse.jetty.spdy.StandardStream#syn(org.eclipse.jetty.spdy.api.SynInfo)}.
@ -63,17 +61,18 @@ public class StandardStreamTest
@Test
public void testSyn()
{
Stream stream = new StandardStream(synStreamFrame,session,null);
Stream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null);
Set<Stream> streams = new HashSet<>();
streams.add(stream);
when(synStreamFrame.isClose()).thenReturn(false);
SynInfo synInfo = new SynInfo(false);
when(session.getStreams()).thenReturn(streams);
stream.syn(synInfo);
verify(session).syn(argThat(new PushSynInfoMatcher(stream.getId(),synInfo)),any(StreamFrameListener.class),anyLong(),any(TimeUnit.class),any(Handler.class));
verify(session).syn(argThat(new PushSynInfoMatcher(stream.getId(), synInfo)), any(StreamFrameListener.class), anyLong(), any(TimeUnit.class), any(Handler.class));
}
private class PushSynInfoMatcher extends ArgumentMatcher<PushSynInfo>{
private class PushSynInfoMatcher extends ArgumentMatcher<PushSynInfo>
{
int associatedStreamId;
SynInfo synInfo;
@ -82,15 +81,18 @@ public class StandardStreamTest
this.associatedStreamId = associatedStreamId;
this.synInfo = synInfo;
}
@Override
public boolean matches(Object argument)
{
PushSynInfo pushSynInfo = (PushSynInfo)argument;
if(pushSynInfo.getAssociatedStreamId() != associatedStreamId){
if (pushSynInfo.getAssociatedStreamId() != associatedStreamId)
{
System.out.println("streamIds do not match!");
return false;
}
if(pushSynInfo.isClose() != synInfo.isClose()){
if (pushSynInfo.isClose() != synInfo.isClose())
{
System.out.println("isClose doesn't match");
return false;
}
@ -99,13 +101,14 @@ public class StandardStreamTest
}
@Test
public void testSynOnClosedStream(){
IStream stream = new StandardStream(synStreamFrame,session,null);
stream.updateCloseState(true,true);
stream.updateCloseState(true,false);
assertThat("stream expected to be closed",stream.isClosed(),is(true));
public void testSynOnClosedStream()
{
IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null);
stream.updateCloseState(true, true);
stream.updateCloseState(true, false);
assertThat("stream expected to be closed", stream.isClosed(), is(true));
final CountDownLatch failedLatch = new CountDownLatch(1);
stream.syn(new SynInfo(false),1,TimeUnit.SECONDS,new Handler.Adapter<Stream>()
stream.syn(new SynInfo(false), 1, TimeUnit.SECONDS, new Handler.Adapter<Stream>()
{
@Override
public void failed(Stream stream, Throwable x)
@ -121,7 +124,7 @@ public class StandardStreamTest
public void testSendDataOnHalfClosedStream() throws InterruptedException, ExecutionException, TimeoutException
{
SynStreamFrame synStreamFrame = new SynStreamFrame(SPDY.V2, SynInfo.FLAG_CLOSE, 1, 0, (byte)0, (short)0, null);
IStream stream = new StandardStream(synStreamFrame, session, null);
IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null);
stream.updateWindowSize(8192);
stream.updateCloseState(synStreamFrame.isClose(), true);
assertThat("stream is half closed", stream.isHalfClosed(), is(true));

View File

@ -60,4 +60,45 @@
-->
</plugins>
</build>
<!--
<profiles>
<profile>
<id>proxy</id>
<build>
<plugins>
<plugin>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-maven-plugin</artifactId>
<version>${project.version}</version>
<configuration>
<stopPort>8888</stopPort>
<stopKey>quit</stopKey>
<jvmArgs>
-Dlog4j.configuration=file://${basedir}/src/main/resources/log4j.properties
-Xbootclasspath/p:${settings.localRepository}/org/mortbay/jetty/npn/npn-boot/${npn.version}/npn-boot-${npn.version}.jar
-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005
</jvmArgs>
<jettyXml>${basedir}/src/main/config/etc/jetty-spdy-proxy.xml</jettyXml>
<contextPath>/</contextPath>
</configuration>
<dependencies>
<dependency>
<groupId>org.eclipse.jetty.spdy</groupId>
<artifactId>spdy-jetty-http</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j-version}</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
</profile>
</profiles>
-->
</project>

View File

@ -0,0 +1,88 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure.dtd">
<Configure id="Server" class="org.eclipse.jetty.server.Server">
<New id="sslContextFactory" class="org.eclipse.jetty.util.ssl.SslContextFactory">
<Set name="keyStorePath">src/main/resources/keystore.jks</Set>
<Set name="keyStorePassword">storepwd</Set>
<Set name="trustStore">src/main/resources/truststore.jks</Set>
<Set name="trustStorePassword">storepwd</Set>
<Set name="protocol">TLSv1</Set>
</New>
<!--
<Set class="org.eclipse.jetty.npn.NextProtoNego" name="debug" type="boolean">true</Set>
-->
<!--
This is the upstream server connector. It speaks non-SSL SPDY/2(HTTP) on port 9090.
-->
<Call name="addConnector">
<Arg>
<New class="org.eclipse.jetty.spdy.http.HTTPSPDYServerConnector">
<Set name="Port">9090</Set>
<Set name="defaultAsyncConnectionFactory">
<Call name="getAsyncConnectionFactory">
<Arg>spdy/2</Arg>
</Call>
</Set>
</New>
</Arg>
</Call>
<!--
The ProxyEngine receives SPDY/x(HTTP) requests from proxy connectors below
and is configured to process requests for host "localhost".
Such requests are converted from SPDY/x(HTTP) to SPDY/2(HTTP) and forwarded
to 127.0.0.1:9090, where they are served by the upstream server above.
-->
<New id="proxyEngine" class="org.eclipse.jetty.spdy.proxy.SPDYProxyEngine">
<Arg>
<New class="org.eclipse.jetty.spdy.SPDYClient$Factory">
<Call name="start" />
</New>
</Arg>
<Set name="proxyInfos">
<Map>
<Entry>
<Item>localhost</Item>
<Item>
<New class="org.eclipse.jetty.spdy.proxy.ProxyEngine$ProxyInfo">
<Arg type="short">2</Arg>
<Arg>127.0.0.1</Arg>
<Arg type="int">9090</Arg>
</New>
</Item>
</Entry>
</Map>
</Set>
</New>
<!--
These are the reverse proxy connectors accepting requests from clients.
They accept non-SSL (on port 8080) and SSL (on port 8443) HTTP,
SPDY/2(HTTP) and SPDY/3(HTTP).
Non-SPDY HTTP requests are converted to SPDY internally and passed to the
ProxyEngine above.
-->
<Call name="addConnector">
<Arg>
<New class="org.eclipse.jetty.spdy.proxy.HTTPSPDYProxyConnector">
<Arg><Ref id="proxyEngine" /></Arg>
<Set name="Port">8080</Set>
</New>
</Arg>
</Call>
<Call name="addConnector">
<Arg>
<New class="org.eclipse.jetty.spdy.proxy.HTTPSPDYProxyConnector">
<Arg><Ref id="proxyEngine" /></Arg>
<Arg><Ref id="sslContextFactory" /></Arg>
<Set name="Port">8443</Set>
</New>
</Arg>
</Call>
</Configure>

View File

@ -0,0 +1,64 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy.http;
import java.io.IOException;
import org.eclipse.jetty.http.HttpSchemes;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.spdy.SPDYServerConnector;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.util.ssl.SslContextFactory;
public class AbstractHTTPSPDYServerConnector extends SPDYServerConnector
{
public AbstractHTTPSPDYServerConnector(ServerSessionFrameListener listener, SslContextFactory sslContextFactory)
{
super(listener, sslContextFactory);
}
@Override
public void customize(EndPoint endPoint, Request request) throws IOException
{
super.customize(endPoint, request);
if (getSslContextFactory() != null)
request.setScheme(HttpSchemes.HTTPS);
}
@Override
public boolean isConfidential(Request request)
{
if (getSslContextFactory() != null)
{
int confidentialPort = getConfidentialPort();
return confidentialPort == 0 || confidentialPort == request.getServerPort();
}
return super.isConfidential(request);
}
@Override
public boolean isIntegral(Request request)
{
if (getSslContextFactory() != null)
{
int integralPort = getIntegralPort();
return integralPort == 0 || integralPort == request.getServerPort();
}
return super.isIntegral(request);
}
}

View File

@ -16,16 +16,10 @@
package org.eclipse.jetty.spdy.http;
import java.io.IOException;
import org.eclipse.jetty.http.HttpSchemes;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.spdy.SPDYServerConnector;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.util.ssl.SslContextFactory;
public class HTTPSPDYServerConnector extends SPDYServerConnector
public class HTTPSPDYServerConnector extends AbstractHTTPSPDYServerConnector
{
public HTTPSPDYServerConnector()
{
@ -47,43 +41,14 @@ public class HTTPSPDYServerConnector extends SPDYServerConnector
// We pass a null ServerSessionFrameListener because for
// HTTP over SPDY we need one that references the endPoint
super(null, sslContextFactory);
// Override the "spdy/3" protocol by handling HTTP over SPDY
clearAsyncConnectionFactories();
// The "spdy/3" protocol handles HTTP over SPDY
putAsyncConnectionFactory("spdy/3", new ServerHTTPSPDYAsyncConnectionFactory(SPDY.V3, getByteBufferPool(), getExecutor(), getScheduler(), this, pushStrategy));
// Override the "spdy/2" protocol by handling HTTP over SPDY
// The "spdy/2" protocol handles HTTP over SPDY
putAsyncConnectionFactory("spdy/2", new ServerHTTPSPDYAsyncConnectionFactory(SPDY.V2, getByteBufferPool(), getExecutor(), getScheduler(), this, pushStrategy));
// Add the "http/1.1" protocol for browsers that support NPN but not SPDY
// The "http/1.1" protocol handles browsers that support NPN but not SPDY
putAsyncConnectionFactory("http/1.1", new ServerHTTPAsyncConnectionFactory(this));
// Override the default connection factory for non-SSL connections to speak plain HTTP
// The default connection factory handles plain HTTP on non-SSL or non-NPN connections
setDefaultAsyncConnectionFactory(getAsyncConnectionFactory("http/1.1"));
}
@Override
public void customize(EndPoint endPoint, Request request) throws IOException
{
super.customize(endPoint, request);
if (getSslContextFactory() != null)
request.setScheme(HttpSchemes.HTTPS);
}
@Override
public boolean isConfidential(Request request)
{
if (getSslContextFactory() != null)
{
int confidentialPort = getConfidentialPort();
return confidentialPort == 0 || confidentialPort == request.getServerPort();
}
return super.isConfidential(request);
}
@Override
public boolean isIntegral(Request request)
{
if (getSslContextFactory() != null)
{
int integralPort = getIntegralPort();
return integralPort == 0 || integralPort == request.getServerPort();
}
return super.isIntegral(request);
}
}

View File

@ -21,18 +21,23 @@ import java.nio.channels.SocketChannel;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.server.AsyncHttpConnection;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.spdy.AsyncConnectionFactory;
import org.eclipse.jetty.spdy.SPDYServerConnector;
public class ServerHTTPAsyncConnectionFactory implements AsyncConnectionFactory
{
private final Connector connector;
private final SPDYServerConnector connector;
public ServerHTTPAsyncConnectionFactory(Connector connector)
public ServerHTTPAsyncConnectionFactory(SPDYServerConnector connector)
{
this.connector = connector;
}
public SPDYServerConnector getConnector()
{
return connector;
}
@Override
public AsyncConnection newAsyncConnection(SocketChannel channel, AsyncEndPoint endPoint, Object attachment)
{

View File

@ -0,0 +1,41 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy.proxy;
import org.eclipse.jetty.spdy.ServerSPDYAsyncConnectionFactory;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.http.AbstractHTTPSPDYServerConnector;
import org.eclipse.jetty.util.ssl.SslContextFactory;
public class HTTPSPDYProxyConnector extends AbstractHTTPSPDYServerConnector
{
public HTTPSPDYProxyConnector(SPDYProxyEngine proxyEngine)
{
this(proxyEngine, null);
}
public HTTPSPDYProxyConnector(SPDYProxyEngine proxyEngine, SslContextFactory sslContextFactory)
{
super(proxyEngine, sslContextFactory);
clearAsyncConnectionFactories();
putAsyncConnectionFactory("spdy/3", new ServerSPDYAsyncConnectionFactory(SPDY.V3, getByteBufferPool(), getExecutor(), getScheduler(), proxyEngine));
putAsyncConnectionFactory("spdy/2", new ServerSPDYAsyncConnectionFactory(SPDY.V2, getByteBufferPool(), getExecutor(), getScheduler(), proxyEngine));
putAsyncConnectionFactory("http/1.1", new ProxyHTTPAsyncConnectionFactory(this, SPDY.V3, proxyEngine));
setDefaultAsyncConnectionFactory(getAsyncConnectionFactory("http/1.1"));
}
}

View File

@ -0,0 +1,131 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy.proxy;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* <p>{@link ProxyEngine} is the base class for SPDY proxy functionalities, that is a proxy that
* accepts SPDY from its client side and converts to any protocol to its server side.</p>
* <p>This class listens for SPDY events sent by clients; subclasses are responsible for translating
* these SPDY client events into appropriate events to forward to the server, in the appropriate
* protocol that is understood by the server.</p>
* <p>This class also provides configuration for the proxy rules.</p>
*/
public abstract class ProxyEngine extends ServerSessionFrameListener.Adapter implements StreamFrameListener
{
protected final Logger logger = Log.getLogger(getClass());
private final ConcurrentMap<String, ProxyInfo> proxyInfos = new ConcurrentHashMap<>();
private final String name;
protected ProxyEngine()
{
this(name());
}
private static String name()
{
try
{
return InetAddress.getLocalHost().getHostName();
}
catch (UnknownHostException x)
{
return "localhost";
}
}
protected ProxyEngine(String name)
{
this.name = name;
}
public String getName()
{
return name;
}
protected void addRequestProxyHeaders(Headers headers)
{
String newValue = "";
Headers.Header header = headers.get("via");
if (header != null)
newValue = header.valuesAsString() + ", ";
newValue += "http/1.1 " + getName();
headers.put("via", newValue);
}
protected void addResponseProxyHeaders(Headers headers)
{
// TODO: add Via header
}
public Map<String, ProxyInfo> getProxyInfos()
{
return new HashMap<>(proxyInfos);
}
public void setProxyInfos(Map<String, ProxyInfo> proxyInfos)
{
this.proxyInfos.clear();
this.proxyInfos.putAll(proxyInfos);
}
public void putProxyInfo(String host, ProxyInfo proxyInfo)
{
proxyInfos.put(host, proxyInfo);
}
protected ProxyInfo getProxyInfo(String host)
{
return proxyInfos.get(host);
}
public static class ProxyInfo
{
private final short version;
private final InetSocketAddress address;
public ProxyInfo(short version, String host, int port)
{
this.version = version;
this.address = new InetSocketAddress(host, port);
}
public short getVersion()
{
return version;
}
public InetSocketAddress getAddress()
{
return address;
}
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy.proxy;
import java.nio.channels.SocketChannel;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.spdy.SPDYServerConnector;
import org.eclipse.jetty.spdy.http.ServerHTTPAsyncConnectionFactory;
public class ProxyHTTPAsyncConnectionFactory extends ServerHTTPAsyncConnectionFactory
{
private final short version;
private final ProxyEngine proxyEngine;
public ProxyHTTPAsyncConnectionFactory(SPDYServerConnector connector, short version, ProxyEngine proxyEngine)
{
super(connector);
this.version = version;
this.proxyEngine = proxyEngine;
}
@Override
public AsyncConnection newAsyncConnection(SocketChannel channel, AsyncEndPoint endPoint, Object attachment)
{
return new ProxyHTTPSPDYAsyncConnection(getConnector(), endPoint, version, proxyEngine);
}
}

View File

@ -0,0 +1,336 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy.proxy;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.nio.DirectNIOBuffer;
import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
import org.eclipse.jetty.io.nio.NIOBuffer;
import org.eclipse.jetty.server.AsyncHttpConnection;
import org.eclipse.jetty.spdy.ISession;
import org.eclipse.jetty.spdy.IStream;
import org.eclipse.jetty.spdy.SPDYServerConnector;
import org.eclipse.jetty.spdy.StandardSession;
import org.eclipse.jetty.spdy.StandardStream;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.SessionStatus;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
public class ProxyHTTPSPDYAsyncConnection extends AsyncHttpConnection
{
private final Headers headers = new Headers();
private final short version;
private final ProxyEngine proxyEngine;
private final HttpGenerator generator;
private final ISession session;
private Stream stream;
private Buffer content;
public ProxyHTTPSPDYAsyncConnection(SPDYServerConnector connector, EndPoint endpoint, short version, ProxyEngine proxyEngine)
{
super(connector, endpoint, connector.getServer());
this.version = version;
this.proxyEngine = proxyEngine;
this.generator = (HttpGenerator)_generator;
this.session = new HTTPSession(version, connector);
}
@Override
public AsyncEndPoint getEndPoint()
{
return (AsyncEndPoint)super.getEndPoint();
}
@Override
protected void startRequest(Buffer method, Buffer uri, Buffer httpVersion) throws IOException
{
SPDYServerConnector connector = (SPDYServerConnector)getConnector();
String scheme = connector.getSslContextFactory() != null ? "https" : "http";
headers.put(HTTPSPDYHeader.SCHEME.name(version), scheme);
headers.put(HTTPSPDYHeader.METHOD.name(version), method.toString("UTF-8"));
headers.put(HTTPSPDYHeader.URI.name(version), uri.toString("UTF-8"));
headers.put(HTTPSPDYHeader.VERSION.name(version), httpVersion.toString("UTF-8"));
}
@Override
protected void parsedHeader(Buffer name, Buffer value) throws IOException
{
String headerName = name.toString("UTF-8").toLowerCase();
String headerValue = value.toString("UTF-8");
switch (headerName)
{
case "host":
headers.put(HTTPSPDYHeader.HOST.name(version), headerValue);
break;
default:
headers.put(headerName, headerValue);
break;
}
}
@Override
protected void headerComplete() throws IOException
{
}
@Override
protected void content(Buffer buffer) throws IOException
{
if (content == null)
{
stream = syn(false);
content = buffer;
}
else
{
proxyEngine.onData(stream, toDataInfo(buffer, false));
}
}
@Override
public void messageComplete(long contentLength) throws IOException
{
if (stream == null)
{
assert content == null;
if (headers.isEmpty())
proxyEngine.onGoAway(session, new GoAwayInfo(0, SessionStatus.OK));
else
syn(true);
}
else
{
proxyEngine.onData(stream, toDataInfo(content, true));
}
headers.clear();
stream = null;
content = null;
}
private Stream syn(boolean close)
{
Stream stream = new HTTPStream(1, (byte)0, session, null);
proxyEngine.onSyn(stream, new SynInfo(headers, close));
return stream;
}
private DataInfo toDataInfo(Buffer buffer, boolean close)
{
if (buffer instanceof ByteArrayBuffer)
return new BytesDataInfo(buffer.array(), buffer.getIndex(), buffer.length(), close);
if (buffer instanceof NIOBuffer)
{
ByteBuffer byteBuffer = ((NIOBuffer)buffer).getByteBuffer();
byteBuffer.limit(buffer.putIndex());
byteBuffer.position(buffer.getIndex());
return new ByteBufferDataInfo(byteBuffer, close);
}
return new BytesDataInfo(buffer.asArray(), close);
}
private class HTTPSession extends StandardSession
{
private HTTPSession(short version, SPDYServerConnector connector)
{
super(version, connector.getByteBufferPool(), connector.getExecutor(), connector.getScheduler(), null, null, 1, proxyEngine, null, null);
}
@Override
public void rst(RstInfo rstInfo, long timeout, TimeUnit unit, Handler<Void> handler)
{
// Not much we can do in HTTP land: just close the connection
goAway(timeout, unit, handler);
}
@Override
public void goAway(long timeout, TimeUnit unit, Handler<Void> handler)
{
try
{
getEndPoint().close();
handler.completed(null);
}
catch (IOException x)
{
handler.failed(null, x);
}
}
}
/**
* <p>This stream will convert the SPDY invocations performed by the proxy into HTTP to be sent to the client.</p>
*/
private class HTTPStream extends StandardStream
{
private final Pattern statusRegexp = Pattern.compile("(\\d{3})\\s*(.*)");
private HTTPStream(int id, byte priority, ISession session, IStream associatedStream)
{
super(id, priority, session, associatedStream);
}
@Override
public void syn(SynInfo synInfo, long timeout, TimeUnit unit, Handler<Stream> handler)
{
// HTTP does not support pushed streams
handler.completed(new HTTPPushStream(2, getPriority(), getSession(), this));
}
@Override
public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Handler<Void> handler)
{
// TODO
throw new UnsupportedOperationException("Not Yet Implemented");
}
@Override
public void reply(ReplyInfo replyInfo, long timeout, TimeUnit unit, Handler<Void> handler)
{
try
{
Headers headers = new Headers(replyInfo.getHeaders(), false);
headers.remove(HTTPSPDYHeader.SCHEME.name(version));
String status = headers.remove(HTTPSPDYHeader.STATUS.name(version)).value();
Matcher matcher = statusRegexp.matcher(status);
matcher.matches();
int code = Integer.parseInt(matcher.group(1));
String reason = matcher.group(2);
generator.setResponse(code, reason);
String httpVersion = headers.remove(HTTPSPDYHeader.VERSION.name(version)).value();
generator.setVersion(Integer.parseInt(httpVersion.replaceAll("\\D", "")));
Headers.Header host = headers.remove(HTTPSPDYHeader.HOST.name(version));
if (host != null)
headers.put("host", host.value());
HttpFields fields = new HttpFields();
for (Headers.Header header : headers)
{
String name = camelize(header.name());
fields.put(name, header.value());
}
generator.completeHeader(fields, replyInfo.isClose());
if (replyInfo.isClose())
complete();
handler.completed(null);
}
catch (IOException x)
{
handler.failed(null, x);
}
}
private String camelize(String name)
{
char[] chars = name.toCharArray();
chars[0] = Character.toUpperCase(chars[0]);
for (int i = 0; i < chars.length; ++i)
{
char c = chars[i];
int j = i + 1;
if (c == '-' && j < chars.length)
chars[j] = Character.toUpperCase(chars[j]);
}
return new String(chars);
}
@Override
public void data(DataInfo dataInfo, long timeout, TimeUnit unit, Handler<Void> handler)
{
try
{
// Data buffer must be copied, as the ByteBuffer is pooled
ByteBuffer byteBuffer = dataInfo.asByteBuffer(false);
Buffer buffer = byteBuffer.isDirect() ?
new DirectNIOBuffer(byteBuffer, false) :
new IndirectNIOBuffer(byteBuffer, false);
generator.addContent(buffer, dataInfo.isClose());
generator.flush(unit.toMillis(timeout));
if (dataInfo.isClose())
complete();
handler.completed(null);
}
catch (IOException x)
{
handler.failed(null, x);
}
}
private void complete() throws IOException
{
generator.complete();
// We need to call asyncDispatch() as if the HTTP request
// has been suspended and now we complete the response
getEndPoint().asyncDispatch();
}
}
private class HTTPPushStream extends StandardStream
{
private HTTPPushStream(int id, byte priority, ISession session, IStream associatedStream)
{
super(id, priority, session, associatedStream);
}
@Override
public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Handler<Void> handler)
{
// Ignore pushed headers
handler.completed(null);
}
@Override
public void data(DataInfo dataInfo, long timeout, TimeUnit unit, Handler<Void> handler)
{
// Ignore pushed data
handler.completed(null);
}
}
}

View File

@ -0,0 +1,541 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy.proxy;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.spdy.SPDYClient;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.PingInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.StreamStatus;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
/**
* <p>{@link SPDYProxyEngine} implements a SPDY to SPDY proxy, that is, converts SPDY events received by
* clients into SPDY events for the servers.</p>
*/
public class SPDYProxyEngine extends ProxyEngine
{
private static final String STREAM_HANDLER_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.streamHandler";
private static final String CLIENT_STREAM_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.clientStream";
private static final String CLIENT_SESSIONS_ATTRIBUTE = "org.eclipse.jetty.spdy.http.proxy.clientSessions";
private final ConcurrentMap<String, Session> serverSessions = new ConcurrentHashMap<>();
private final SessionFrameListener sessionListener = new ProxySessionFrameListener();
private final SPDYClient.Factory factory;
private volatile long connectTimeout = 15000;
private volatile long timeout = 60000;
public SPDYProxyEngine(SPDYClient.Factory factory)
{
this.factory = factory;
}
public long getConnectTimeout()
{
return connectTimeout;
}
public void setConnectTimeout(long connectTimeout)
{
this.connectTimeout = connectTimeout;
}
public long getTimeout()
{
return timeout;
}
public void setTimeout(long timeout)
{
this.timeout = timeout;
}
@Override
public void onPing(Session clientSession, PingInfo pingInfo)
{
// We do not know to which upstream server
// to send the PING so we just ignore it
}
@Override
public void onGoAway(Session clientSession, GoAwayInfo goAwayInfo)
{
for (Session serverSession : serverSessions.values())
{
@SuppressWarnings("unchecked")
Set<Session> sessions = (Set<Session>)serverSession.getAttribute(CLIENT_SESSIONS_ATTRIBUTE);
if (sessions.remove(clientSession))
break;
}
}
@Override
public StreamFrameListener onSyn(final Stream clientStream, SynInfo clientSynInfo)
{
logger.debug("C -> P {} on {}", clientSynInfo, clientStream);
final Session clientSession = clientStream.getSession();
short clientVersion = clientSession.getVersion();
Headers headers = new Headers(clientSynInfo.getHeaders(), false);
Headers.Header hostHeader = headers.get(HTTPSPDYHeader.HOST.name(clientVersion));
if (hostHeader == null)
{
rst(clientStream);
return null;
}
String host = hostHeader.value();
int colon = host.indexOf(':');
if (colon >= 0)
host = host.substring(0, colon);
ProxyInfo proxyInfo = getProxyInfo(host);
if (proxyInfo == null)
{
rst(clientStream);
return null;
}
// TODO: give a chance to modify headers and rewrite URI
short serverVersion = proxyInfo.getVersion();
InetSocketAddress address = proxyInfo.getAddress();
Session serverSession = produceSession(host, serverVersion, address);
if (serverSession == null)
{
rst(clientStream);
return null;
}
@SuppressWarnings("unchecked")
Set<Session> sessions = (Set<Session>)serverSession.getAttribute(CLIENT_SESSIONS_ATTRIBUTE);
sessions.add(clientSession);
convert(clientVersion, serverVersion, headers);
addRequestProxyHeaders(headers);
SynInfo serverSynInfo = new SynInfo(headers, clientSynInfo.isClose());
logger.debug("P -> S {}", serverSynInfo);
StreamFrameListener listener = new ProxyStreamFrameListener(clientStream);
StreamHandler handler = new StreamHandler(clientStream);
clientStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
serverSession.syn(serverSynInfo, listener, timeout, TimeUnit.MILLISECONDS, handler);
return this;
}
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
// Servers do not receive replies
}
@Override
public void onHeaders(Stream stream, HeadersInfo headersInfo)
{
// TODO
throw new UnsupportedOperationException("Not Yet Implemented");
}
@Override
public void onData(Stream clientStream, final DataInfo clientDataInfo)
{
logger.debug("C -> P {} on {}", clientDataInfo, clientStream);
ByteBufferDataInfo serverDataInfo = new ByteBufferDataInfo(clientDataInfo.asByteBuffer(false), clientDataInfo.isClose())
{
@Override
public void consume(int delta)
{
super.consume(delta);
clientDataInfo.consume(delta);
}
};
StreamHandler streamHandler = (StreamHandler)clientStream.getAttribute(STREAM_HANDLER_ATTRIBUTE);
streamHandler.data(serverDataInfo);
}
private Session produceSession(String host, short version, InetSocketAddress address)
{
try
{
Session session = serverSessions.get(host);
if (session == null)
{
SPDYClient client = factory.newSPDYClient(version);
session = client.connect(address, sessionListener).get(getConnectTimeout(), TimeUnit.MILLISECONDS);
session.setAttribute(CLIENT_SESSIONS_ATTRIBUTE, Collections.newSetFromMap(new ConcurrentHashMap<Session, Boolean>()));
logger.debug("Proxy session connected to {}", address);
Session existing = serverSessions.putIfAbsent(host, session);
if (existing != null)
{
session.goAway(getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>());
session = existing;
}
}
return session;
}
catch (Exception x)
{
logger.debug(x);
return null;
}
}
private void convert(short fromVersion, short toVersion, Headers headers)
{
if (fromVersion != toVersion)
{
for (HTTPSPDYHeader httpHeader : HTTPSPDYHeader.values())
{
Headers.Header header = headers.remove(httpHeader.name(fromVersion));
if (header != null)
{
String toName = httpHeader.name(toVersion);
for (String value : header.values())
headers.add(toName, value);
}
}
}
}
private void rst(Stream stream)
{
RstInfo rstInfo = new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM);
stream.getSession().rst(rstInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>());
}
private class ProxyStreamFrameListener extends StreamFrameListener.Adapter
{
private final Stream clientStream;
private volatile ReplyInfo replyInfo;
public ProxyStreamFrameListener(Stream clientStream)
{
this.clientStream = clientStream;
}
@Override
public void onReply(final Stream stream, ReplyInfo replyInfo)
{
short serverVersion = stream.getSession().getVersion();
Headers headers = new Headers(replyInfo.getHeaders(), false);
short clientVersion = this.clientStream.getSession().getVersion();
convert(serverVersion, clientVersion, headers);
addResponseProxyHeaders(headers);
this.replyInfo = new ReplyInfo(headers, replyInfo.isClose());
if (replyInfo.isClose())
reply();
}
@Override
public void onHeaders(Stream stream, HeadersInfo headersInfo)
{
// TODO
throw new UnsupportedOperationException("Not Yet Implemented");
}
@Override
public void onData(final Stream stream, final DataInfo dataInfo)
{
if (replyInfo != null)
{
if (dataInfo.isClose())
replyInfo.getHeaders().put("content-length", String.valueOf(dataInfo.available()));
reply();
}
data(dataInfo);
}
private void reply()
{
clientStream.reply(replyInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>()
{
@Override
public void failed(Void context, Throwable x)
{
logger.debug(x);
rst(clientStream);
}
});
replyInfo = null;
}
private void data(final DataInfo dataInfo)
{
clientStream.data(dataInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler<Void>()
{
@Override
public void completed(Void context)
{
dataInfo.consume(dataInfo.length());
}
@Override
public void failed(Void context, Throwable x)
{
logger.debug(x);
rst(clientStream);
}
});
}
}
/**
* <p>{@link StreamHandler} implements the forwarding of DATA frames from the client to the server.</p>
* <p>Instances of this class buffer DATA frames sent by clients and send them to the server.
* The buffering happens between the send of the SYN_STREAM to the server (where DATA frames may arrive
* from the client before the SYN_STREAM has been fully sent), and between DATA frames, if the client
* is a fast producer and the server a slow consumer, or if the client is a SPDY v2 client (and hence
* without flow control) while the server is a SPDY v3 server (and hence with flow control).</p>
*/
private class StreamHandler implements Handler<Stream>
{
private final Queue<DataInfoHandler> queue = new LinkedList<>();
private final Stream clientStream;
private Stream serverStream;
private StreamHandler(Stream clientStream)
{
this.clientStream = clientStream;
}
@Override
public void completed(Stream serverStream)
{
serverStream.setAttribute(CLIENT_STREAM_ATTRIBUTE, clientStream);
DataInfoHandler dataInfoHandler;
synchronized (queue)
{
this.serverStream = serverStream;
dataInfoHandler = queue.peek();
if (dataInfoHandler != null)
{
if (dataInfoHandler.flushing)
{
logger.debug("SYN completed, flushing {}, queue size {}", dataInfoHandler.dataInfo, queue.size());
dataInfoHandler = null;
}
else
{
dataInfoHandler.flushing = true;
logger.debug("SYN completed, queue size {}", queue.size());
}
}
else
{
logger.debug("SYN completed, queue empty");
}
}
if (dataInfoHandler != null)
flush(serverStream, dataInfoHandler);
}
@Override
public void failed(Stream serverStream, Throwable x)
{
logger.debug(x);
rst(clientStream);
}
public void data(DataInfo dataInfo)
{
Stream serverStream;
DataInfoHandler dataInfoHandler = null;
DataInfoHandler item = new DataInfoHandler(dataInfo);
synchronized (queue)
{
queue.offer(item);
serverStream = this.serverStream;
if (serverStream != null)
{
dataInfoHandler = queue.peek();
if (dataInfoHandler.flushing)
{
logger.debug("Queued {}, flushing {}, queue size {}", dataInfo, dataInfoHandler.dataInfo, queue.size());
serverStream = null;
}
else
{
dataInfoHandler.flushing = true;
logger.debug("Queued {}, queue size {}", dataInfo, queue.size());
}
}
else
{
logger.debug("Queued {}, SYN incomplete, queue size {}", dataInfo, queue.size());
}
}
if (serverStream != null)
flush(serverStream, dataInfoHandler);
}
private void flush(Stream serverStream, DataInfoHandler dataInfoHandler)
{
logger.debug("P -> S {} on {}", dataInfoHandler.dataInfo, serverStream);
serverStream.data(dataInfoHandler.dataInfo, getTimeout(), TimeUnit.MILLISECONDS, dataInfoHandler);
}
private class DataInfoHandler implements Handler<Void>
{
private final DataInfo dataInfo;
private boolean flushing;
private DataInfoHandler(DataInfo dataInfo)
{
this.dataInfo = dataInfo;
}
@Override
public void completed(Void context)
{
Stream serverStream;
DataInfoHandler dataInfoHandler;
synchronized (queue)
{
serverStream = StreamHandler.this.serverStream;
assert serverStream != null;
dataInfoHandler = queue.poll();
assert dataInfoHandler == this;
dataInfoHandler = queue.peek();
if (dataInfoHandler != null)
{
assert !dataInfoHandler.flushing;
dataInfoHandler.flushing = true;
logger.debug("Completed {}, queue size {}", dataInfo, queue.size());
}
else
{
logger.debug("Completed {}, queue empty", dataInfo);
}
}
if (dataInfoHandler != null)
flush(serverStream, dataInfoHandler);
}
@Override
public void failed(Void context, Throwable x)
{
logger.debug(x);
rst(clientStream);
}
}
}
private class ProxySessionFrameListener extends SessionFrameListener.Adapter implements StreamFrameListener
{
@Override
public StreamFrameListener onSyn(Stream serverStream, SynInfo serverSynInfo)
{
logger.debug("S -> P pushed {} on {}", serverSynInfo, serverStream);
Headers headers = new Headers(serverSynInfo.getHeaders(), false);
Stream clientStream = (Stream)serverStream.getAssociatedStream().getAttribute(CLIENT_STREAM_ATTRIBUTE);
convert(serverStream.getSession().getVersion(), clientStream.getSession().getVersion(), headers);
addResponseProxyHeaders(headers);
StreamHandler handler = new StreamHandler(clientStream);
serverStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
clientStream.syn(new SynInfo(headers, serverSynInfo.isClose()), getTimeout(), TimeUnit.MILLISECONDS, handler);
return this;
}
@Override
public void onRst(Session serverSession, RstInfo serverRstInfo)
{
Stream serverStream = serverSession.getStream(serverRstInfo.getStreamId());
if (serverStream != null)
{
Stream clientStream = (Stream)serverStream.getAttribute(CLIENT_STREAM_ATTRIBUTE);
if (clientStream != null)
{
Session clientSession = clientStream.getSession();
RstInfo clientRstInfo = new RstInfo(clientStream.getId(), serverRstInfo.getStreamStatus());
clientSession.rst(clientRstInfo, getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>());
}
}
}
@Override
public void onGoAway(Session serverSession, GoAwayInfo goAwayInfo)
{
serverSessions.values().remove(serverSession);
@SuppressWarnings("unchecked")
Set<Session> sessions = (Set<Session>)serverSession.removeAttribute(CLIENT_SESSIONS_ATTRIBUTE);
for (Session session : sessions)
session.goAway(getTimeout(), TimeUnit.MILLISECONDS, new Handler.Adapter<Void>());
}
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
// Push streams never send a reply
}
@Override
public void onHeaders(Stream stream, HeadersInfo headersInfo)
{
throw new UnsupportedOperationException();
}
@Override
public void onData(Stream serverStream, final DataInfo serverDataInfo)
{
logger.debug("S -> P pushed {} on {}", serverDataInfo, serverStream);
ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
{
@Override
public void consume(int delta)
{
super.consume(delta);
serverDataInfo.consume(delta);
}
};
StreamHandler handler = (StreamHandler)serverStream.getAttribute(STREAM_HANDLER_ATTRIBUTE);
handler.data(clientDataInfo);
}
}
}

View File

@ -0,0 +1,855 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy.proxy;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.spdy.SPDYClient;
import org.eclipse.jetty.spdy.SPDYServerConnector;
import org.eclipse.jetty.spdy.ServerSPDYAsyncConnectionFactory;
import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.PingInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.StreamStatus;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatchman;
import org.junit.runners.model.FrameworkMethod;
public class ProxyHTTPSPDYv2Test
{
@Rule
public final TestWatchman testName = new TestWatchman()
{
@Override
public void starting(FrameworkMethod method)
{
super.starting(method);
System.err.printf("Running %s.%s()%n",
method.getMethod().getDeclaringClass().getName(),
method.getName());
}
};
private SPDYClient.Factory factory;
private Server server;
private Server proxy;
private SPDYServerConnector proxyConnector;
protected short version()
{
return SPDY.V2;
}
protected InetSocketAddress startServer(ServerSessionFrameListener listener) throws Exception
{
server = new Server();
SPDYServerConnector serverConnector = new SPDYServerConnector(listener);
serverConnector.setDefaultAsyncConnectionFactory(new ServerSPDYAsyncConnectionFactory(version(), serverConnector.getByteBufferPool(), serverConnector.getExecutor(), serverConnector.getScheduler(), listener));
serverConnector.setPort(0);
server.addConnector(serverConnector);
server.start();
return new InetSocketAddress("localhost", serverConnector.getLocalPort());
}
protected InetSocketAddress startProxy(InetSocketAddress address) throws Exception
{
proxy = new Server();
SPDYProxyEngine proxyEngine = new SPDYProxyEngine(factory);
proxyEngine.putProxyInfo("localhost", new ProxyEngine.ProxyInfo(version(), address.getHostName(), address.getPort()));
proxyConnector = new HTTPSPDYProxyConnector(proxyEngine);
proxyConnector.setPort(0);
proxy.addConnector(proxyConnector);
proxy.start();
return new InetSocketAddress("localhost", proxyConnector.getLocalPort());
}
@Before
public void init() throws Exception
{
factory = new SPDYClient.Factory();
factory.start();
}
@After
public void destroy() throws Exception
{
if (server != null)
{
server.stop();
server.join();
}
if (proxy != null)
{
proxy.stop();
proxy.join();
}
factory.stop();
}
@Test
public void testClosingClientDoesNotCloseServer() throws Exception
{
final CountDownLatch closeLatch = new CountDownLatch(1);
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
Headers responseHeaders = new Headers();
responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
stream.reply(new ReplyInfo(responseHeaders, true));
return null;
}
@Override
public void onGoAway(Session session, GoAwayInfo goAwayInfo)
{
closeLatch.countDown();
}
}));
Socket client = new Socket();
client.connect(proxyAddress);
OutputStream output = client.getOutputStream();
String request = "" +
"GET / HTTP/1.1\r\n" +
"Host: localhost:" + proxyAddress.getPort() + "\r\n" +
"\r\n";
output.write(request.getBytes("UTF-8"));
output.flush();
InputStream input = client.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8"));
String line = reader.readLine();
Assert.assertTrue(line.contains(" 200"));
while (line.length() > 0)
line = reader.readLine();
Assert.assertFalse(reader.ready());
client.close();
// Must not close, other clients may still be connected
Assert.assertFalse(closeLatch.await(1, TimeUnit.SECONDS));
}
@Test
public void testClosingServerClosesHTTPClient() throws Exception
{
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
Headers responseHeaders = new Headers();
responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
stream.reply(new ReplyInfo(responseHeaders, true));
stream.getSession().goAway();
return null;
}
}));
Socket client = new Socket();
client.connect(proxyAddress);
OutputStream output = client.getOutputStream();
String request = "" +
"GET / HTTP/1.1\r\n" +
"Host: localhost:" + proxyAddress.getPort() + "\r\n" +
"\r\n";
output.write(request.getBytes("UTF-8"));
output.flush();
client.setSoTimeout(1000);
InputStream input = client.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8"));
String line = reader.readLine();
Assert.assertTrue(line.contains(" 200"));
while (line.length() > 0)
line = reader.readLine();
Assert.assertFalse(reader.ready());
Assert.assertNull(reader.readLine());
client.close();
}
@Test
public void testClosingServerClosesSPDYClient() throws Exception
{
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
Headers responseHeaders = new Headers();
responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
stream.reply(new ReplyInfo(responseHeaders, true));
stream.getSession().goAway();
return null;
}
}));
proxyConnector.setDefaultAsyncConnectionFactory(proxyConnector.getAsyncConnectionFactory("spdy/" + version()));
final CountDownLatch goAwayLatch = new CountDownLatch(1);
Session client = factory.newSPDYClient(version()).connect(proxyAddress, new SessionFrameListener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayInfo goAwayInfo)
{
goAwayLatch.countDown();
}
}).get(5, TimeUnit.SECONDS);
final CountDownLatch replyLatch = new CountDownLatch(1);
Headers headers = new Headers();
headers.put(HTTPSPDYHeader.SCHEME.name(version()), "http");
headers.put(HTTPSPDYHeader.METHOD.name(version()), "GET");
headers.put(HTTPSPDYHeader.URI.name(version()), "/");
headers.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
headers.put(HTTPSPDYHeader.HOST.name(version()), "localhost:" + proxyAddress.getPort());
client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
replyLatch.countDown();
}
});
Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(goAwayLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testGETThenNoContentFromTwoClients() throws Exception
{
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
Assert.assertTrue(synInfo.isClose());
Headers requestHeaders = synInfo.getHeaders();
Assert.assertNotNull(requestHeaders.get("via"));
Headers responseHeaders = new Headers();
responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
ReplyInfo replyInfo = new ReplyInfo(responseHeaders, true);
stream.reply(replyInfo);
return null;
}
}));
Socket client1 = new Socket();
client1.connect(proxyAddress);
OutputStream output1 = client1.getOutputStream();
String request = "" +
"GET / HTTP/1.1\r\n" +
"Host: localhost:" + proxyAddress.getPort() + "\r\n" +
"\r\n";
output1.write(request.getBytes("UTF-8"));
output1.flush();
InputStream input1 = client1.getInputStream();
BufferedReader reader1 = new BufferedReader(new InputStreamReader(input1, "UTF-8"));
String line = reader1.readLine();
Assert.assertTrue(line.contains(" 200"));
while (line.length() > 0)
line = reader1.readLine();
Assert.assertFalse(reader1.ready());
// Perform another request with another client
Socket client2 = new Socket();
client2.connect(proxyAddress);
OutputStream output2 = client2.getOutputStream();
output2.write(request.getBytes("UTF-8"));
output2.flush();
InputStream input2 = client2.getInputStream();
BufferedReader reader2 = new BufferedReader(new InputStreamReader(input2, "UTF-8"));
line = reader2.readLine();
Assert.assertTrue(line.contains(" 200"));
while (line.length() > 0)
line = reader2.readLine();
Assert.assertFalse(reader2.ready());
client1.close();
client2.close();
}
@Test
public void testGETThenSmallResponseContent() throws Exception
{
final byte[] data = "0123456789ABCDEF".getBytes("UTF-8");
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
Assert.assertTrue(synInfo.isClose());
Headers requestHeaders = synInfo.getHeaders();
Assert.assertNotNull(requestHeaders.get("via"));
Headers responseHeaders = new Headers();
responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
ReplyInfo replyInfo = new ReplyInfo(responseHeaders, false);
stream.reply(replyInfo);
stream.data(new BytesDataInfo(data, true));
return null;
}
}));
Socket client = new Socket();
client.connect(proxyAddress);
OutputStream output = client.getOutputStream();
String request = "" +
"GET / HTTP/1.1\r\n" +
"Host: localhost:" + proxyAddress.getPort() + "\r\n" +
"\r\n";
output.write(request.getBytes("UTF-8"));
output.flush();
InputStream input = client.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8"));
String line = reader.readLine();
Assert.assertTrue(line.contains(" 200"));
while (line.length() > 0)
line = reader.readLine();
for (byte datum : data)
Assert.assertEquals(datum, reader.read());
Assert.assertFalse(reader.ready());
// Perform another request so that we are sure we reset the states of parsers and generators
output.write(request.getBytes("UTF-8"));
output.flush();
line = reader.readLine();
Assert.assertTrue(line.contains(" 200"));
while (line.length() > 0)
line = reader.readLine();
for (byte datum : data)
Assert.assertEquals(datum, reader.read());
Assert.assertFalse(reader.ready());
client.close();
}
@Test
public void testPOSTWithSmallRequestContentThenRedirect() throws Exception
{
final byte[] data = "0123456789ABCDEF".getBytes("UTF-8");
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
return new StreamFrameListener.Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
dataInfo.consume(dataInfo.length());
if (dataInfo.isClose())
{
Headers headers = new Headers();
headers.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
headers.put(HTTPSPDYHeader.STATUS.name(version()), "303 See Other");
stream.reply(new ReplyInfo(headers, true));
}
}
};
}
}));
Socket client = new Socket();
client.connect(proxyAddress);
OutputStream output = client.getOutputStream();
String request = "" +
"POST / HTTP/1.1\r\n" +
"Host: localhost:" + proxyAddress.getPort() + "\r\n" +
"Content-Length: " + data.length + "\r\n" +
"Content-Type: application/octet-stream\r\n" +
"\r\n";
output.write(request.getBytes("UTF-8"));
output.write(data);
output.flush();
InputStream input = client.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8"));
String line = reader.readLine();
Assert.assertTrue(line.contains(" 303"));
while (line.length() > 0)
line = reader.readLine();
Assert.assertFalse(reader.ready());
// Perform another request so that we are sure we reset the states of parsers and generators
output.write(request.getBytes("UTF-8"));
output.write(data);
output.flush();
line = reader.readLine();
Assert.assertTrue(line.contains(" 303"));
while (line.length() > 0)
line = reader.readLine();
Assert.assertFalse(reader.ready());
client.close();
}
@Test
public void testPOSTWithSmallRequestContentThenSmallResponseContent() throws Exception
{
final byte[] data = "0123456789ABCDEF".getBytes("UTF-8");
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
return new StreamFrameListener.Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
dataInfo.consume(dataInfo.length());
if (dataInfo.isClose())
{
Headers responseHeaders = new Headers();
responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
ReplyInfo replyInfo = new ReplyInfo(responseHeaders, false);
stream.reply(replyInfo);
stream.data(new BytesDataInfo(data, true));
}
}
};
}
}));
Socket client = new Socket();
client.connect(proxyAddress);
OutputStream output = client.getOutputStream();
String request = "" +
"POST / HTTP/1.1\r\n" +
"Host: localhost:" + proxyAddress.getPort() + "\r\n" +
"Content-Length: " + data.length + "\r\n" +
"Content-Type: application/octet-stream\r\n" +
"\r\n";
output.write(request.getBytes("UTF-8"));
output.write(data);
output.flush();
InputStream input = client.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8"));
String line = reader.readLine();
Assert.assertTrue(line.contains(" 200"));
while (line.length() > 0)
line = reader.readLine();
for (byte datum : data)
Assert.assertEquals(datum, reader.read());
Assert.assertFalse(reader.ready());
// Perform another request so that we are sure we reset the states of parsers and generators
output.write(request.getBytes("UTF-8"));
output.write(data);
output.flush();
line = reader.readLine();
Assert.assertTrue(line.contains(" 200"));
while (line.length() > 0)
line = reader.readLine();
for (byte datum : data)
Assert.assertEquals(datum, reader.read());
Assert.assertFalse(reader.ready());
client.close();
}
@Test
public void testSYNThenREPLY() throws Exception
{
final String header = "foo";
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
Headers requestHeaders = synInfo.getHeaders();
Assert.assertNotNull(requestHeaders.get("via"));
Assert.assertNotNull(requestHeaders.get(header));
Headers responseHeaders = new Headers();
responseHeaders.put(header, "baz");
stream.reply(new ReplyInfo(responseHeaders, true));
return null;
}
}));
proxyConnector.setDefaultAsyncConnectionFactory(proxyConnector.getAsyncConnectionFactory("spdy/" + version()));
Session client = factory.newSPDYClient(version()).connect(proxyAddress, null).get(5, TimeUnit.SECONDS);
final CountDownLatch replyLatch = new CountDownLatch(1);
Headers headers = new Headers();
headers.put(HTTPSPDYHeader.HOST.name(version()), "localhost:" + proxyAddress.getPort());
headers.put(header, "bar");
client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
Headers headers = replyInfo.getHeaders();
Assert.assertNotNull(headers.get(header));
replyLatch.countDown();
}
});
Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
client.goAway().get(5, TimeUnit.SECONDS);
}
@Test
public void testSYNThenREPLYAndDATA() throws Exception
{
final byte[] data = "0123456789ABCDEF".getBytes("UTF-8");
final String header = "foo";
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
Headers requestHeaders = synInfo.getHeaders();
Assert.assertNotNull(requestHeaders.get("via"));
Assert.assertNotNull(requestHeaders.get(header));
Headers responseHeaders = new Headers();
responseHeaders.put(header, "baz");
stream.reply(new ReplyInfo(responseHeaders, false));
stream.data(new BytesDataInfo(data, true));
return null;
}
}));
proxyConnector.setDefaultAsyncConnectionFactory(proxyConnector.getAsyncConnectionFactory("spdy/" + version()));
Session client = factory.newSPDYClient(version()).connect(proxyAddress, null).get(5, TimeUnit.SECONDS);
final CountDownLatch replyLatch = new CountDownLatch(1);
final CountDownLatch dataLatch = new CountDownLatch(1);
Headers headers = new Headers();
headers.put(HTTPSPDYHeader.HOST.name(version()), "localhost:" + proxyAddress.getPort());
headers.put(header, "bar");
client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
{
private final ByteArrayOutputStream result = new ByteArrayOutputStream();
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
Headers headers = replyInfo.getHeaders();
Assert.assertNotNull(headers.get(header));
replyLatch.countDown();
}
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
result.write(dataInfo.asBytes(true), 0, dataInfo.length());
if (dataInfo.isClose())
{
Assert.assertArrayEquals(data, result.toByteArray());
dataLatch.countDown();
}
}
});
Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
client.goAway().get(5, TimeUnit.SECONDS);
}
@Test
public void testGETThenSPDYPushIsIgnored() throws Exception
{
final byte[] data = "0123456789ABCDEF".getBytes("UTF-8");
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
Headers responseHeaders = new Headers();
responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
Headers pushHeaders = new Headers();
pushHeaders.put(HTTPSPDYHeader.URI.name(version()), "/push");
stream.syn(new SynInfo(pushHeaders, false), 5, TimeUnit.SECONDS, new Handler.Adapter<Stream>()
{
@Override
public void completed(Stream pushStream)
{
pushStream.data(new BytesDataInfo(data, true));
}
});
stream.reply(new ReplyInfo(responseHeaders, true));
return null;
}
}));
Socket client = new Socket();
client.connect(proxyAddress);
OutputStream output = client.getOutputStream();
String request = "" +
"GET / HTTP/1.1\r\n" +
"Host: localhost:" + proxyAddress.getPort() + "\r\n" +
"\r\n";
output.write(request.getBytes("UTF-8"));
output.flush();
client.setSoTimeout(1000);
InputStream input = client.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8"));
String line = reader.readLine();
Assert.assertTrue(line.contains(" 200"));
while (line.length() > 0)
line = reader.readLine();
Assert.assertFalse(reader.ready());
client.close();
}
@Test
public void testSYNThenSPDYPushIsReceived() throws Exception
{
final byte[] data = "0123456789ABCDEF".getBytes("UTF-8");
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
Headers responseHeaders = new Headers();
responseHeaders.put(HTTPSPDYHeader.VERSION.name(version()), "HTTP/1.1");
responseHeaders.put(HTTPSPDYHeader.STATUS.name(version()), "200 OK");
stream.reply(new ReplyInfo(responseHeaders, false));
Headers pushHeaders = new Headers();
pushHeaders.put(HTTPSPDYHeader.URI.name(version()), "/push");
stream.syn(new SynInfo(pushHeaders, false), 5, TimeUnit.SECONDS, new Handler.Adapter<Stream>()
{
@Override
public void completed(Stream pushStream)
{
pushStream.data(new BytesDataInfo(data, true));
}
});
stream.data(new BytesDataInfo(data, true));
return null;
}
}));
proxyConnector.setDefaultAsyncConnectionFactory(proxyConnector.getAsyncConnectionFactory("spdy/" + version()));
final CountDownLatch pushSynLatch = new CountDownLatch(1);
final CountDownLatch pushDataLatch = new CountDownLatch(1);
Session client = factory.newSPDYClient(version()).connect(proxyAddress, new SessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
pushSynLatch.countDown();
return new StreamFrameListener.Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
dataInfo.consume(dataInfo.length());
if (dataInfo.isClose())
pushDataLatch.countDown();
}
};
}
}).get(5, TimeUnit.SECONDS);
Headers headers = new Headers();
headers.put(HTTPSPDYHeader.HOST.name(version()), "localhost:" + proxyAddress.getPort());
final CountDownLatch replyLatch = new CountDownLatch(1);
final CountDownLatch dataLatch = new CountDownLatch(1);
client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
replyLatch.countDown();
}
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
dataInfo.consume(dataInfo.length());
if (dataInfo.isClose())
dataLatch.countDown();
}
});
Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(pushSynLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(pushDataLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
client.goAway().get(5, TimeUnit.SECONDS);
}
@Test
public void testPING() throws Exception
{
// PING is per hop, and it does not carry the information to which server to ping to
// We just verify that it works
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()));
proxyConnector.setDefaultAsyncConnectionFactory(proxyConnector.getAsyncConnectionFactory("spdy/" + version()));
final CountDownLatch pingLatch = new CountDownLatch(1);
Session client = factory.newSPDYClient(version()).connect(proxyAddress, new SessionFrameListener.Adapter()
{
@Override
public void onPing(Session session, PingInfo pingInfo)
{
pingLatch.countDown();
}
}).get(5, TimeUnit.SECONDS);
client.ping().get(5, TimeUnit.SECONDS);
Assert.assertTrue(pingLatch.await(5, TimeUnit.SECONDS));
client.goAway().get(5, TimeUnit.SECONDS);
}
@Test
public void testGETThenReset() throws Exception
{
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
Assert.assertTrue(synInfo.isClose());
Headers requestHeaders = synInfo.getHeaders();
Assert.assertNotNull(requestHeaders.get("via"));
stream.getSession().rst(new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM));
return null;
}
}));
Socket client = new Socket();
client.connect(proxyAddress);
OutputStream output = client.getOutputStream();
String request = "" +
"GET / HTTP/1.1\r\n" +
"Host: localhost:" + proxyAddress.getPort() + "\r\n" +
"\r\n";
output.write(request.getBytes("UTF-8"));
output.flush();
InputStream input = client.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8"));
Assert.assertNull(reader.readLine());
client.close();
}
@Test
public void testSYNThenReset() throws Exception
{
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
Assert.assertTrue(synInfo.isClose());
Headers requestHeaders = synInfo.getHeaders();
Assert.assertNotNull(requestHeaders.get("via"));
stream.getSession().rst(new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM));
return null;
}
}));
proxyConnector.setDefaultAsyncConnectionFactory(proxyConnector.getAsyncConnectionFactory("spdy/" + version()));
final CountDownLatch resetLatch = new CountDownLatch(1);
Session client = factory.newSPDYClient(version()).connect(proxyAddress, new SessionFrameListener.Adapter()
{
@Override
public void onRst(Session session, RstInfo rstInfo)
{
resetLatch.countDown();
}
}).get(5, TimeUnit.SECONDS);
Headers headers = new Headers();
headers.put(HTTPSPDYHeader.HOST.name(version()), "localhost:" + proxyAddress.getPort());
client.syn(new SynInfo(headers, true), null);
Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
client.goAway().get(5, TimeUnit.SECONDS);
}
}

View File

@ -101,7 +101,7 @@ public class SPDYClient
channel.socket().setTcpNoDelay(true);
channel.configureBlocking(false);
SessionPromise result = new SessionPromise(this, listener);
SessionPromise result = new SessionPromise(channel, this, listener);
channel.connect(address);
factory.selector.register(channel, result);
@ -419,14 +419,31 @@ public class SPDYClient
private static class SessionPromise extends Promise<Session>
{
private final SocketChannel channel;
private final SPDYClient client;
private final SessionFrameListener listener;
private SessionPromise(SPDYClient client, SessionFrameListener listener)
private SessionPromise(SocketChannel channel, SPDYClient client, SessionFrameListener listener)
{
this.channel = channel;
this.client = client;
this.listener = listener;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning)
{
try
{
super.cancel(mayInterruptIfRunning);
channel.close();
return true;
}
catch (IOException x)
{
return true;
}
}
}
private static class ClientSPDYAsyncConnectionFactory implements AsyncConnectionFactory

View File

@ -156,6 +156,14 @@ public class SPDYServerConnector extends SelectChannelConnector
}
}
public void clearAsyncConnectionFactories()
{
synchronized (factories)
{
factories.clear();
}
}
protected List<String> provideProtocols()
{
synchronized (factories)

View File

@ -219,10 +219,10 @@ public class GoAwayTest extends AbstractTest
Assert.assertThat(x.getCause(), CoreMatchers.instanceOf(ClosedChannelException.class));
}
// Be sure the last good stream is the first
// The last good stream is the second, because it was received by the server
Assert.assertTrue(goAwayLatch.await(5, TimeUnit.SECONDS));
GoAwayInfo goAway = goAwayRef.get();
Assert.assertNotNull(goAway);
Assert.assertEquals(stream1.getId(), goAway.getLastStreamId());
Assert.assertEquals(stream2.getId(), goAway.getLastStreamId());
}
}

View File

@ -0,0 +1,68 @@
// ========================================================================
// Copyright (c) 2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.util;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class Atomics
{
private Atomics()
{
}
public static void updateMin(AtomicLong currentMin, long newValue)
{
long oldValue = currentMin.get();
while (newValue < oldValue)
{
if (currentMin.compareAndSet(oldValue, newValue))
break;
oldValue = currentMin.get();
}
}
public static void updateMax(AtomicLong currentMax, long newValue)
{
long oldValue = currentMax.get();
while (newValue > oldValue)
{
if (currentMax.compareAndSet(oldValue, newValue))
break;
oldValue = currentMax.get();
}
}
public static void updateMin(AtomicInteger currentMin, int newValue)
{
int oldValue = currentMin.get();
while (newValue < oldValue)
{
if (currentMin.compareAndSet(oldValue, newValue))
break;
oldValue = currentMin.get();
}
}
public static void updateMax(AtomicInteger currentMax, int newValue)
{
int oldValue = currentMax.get();
while (newValue > oldValue)
{
if (currentMax.compareAndSet(oldValue, newValue))
break;
oldValue = currentMax.get();
}
}
}

View File

@ -163,9 +163,6 @@ public class Loader
*/
public static String getClassPath(ClassLoader loader) throws Exception
{
if (loader.getParent() != null)
loader = loader.getParent();
StringBuilder classpath=new StringBuilder();
while (loader != null && (loader instanceof URLClassLoader))
{

View File

@ -37,6 +37,8 @@ public class Constraint implements Cloneable, Serializable
public final static String __SPNEGO_AUTH = "SPNEGO";
public final static String __NEGOTIATE_AUTH = "NEGOTIATE";
public static boolean validateMethod (String method)
{
if (method == null)
@ -47,7 +49,8 @@ public class Constraint implements Cloneable, Serializable
|| method.equals (__DIGEST_AUTH)
|| method.equals (__CERT_AUTH)
|| method.equals(__CERT_AUTH2)
|| method.equals(__SPNEGO_AUTH));
|| method.equals(__SPNEGO_AUTH)
|| method.equals(__NEGOTIATE_AUTH));
}
/* ------------------------------------------------------------ */

View File

@ -4,17 +4,19 @@
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.util.statistic;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.Atomics;
/* ------------------------------------------------------------ */
/** Statistics on a counter value.
@ -22,9 +24,9 @@ import java.util.concurrent.atomic.AtomicLong;
* Keep total, current and maximum values of a counter that
* can be incremented and decremented. The total refers only
* to increments.
*
*
*/
public class CounterStatistic
public class CounterStatistic
{
protected final AtomicLong _max = new AtomicLong();
protected final AtomicLong _curr = new AtomicLong();
@ -39,11 +41,11 @@ public class CounterStatistic
/* ------------------------------------------------------------ */
public void reset(final long value)
{
_max.set(value);
_max.set(value);
_curr.set(value);
_total.set(0); // total always set to 0 to properly calculate cumulative total
}
/* ------------------------------------------------------------ */
/**
* @param delta the amount to add to the count
@ -53,15 +55,9 @@ public class CounterStatistic
long value=_curr.addAndGet(delta);
if (delta > 0)
_total.addAndGet(delta);
long oldValue = _max.get();
while (value > oldValue)
{
if (_max.compareAndSet(oldValue, value))
break;
oldValue = _max.get();
}
Atomics.updateMax(_max,value);
}
/* ------------------------------------------------------------ */
/**
* @param delta the amount to subtract the count by.
@ -70,7 +66,7 @@ public class CounterStatistic
{
add(-delta);
}
/* ------------------------------------------------------------ */
/**
*/
@ -78,7 +74,7 @@ public class CounterStatistic
{
add(1);
}
/* ------------------------------------------------------------ */
/**
*/
@ -95,7 +91,7 @@ public class CounterStatistic
{
return _max.get();
}
/* ------------------------------------------------------------ */
/**
* @return current value
@ -104,7 +100,7 @@ public class CounterStatistic
{
return _curr.get();
}
/* ------------------------------------------------------------ */
/**
* @return total value
@ -113,9 +109,4 @@ public class CounterStatistic
{
return _total.get();
}
/* ------------------------------------------------------------ */
protected void upxdateMax(long value)
{
}
}

View File

@ -4,23 +4,25 @@
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.util.statistic;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.Atomics;
/* ------------------------------------------------------------ */
/**
* SampledStatistics
* <p>
* Provides max, total, mean, count, variance, and standard
* Provides max, total, mean, count, variance, and standard
* deviation of continuous sequence of samples.
* <p>
* Calculates estimates of mean, variance, and standard deviation
@ -53,25 +55,17 @@ public class SampleStatistic
{
long total = _total.addAndGet(sample);
long count = _count.incrementAndGet();
if (count>1)
{
long mean10 = total*10/count;
long delta10 = sample*10 - mean10;
_totalVariance100.addAndGet(delta10*delta10);
}
long oldMax = _max.get();
while (sample > oldMax)
{
if (_max.compareAndSet(oldMax, sample))
break;
oldMax = _max.get();
}
Atomics.updateMax(_max, sample);
}
/* ------------------------------------------------------------ */
/**
* @return the max value
*/
@ -80,37 +74,31 @@ public class SampleStatistic
return _max.get();
}
/* ------------------------------------------------------------ */
public long getTotal()
{
return _total.get();
}
/* ------------------------------------------------------------ */
public long getCount()
{
return _count.get();
}
/* ------------------------------------------------------------ */
public double getMean()
{
return (double)_total.get()/_count.get();
}
/* ------------------------------------------------------------ */
public double getVariance()
{
final long variance100 = _totalVariance100.get();
final long count = _count.get();
return count>1?((double)variance100)/100.0/(count-1):0.0;
}
/* ------------------------------------------------------------ */
public double getStdDev()
{
return Math.sqrt(getVariance());
}
}

View File

@ -71,11 +71,10 @@ public class XmlConfiguration
private static final Class<?>[] __primitiveHolders =
{ Boolean.class, Character.class, Byte.class, Short.class, Integer.class, Long.class, Float.class, Double.class, Void.class };
private static final Integer ZERO = new Integer(0);
private static final Class<?>[] __supportedCollections =
{ ArrayList.class,ArrayQueue.class,HashSet.class,Queue.class,List.class,Set.class,Collection.class,};
private static final Iterable<?> __factoryLoader;
private static final XmlParser __parser = initParser();
@ -141,9 +140,11 @@ public class XmlConfiguration
/* ------------------------------------------------------------ */
/**
* Constructor. Reads the XML configuration file.
* Reads and parses the XML configuration file.
*
* @param configuration
* @param configuration the URL of the XML configuration
* @throws IOException if the configuration could not be read
* @throws SAXException if the configuration could not be parsed
*/
public XmlConfiguration(URL configuration) throws SAXException, IOException
{
@ -157,12 +158,12 @@ public class XmlConfiguration
/* ------------------------------------------------------------ */
/**
* Constructor.
* Reads and parses the XML configuration string.
*
* @param configuration
* String of XML configuration commands excluding the normal XML preamble. The String should start with a " <Configure ...." element.
* @exception SAXException
* @exception IOException
* @param configuration String of XML configuration commands excluding the normal XML preamble.
* The String should start with a "&lt;Configure ....&gt;" element.
* @throws IOException if the configuration could not be read
* @throws SAXException if the configuration could not be parsed
*/
public XmlConfiguration(String configuration) throws SAXException, IOException
{
@ -178,12 +179,11 @@ public class XmlConfiguration
/* ------------------------------------------------------------ */
/**
* Constructor.
* Reads and parses the XML configuration stream.
*
* @param configuration
* An input stream containing a complete e.g. configuration file
* @exception SAXException
* @exception IOException
* @param configuration An input stream containing a complete configuration file
* @throws IOException if the configuration could not be read
* @throws SAXException if the configuration could not be parsed
*/
public XmlConfiguration(InputStream configuration) throws SAXException, IOException
{
@ -240,6 +240,7 @@ public class XmlConfiguration
/* ------------------------------------------------------------ */
/**
* @param map the ID map
* @deprecated use {@link #getIdMap()}.put(...)
*/
@Deprecated
@ -251,6 +252,7 @@ public class XmlConfiguration
/* ------------------------------------------------------------ */
/**
* @param map the properties map
* @deprecated use {@link #getProperties()}.putAll(...)
*/
@Deprecated
@ -268,13 +270,12 @@ public class XmlConfiguration
/* ------------------------------------------------------------ */
/**
* Configure an object.
* Applies the XML configuration script to the given object.
*
* <p>Apply the XML configuration script to the passed object.</p>
*
* @param obj
* The object to be configured, which must be of a type or super type of the class attribute of the Configure element.
* @exception Exception
* @param obj The object to be configured, which must be of a type or super type
* of the class attribute of the &lt;Configure&gt; element.
* @throws Exception if the configuration fails
* @return the configured object
*/
public Object configure(Object obj) throws Exception
{
@ -283,10 +284,13 @@ public class XmlConfiguration
/* ------------------------------------------------------------ */
/**
* Configure an object. If the configuration has an ID, an object is looked up by ID and it's type check. Otherwise a new object is created.
* Applies the XML configuration script.
* If the root element of the configuration has an ID, an object is looked up by ID and its type checked
* against the root element's type.
* Otherwise a new object of the type specified by the root element is created.
*
* @return The newly created configured object.
* @exception Exception
* @throws Exception if the configuration fails
*/
public Object configure() throws Exception
{
@ -353,12 +357,13 @@ public class XmlConfiguration
/* ------------------------------------------------------------ */
/**
* Recursive configuration step. This method applies the remaining Set, Put and Call elements to the current object.
* Recursive configuration routine.
* This method applies the nested Set, Put, Call, etc. elements to the given object.
*
* @param obj
* @param cfg
* @param i
* @exception Exception
* @param obj the object to configure
* @param cfg the XML nodes of the configuration
* @param i the index of the XML nodes
* @throws Exception if the configuration fails
*/
public void configure(Object obj, XmlParser.Node cfg, int i) throws Exception
{
@ -576,7 +581,9 @@ public class XmlConfiguration
}
/**
* @return a collection if compareValueToClass is a Set or List. null if that's not the case or value can't be converted to a Collection
* @param array the array to convert
* @param collectionType the desired collection type
* @return a collection of the desired type if the array can be converted
*/
private static Collection<?> convertArrayToCollection(Object array, Class<?> collectionType)
{
@ -862,7 +869,7 @@ public class XmlConfiguration
XmlParser.Node item = (Node)nodeObject;
String nid = item.getAttribute("id");
Object v = value(obj,item);
al = LazyList.add(al,(v == null && aClass.isPrimitive())?ZERO:v);
al = LazyList.add(al,(v == null && aClass.isPrimitive())?0:v);
if (nid != null)
_idMap.put(nid,v);
}
@ -896,7 +903,7 @@ public class XmlConfiguration
XmlParser.Node key = null;
XmlParser.Node value = null;
for (Object object : node)
for (Object object : entry)
{
if (object instanceof String)
continue;
@ -932,26 +939,26 @@ public class XmlConfiguration
* Get a Property.
*
* @param node
* @return
* @return
* @exception Exception
*/
private Object propertyObj(XmlParser.Node node) throws Exception
{
String id = node.getAttribute("id");
String name = node.getAttribute("name");
String defval = node.getAttribute("default");
Object prop = null;
String defaultValue = node.getAttribute("default");
Object prop;
if (_propertyMap != null && _propertyMap.containsKey(name))
prop = _propertyMap.get(name);
else
prop = defval;
prop = defaultValue;
if (id != null)
_idMap.put(id,prop);
if (prop != null)
configure(prop,node,0);
return prop;
}
/* ------------------------------------------------------------ */
/*
@ -960,7 +967,7 @@ public class XmlConfiguration
*/
private Object value(Object obj, XmlParser.Node node) throws Exception
{
Object value = null;
Object value;
// Get the type
String type = node.getAttribute("type");
@ -989,7 +996,7 @@ public class XmlConfiguration
if (type == null || !"String".equals(type))
{
// Skip leading white
Object item = null;
Object item;
while (first <= last)
{
item = node.get(first);
@ -1084,7 +1091,7 @@ public class XmlConfiguration
throw new InvocationTargetException(e);
}
}
for (Class<?> collectionClass : __supportedCollections)
{
if (isTypeMatchingClass(type,collectionClass))
@ -1093,12 +1100,11 @@ public class XmlConfiguration
throw new IllegalStateException("Unknown type " + type);
}
/* ------------------------------------------------------------ */
private static boolean isTypeMatchingClass(String type, Class<?> classToMatch)
{
boolean match = classToMatch.getSimpleName().equalsIgnoreCase(type) || classToMatch.getName().equals(type);
return match;
return classToMatch.getSimpleName().equalsIgnoreCase(type) || classToMatch.getName().equals(type);
}
/* ------------------------------------------------------------ */
@ -1134,7 +1140,7 @@ public class XmlConfiguration
String defaultValue = node.getAttribute("default");
return System.getProperty(name,defaultValue);
}
if ("Env".equals(tag))
{
String name = node.getAttribute("name");
@ -1167,6 +1173,7 @@ public class XmlConfiguration
*
* @param args
* array of property and xml configuration filenames or {@link Resource}s.
* @throws Exception if the XML configurations cannot be run
*/
public static void main(final String[] args) throws Exception
{

View File

@ -18,6 +18,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.junit.Ignore;
@ -42,6 +43,7 @@ public class TestConfiguration extends HashMap<String,Object>
@SuppressWarnings("rawtypes")
private Set set;
private ConstructorArgTestClass constructorArgTestClass;
public Map map;
public void setTest(Object value)
{
@ -52,7 +54,7 @@ public class TestConfiguration extends HashMap<String,Object>
{
testInt=value;
}
public void setPropertyTest(int value)
{
propValue=value;
@ -141,4 +143,9 @@ public class TestConfiguration extends HashMap<String,Object>
{
this.constructorArgTestClass = constructorArgTestClass;
}
public void setMap(Map map)
{
this.map = map;
}
}

View File

@ -13,20 +13,24 @@
package org.eclipse.jetty.xml;
import static junit.framework.Assert.assertEquals;
import static org.junit.Assert.*;
import static org.hamcrest.CoreMatchers.*;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
import static junit.framework.Assert.assertEquals;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public class XmlConfigurationTest
{
protected String _configure="org/eclipse/jetty/xml/configure.xml";
private static final String STRING_ARRAY_XML = "<Array type=\"String\"><Item type=\"String\">String1</Item><Item type=\"String\">String2</Item></Array>";
private static final String INT_ARRAY_XML = "<Array type=\"int\"><Item type=\"int\">1</Item><Item type=\"int\">2</Item></Array>";
@ -37,7 +41,7 @@ public class XmlConfigurationTest
XmlConfiguration configuration = new XmlConfiguration(url);
configuration.configure();
}
@Test
public void testPassedObject() throws Exception
{
@ -53,7 +57,7 @@ public class XmlConfigurationTest
assertEquals("Set String","SetValue",tc.testObject);
assertEquals("Set Type",2,tc.testInt);
assertEquals(18080, tc.propValue);
assertEquals("Put","PutValue",tc.get("Test"));
@ -76,7 +80,7 @@ public class XmlConfigurationTest
assertEquals( "SystemProperty", System.getProperty("user.dir")+"/stuff",tc.get("SystemProperty"));
assertEquals( "Env", System.getenv("HOME"),tc.get("Env"));
assertEquals( "Property", "xxx", tc.get("Property"));
@ -104,12 +108,12 @@ public class XmlConfigurationTest
assertEquals("nested config","Call1",tc2.testObject);
assertEquals("nested config",4,tc2.testInt);
assertEquals( "nested call", "http://www.eclipse.com/",tc2.url.toString());
assertEquals("static to field",tc.testField1,77);
assertEquals("field to field",tc.testField2,2);
assertEquals("literal to static",TestConfiguration.VALUE,42);
}
@Test
public void testNewObject() throws Exception
{
@ -124,7 +128,7 @@ public class XmlConfigurationTest
assertEquals("Set String","SetValue",tc.testObject);
assertEquals("Set Type",2,tc.testInt);
assertEquals(18080, tc.propValue);
assertEquals("Put","PutValue",tc.get("Test"));
@ -173,13 +177,13 @@ public class XmlConfigurationTest
assertEquals("nested config","Call1",tc2.testObject);
assertEquals("nested config",4,tc2.testInt);
assertEquals( "nested call", "http://www.eclipse.com/",tc2.url.toString());
assertEquals("static to field",71,tc.testField1);
assertEquals("field to field",2,tc.testField2);
assertEquals("literal to static",42,TestConfiguration.VALUE);
}
@Test
public void testStringConfiguration() throws Exception
{
@ -314,4 +318,28 @@ public class XmlConfigurationTest
xmlConfiguration.configure(tc);
assertThat("tc.getSet() has two entries as specified in the xml",tc.getSet().size(),is(2));
}
@Test
public void testMap() throws Exception
{
XmlConfiguration xmlConfiguration = new XmlConfiguration("" +
"<Configure class=\"org.eclipse.jetty.xml.TestConfiguration\">" +
" <Set name=\"map\">" +
" <Map>" +
" <Entry>" +
" <Item>key1</Item>" +
" <Item>value1</Item>" +
" </Entry>" +
" <Entry>" +
" <Item>key2</Item>" +
" <Item>value2</Item>" +
" </Entry>" +
" </Map>" +
" </Set>" +
"</Configure>");
TestConfiguration tc = new TestConfiguration();
Assert.assertNull("tc.map is null as it's not configured yet", tc.map);
xmlConfiguration.configure(tc);
Assert.assertEquals("tc.map is has two entries as specified in the XML", 2, tc.map.size());
}
}