jetty-9: jetty-spdy refactored to match jetty-9.

This commit is contained in:
Thomas Becker 2012-07-27 16:03:43 +02:00
parent b36a6cc9cc
commit 369bc035fc
65 changed files with 533 additions and 896 deletions

View File

@ -6,7 +6,7 @@ jetty-7.6.6-SNAPSHOT
+ 376717 Balancer Servlet with round robin support, contribution, added
missing license
+ 379250 Server is added to shutdown hook twice
+ 380866 maxIdleTime set to 0 after session migration
+ 380866 idleTimeout set to 0 after session migration
+ 381399 Unable to stop a jetty instance that has not finished starting
+ 381401 Print log warning when stop attempt made with incorrect STOP.KEY
+ 381402 Make ContextHandler take set of protected directories
@ -178,7 +178,7 @@ jetty-8.1.0.RC4 - 13 January 2012
+ 367548 jetty-osgi-boot must not import the nested package twice
+ 367591 corrected configuration.xml version to 7.6
+ 367635 Added support for start.d directory
+ 367716 simplified maxIdleTime logic
+ 367716 simplified idleTimeout logic
+ 368035 WebSocketClientFactory does not invoke super.doStop().
+ 368060 do not encode sendRedirect URLs
+ 368112 NPE on <jsp-config><taglib> element parsing web.xml
@ -257,7 +257,7 @@ jetty-7.6.0.RC4 - 13 January 2012
a Https-resource through a web-proxy.
+ 366774 removed XSS vulnerbility
+ 367099 Upgrade jetty-websocket for RFC 6455 - Addendum.
+ 367716 simplified maxIdleTime logic
+ 367716 simplified idleTimeout logic
+ 368035 WebSocketClientFactory does not invoke super.doStop().
+ 368060 do not encode sendRedirect URLs
+ 368114 Protect against non-Strings in System properties for Log
@ -625,7 +625,7 @@ jetty-7.4.3.v20110701 - 01 July 2011
String) does not yield an empty map
+ 349738 set buffer sizes for http client in proxy servlet
+ 349870 proxy servlet protect continuation against fast failing exchanges
+ 349896 SCEP supports zero maxIdleTime
+ 349896 SCEP supports zero idleTimeout
+ 349897 draft -09 websockets
+ 349997 MBeanContainer uses weak references
+ 350533 Add "Origin" to the list of allowed headers in CrossOriginFilter
@ -1070,7 +1070,7 @@ jetty-7.1.4.v20100610
+ 315748 Removed --fromDaemon from start.jar (replaced with --daemon)
+ 315925 Improved context xml configuration handling
+ 315995 Incorrect package name in system classes list
+ 316119 Fixed maxIdleTime for SocketEndPoint
+ 316119 Fixed idleTimeout for SocketEndPoint
+ 316254 Implement @DeclareRoles
+ 316334 Breaking change on org.eclipse.jetty.client.HttpExchange
+ 316399 Debug output in MultiPartFilter
@ -2492,7 +2492,7 @@ jetty-6.1.5rc0 - 15 July 0200
jetty-6.1.4 - 15 June 2007
+ fixed early open() call in NIO connectors
+ JETTY-370 ensure maxIdleTime<=0 means connections never expire
+ JETTY-370 ensure idleTimeout<=0 means connections never expire
+ JETTY-371 Fixed chunked HEAD response
+ JETTY-372 make test for cookie caching more rigorous

View File

@ -10,7 +10,7 @@ confidentialPort: Port to use for confidential redirections
confidentialScheme: Scheme to use for confidential redirections
host: Host name to accept connections on
port: TCP/IP port to accept connections on
maxIdleTime: Maximum time in ms that a connection can be idle before being closed
idleTimeout: Maximum time in ms that a connection can be idle before being closed
statsOn: True if statistics collection is turned on.
statsOnMs: Time in milliseconds stats have been collected for.
statsReset(): Reset statistics.
@ -26,4 +26,4 @@ connectionsRequestsStdDev: Standard deviation of number of requests per connecti
connectionsRequestsMax: Maximum number of requests per connection since statsReset() called. Undefined if setStatsOn(false).
requests: Number of requests since statsReset() called. Undefined if setStatsOn(false).
open(): Open the listening port
close(): Close the listening port (but allow existing connections to continue for graceful shutdown)
close(): Close the listening port (but allow existing connections to continue for graceful shutdown)

View File

@ -3,7 +3,7 @@
<parent>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-project</artifactId>
<version>8.1.6-SNAPSHOT</version>
<version>9.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
@ -19,8 +19,10 @@
<modules>
<module>spdy-core</module>
<module>spdy-jetty</module>
<!--
<module>spdy-jetty-http</module>
<module>spdy-jetty-http-webapp</module>
-->
</modules>
<build>

View File

@ -3,12 +3,12 @@
<parent>
<groupId>org.eclipse.jetty.spdy</groupId>
<artifactId>spdy-parent</artifactId>
<version>8.1.6-SNAPSHOT</version>
<version>9.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spdy-core</artifactId>
<name>Jetty :: SPDY :: Core</name>
<modelVersion>4.0.0</modelVersion>
<artifactId>spdy-core</artifactId>
<name>Jetty :: SPDY :: Core</name>
<dependencies>
<dependency>
@ -16,21 +16,26 @@
<artifactId>jetty-util</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-io</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>

View File

@ -1,46 +0,0 @@
//========================================================================
//Copyright 2012 Mort Bay Consulting Pty. Ltd.
//------------------------------------------------------------------------
//All rights reserved. This program and the accompanying materials
//are made available under the terms of the Eclipse Public License v1.0
//and Apache License v2.0 which accompanies this distribution.
//The Eclipse Public License is available at
//http://www.eclipse.org/legal/epl-v10.html
//The Apache License v2.0 is available at
//http://www.opensource.org/licenses/apache2.0.php
//You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.spdy;
import java.nio.ByteBuffer;
/**
* <p>A {@link ByteBuffer} pool.</p>
* <p>Acquired buffers may be {@link #release(ByteBuffer) released} but they do not need to;
* if they are released, they may be recycled and reused, otherwise they will be garbage
* collected as usual.</p>
*/
public interface ByteBufferPool
{
/**
* <p>Requests a {@link ByteBuffer} of the given size.</p>
* <p>The returned buffer may have a bigger capacity than the size being
* requested but it will have the limit set to the given size.</p>
*
* @param size the size of the buffer
* @param direct whether the buffer must be direct or not
* @return the requested buffer
* @see #release(ByteBuffer)
*/
public ByteBuffer acquire(int size, boolean direct);
/**
* <p>Returns a {@link ByteBuffer}, usually obtained with {@link #acquire(int, boolean)}
* (but not necessarily), making it available for recycling and reuse.</p>
*
* @param buffer the buffer to return
* @see #acquire(int, boolean)
*/
public void release(ByteBuffer buffer);
}

View File

@ -1,25 +1,28 @@
//========================================================================
//Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
//------------------------------------------------------------------------
//All rights reserved. This program and the accompanying materials
//are made available under the terms of the Eclipse Public License v1.0
//and Apache License v2.0 which accompanies this distribution.
//The Eclipse Public License is available at
//http://www.eclipse.org/legal/epl-v10.html
//The Apache License v2.0 is available at
//http://www.opensource.org/licenses/apache2.0.php
//You may elect to redistribute this code under either of these licenses.
//========================================================================
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.util.Callback;
public interface Controller<T>
{
public int write(ByteBuffer buffer, Handler<T> handler, T context);
public int write(ByteBuffer buffer, Callback<T> callback, T context);
public void close(boolean onlyOutput);
}

View File

@ -1,24 +1,27 @@
//========================================================================
//Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
//------------------------------------------------------------------------
//All rights reserved. This program and the accompanying materials
//are made available under the terms of the Eclipse Public License v1.0
//and Apache License v2.0 which accompanies this distribution.
//The Eclipse Public License is available at
//http://www.eclipse.org/legal/epl-v10.html
//The Apache License v2.0 is available at
//http://www.opensource.org/licenses/apache2.0.php
//You may elect to redistribute this code under either of these licenses.
//========================================================================
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.util.Callback;
public interface ISession extends Session
{
@ -29,7 +32,7 @@ public interface ISession extends Session
*/
public void flush();
public <C> void control(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Handler<C> handler, C context);
public <C> void control(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Callback<C> callback, C context);
public <C> void data(IStream stream, DataInfo dataInfo, long timeout, TimeUnit unit, Handler<C> handler, C context);
public <C> void data(IStream stream, DataInfo dataInfo, long timeout, TimeUnit unit, Callback<C> callback, C context);
}

View File

