Updated to Jetty 9.1.1-SNAPSHOT.

Updated Flusher to use the new IteratingCallback API,
and HttpChannelOverFCGI to use the new IdleTimeout API.
This commit is contained in:
Simone Bordet 2013-12-16 12:57:19 +01:00
parent 8dddc2a3f6
commit c7511dd2c7
3 changed files with 29 additions and 55 deletions

View File

@ -19,12 +19,9 @@
package org.eclipse.jetty.fcgi.generator; package org.eclipse.jetty.fcgi.generator;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue; import java.util.Queue;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ConcurrentArrayQueue; import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
@ -35,9 +32,8 @@ public class Flusher
private static final Logger LOG = Log.getLogger(Flusher.class); private static final Logger LOG = Log.getLogger(Flusher.class);
private final Queue<Generator.Result> queue = new ConcurrentArrayQueue<>(); private final Queue<Generator.Result> queue = new ConcurrentArrayQueue<>();
private final Callback flushCallback = new FlushCallback(); private final IteratingCallback flushCallback = new FlushCallback();
private final EndPoint endPoint; private final EndPoint endPoint;
private boolean flushing;
public Flusher(EndPoint endPoint) public Flusher(EndPoint endPoint)
{ {
@ -46,15 +42,9 @@ public class Flusher
public void flush(Generator.Result... results) public void flush(Generator.Result... results)
{ {
synchronized (queue) for (Generator.Result result : results)
{ queue.offer(result);
for (Generator.Result result : results) flushCallback.iterate();
queue.offer(result);
if (flushing)
return;
flushing = true;
}
endPoint.write(flushCallback);
} }
public void shutdown() public void shutdown()
@ -67,37 +57,35 @@ public class Flusher
private Generator.Result active; private Generator.Result active;
@Override @Override
protected boolean process() throws Exception protected Action process() throws Exception
{ {
// Look if other writes are needed. // Look if other writes are needed.
Generator.Result result; Generator.Result result = queue.poll();
synchronized (queue) if (result == null)
{ {
if (queue.isEmpty()) // No more writes to do, return.
{ return Action.IDLE;
// No more writes to do, switch to non-flushing
flushing = false;
return false;
}
result = queue.poll();
// Attempt to gather another result.
// Most often there is another result in the
// queue so this is a real optimization because
// it sends both results in just one TCP packet.
Generator.Result other = queue.poll();
if (other != null)
result = result.join(other);
} }
// Attempt to gather another result.
// Most often there is another result in the
// queue so this is a real optimization because
// it sends both results in just one TCP packet.
Generator.Result other = queue.poll();
if (other != null)
result = result.join(other);
active = result; active = result;
ByteBuffer[] buffers = result.getByteBuffers(); ByteBuffer[] buffers = result.getByteBuffers();
endPoint.write(this, buffers); endPoint.write(this, buffers);
return false; return Action.SCHEDULED;
} }
@Override @Override
protected void completed() protected void completed()
{ {
// Nothing to do, we always return false from process(). // We never return Action.SUCCEEDED, so this method is never called.
throw new IllegalStateException();
} }
@Override @Override
@ -116,20 +104,13 @@ public class Flusher
active.failed(x); active.failed(x);
active = null; active = null;
List<Generator.Result> pending = new ArrayList<>(); while (true)
synchronized (queue)
{ {
while (true) Generator.Result result = queue.poll();
{ if (result == null)
Generator.Result result = queue.poll(); break;
if (result != null)
pending.add(result);
else
break;
}
}
for (Generator.Result result : pending)
result.failed(x); result.failed(x);
}
super.failed(x); super.failed(x);
} }

View File

@ -120,7 +120,7 @@ public class HttpChannelOverFCGI extends HttpChannel
public void exchangeTerminated(Result result) public void exchangeTerminated(Result result)
{ {
super.exchangeTerminated(result); super.exchangeTerminated(result);
idle.close(); idle.onClose();
} }
protected void flush(Generator.Result... results) protected void flush(Generator.Result... results)
@ -151,12 +151,5 @@ public class HttpChannelOverFCGI extends HttpChannel
{ {
return connection.getEndPoint().isOpen(); return connection.getEndPoint().isOpen();
} }
// Overridden for visibility
@Override
protected void close()
{
super.close();
}
} }
} }

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>org.eclipse.jetty</groupId> <groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-project</artifactId> <artifactId>jetty-project</artifactId>
<version>9.1.0.v20131115</version> <version>9.1.1-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
@ -16,7 +16,7 @@
<name>Jetty :: FastCGI</name> <name>Jetty :: FastCGI</name>
<properties> <properties>
<jetty-version>9.1.0.v20131115</jetty-version> <jetty-version>9.1.1-SNAPSHOT</jetty-version>
</properties> </properties>
<modules> <modules>