@ -20,7 +20,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.util.Callback;
/**
* <p>A {@link Promise} is a {@link Future} that allows a result or a failure to be set,
@ -28,7 +28,7 @@ import org.eclipse.jetty.spdy.api.Handler;
*
* @param <T> the type of the result object
*/
public class Promise<T> implements Handler<T>, Future<T>
public class Promise<T> implements Callback<T>, Future<T>
{
private final CountDownLatch latch = new CountDownLatch(1);
private boolean cancelled;

View File

@ -1,97 +0,0 @@
//========================================================================
//Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
//------------------------------------------------------------------------
//All rights reserved. This program and the accompanying materials
//are made available under the terms of the Eclipse Public License v1.0
//and Apache License v2.0 which accompanies this distribution.
//The Eclipse Public License is available at
//http://www.eclipse.org/legal/epl-v10.html
//The Apache License v2.0 is available at
//http://www.opensource.org/licenses/apache2.0.php
//You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.spdy;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
public class StandardByteBufferPool implements ByteBufferPool
{
private final ConcurrentMap<Integer, Queue<ByteBuffer>> directBuffers = new ConcurrentHashMap<>();
private final ConcurrentMap<Integer, Queue<ByteBuffer>> heapBuffers = new ConcurrentHashMap<>();
private final int factor;
public StandardByteBufferPool()
{
this(1024);
}
public StandardByteBufferPool(int factor)
{
this.factor = factor;
}
public ByteBuffer acquire(int size, boolean direct)
{
int bucket = bucketFor(size);
ConcurrentMap<Integer, Queue<ByteBuffer>> buffers = buffersFor(direct);
ByteBuffer result = null;
Queue<ByteBuffer> byteBuffers = buffers.get(bucket);
if (byteBuffers != null)
result = byteBuffers.poll();
if (result == null)
{
int capacity = bucket * factor;
result = direct ? ByteBuffer.allocateDirect(capacity) : ByteBuffer.allocate(capacity);
}
result.clear();
result.limit(size);
return result;
}
public void release(ByteBuffer buffer)
{
int bucket = bucketFor(buffer.capacity());
ConcurrentMap<Integer, Queue<ByteBuffer>> buffers = buffersFor(buffer.isDirect());
// Avoid to create a new queue every time, just to be discarded immediately
Queue<ByteBuffer> byteBuffers = buffers.get(bucket);
if (byteBuffers == null)
{
byteBuffers = new ConcurrentLinkedQueue<>();
Queue<ByteBuffer> existing = buffers.putIfAbsent(bucket, byteBuffers);
if (existing != null)
byteBuffers = existing;
}
buffer.clear();
byteBuffers.offer(buffer);
}
public void clear()
{
directBuffers.clear();
heapBuffers.clear();
}
private int bucketFor(int size)
{
int bucket = size / factor;
if (size % factor > 0)
++bucket;
return bucket;
}
private ConcurrentMap<Integer, Queue<ByteBuffer>> buffersFor(boolean direct)
{
return direct ? directBuffers : heapBuffers;
}
}

View File

@ -34,10 +34,10 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.PingInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.SPDYException;
@ -65,12 +65,13 @@ import org.eclipse.jetty.spdy.frames.WindowUpdateFrame;
import org.eclipse.jetty.spdy.generator.Generator;
import org.eclipse.jetty.spdy.parser.Parser;
import org.eclipse.jetty.util.Atomics;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class StandardSession implements ISession, Parser.Listener, Handler<StandardSession.FrameBytes>, Dumpable
public class StandardSession implements ISession, Parser.Listener, Callback<StandardSession.FrameBytes>, Dumpable
{
private static final Logger logger = Log.getLogger(Session.class);
private static final ThreadLocal<Integer> handlerInvocations = new ThreadLocal<Integer>()
@ -147,7 +148,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
@Override
public void syn(SynInfo synInfo, StreamFrameListener listener, long timeout, TimeUnit unit, Handler<Stream> handler)
public void syn(SynInfo synInfo, StreamFrameListener listener, long timeout, TimeUnit unit, Callback<Stream> callback)
{
// Synchronization is necessary.
// SPEC v3, 2.3.1 requires that the stream creation be monotonically crescent
@ -165,7 +166,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
// TODO: for SPDYv3 we need to support the "slot" argument
SynStreamFrame synStream = new SynStreamFrame(version, synInfo.getFlags(), streamId, associatedStreamId, synInfo.getPriority(), (short)0, synInfo.getHeaders());
IStream stream = createStream(synStream, listener, true);
generateAndEnqueueControlFrame(stream, synStream, timeout, unit, handler, stream);
generateAndEnqueueControlFrame(stream, synStream, timeout, unit, callback, stream);
}
flush();
}
@ -179,19 +180,19 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
@Override
public void rst(RstInfo rstInfo, long timeout, TimeUnit unit, Handler<Void> handler)
public void rst(RstInfo rstInfo, long timeout, TimeUnit unit, Callback<Void> callback)
{
// SPEC v3, 2.2.2
if (goAwaySent.get())
{
complete(handler,null);
complete(callback,null);
}
else
{
int streamId = rstInfo.getStreamId();
IStream stream = streams.get(streamId);
RstStreamFrame frame = new RstStreamFrame(version,streamId,rstInfo.getStreamStatus().getCode(version));
control(stream,frame,timeout,unit,handler,null);
control(stream,frame,timeout,unit,callback,null);
if (stream != null)
{
stream.process(frame);
@ -209,10 +210,10 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
@Override
public void settings(SettingsInfo settingsInfo, long timeout, TimeUnit unit, Handler<Void> handler)
public void settings(SettingsInfo settingsInfo, long timeout, TimeUnit unit, Callback<Void> callback)
{
SettingsFrame frame = new SettingsFrame(version,settingsInfo.getFlags(),settingsInfo.getSettings());
control(null, frame, timeout, unit, handler, null);
control(null, frame, timeout, unit, callback, null);
}
@Override
@ -224,12 +225,12 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
@Override
public void ping(long timeout, TimeUnit unit, Handler<PingInfo> handler)
public void ping(long timeout, TimeUnit unit, Callback<PingInfo> callback)
{
int pingId = pingIds.getAndAdd(2);
PingInfo pingInfo = new PingInfo(pingId);
PingFrame frame = new PingFrame(version,pingId);
control(null,frame,timeout,unit,handler,pingInfo);
control(null,frame,timeout,unit,callback,pingInfo);
}
@Override
@ -246,23 +247,24 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
@Override
public void goAway(long timeout, TimeUnit unit, Handler<Void> handler)
public void goAway(long timeout, TimeUnit unit, Callback<Void> callback)
{
goAway(SessionStatus.OK, timeout, unit, handler);
goAway(SessionStatus.OK, timeout, unit, callback);
}
private void goAway(SessionStatus sessionStatus, long timeout, TimeUnit unit, Handler<Void> handler)
private void goAway(SessionStatus sessionStatus, long timeout, TimeUnit unit, Callback<Void> callback)
{
new Exception().printStackTrace();
if (goAwaySent.compareAndSet(false,true))
{
if (!goAwayReceived.get())
{
GoAwayFrame frame = new GoAwayFrame(version,lastStreamId.get(),sessionStatus.getCode());
control(null,frame,timeout,unit,handler,null);
control(null,frame,timeout,unit,callback,null);
return;
}
}
complete(handler, null);
complete(callback, null);
}
@Override
@ -818,14 +820,15 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
}
@Override
public <C> void control(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Handler<C> handler, C context)
public <C> void control(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Callback<C> callback, C context)
{
generateAndEnqueueControlFrame(stream,frame,timeout,unit,handler,context);
generateAndEnqueueControlFrame(stream,frame,timeout,unit,callback,context);
flush();
}
private <C> void generateAndEnqueueControlFrame(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Handler<C> handler, C context)
private <C> void generateAndEnqueueControlFrame(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Callback<C> callback, C context)
{
try
{
@ -836,7 +839,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
{
ByteBuffer buffer = generator.control(frame);
logger.debug("Queuing {} on {}", frame, stream);
ControlFrameBytes<C> frameBytes = new ControlFrameBytes<>(stream, handler, context, frame, buffer);
ControlFrameBytes<C> frameBytes = new ControlFrameBytes<>(stream, callback, context, frame, buffer);
if (timeout > 0)
frameBytes.task = scheduler.schedule(frameBytes, timeout, unit);
@ -849,7 +852,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
catch (Exception x)
{
notifyHandlerFailed(handler, context, x);
notifyCallbackFailed(callback, context, x);
}
}
@ -861,10 +864,10 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
@Override
public <C> void data(IStream stream, DataInfo dataInfo, long timeout, TimeUnit unit, Handler<C> handler, C context)
public <C> void data(IStream stream, DataInfo dataInfo, long timeout, TimeUnit unit, Callback<C> callback, C context)
{
logger.debug("Queuing {} on {}",dataInfo,stream);
DataFrameBytes<C> frameBytes = new DataFrameBytes<>(stream,handler,context,dataInfo);
DataFrameBytes<C> frameBytes = new DataFrameBytes<>(stream,callback,context,dataInfo);
if (timeout > 0)
frameBytes.task = scheduler.schedule(frameBytes,timeout,unit);
append(frameBytes);
@ -1003,19 +1006,19 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
fb.fail(x);
}
protected void write(ByteBuffer buffer, Handler<FrameBytes> handler, FrameBytes frameBytes)
protected void write(ByteBuffer buffer, Callback<FrameBytes> callback, FrameBytes frameBytes)
{
if (controller != null)
{
logger.debug("Writing {} frame bytes of {}",buffer.remaining(),frameBytes);
controller.write(buffer,handler,frameBytes);
controller.write(buffer,callback,frameBytes);
}
}
private <C> void complete(final Handler<C> handler, final C context)
private <C> void complete(final Callback<C> callback, final C context)
{
// Applications may send and queue up a lot of frames and
// if we call Handler.completed() only synchronously we risk
// if we call Callback.completed() only synchronously we risk
// starvation (for the last frames sent) and stack overflow.
// Therefore every some invocation, we dispatch to a new thread
Integer invocations = handlerInvocations.get();
@ -1026,8 +1029,8 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
@Override
public void run()
{
if (handler != null)
notifyHandlerCompleted(handler,context);
if (callback != null)
notifyCallbackCompleted(callback, context);
flush();
}
});
@ -1037,8 +1040,8 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
handlerInvocations.set(invocations + 1);
try
{
if (handler != null)
notifyHandlerCompleted(handler,context);
if (callback != null)
notifyCallbackCompleted(callback, context);
flush();
}
finally
@ -1048,37 +1051,37 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
}
private <C> void notifyHandlerCompleted(Handler<C> handler, C context)
private <C> void notifyCallbackCompleted(Callback<C> callback, C context)
{
try
{
handler.completed(context);
callback.completed(context);
}
catch (Exception x)
{
logger.info("Exception while notifying handler " + handler, x);
logger.info("Exception while notifying callback " + callback, x);
}
catch (Error x)
{
logger.info("Exception while notifying handler " + handler, x);
logger.info("Exception while notifying callback " + callback, x);
throw x;
}
}
private <C> void notifyHandlerFailed(Handler<C> handler, C context, Throwable x)
private <C> void notifyCallbackFailed(Callback<C> callback, C context, Throwable x)
{
try
{
if (handler != null)
handler.failed(context, x);
if (callback != null)
callback.failed(context, x);
}
catch (Exception xx)
{
logger.info("Exception while notifying handler " + handler, xx);
logger.info("Exception while notifying callback " + callback, xx);
}
catch (Error xx)
{
logger.info("Exception while notifying handler " + handler, xx);
logger.info("Exception while notifying callback " + callback, xx);
throw xx;
}
}
@ -1128,14 +1131,14 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private abstract class AbstractFrameBytes<C> implements FrameBytes, Runnable
{
private final IStream stream;
private final Handler<C> handler;
private final Callback<C> callback;
private final C context;
protected volatile ScheduledFuture<?> task;
protected AbstractFrameBytes(IStream stream, Handler<C> handler, C context)
protected AbstractFrameBytes(IStream stream, Callback<C> callback, C context)
{
this.stream = stream;
this.handler = handler;
this.callback = callback;
this.context = context;
}
@ -1164,14 +1167,14 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
public void complete()
{
cancelTask();
StandardSession.this.complete(handler,context);
StandardSession.this.complete(callback,context);
}
@Override
public void fail(Throwable x)
{
cancelTask();
notifyHandlerFailed(handler,context,x);
notifyCallbackFailed(callback, context, x);
StandardSession.this.flush();
}
@ -1195,9 +1198,9 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private final ControlFrame frame;
private final ByteBuffer buffer;
private ControlFrameBytes(IStream stream, Handler<C> handler, C context, ControlFrame frame, ByteBuffer buffer)
private ControlFrameBytes(IStream stream, Callback<C> callback, C context, ControlFrame frame, ByteBuffer buffer)
{
super(stream,handler,context);
super(stream,callback,context);
this.frame = frame;
this.buffer = buffer;
}
@ -1239,7 +1242,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private int size;
private volatile ByteBuffer buffer;
private DataFrameBytes(IStream stream, Handler<C> handler, C context, DataInfo dataInfo)
private DataFrameBytes(IStream stream, Callback<C> handler, C context, DataInfo dataInfo)
{
super(stream,handler,context);
this.dataInfo = dataInfo;

View File

@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
@ -33,6 +32,7 @@ import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.spdy.frames.HeadersFrame;
import org.eclipse.jetty.spdy.frames.SynReplyFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -318,15 +318,15 @@ public class StandardStream implements IStream
}
@Override
public void syn(SynInfo synInfo, long timeout, TimeUnit unit, Handler<Stream> handler)
public void syn(SynInfo synInfo, long timeout, TimeUnit unit, Callback<Stream> callback)
{
if (isClosed() || isReset())
{
handler.failed(this, new StreamException(getId(), StreamStatus.STREAM_ALREADY_CLOSED));
callback.failed(this, new StreamException(getId(), StreamStatus.STREAM_ALREADY_CLOSED));
return;
}
PushSynInfo pushSynInfo = new PushSynInfo(getId(), synInfo);
session.syn(pushSynInfo, null, timeout, unit, handler);
session.syn(pushSynInfo, null, timeout, unit, callback);
}
@Override
@ -338,14 +338,14 @@ public class StandardStream implements IStream
}
@Override
public void reply(ReplyInfo replyInfo, long timeout, TimeUnit unit, Handler<Void> handler)
public void reply(ReplyInfo replyInfo, long timeout, TimeUnit unit, Callback<Void> callback)
{
if (isUnidirectional())
throw new IllegalStateException("Protocol violation: cannot send SYN_REPLY frames in unidirectional streams");
openState = OpenState.REPLY_SENT;
updateCloseState(replyInfo.isClose(), true);
SynReplyFrame frame = new SynReplyFrame(session.getVersion(), replyInfo.getFlags(), getId(), replyInfo.getHeaders());
session.control(this, frame, timeout, unit, handler, null);
session.control(this, frame, timeout, unit, callback, null);
}
@Override
@ -357,7 +357,7 @@ public class StandardStream implements IStream
}
@Override
public void data(DataInfo dataInfo, long timeout, TimeUnit unit, Handler<Void> handler)
public void data(DataInfo dataInfo, long timeout, TimeUnit unit, Callback<Void> callback)
{
if (!canSend())
{
@ -372,7 +372,7 @@ public class StandardStream implements IStream
// Cannot update the close state here, because the data that we send may
// be flow controlled, so we need the stream to update the window size.
session.data(this, dataInfo, timeout, unit, handler, null);
session.data(this, dataInfo, timeout, unit, callback, null);
}
@Override
@ -384,7 +384,7 @@ public class StandardStream implements IStream
}
@Override
public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Handler<Void> handler)
public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Callback<Void> callback)
{
if (!canSend())
{
@ -399,7 +399,7 @@ public class StandardStream implements IStream
updateCloseState(headersInfo.isClose(), true);
HeadersFrame frame = new HeadersFrame(session.getVersion(), headersInfo.getFlags(), getId(), headersInfo.getHeaders());
session.control(this, frame, timeout, unit, handler, null);
session.control(this, frame, timeout, unit, callback, null);
}
@Override

View File

@ -1,56 +0,0 @@
//========================================================================
//Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
//------------------------------------------------------------------------
//All rights reserved. This program and the accompanying materials
//are made available under the terms of the Eclipse Public License v1.0
//and Apache License v2.0 which accompanies this distribution.
//The Eclipse Public License is available at
//http://www.eclipse.org/legal/epl-v10.html
//The Apache License v2.0 is available at
//http://www.opensource.org/licenses/apache2.0.php
//You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.spdy.api;
/**
* <p>A callback abstraction that handles completed/failed events of asynchronous operations.</p>
* <p>Instances of this class capture a context that is made available on the completion callback.</p>
*
* @param <C> the type of the context object
*/
public interface Handler<C>
{
/**
* <p>Callback invoked when the operation completes.</p>
*
* @param context the context
* @see #failed(Object, Throwable)
*/
public abstract void completed(C context);
/**
* <p>Callback invoked when the operation fails.</p>
* @param context the context
* @param x the reason for the operation failure
*/
public void failed(C context, Throwable x);
/**
* <p>Empty implementation of {@link Handler}</p>
*
* @param <C> the type of the context object
*/
public static class Adapter<C> implements Handler<C>
{
@Override
public void completed(C context)
{
}
@Override
public void failed(C context, Throwable x)
{
}
}
}

View File

@ -18,6 +18,8 @@ import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.Callback;
/**
* <p>A {@link Session} represents the client-side endpoint of a SPDY connection to a single origin server.</p>
* <p>Once a {@link Session} has been obtained, it can be used to open SPDY streams:</p>
@ -69,23 +71,23 @@ public interface Session
* @param synInfo the metadata to send on stream creation
* @param listener the listener to invoke when events happen on the stream just created
* @return a future for the stream that will be created
* @see #syn(SynInfo, StreamFrameListener, long, TimeUnit, Handler)
* @see #syn(SynInfo, StreamFrameListener, long, TimeUnit, Callback)
*/
public Future<Stream> syn(SynInfo synInfo, StreamFrameListener listener);
/**
* <p>Sends asynchronously a SYN_FRAME to create a new {@link Stream SPDY stream}.</p>
* <p>Callers may pass a non-null completion handler to be notified of when the
* <p>Callers may pass a non-null completion callback to be notified of when the
* stream has been created and use the stream, for example, to send data frames.</p>
*
* @param synInfo the metadata to send on stream creation
* @param listener the listener to invoke when events happen on the stream just created
* @param timeout the operation's timeout
* @param unit the timeout's unit
* @param handler the completion handler that gets notified of stream creation
* @param callback the completion callback that gets notified of stream creation
* @see #syn(SynInfo, StreamFrameListener)
*/
public void syn(SynInfo synInfo, StreamFrameListener listener, long timeout, TimeUnit unit, Handler<Stream> handler);
public void syn(SynInfo synInfo, StreamFrameListener listener, long timeout, TimeUnit unit, Callback<Stream> callback);
/**
@ -94,22 +96,22 @@ public interface Session
*
* @param rstInfo the metadata to reset the stream
* @return a future to wait for the reset to be sent
* @see #rst(RstInfo, long, TimeUnit, Handler)
* @see #rst(RstInfo, long, TimeUnit, Callback)
*/
public Future<Void> rst(RstInfo rstInfo);
/**
* <p>Sends asynchronously a RST_STREAM to abort a stream.</p>
* <p>Callers may pass a non-null completion handler to be notified of when the
* <p>Callers may pass a non-null completion callback to be notified of when the
* reset has been actually sent.</p>
*
* @param rstInfo the metadata to reset the stream
* @param timeout the operation's timeout
* @param unit the timeout's unit
* @param handler the completion handler that gets notified of reset's send
* @param callback the completion callback that gets notified of reset's send
* @see #rst(RstInfo)
*/
public void rst(RstInfo rstInfo, long timeout, TimeUnit unit, Handler<Void> handler);
public void rst(RstInfo rstInfo, long timeout, TimeUnit unit, Callback<Void> callback);
/**
* <p>Sends asynchronously a SETTINGS to configure the SPDY connection.</p>
@ -117,64 +119,64 @@ public interface Session
*
* @param settingsInfo the metadata to send
* @return a future to wait for the settings to be sent
* @see #settings(SettingsInfo, long, TimeUnit, Handler)
* @see #settings(SettingsInfo, long, TimeUnit, Callback)
*/
public Future<Void> settings(SettingsInfo settingsInfo);
/**
* <p>Sends asynchronously a SETTINGS to configure the SPDY connection.</p>
* <p>Callers may pass a non-null completion handler to be notified of when the
* <p>Callers may pass a non-null completion callback to be notified of when the
* settings has been actually sent.</p>
*
* @param settingsInfo the metadata to send
* @param timeout the operation's timeout
* @param unit the timeout's unit
* @param handler the completion handler that gets notified of settings' send
* @param callback the completion callback that gets notified of settings' send
* @see #settings(SettingsInfo)
*/
public void settings(SettingsInfo settingsInfo, long timeout, TimeUnit unit, Handler<Void> handler);
public void settings(SettingsInfo settingsInfo, long timeout, TimeUnit unit, Callback<Void> callback);
/**
* <p>Sends asynchronously a PING, normally to measure round-trip time.</p>
* <p>Callers may use the returned future to wait for the ping to be sent.</p>
*
* @return a future for the metadata sent
* @see #ping(long, TimeUnit, Handler)
* @see #ping(long, TimeUnit, Callback)
*/
public Future<PingInfo> ping();
/**
* <p>Sends asynchronously a PING, normally to measure round-trip time.</p>
* <p>Callers may pass a non-null completion handler to be notified of when the
* <p>Callers may pass a non-null completion callback to be notified of when the
* ping has been actually sent.</p>
*
* @param timeout the operation's timeout
* @param unit the timeout's unit
* @param handler the completion handler that gets notified of ping's send
* @param callback the completion callback that gets notified of ping's send
* @see #ping()
*/
public void ping(long timeout, TimeUnit unit, Handler<PingInfo> handler);
public void ping(long timeout, TimeUnit unit, Callback<PingInfo> callback);
/**
* <p>Closes gracefully this session, sending a GO_AWAY frame and then closing the TCP connection.</p>
* <p>Callers may use the returned future to wait for the go away to be sent.</p>
*
* @return a future to wait for the go away to be sent
* @see #goAway(long, TimeUnit, Handler)
* @see #goAway(long, TimeUnit, Callback)
*/
public Future<Void> goAway();
/**
* <p>Closes gracefully this session, sending a GO_AWAY frame and then closing the TCP connection.</p>
* <p>Callers may pass a non-null completion handler to be notified of when the
* <p>Callers may pass a non-null completion callback to be notified of when the
* go away has been actually sent.</p>
*
* @param timeout the operation's timeout
* @param unit the timeout's unit
* @param handler the completion handler that gets notified of go away's send
* @param callback the completion callback that gets notified of go away's send
* @see #goAway()
*/
public void goAway(long timeout, TimeUnit unit, Handler<Void> handler);
public void goAway(long timeout, TimeUnit unit, Callback<Void> callback);
/**
* @return a snapshot of the streams currently active in this session

View File

@ -18,6 +18,8 @@ import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.Callback;
/**
* <p>A {@link Stream} represents a bidirectional exchange of data on top of a {@link Session}.</p>
* <p>Differently from socket streams, where the input and output streams are permanently associated
@ -42,7 +44,7 @@ import java.util.concurrent.TimeUnit;
* stream.data(StringDataInfo("chunk1", false), 5, TimeUnit.SECONDS, new Handler&lt;Void&gt;() { ... });
* stream.data(StringDataInfo("chunk2", true), 1, TimeUnit.SECONDS, new Handler&lt;Void&gt;() { ... });
* </pre>
* <p>where the second call to {@link #data(DataInfo, long, TimeUnit, Handler)} has a timeout smaller
* <p>where the second call to {@link #data(DataInfo, long, TimeUnit, Callback)} has a timeout smaller
* than the previous call.</p>
* <p>The behavior of such style of invocations is unspecified (it may even throw an exception - similar
* to {@link WritePendingException}).</p>
@ -89,22 +91,22 @@ public interface Stream
*
* @param synInfo the metadata to send on stream creation
* @return a future containing the stream once it got established
* @see #syn(SynInfo, long, TimeUnit, Handler)
* @see #syn(SynInfo, long, TimeUnit, Callback)
*/
public Future<Stream> syn(SynInfo synInfo);
/**
* <p>Initiate a unidirectional spdy pushstream associated to this stream asynchronously<p>
* <p>Callers may pass a non-null completion handler to be notified of when the
* <p>Callers may pass a non-null completion callback to be notified of when the
* pushstream has been established.</p>
*
* @param synInfo the metadata to send on stream creation
* @param timeout the operation's timeout
* @param unit the timeout's unit
* @param handler the completion handler that gets notified once the pushstream is established
* @param callback the completion callback that gets notified once the pushstream is established
* @see #syn(SynInfo)
*/
public void syn(SynInfo synInfo, long timeout, TimeUnit unit, Handler<Stream> handler);
public void syn(SynInfo synInfo, long timeout, TimeUnit unit, Callback<Stream> callback);
/**
* <p>Sends asynchronously a SYN_REPLY frame in response to a SYN_STREAM frame.</p>
@ -112,23 +114,23 @@ public interface Stream
*
* @param replyInfo the metadata to send
* @return a future to wait for the reply to be sent
* @see #reply(ReplyInfo, long, TimeUnit, Handler)
* @see #reply(ReplyInfo, long, TimeUnit, Callback)
* @see SessionFrameListener#onSyn(Stream, SynInfo)
*/
public Future<Void> reply(ReplyInfo replyInfo);
/**
* <p>Sends asynchronously a SYN_REPLY frame in response to a SYN_STREAM frame.</p>
* <p>Callers may pass a non-null completion handler to be notified of when the
* <p>Callers may pass a non-null completion callback to be notified of when the
* reply has been actually sent.</p>
*
* @param replyInfo the metadata to send
* @param timeout the operation's timeout
* @param unit the timeout's unit
* @param handler the completion handler that gets notified of reply sent
* @param callback the completion callback that gets notified of reply sent
* @see #reply(ReplyInfo)
*/
public void reply(ReplyInfo replyInfo, long timeout, TimeUnit unit, Handler<Void> handler);
public void reply(ReplyInfo replyInfo, long timeout, TimeUnit unit, Callback<Void> callback);
/**
* <p>Sends asynchronously a DATA frame on this stream.</p>
@ -137,7 +139,7 @@ public interface Stream
*
* @param dataInfo the metadata to send
* @return a future to wait for the data to be sent
* @see #data(DataInfo, long, TimeUnit, Handler)
* @see #data(DataInfo, long, TimeUnit, Callback)
* @see #reply(ReplyInfo)
*/
public Future<Void> data(DataInfo dataInfo);
@ -145,16 +147,16 @@ public interface Stream
/**
* <p>Sends asynchronously a DATA frame on this stream.</p>
* <p>DATA frames should always be sent after a SYN_REPLY frame.</p>
* <p>Callers may pass a non-null completion handler to be notified of when the
* <p>Callers may pass a non-null completion callback to be notified of when the
* data has been actually sent.</p>
*
* @param dataInfo the metadata to send
* @param timeout the operation's timeout
* @param unit the timeout's unit
* @param handler the completion handler that gets notified of data sent
* @param callback the completion callback that gets notified of data sent
* @see #data(DataInfo)
*/
public void data(DataInfo dataInfo, long timeout, TimeUnit unit, Handler<Void> handler);
public void data(DataInfo dataInfo, long timeout, TimeUnit unit, Callback<Void> callback);
/**
* <p>Sends asynchronously a HEADER frame on this stream.</p>
@ -163,7 +165,7 @@ public interface Stream
*
* @param headersInfo the metadata to send
* @return a future to wait for the headers to be sent
* @see #headers(HeadersInfo, long, TimeUnit, Handler)
* @see #headers(HeadersInfo, long, TimeUnit, Callback
* @see #reply(ReplyInfo)
*/
public Future<Void> headers(HeadersInfo headersInfo);
@ -171,16 +173,16 @@ public interface Stream
/**
* <p>Sends asynchronously a HEADER frame on this stream.</p>
* <p>HEADERS frames should always be sent after a SYN_REPLY frame.</p>
* <p>Callers may pass a non-null completion handler to be notified of when the
* <p>Callers may pass a non-null completion callback to be notified of when the
* headers have been actually sent.</p>
*
* @param headersInfo the metadata to send
* @param timeout the operation's timeout
* @param unit the timeout's unit
* @param handler the completion handler that gets notified of headers sent
* @param callback the completion callback that gets notified of headers sent
* @see #headers(HeadersInfo)
*/
public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Handler<Void> handler);
public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Callback<Void> callback);
/**
* @return whether this stream is unidirectional or not

View File

@ -15,7 +15,7 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.spdy.frames.ControlFrame;
public abstract class ControlFrameGenerator

View File

@ -19,11 +19,12 @@ import java.security.cert.CertificateEncodingException;
import java.util.ArrayList;
import java.util.List;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.spdy.SessionException;
import org.eclipse.jetty.spdy.api.SessionStatus;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.spdy.frames.CredentialFrame;
import org.eclipse.jetty.util.BufferUtil;
public class CredentialGenerator extends ControlFrameGenerator
{
@ -48,6 +49,7 @@ public class CredentialGenerator extends ControlFrameGenerator
int totalLength = ControlFrame.HEADER_LENGTH + frameBodyLength;
ByteBuffer buffer = getByteBufferPool().acquire(totalLength, true);
BufferUtil.clearToFill(buffer);
generateControlFrameHeader(credential, frameBodyLength, buffer);
buffer.putShort(credential.getSlot());

View File

@ -14,9 +14,10 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.frames.DataFrame;
import org.eclipse.jetty.util.BufferUtil;
public class DataFrameGenerator
{
@ -30,6 +31,8 @@ public class DataFrameGenerator
public ByteBuffer generate(int streamId, int length, DataInfo dataInfo)
{
ByteBuffer buffer = bufferPool.acquire(DataFrame.HEADER_LENGTH + length, true);
BufferUtil.clearToFill(buffer);
buffer.limit(length + DataFrame.HEADER_LENGTH); //TODO: thomas show Simone :)
buffer.position(DataFrame.HEADER_LENGTH);
// Guaranteed to always be >= 0
int read = dataInfo.readInto(buffer);

View File

@ -16,7 +16,7 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import java.util.EnumMap;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.spdy.CompressionFactory;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.frames.ControlFrame;

View File

@ -15,10 +15,11 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.spdy.frames.GoAwayFrame;
import org.eclipse.jetty.util.BufferUtil;
public class GoAwayGenerator extends ControlFrameGenerator
{
@ -35,6 +36,7 @@ public class GoAwayGenerator extends ControlFrameGenerator
int frameBodyLength = 8;
int totalLength = ControlFrame.HEADER_LENGTH + frameBodyLength;
ByteBuffer buffer = getByteBufferPool().acquire(totalLength, true);
BufferUtil.clearToFill(buffer);
generateControlFrameHeader(goAway, frameBodyLength, buffer);
buffer.putInt(goAway.getLastStreamId() & 0x7F_FF_FF_FF);

View File

@ -15,12 +15,13 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.spdy.SessionException;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.SessionStatus;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.spdy.frames.HeadersFrame;
import org.eclipse.jetty.util.BufferUtil;
public class HeadersGenerator extends ControlFrameGenerator
{
@ -55,6 +56,7 @@ public class HeadersGenerator extends ControlFrameGenerator
int totalLength = ControlFrame.HEADER_LENGTH + frameLength;
ByteBuffer buffer = getByteBufferPool().acquire(totalLength, true);
BufferUtil.clearToFill(buffer);
generateControlFrameHeader(headers, frameLength, buffer);
buffer.putInt(headers.getStreamId() & 0x7F_FF_FF_FF);

View File

@ -15,9 +15,10 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.spdy.frames.NoOpFrame;
import org.eclipse.jetty.util.BufferUtil;
public class NoOpGenerator extends ControlFrameGenerator
{
@ -34,6 +35,7 @@ public class NoOpGenerator extends ControlFrameGenerator
int frameBodyLength = 0;
int totalLength = ControlFrame.HEADER_LENGTH + frameBodyLength;
ByteBuffer buffer = getByteBufferPool().acquire(totalLength, true);
BufferUtil.clearToFill(buffer);
generateControlFrameHeader(noOp, frameBodyLength, buffer);
buffer.flip();

View File

@ -15,9 +15,10 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.spdy.frames.PingFrame;
import org.eclipse.jetty.util.BufferUtil;
public class PingGenerator extends ControlFrameGenerator
{
@ -34,6 +35,7 @@ public class PingGenerator extends ControlFrameGenerator
int frameBodyLength = 4;
int totalLength = ControlFrame.HEADER_LENGTH + frameBodyLength;
ByteBuffer buffer = getByteBufferPool().acquire(totalLength, true);
BufferUtil.clearToFill(buffer);
generateControlFrameHeader(ping, frameBodyLength, buffer);
buffer.putInt(ping.getPingId());

View File

@ -15,9 +15,10 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.spdy.frames.RstStreamFrame;
import org.eclipse.jetty.util.BufferUtil;
public class RstStreamGenerator extends ControlFrameGenerator
{
@ -34,6 +35,7 @@ public class RstStreamGenerator extends ControlFrameGenerator
int frameBodyLength = 8;
int totalLength = ControlFrame.HEADER_LENGTH + frameBodyLength;
ByteBuffer buffer = getByteBufferPool().acquire(totalLength, true);
BufferUtil.clearToFill(buffer);
generateControlFrameHeader(rstStream, frameBodyLength, buffer);
buffer.putInt(rstStream.getStreamId() & 0x7F_FF_FF_FF);

View File

@ -15,11 +15,12 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Settings;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.spdy.frames.SettingsFrame;
import org.eclipse.jetty.util.BufferUtil;
public class SettingsGenerator extends ControlFrameGenerator
{
@ -38,6 +39,7 @@ public class SettingsGenerator extends ControlFrameGenerator
int frameBodyLength = 4 + 8 * size;
int totalLength = ControlFrame.HEADER_LENGTH + frameBodyLength;
ByteBuffer buffer = getByteBufferPool().acquire(totalLength, true);
BufferUtil.clearToFill(buffer);
generateControlFrameHeader(settingsFrame, frameBodyLength, buffer);
buffer.putInt(size);

View File

@ -15,12 +15,13 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.spdy.SessionException;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.SessionStatus;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.spdy.frames.SynReplyFrame;
import org.eclipse.jetty.util.BufferUtil;
public class SynReplyGenerator extends ControlFrameGenerator
{
@ -53,6 +54,7 @@ public class SynReplyGenerator extends ControlFrameGenerator
int totalLength = ControlFrame.HEADER_LENGTH + frameLength;
ByteBuffer buffer = getByteBufferPool().acquire(totalLength, true);
BufferUtil.clearToFill(buffer);
generateControlFrameHeader(synReply, frameLength, buffer);
buffer.putInt(synReply.getStreamId() & 0x7F_FF_FF_FF);

View File

@ -15,7 +15,7 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.spdy.SessionException;
import org.eclipse.jetty.spdy.StreamException;
import org.eclipse.jetty.spdy.api.SPDY;
@ -23,6 +23,7 @@ import org.eclipse.jetty.spdy.api.SessionStatus;
import org.eclipse.jetty.spdy.api.StreamStatus;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.spdy.frames.SynStreamFrame;
import org.eclipse.jetty.util.BufferUtil;
public class SynStreamGenerator extends ControlFrameGenerator
{
@ -55,6 +56,7 @@ public class SynStreamGenerator extends ControlFrameGenerator
int totalLength = ControlFrame.HEADER_LENGTH + frameLength;
ByteBuffer buffer = getByteBufferPool().acquire(totalLength, true);
BufferUtil.clearToFill(buffer);
generateControlFrameHeader(synStream, frameLength, buffer);
int streamId = synStream.getStreamId();

View File

@ -15,9 +15,10 @@ package org.eclipse.jetty.spdy.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.ByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.spdy.frames.WindowUpdateFrame;
import org.eclipse.jetty.util.BufferUtil;
public class WindowUpdateGenerator extends ControlFrameGenerator
{
@ -34,6 +35,7 @@ public class WindowUpdateGenerator extends ControlFrameGenerator
int frameBodyLength = 8;
int totalLength = ControlFrame.HEADER_LENGTH + frameBodyLength;
ByteBuffer buffer = getByteBufferPool().acquire(totalLength, true);
BufferUtil.clearToFill(buffer);
generateControlFrameHeader(windowUpdate, frameBodyLength, buffer);
buffer.putInt(windowUpdate.getStreamId() & 0x7F_FF_FF_FF);

View File

@ -20,7 +20,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.SPDYException;
import org.eclipse.jetty.spdy.api.Session;
@ -28,6 +29,7 @@ import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StringDataInfo;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.generator.Generator;
import org.eclipse.jetty.util.Callback;
import org.junit.Assert;
import org.junit.Test;
@ -61,7 +63,7 @@ public class AsyncTimeoutTest
};
final CountDownLatch failedLatch = new CountDownLatch(1);
session.syn(new SynInfo(true), null, timeout, unit, new Handler<Stream>()
session.syn(new SynInfo(true), null, timeout, unit, new Callback<Stream>()
{
@Override
public void completed(Stream stream)
@ -91,14 +93,14 @@ public class AsyncTimeoutTest
Session session = new StandardSession(SPDY.V2, bufferPool, threadPool, scheduler, new TestController(), null, 1, null, generator, new FlowControlStrategy.None())
{
@Override
protected void write(ByteBuffer buffer, Handler<FrameBytes> handler, FrameBytes frameBytes)
protected void write(ByteBuffer buffer, Callback<FrameBytes> callback, FrameBytes frameBytes)
{
try
{
// Wait if we're writing the data frame (control frame's first byte is 0x80)
if (buffer.get(0) == 0)
unit.sleep(2 * timeout);
super.write(buffer, handler, frameBytes);
super.write(buffer, callback, frameBytes);
}
catch (InterruptedException x)
{
@ -109,13 +111,8 @@ public class AsyncTimeoutTest
Stream stream = session.syn(new SynInfo(false), null).get(5, TimeUnit.SECONDS);
final CountDownLatch failedLatch = new CountDownLatch(1);
stream.data(new StringDataInfo("data", true), timeout, unit, new Handler<Void>()
stream.data(new StringDataInfo("data", true), timeout, unit, new Callback.Empty<Void>()
{
@Override
public void completed(Void context)
{
}
@Override
public void failed(Void context, Throwable x)
{
@ -129,9 +126,9 @@ public class AsyncTimeoutTest
private static class TestController implements Controller<StandardSession.FrameBytes>
{
@Override
public int write(ByteBuffer buffer, Handler<StandardSession.FrameBytes> handler, StandardSession.FrameBytes context)
public int write(ByteBuffer buffer, Callback<StandardSession.FrameBytes> callback, StandardSession.FrameBytes context)
{
handler.completed(context);
callback.completed(context);
return buffer.remaining();
}

View File

@ -23,10 +23,11 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardSession.FrameBytes;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
@ -41,6 +42,7 @@ import org.eclipse.jetty.spdy.frames.DataFrame;
import org.eclipse.jetty.spdy.frames.SynReplyFrame;
import org.eclipse.jetty.spdy.frames.SynStreamFrame;
import org.eclipse.jetty.spdy.generator.Generator;
import org.eclipse.jetty.util.Callback;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@ -86,19 +88,19 @@ public class StandardSessionTest
@SuppressWarnings("unchecked")
private void setControllerWriteExpectationToFail(final boolean fail)
{
when(controller.write(any(ByteBuffer.class),any(Handler.class),any(StandardSession.FrameBytes.class))).thenAnswer(new Answer<Integer>()
when(controller.write(any(ByteBuffer.class),any(Callback.class),any(StandardSession.FrameBytes.class))).thenAnswer(new Answer<Integer>()
{
public Integer answer(InvocationOnMock invocation)
{
Object[] args = invocation.getArguments();
Handler<StandardSession.FrameBytes> handler = (Handler<FrameBytes>)args[1];
Callback<StandardSession.FrameBytes> callback = (Callback<FrameBytes>)args[1];
FrameBytes context = (FrameBytes)args[2];
if (fail)
handler.failed(context,new ClosedChannelException());
callback.failed(context,new ClosedChannelException());
else
handler.completed(context);
callback.completed(context);
return 0;
}
});
@ -205,7 +207,7 @@ public class StandardSessionTest
{
final CountDownLatch failedLatch = new CountDownLatch(1);
SynInfo synInfo = new SynInfo(headers,false,stream.getPriority());
stream.syn(synInfo,5,TimeUnit.SECONDS,new Handler.Adapter<Stream>()
stream.syn(synInfo,5,TimeUnit.SECONDS,new Callback.Empty<Stream>()
{
@Override
public void failed(Stream stream, Throwable x)
@ -404,22 +406,22 @@ public class StandardSessionTest
SynStreamFrame synStreamFrame = new SynStreamFrame(SPDY.V2, SynInfo.FLAG_CLOSE, 1, 0, (byte)0, (short)0, null);
IStream stream = new StandardStream(synStreamFrame.getStreamId(), synStreamFrame.getPriority(), session, null);
stream.updateWindowSize(8192);
Handler.Adapter<Void> handler = new Handler.Adapter<Void>()
Callback.Empty<Void> callback = new Callback.Empty()
{
@Override
public void failed(Void context, Throwable x)
public void failed(Object context, Throwable x)
{
failedCalledLatch.countDown();
}
};
// first data frame should fail on controller.write()
stream.data(new StringDataInfo("data", false), 5, TimeUnit.SECONDS, handler);
stream.data(new StringDataInfo("data", false), 5, TimeUnit.SECONDS, callback);
// second data frame should fail without controller.writer() as the connection is expected to be broken after first controller.write() call failed.
stream.data(new StringDataInfo("data", false), 5, TimeUnit.SECONDS, handler);
stream.data(new StringDataInfo("data", false), 5, TimeUnit.SECONDS, callback);
verify(controller, times(1)).write(any(ByteBuffer.class), any(Handler.class), any(FrameBytes.class));
assertThat("Handler.failed has been called twice", failedCalledLatch.await(5, TimeUnit.SECONDS), is(true));
verify(controller, times(1)).write(any(ByteBuffer.class), any(Callback.class), any(FrameBytes.class));
assertThat("Callback.failed has been called twice", failedCalledLatch.await(5, TimeUnit.SECONDS), is(true));
}
private IStream createStream() throws InterruptedException, ExecutionException, TimeoutException

View File

@ -21,13 +21,13 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.StringDataInfo;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.frames.SynStreamFrame;
import org.eclipse.jetty.util.Callback;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatcher;
@ -67,7 +67,7 @@ public class StandardStreamTest
SynInfo synInfo = new SynInfo(false);
when(session.getStreams()).thenReturn(streams);
stream.syn(synInfo);
verify(session).syn(argThat(new PushSynInfoMatcher(stream.getId(), synInfo)), any(StreamFrameListener.class), anyLong(), any(TimeUnit.class), any(Handler.class));
verify(session).syn(argThat(new PushSynInfoMatcher(stream.getId(), synInfo)), any(StreamFrameListener.class), anyLong(), any(TimeUnit.class), any(Callback.class));
}
private class PushSynInfoMatcher extends ArgumentMatcher<PushSynInfo>
@ -107,7 +107,7 @@ public class StandardStreamTest
stream.updateCloseState(true, false);
assertThat("stream expected to be closed", stream.isClosed(), is(true));
final CountDownLatch failedLatch = new CountDownLatch(1);
stream.syn(new SynInfo(false), 1, TimeUnit.SECONDS, new Handler.Adapter<Stream>()
stream.syn(new SynInfo(false), 1, TimeUnit.SECONDS, new Callback.Empty<Stream>()
{
@Override
public void failed(Stream stream, Throwable x)
@ -128,6 +128,6 @@ public class StandardStreamTest
stream.updateCloseState(synStreamFrame.isClose(), true);
assertThat("stream is half closed", stream.isHalfClosed(), is(true));
stream.data(new StringDataInfo("data on half closed stream", true));
verify(session, never()).data(any(IStream.class), any(DataInfo.class), anyInt(), any(TimeUnit.class), any(Handler.class), any(void.class));
verify(session, never()).data(any(IStream.class), any(DataInfo.class), anyInt(), any(TimeUnit.class), any(Callback.class), any(void.class));
}
}

View File

@ -17,6 +17,7 @@ import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.spdy.StandardSession;
import org.eclipse.jetty.util.Callback;
import org.junit.Ignore;
import org.junit.Test;
@ -80,7 +81,7 @@ public class ClientUsageTest
// Then issue another similar request
stream.getSession().syn(new SynInfo(true), this);
}
}, 0, TimeUnit.MILLISECONDS, new Handler.Adapter<Stream>()
}, 0, TimeUnit.MILLISECONDS, new Callback.Empty<Stream>()
{
@Override
public void completed(Stream stream)
@ -135,7 +136,7 @@ public class ClientUsageTest
}
}
}, 0, TimeUnit.MILLISECONDS, new Handler.Adapter<Stream>()
}, 0, TimeUnit.MILLISECONDS, new Callback.Empty<Stream>()
{
@Override
public void completed(Stream stream)

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.spdy.api;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.util.Callback;
import org.junit.Ignore;
import org.junit.Test;
@ -66,7 +67,7 @@ public class ServerUsageTest
//
// However, the API may allow to initiate the stream
session.syn(new SynInfo(false), null, 0, TimeUnit.MILLISECONDS, new Handler.Adapter<Stream>()
session.syn(new SynInfo(false), null, 0, TimeUnit.MILLISECONDS, new Callback.Empty<Stream>()
{
@Override
public void completed(Stream stream)
@ -96,7 +97,7 @@ public class ServerUsageTest
Session session = stream.getSession();
// Since it's unidirectional, no need to pass the listener
session.syn(new SynInfo(new Headers(), false, (byte)0), null, 0, TimeUnit.MILLISECONDS, new Handler.Adapter<Stream>()
session.syn(new SynInfo(new Headers(), false, (byte)0), null, 0, TimeUnit.MILLISECONDS, new Callback.Empty<Stream>()
{
@Override
public void completed(Stream pushStream)

View File

@ -18,7 +18,7 @@ import java.nio.ByteBuffer;
import java.security.KeyStore;
import java.security.cert.Certificate;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.generator.Generator;

View File

@ -15,7 +15,7 @@ package org.eclipse.jetty.spdy.frames;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.StringDataInfo;

View File

@ -15,7 +15,7 @@ package org.eclipse.jetty.spdy.frames;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.generator.Generator;

View File

@ -15,7 +15,7 @@ package org.eclipse.jetty.spdy.frames;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.HeadersInfo;

View File

@ -15,7 +15,7 @@ package org.eclipse.jetty.spdy.frames;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.generator.Generator;
import org.eclipse.jetty.spdy.parser.Parser;

View File

@ -15,7 +15,7 @@ package org.eclipse.jetty.spdy.frames;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.generator.Generator;

View File

@ -13,15 +13,9 @@
package org.eclipse.jetty.spdy.frames;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.StreamStatus;
@ -30,6 +24,12 @@ import org.eclipse.jetty.spdy.parser.Parser;
import org.junit.Assert;
import org.junit.Test;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
public class RstStreamGenerateParseTest
{
@Test

View File

@ -15,7 +15,7 @@ package org.eclipse.jetty.spdy.frames;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Settings;

View File

@ -15,7 +15,7 @@ package org.eclipse.jetty.spdy.frames;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.ReplyInfo;

View File

@ -15,7 +15,7 @@ package org.eclipse.jetty.spdy.frames;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.SPDY;

View File

@ -15,7 +15,7 @@ package org.eclipse.jetty.spdy.frames;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.generator.Generator;

View File

@ -17,8 +17,8 @@ import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.spdy.SessionException;
import org.eclipse.jetty.spdy.StandardByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.StreamException;
import org.eclipse.jetty.spdy.api.Headers;

View File

@ -3,7 +3,7 @@
<parent>
<groupId>org.eclipse.jetty.spdy</groupId>
<artifactId>spdy-parent</artifactId>
<version>8.1.6-SNAPSHOT</version>
<version>9.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spdy-jetty-http-webapp</artifactId>

View File

@ -3,7 +3,7 @@
<parent>
<groupId>org.eclipse.jetty.spdy</groupId>
<artifactId>spdy-parent</artifactId>
<version>8.1.6-SNAPSHOT</version>
<version>9.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spdy-jetty-http</artifactId>

View File

@ -3,8 +3,9 @@
<parent>
<groupId>org.eclipse.jetty.spdy</groupId>
<artifactId>spdy-parent</artifactId>
<version>8.1.6-SNAPSHOT</version>
<version>9.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spdy-jetty</artifactId>
<name>Jetty :: SPDY :: Jetty Binding</name>

View File

@ -16,8 +16,8 @@ package org.eclipse.jetty.spdy;
import java.nio.channels.SocketChannel;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.nio.AsyncConnection;
public interface AsyncConnectionFactory
{

View File

@ -1,63 +1,43 @@
//========================================================================
//Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
//------------------------------------------------------------------------
//All rights reserved. This program and the accompanying materials
//are made available under the terms of the Eclipse Public License v1.0
//and Apache License v2.0 which accompanies this distribution.
//The Eclipse Public License is available at
//http://www.eclipse.org/legal/epl-v10.html
//The Apache License v2.0 is available at
//http://www.opensource.org/licenses/apache2.0.php
//You may elect to redistribute this code under either of these licenses.
//========================================================================
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy;
import java.io.IOException;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.AbstractAsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.nio.AsyncConnection;
public class EmptyAsyncConnection extends AbstractConnection implements AsyncConnection
public class EmptyAsyncConnection extends AbstractAsyncConnection
{
public EmptyAsyncConnection(AsyncEndPoint endPoint)
{
super(endPoint);
}
public Connection handle() throws IOException
{
return this;
super(endPoint, new Executor()
{
@Override
public void execute(Runnable command)
{
command.run();
}
});
}
@Override
public AsyncEndPoint getEndPoint()
{
return (AsyncEndPoint)super.getEndPoint();
}
@Override
public boolean isIdle()
{
return false;
}
@Override
public boolean isSuspended()
{
return false;
}
@Override
public void onClose()
public void onFillable()
{
}
@Override
public void onInputShutdown() throws IOException
{
}
}

View File

@ -1,103 +1,59 @@
//========================================================================
//Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
//------------------------------------------------------------------------
//All rights reserved. This program and the accompanying materials
//are made available under the terms of the Eclipse Public License v1.0
//and Apache License v2.0 which accompanies this distribution.
//The Eclipse Public License is available at
//http://www.eclipse.org/legal/epl-v10.html
//The Apache License v2.0 is available at
//http://www.opensource.org/licenses/apache2.0.php
//You may elect to redistribute this code under either of these licenses.
//========================================================================
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ReadPendingException;
import java.nio.channels.WritePendingException;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.util.thread.Timeout;
import org.eclipse.jetty.util.Callback;
public class EmptyAsyncEndPoint implements AsyncEndPoint
{
private boolean checkForIdle;
private Connection connection;
private AsyncConnection connection;
private boolean oshut;
private boolean ishut;
private boolean closed;
private int maxIdleTime;
private long maxIdleTime;
@Override
public void dispatch()
{
}
@Override
public void asyncDispatch()
public long getCreatedTimeStamp()
{
return 0;
}
@Override
public void scheduleWrite()
{
}
@Override
public void onIdleExpired(long idleForMs)
{
}
@Override
public void setCheckForIdle(boolean check)
{
this.checkForIdle = check;
}
@Override
public boolean isCheckForIdle()
{
return checkForIdle;
}
@Override
public boolean isWritable()
{
return false;
}
@Override
public boolean hasProgressed()
{
return false;
}
@Override
public void scheduleTimeout(Timeout.Task task, long timeoutMs)
{
}
@Override
public void cancelTimeout(Timeout.Task task)
{
}
@Override
public Connection getConnection()
public AsyncConnection getAsyncConnection()
{
return connection;
}
@Override
public void setConnection(Connection connection)
public void setAsyncConnection(AsyncConnection connection)
{
this.connection = connection;
}
@Override
public void shutdownOutput() throws IOException
public void shutdownOutput()
{
oshut = true;
}
@ -108,96 +64,42 @@ public class EmptyAsyncEndPoint implements AsyncEndPoint
return oshut;
}
@Override
public void shutdownInput() throws IOException
{
ishut = true;
}
@Override
public boolean isInputShutdown()
{
return ishut;
return false;
}
@Override
public void close() throws IOException
public void close()
{
closed = true;
}
@Override
public int fill(Buffer buffer) throws IOException
public int fill(ByteBuffer buffer) throws IOException
{
return 0;
}
@Override
public int flush(Buffer buffer) throws IOException
public int flush(ByteBuffer... buffer) throws IOException
{
return 0;
}
@Override
public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
{
return 0;
}
@Override
public String getLocalAddr()
public InetSocketAddress getLocalAddress()
{
return null;
}
@Override
public String getLocalHost()
public InetSocketAddress getRemoteAddress()
{
return null;
}
@Override
public int getLocalPort()
{
return -1;
}
@Override
public String getRemoteAddr()
{
return null;
}
@Override
public String getRemoteHost()
{
return null;
}
@Override
public int getRemotePort()
{
return -1;
}
@Override
public boolean isBlocking()
{
return false;
}
@Override
public boolean blockReadable(long millisecs) throws IOException
{
return false;
}
@Override
public boolean blockWritable(long millisecs) throws IOException
{
return false;
}
@Override
public boolean isOpen()
{
@ -211,19 +113,34 @@ public class EmptyAsyncEndPoint implements AsyncEndPoint
}
@Override
public void flush() throws IOException
{
}
@Override
public int getMaxIdleTime()
public long getIdleTimeout()
{
return maxIdleTime;
}
@Override
public void setMaxIdleTime(int timeMs) throws IOException
public void setIdleTimeout(long timeMs)
{
this.maxIdleTime = timeMs;
}
@Override
public void onOpen()
{
}
@Override
public void onClose()
{
}
@Override
public <C> void fillInterested(C context, Callback<C> callback) throws ReadPendingException
{
}
@Override
public <C> void write(C context, Callback<C> callback, ByteBuffer... buffers) throws WritePendingException
{
}
}

View File

@ -1,228 +1,131 @@
//========================================================================
//Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
//------------------------------------------------------------------------
//All rights reserved. This program and the accompanying materials
//are made available under the terms of the Eclipse Public License v1.0
//and Apache License v2.0 which accompanies this distribution.
//The Eclipse Public License is available at
//http://www.eclipse.org/legal/epl-v10.html
//The Apache License v2.0 is available at
//http://www.opensource.org/licenses/apache2.0.php
//You may elect to redistribute this code under either of these licenses.
//========================================================================
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.AbstractAsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.io.nio.DirectNIOBuffer;
import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
import org.eclipse.jetty.io.nio.NIOBuffer;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.parser.Parser;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class SPDYAsyncConnection extends AbstractConnection implements AsyncConnection, Controller<StandardSession.FrameBytes>, IdleListener
public class SPDYAsyncConnection extends AbstractAsyncConnection implements Controller<StandardSession.FrameBytes>, IdleListener
{
private static final Logger logger = Log.getLogger(SPDYAsyncConnection.class);
private final ByteBufferPool bufferPool;
private final Parser parser;
private volatile Session session;
private ByteBuffer writeBuffer;
private Handler<StandardSession.FrameBytes> writeHandler;
private StandardSession.FrameBytes writeContext;
private volatile boolean writePending;
private volatile boolean idle = false;
public SPDYAsyncConnection(AsyncEndPoint endPoint, ByteBufferPool bufferPool, Parser parser)
public SPDYAsyncConnection(AsyncEndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Executor executor)
{
super(endPoint);
super(endPoint, executor);
this.bufferPool = bufferPool;
this.parser = parser;
onIdle(true);
}
@Override
public Connection handle() throws IOException
public void onFillable()
{
ByteBuffer buffer = bufferPool.acquire(8192, true); //TODO: 8k window?
boolean readMore = read(buffer) == 0;
bufferPool.release(buffer);
if (readMore)
fillInterested();
}
protected int read(ByteBuffer buffer)
{
AsyncEndPoint endPoint = getEndPoint();
boolean progress = true;
while (endPoint.isOpen() && progress)
while (true)
{
int filled = fill();
progress = filled > 0;
int flushed = flush();
progress |= flushed > 0;
endPoint.flush();
progress |= endPoint.hasProgressed();
if (!progress && filled < 0)
int filled = fill(endPoint, buffer);
if (filled == 0)
{
return 0;
}
else if (filled < 0)
{
onInputShutdown();
close(false);
return -1;
}
else
{
parser.parse(buffer);
}
}
return this;
}
public int fill() throws IOException
private int fill(AsyncEndPoint endPoint, ByteBuffer buffer)
{
ByteBuffer buffer = bufferPool.acquire(8192, true);
NIOBuffer jettyBuffer = new DirectNIOBuffer(buffer, false);
jettyBuffer.setPutIndex(jettyBuffer.getIndex());
AsyncEndPoint endPoint = getEndPoint();
int filled = endPoint.fill(jettyBuffer);
logger.debug("Filled {} from {}", filled, endPoint);
if (filled <= 0)
return filled;
buffer.limit(jettyBuffer.putIndex());
buffer.position(jettyBuffer.getIndex());
parser.parse(buffer);
bufferPool.release(buffer);
return filled;
}
public int flush()
{
int result = 0;
// Volatile read to ensure visibility of buffer and handler
if (writePending)
result = write(writeBuffer, writeHandler, writeContext);
logger.debug("Flushed {} to {}", result, getEndPoint());
return result;
try
{
return endPoint.fill(buffer);
}
catch (IOException x)
{
endPoint.close();
throw new RuntimeIOException(x);
}
}
@Override
public int write(ByteBuffer buffer, Handler<StandardSession.FrameBytes> handler, StandardSession.FrameBytes context)
public int write(ByteBuffer buffer, final Callback<StandardSession.FrameBytes> callback, StandardSession.FrameBytes context)
{
int remaining = buffer.remaining();
Buffer jettyBuffer = buffer.isDirect() ? new DirectNIOBuffer(buffer, false) : new IndirectNIOBuffer(buffer, false);
AsyncEndPoint endPoint = getEndPoint();
try
{
int written = endPoint.flush(jettyBuffer);
logger.debug("Written {} bytes, {} remaining", written, jettyBuffer.length());
}
catch (Exception x)
{
close(false);
handler.failed(context, x);
return -1;
}
finally
{
buffer.limit(jettyBuffer.putIndex());
buffer.position(jettyBuffer.getIndex());
}
if (buffer.hasRemaining())
{
// Save buffer and handler in order to finish the write later in flush()
this.writeBuffer = buffer;
this.writeHandler = handler;
this.writeContext = context;
// Volatile write to ensure visibility of write fields
writePending = true;
endPoint.scheduleWrite();
}
else
{
if (writePending)
{
this.writeBuffer = null;
this.writeHandler = null;
this.writeContext = null;
// Volatile write to ensure visibility of write fields
writePending = false;
}
handler.completed(context);
}
return remaining - buffer.remaining();
endPoint.write(context, callback, buffer);
return -1; //TODO: void or have endPoint.write return int
}
@Override
public void close(boolean onlyOutput)
{
try
AsyncEndPoint endPoint = getEndPoint();
// We need to gently close first, to allow
// SSL close alerts to be sent by Jetty
logger.debug("Shutting down output {}", endPoint);
endPoint.shutdownOutput();
if (!onlyOutput)
{
AsyncEndPoint endPoint = getEndPoint();
try
{
// We need to gently close first, to allow
// SSL close alerts to be sent by Jetty
logger.debug("Shutting down output {}", endPoint);
endPoint.shutdownOutput();
if (!onlyOutput)
{
logger.debug("Closing {}", endPoint);
endPoint.close();
}
}
catch (IOException x)
{
endPoint.close();
}
}
catch (IOException x)
{
logger.ignore(x);
logger.debug("Closing {}", endPoint);
endPoint.close();
}
}
@Override
public void onIdle(boolean idle)
{
getEndPoint().setCheckForIdle(idle);
this.idle = idle;
}
@Override
public AsyncEndPoint getEndPoint()
protected boolean onReadTimeout()
{
return (AsyncEndPoint)super.getEndPoint();
}
@Override
public boolean isIdle()
{
return false;
}
@Override
public boolean isSuspended()
{
return false;
}
@Override
public void onClose()
{
}
@Override
public void onInputShutdown() throws IOException
{
}
@Override
public void onIdleExpired(long idleForMs)
{
logger.debug("Idle timeout expired for {}", getEndPoint());
session.goAway();
if(idle)
session.goAway();
return idle;
}
protected Session getSession()
@ -234,9 +137,4 @@ public class SPDYAsyncConnection extends AbstractConnection implements AsyncConn
{
this.session = session;
}
public String toString()
{
return String.format("%s@%x{endp=%s@%x}",getClass().getSimpleName(),hashCode(),getEndPoint().getClass().getSimpleName(),getEndPoint().hashCode());
}
}

View File

@ -29,18 +29,17 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.io.nio.SelectorManager;
import org.eclipse.jetty.io.nio.SslConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.npn.NextProtoNego;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
@ -56,7 +55,7 @@ public class SPDYClient
private final short version;
private final Factory factory;
private SocketAddress bindAddress;
private long maxIdleTime = -1;
private long idleTimeout = -1;
private volatile int initialWindowSize = 65536;
protected SPDYClient(short version, Factory factory)
@ -97,19 +96,19 @@ public class SPDYClient
SessionPromise result = new SessionPromise(channel, this, listener);
channel.connect(address);
factory.selector.register(channel, result);
factory.selector.connect(channel, result);
return result;
}
public long getMaxIdleTime()
public long getIdleTimeout()
{
return maxIdleTime;
return idleTimeout;
}
public void setMaxIdleTime(long maxIdleTime)
public void setIdleTimeout(long idleTimeout)
{
this.maxIdleTime = maxIdleTime;
this.idleTimeout = idleTimeout;
}
public int getInitialWindowSize()
@ -190,24 +189,43 @@ public class SPDYClient
private final Executor threadPool;
private final SslContextFactory sslContextFactory;
private final SelectorManager selector;
private final long defaultTimeout = 30000;
private final long idleTimeout;
//TODO: Replace with Builder?!
public Factory()
{
this(null, null);
this(null, null, 30000);
}
public Factory(SslContextFactory sslContextFactory)
{
this(null, sslContextFactory);
this(null, sslContextFactory, 30000);
}
public Factory(SslContextFactory sslContextFactory, long idleTimeout)
{
this(null, sslContextFactory, idleTimeout);
}
public Factory(Executor threadPool)
{
this(threadPool, null);
this(threadPool, null, 30000);
}
public Factory(Executor threadPool, long idleTimeout)
{
this(threadPool, null, idleTimeout);
}
public Factory(Executor threadPool, SslContextFactory sslContextFactory)
{
this(threadPool, sslContextFactory, 30000);
}
public Factory(Executor threadPool, SslContextFactory sslContextFactory, long idleTimeout)
{
this.idleTimeout = idleTimeout;
if (threadPool == null)
threadPool = new QueuedThreadPool();
this.threadPool = threadPool;
@ -275,50 +293,24 @@ public class SPDYClient
private class ClientSelectorManager extends SelectorManager
{
@Override
public boolean dispatch(Runnable task)
{
try
{
threadPool.execute(task);
return true;
}
catch (RejectedExecutionException x)
{
return false;
}
}
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
protected AsyncEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
{
SessionPromise attachment = (SessionPromise)key.attachment();
long maxIdleTime = attachment.client.getMaxIdleTime();
if (maxIdleTime < 0)
maxIdleTime = getMaxIdleTime();
SelectChannelEndPoint result = new SelectChannelEndPoint(channel, selectSet, key, (int)maxIdleTime);
AsyncConnection connection = newConnection(channel, result, attachment);
result.setConnection(connection);
long clientIdleTimeout = attachment.client.getIdleTimeout();
if (clientIdleTimeout < 0)
clientIdleTimeout = idleTimeout;
AsyncEndPoint result = new SelectChannelEndPoint(channel, selectSet, key, scheduler, clientIdleTimeout);
return result;
}
@Override
protected void endPointOpened(SelectChannelEndPoint endpoint)
protected void execute(Runnable task)
{
}
@Override
protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
{
}
@Override
protected void endPointClosed(SelectChannelEndPoint endpoint)
{
endpoint.getConnection().onClose();
threadPool.execute(task);
}
@Override
@ -332,7 +324,7 @@ public class SPDYClient
if (sslContextFactory != null)
{
final SSLEngine engine = client.newSSLEngine(sslContextFactory, channel);
SslConnection sslConnection = new SslConnection(engine, endPoint)
SslConnection sslConnection = new SslConnection(bufferPool, threadPool, endPoint, engine)
{
@Override
public void onClose()
@ -341,7 +333,7 @@ public class SPDYClient
super.onClose();
}
};
endPoint.setConnection(sslConnection);
endPoint.setAsyncConnection(sslConnection);
final AsyncEndPoint sslEndPoint = sslConnection.getSslEndPoint();
NextProtoNego.put(engine, new NextProtoNego.ClientProvider()
{
@ -357,7 +349,7 @@ public class SPDYClient
// Server does not support NPN, but this is a SPDY client, so hardcode SPDY
ClientSPDYAsyncConnectionFactory connectionFactory = new ClientSPDYAsyncConnectionFactory();
AsyncConnection connection = connectionFactory.newAsyncConnection(channel, sslEndPoint, attachment);
sslEndPoint.setConnection(connection);
sslEndPoint.setAsyncConnection(connection);
}
@Override
@ -369,13 +361,13 @@ public class SPDYClient
AsyncConnectionFactory connectionFactory = client.getAsyncConnectionFactory(protocol);
AsyncConnection connection = connectionFactory.newAsyncConnection(channel, sslEndPoint, attachment);
sslEndPoint.setConnection(connection);
sslEndPoint.setAsyncConnection(connection);
return protocol;
}
});
AsyncConnection connection = new EmptyAsyncConnection(sslEndPoint);
sslEndPoint.setConnection(connection);
sslEndPoint.setAsyncConnection(connection);
startHandshake(engine);
@ -385,7 +377,7 @@ public class SPDYClient
{
AsyncConnectionFactory connectionFactory = new ClientSPDYAsyncConnectionFactory();
AsyncConnection connection = connectionFactory.newAsyncConnection(channel, endPoint, attachment);
endPoint.setConnection(connection);
endPoint.setAsyncConnection(connection);
return connection;
}
}
@ -453,7 +445,7 @@ public class SPDYClient
Generator generator = new Generator(factory.bufferPool, compressionFactory.newCompressor());
SPDYAsyncConnection connection = new ClientSPDYAsyncConnection(endPoint, factory.bufferPool, parser, factory);
endPoint.setConnection(connection);
endPoint.setAsyncConnection(connection);
FlowControlStrategy flowControlStrategy = client.newFlowControlStrategy();
@ -474,7 +466,7 @@ public class SPDYClient
public ClientSPDYAsyncConnection(AsyncEndPoint endPoint, ByteBufferPool bufferPool, Parser parser, Factory factory)
{
super(endPoint, bufferPool, parser);
super(endPoint, bufferPool, parser, factory.threadPool);
this.factory = factory;
}

View File

@ -32,9 +32,11 @@ import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.io.nio.SslConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.npn.NextProtoNego;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.spdy.api.SPDY;
@ -44,7 +46,6 @@ import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.ThreadPool;
public class SPDYServerConnector extends SelectChannelConnector
{
@ -188,7 +189,7 @@ public class SPDYServerConnector extends SelectChannelConnector
if (sslContextFactory != null)
{
final SSLEngine engine = newSSLEngine(sslContextFactory, channel);
SslConnection sslConnection = new SslConnection(engine, endPoint)
SslConnection sslConnection = new SslConnection(bufferPool, findExecutor(), endPoint, engine)
{
@Override
public void onClose()
@ -197,7 +198,7 @@ public class SPDYServerConnector extends SelectChannelConnector
super.onClose();
}
};
endPoint.setConnection(sslConnection);
endPoint.setAsyncConnection(sslConnection);
final AsyncEndPoint sslEndPoint = sslConnection.getSslEndPoint();
NextProtoNego.put(engine, new NextProtoNego.ServerProvider()
{
@ -206,7 +207,7 @@ public class SPDYServerConnector extends SelectChannelConnector
{
AsyncConnectionFactory connectionFactory = getDefaultAsyncConnectionFactory();
AsyncConnection connection = connectionFactory.newAsyncConnection(channel, sslEndPoint, SPDYServerConnector.this);
sslEndPoint.setConnection(connection);
sslEndPoint.setAsyncConnection(connection);
}
@Override
@ -220,12 +221,12 @@ public class SPDYServerConnector extends SelectChannelConnector
{
AsyncConnectionFactory connectionFactory = getAsyncConnectionFactory(protocol);
AsyncConnection connection = connectionFactory.newAsyncConnection(channel, sslEndPoint, SPDYServerConnector.this);
sslEndPoint.setConnection(connection);
sslEndPoint.setAsyncConnection(connection);
}
});
AsyncConnection connection = new EmptyAsyncConnection(sslEndPoint);
sslEndPoint.setConnection(connection);
sslEndPoint.setAsyncConnection(connection);
startHandshake(engine);
@ -235,7 +236,7 @@ public class SPDYServerConnector extends SelectChannelConnector
{
AsyncConnectionFactory connectionFactory = getDefaultAsyncConnectionFactory();
AsyncConnection connection = connectionFactory.newAsyncConnection(channel, endPoint, this);
endPoint.setConnection(connection);
endPoint.setAsyncConnection(connection);
return connection;
}
}
@ -306,10 +307,10 @@ public class SPDYServerConnector extends SelectChannelConnector
@Override
public void execute(Runnable command)
{
ThreadPool threadPool = getThreadPool();
Executor threadPool = findExecutor();
if (threadPool == null)
throw new RejectedExecutionException();
threadPool.dispatch(command);
threadPool.execute(command);
}
}

View File

@ -14,14 +14,13 @@
package org.eclipse.jetty.spdy;
import java.io.IOException;
import java.nio.channels.SocketChannel;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.spdy.generator.Generator;
import org.eclipse.jetty.spdy.parser.Parser;
@ -64,12 +63,12 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory
ServerSessionFrameListener listener = provideServerSessionFrameListener(endPoint, attachment);
SPDYAsyncConnection connection = new ServerSPDYAsyncConnection(endPoint, bufferPool, parser, listener, connector);
endPoint.setConnection(connection);
endPoint.setAsyncConnection(connection);
FlowControlStrategy flowControlStrategy = connector.newFlowControlStrategy(version);
StandardSession session = new StandardSession(version, bufferPool, threadPool, scheduler, connection, connection, 2, listener, generator, flowControlStrategy);
session.setAttribute("org.eclipse.jetty.spdy.remoteAddress", endPoint.getRemoteAddr());
session.setAttribute("org.eclipse.jetty.spdy.remoteAddress", endPoint.getRemoteAddress());
session.setWindowSize(connector.getInitialWindowSize());
parser.addListener(session);
connection.setSession(session);
@ -92,13 +91,13 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory
private ServerSPDYAsyncConnection(AsyncEndPoint endPoint, ByteBufferPool bufferPool, Parser parser, ServerSessionFrameListener listener, SPDYServerConnector connector)
{
super(endPoint, bufferPool, parser);
super(endPoint, bufferPool, parser, connector.findExecutor());
this.listener = listener;
this.connector = connector;
}
@Override
public Connection handle() throws IOException
public void onOpen()
{
if (!connected)
{
@ -107,7 +106,7 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory
listener.onConnect(getSession());
connected = true;
}
return super.handle();
super.onOpen();
}
@Override

View File

@ -21,6 +21,7 @@ import java.nio.channels.SocketChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
@ -140,6 +141,7 @@ public class ClosedStreamTest extends AbstractTest
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
System.out.println("ONREPLY CLIENT CALLED");
replyReceivedLatch.countDown();
}
@ -149,11 +151,11 @@ public class ClosedStreamTest extends AbstractTest
clientReceivedDataLatch.countDown();
}
}).get();
assertThat("reply has been received by client",replyReceivedLatch.await(5,TimeUnit.SECONDS),is(true));
assertThat("reply has been received by client",replyReceivedLatch.await(500,TimeUnit.SECONDS),is(true));
assertThat("stream is half closed from server",stream.isHalfClosed(),is(true));
assertThat("client has not received any data sent after stream was half closed by server",clientReceivedDataLatch.await(1,TimeUnit.SECONDS),
is(false));
assertThat("sending data threw an exception",exceptionWhenSendingData.await(5,TimeUnit.SECONDS),is(true));
assertThat("sending data threw an exception",exceptionWhenSendingData.await(500,TimeUnit.SECONDS),is(true)); //thomas
}
@Test

View File

@ -33,6 +33,9 @@ import org.junit.Test;
public class IdleTimeoutTest extends AbstractTest
{
private final int idleTimeout = 1000;
@Test
public void testServerEnforcingIdleTimeout() throws Exception
{
@ -45,8 +48,7 @@ public class IdleTimeoutTest extends AbstractTest
return null;
}
});
int maxIdleTime = 1000;
connector.setMaxIdleTime(maxIdleTime);
connector.setIdleTimeout(idleTimeout);
final CountDownLatch latch = new CountDownLatch(1);
Session session = startClient(startServer(null), new SessionFrameListener.Adapter()
@ -60,15 +62,14 @@ public class IdleTimeoutTest extends AbstractTest
session.syn(new SynInfo(true), null);
Assert.assertTrue(latch.await(2 * maxIdleTime, TimeUnit.MILLISECONDS));
Assert.assertTrue(latch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
}
@Test
public void testServerEnforcingIdleTimeoutWithUnrespondedStream() throws Exception
{
connector = newSPDYServerConnector(null);
int maxIdleTime = 1000;
connector.setMaxIdleTime(maxIdleTime);
connector.setIdleTimeout(idleTimeout);
final CountDownLatch latch = new CountDownLatch(1);
Session session = startClient(startServer(null), new SessionFrameListener.Adapter()
@ -83,13 +84,12 @@ public class IdleTimeoutTest extends AbstractTest
// The SYN is not replied, and the server should idle timeout
session.syn(new SynInfo(true), null);
Assert.assertTrue(latch.await(2 * maxIdleTime, TimeUnit.MILLISECONDS));
Assert.assertTrue(latch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
}
@Test
public void testServerNotEnforcingIdleTimeoutWithPendingStream() throws Exception
{
final int maxIdleTime = 1000;
connector = newSPDYServerConnector(new ServerSessionFrameListener.Adapter()
{
@Override
@ -97,7 +97,7 @@ public class IdleTimeoutTest extends AbstractTest
{
try
{
Thread.sleep(2 * maxIdleTime);
Thread.sleep(2 * idleTimeout);
stream.reply(new ReplyInfo(true));
return null;
}
@ -108,7 +108,7 @@ public class IdleTimeoutTest extends AbstractTest
}
}
});
connector.setMaxIdleTime(maxIdleTime);
connector.setIdleTimeout(idleTimeout);
final CountDownLatch latch = new CountDownLatch(1);
Session session = startClient(startServer(null), new SessionFrameListener.Adapter()
@ -130,7 +130,7 @@ public class IdleTimeoutTest extends AbstractTest
}
});
Assert.assertTrue(replyLatch.await(3 * maxIdleTime, TimeUnit.MILLISECONDS));
Assert.assertTrue(replyLatch.await(3 * idleTimeout, TimeUnit.MILLISECONDS));
Assert.assertFalse(latch.await(1000, TimeUnit.MILLISECONDS));
}
@ -159,13 +159,12 @@ public class IdleTimeoutTest extends AbstractTest
clientFactory = newSPDYClientFactory(threadPool);
clientFactory.start();
SPDYClient client = clientFactory.newSPDYClient(SPDY.V2);
long maxIdleTime = 1000;
client.setMaxIdleTime(maxIdleTime);
client.setIdleTimeout(idleTimeout);
Session session = client.connect(address, null).get(5, TimeUnit.SECONDS);
session.syn(new SynInfo(true), null);
Assert.assertTrue(latch.await(2 * maxIdleTime, TimeUnit.MILLISECONDS));
Assert.assertTrue(latch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
}
@Test
@ -186,19 +185,17 @@ public class IdleTimeoutTest extends AbstractTest
clientFactory = newSPDYClientFactory(threadPool);
clientFactory.start();
SPDYClient client = clientFactory.newSPDYClient(SPDY.V2);
long maxIdleTime = 1000;
client.setMaxIdleTime(maxIdleTime);
client.setIdleTimeout(idleTimeout);
Session session = client.connect(address, null).get(5, TimeUnit.SECONDS);
session.syn(new SynInfo(true), null);
Assert.assertTrue(latch.await(2 * maxIdleTime, TimeUnit.MILLISECONDS));
Assert.assertTrue(latch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
}
@Test
public void testClientNotEnforcingIdleTimeoutWithPendingStream() throws Exception
{
final long maxIdleTime = 1000;
final CountDownLatch latch = new CountDownLatch(1);
InetSocketAddress address = startServer(new ServerSessionFrameListener.Adapter()
{
@ -221,7 +218,7 @@ public class IdleTimeoutTest extends AbstractTest
clientFactory = newSPDYClientFactory(threadPool);
clientFactory.start();
SPDYClient client = clientFactory.newSPDYClient(SPDY.V2);
client.setMaxIdleTime(maxIdleTime);
client.setIdleTimeout(idleTimeout);
Session session = client.connect(address, null).get(5, TimeUnit.SECONDS);
final CountDownLatch replyLatch = new CountDownLatch(1);
@ -232,7 +229,7 @@ public class IdleTimeoutTest extends AbstractTest
{
try
{
Thread.sleep(2 * maxIdleTime);
Thread.sleep(2 * idleTimeout);
replyLatch.countDown();
}
catch (InterruptedException e)
@ -242,7 +239,7 @@ public class IdleTimeoutTest extends AbstractTest
}
});
Assert.assertFalse(latch.await(2 * maxIdleTime, TimeUnit.MILLISECONDS));
Assert.assertTrue(replyLatch.await(3 * maxIdleTime, TimeUnit.MILLISECONDS));
Assert.assertFalse(latch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
Assert.assertTrue(replyLatch.await(3 * idleTimeout, TimeUnit.MILLISECONDS));
}
}

View File

@ -18,11 +18,11 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.PingInfo;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.util.Callback;
import org.junit.Assert;
import org.junit.Test;
@ -63,7 +63,7 @@ public class PingTest extends AbstractTest
@Override
public void onConnect(Session session)
{
session.ping(0, TimeUnit.MILLISECONDS, new Handler.Adapter<PingInfo>()
session.ping(0, TimeUnit.MILLISECONDS, new Callback.Empty<PingInfo>()
{
@Override
public void completed(PingInfo pingInfo)

View File

@ -13,9 +13,6 @@
package org.eclipse.jetty.spdy;
import static org.junit.Assert.*;
import static org.hamcrest.Matchers.*;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
@ -23,6 +20,7 @@ import java.nio.channels.SocketChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Headers;
@ -43,6 +41,9 @@ import org.eclipse.jetty.spdy.generator.Generator;
import org.junit.Assert;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
public class ProtocolViolationsTest extends AbstractTest
{
@Test

View File

@ -28,10 +28,10 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
@ -54,6 +54,7 @@ import org.eclipse.jetty.spdy.frames.WindowUpdateFrame;
import org.eclipse.jetty.spdy.generator.Generator;
import org.eclipse.jetty.spdy.parser.Parser;
import org.eclipse.jetty.spdy.parser.Parser.Listener;
import org.eclipse.jetty.util.Callback;
import org.junit.Assert;
import org.junit.Test;
@ -237,7 +238,7 @@ public class PushStreamTest extends AbstractTest
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
stream.reply(new ReplyInfo(true));
stream.syn(new SynInfo(false),1,TimeUnit.SECONDS,new Handler.Adapter<Stream>()
stream.syn(new SynInfo(false),1,TimeUnit.SECONDS,new Callback.Empty<Stream>()
{
@Override
public void failed(Stream stream, Throwable x)

View File

@ -18,7 +18,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
@ -28,6 +27,7 @@ import org.eclipse.jetty.spdy.api.StreamStatus;
import org.eclipse.jetty.spdy.api.StringDataInfo;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.util.Callback;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
@ -129,7 +129,7 @@ public class ResetStreamTest extends AbstractTest
});
Stream stream = session.syn(new SynInfo(false),null).get(5,TimeUnit.SECONDS);
stream.data(new StringDataInfo("data",true),5,TimeUnit.SECONDS,new Handler.Adapter<Void>()
stream.data(new StringDataInfo("data",true),5,TimeUnit.SECONDS,new Callback.Empty<Void>()
{
@Override
public void completed(Void context)
@ -179,7 +179,7 @@ public class ResetStreamTest extends AbstractTest
assertThat("syn is received by server", synLatch.await(5,TimeUnit.SECONDS),is(true));
stream.data(new StringDataInfo("data",false),5,TimeUnit.SECONDS,null);
assertThat("stream is reset",rstLatch.await(5,TimeUnit.SECONDS),is(true));
stream.data(new StringDataInfo("2nd dataframe",false),5L,TimeUnit.SECONDS,new Handler.Adapter<Void>()
stream.data(new StringDataInfo("2nd dataframe",false),5L,TimeUnit.SECONDS,new Callback.Empty<Void>()
{
@Override
public void failed(Void context, Throwable x)

View File

@ -29,7 +29,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.Session;
@ -38,6 +37,7 @@ import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.StringDataInfo;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.util.Callback;
import org.junit.Assert;
import org.junit.Test;
@ -167,7 +167,7 @@ public class SynDataReplyDataLoadTest extends AbstractTest
latch.countDown();
}
}
}, 0, TimeUnit.SECONDS, new Handler.Adapter<Stream>()
}, 0, TimeUnit.SECONDS, new Callback.Empty<Stream>()
{
@Override
public void completed(Stream stream)

View File

@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.Session;
@ -34,6 +33,7 @@ import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.StringDataInfo;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.util.Callback;
import org.junit.Assert;
import org.junit.Test;
@ -194,7 +194,7 @@ public class SynReplyTest extends AbstractTest
Assert.assertTrue(stream.isHalfClosed());
stream.reply(new ReplyInfo(false));
stream.data(new StringDataInfo(data1, false), 5, TimeUnit.SECONDS, new Handler.Adapter<Void>()
stream.data(new StringDataInfo(data1, false), 5, TimeUnit.SECONDS, new Callback.Empty<Void>()
{
@Override
public void completed(Void context)
@ -273,7 +273,7 @@ public class SynReplyTest extends AbstractTest
Assert.assertEquals(clientData, data);
clientDataLatch.countDown();
}
}, 0, TimeUnit.MILLISECONDS, new Handler.Adapter<Stream>()
}, 0, TimeUnit.MILLISECONDS, new Callback.Empty<Stream>()
{
@Override
public void completed(Stream stream)

View File

@ -19,6 +19,7 @@ import java.nio.channels.SocketChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Stream;

View File

@ -11,4 +11,6 @@ log4j.appender.CONSOLE.target=System.err
# Level tuning
log4j.logger.org.eclipse.jetty=INFO
#log4j.logger.org.eclipse.jetty.spdy=DEBUG
#log4j.logger.org.eclipse.jetty.io=DEBUG
log4j.logger.org.eclipse.jetty.spdy=DEBUG
# thomas

View File

@ -374,8 +374,8 @@
<!--<module>jetty-client</module>-->
<!--<module>jetty-proxy</module>-->
<!--
<module>jetty-spdy</module>
<!--
<module>jetty-jaspi</module>
<module>jetty-ajp</module>
<module>jetty-jndi</module>