jetty-9 working on ByteArrayEndPoint

This commit is contained in:
Greg Wilkins 2012-05-03 15:17:23 +02:00
parent 6a5fdefa07
commit 60f9d7faff
14 changed files with 366 additions and 350 deletions

View File

@ -16,8 +16,10 @@ package org.eclipse.jetty.io;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -26,9 +28,9 @@ import org.eclipse.jetty.util.BufferUtil;
*/ */
public class ByteArrayEndPoint extends AbstractEndPoint public class ByteArrayEndPoint extends AbstractEndPoint
{ {
protected byte[] _inBytes;
protected ByteBuffer _in; protected ByteBuffer _in;
protected ByteBuffer _out; protected ByteBuffer _out;
protected boolean _oshut;
protected boolean _closed; protected boolean _closed;
protected boolean _growOutput; protected boolean _growOutput;
@ -39,6 +41,8 @@ public class ByteArrayEndPoint extends AbstractEndPoint
public ByteArrayEndPoint() public ByteArrayEndPoint()
{ {
super(null,null); super(null,null);
_in=BufferUtil.EMPTY_BUFFER;
_out=BufferUtil.allocate(1024);
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -48,10 +52,21 @@ public class ByteArrayEndPoint extends AbstractEndPoint
public ByteArrayEndPoint(byte[] input, int outputSize) public ByteArrayEndPoint(byte[] input, int outputSize)
{ {
super(null,null); super(null,null);
_inBytes=input; _in=input==null?null:ByteBuffer.wrap(input);
_in=ByteBuffer.wrap(input); _out=BufferUtil.allocate(outputSize);
_out=ByteBuffer.allocate(outputSize);
} }
/* ------------------------------------------------------------ */
/**
*
*/
public ByteArrayEndPoint(String input, int outputSize)
{
super(null,null);
setInput(input);
_out=BufferUtil.allocate(outputSize);
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
@ -61,30 +76,68 @@ public class ByteArrayEndPoint extends AbstractEndPoint
{ {
return _in; return _in;
} }
/* ------------------------------------------------------------ */
/**
*/
public void setInputEOF()
{
_in = null;
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* @param in The in to set. * @param in The in to set.
*/ */
public void setIn(ByteBuffer in) public void setInput(ByteBuffer in)
{ {
_in = in; _in = in;
} }
/* ------------------------------------------------------------ */
public void setInput(String s)
{
setInput(BufferUtil.toBuffer(s,StringUtil.__UTF8_CHARSET));
}
/* ------------------------------------------------------------ */
public void setInput(String s,Charset charset)
{
setInput(BufferUtil.toBuffer(s,charset));
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* @return Returns the out. * @return Returns the out.
*/ */
public ByteBuffer getOut() public ByteBuffer getOutput()
{ {
return _out; return _out;
} }
/* ------------------------------------------------------------ */
/**
* @return Returns the out.
*/
public String getOutputString()
{
return BufferUtil.toString(_out,StringUtil.__UTF8_CHARSET);
}
/* ------------------------------------------------------------ */
/**
* @return Returns the out.
*/
public String getOutputString(Charset charset)
{
return BufferUtil.toString(_out,charset);
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* @param out The out to set. * @param out The out to set.
*/ */
public void setOut(ByteBuffer out) public void setOutput(ByteBuffer out)
{ {
_out = out; _out = out;
} }
@ -106,7 +159,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
@Override @Override
public boolean isOutputShutdown() public boolean isOutputShutdown()
{ {
return _closed; return _oshut||_closed;
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -116,7 +169,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
@Override @Override
public void shutdownOutput() throws IOException public void shutdownOutput() throws IOException
{ {
close(); _oshut=true;
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -128,7 +181,16 @@ public class ByteArrayEndPoint extends AbstractEndPoint
{ {
_closed=true; _closed=true;
} }
/* ------------------------------------------------------------ */
/**
* @return <code>true</code> if there are bytes remaining to be read from the encoded input
*/
public boolean hasMore()
{
return getOutput().position()>0;
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/* /*
* @see org.eclipse.io.EndPoint#fill(org.eclipse.io.Buffer) * @see org.eclipse.io.EndPoint#fill(org.eclipse.io.Buffer)
@ -138,10 +200,9 @@ public class ByteArrayEndPoint extends AbstractEndPoint
{ {
if (_closed) if (_closed)
throw new IOException("CLOSED"); throw new IOException("CLOSED");
if (_in!=null) if (_in==null)
return BufferUtil.append(_in,buffer); return -1;
return BufferUtil.append(_in,buffer);
return 0;
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -151,34 +212,31 @@ public class ByteArrayEndPoint extends AbstractEndPoint
@Override @Override
public int flush(ByteBuffer... buffers) throws IOException public int flush(ByteBuffer... buffers) throws IOException
{ {
if (_oshut)
throw new IOException("oshut");
if (_closed) if (_closed)
throw new IOException("CLOSED"); throw new IOException("CLOSED");
int len=0; int len=0;
for (ByteBuffer b : buffers) for (ByteBuffer b : buffers)
{ {
if (b.hasRemaining()) if (BufferUtil.hasContent(b))
{ {
if (_growOutput && b.remaining()>_out.remaining()) if (_growOutput && b.remaining()>BufferUtil.space(_out))
{ {
_out.compact(); BufferUtil.compact(_out);
if (b.remaining()>BufferUtil.space(_out))
if (b.remaining()>_out.remaining())
{ {
ByteBuffer n = ByteBuffer.allocate(_out.capacity()+b.remaining()*2); ByteBuffer n = BufferUtil.allocate(_out.capacity()+b.remaining()*2);
n.put(_out); BufferUtil.append(_out,n);
_out=n; _out=n;
} }
} }
int put=b.remaining(); len+=BufferUtil.append(b,_out);
if (put>0) if (BufferUtil.hasContent(b))
{
_out.put(b);
len+=put;
}
else
break; break;
} }
} }
@ -191,9 +249,10 @@ public class ByteArrayEndPoint extends AbstractEndPoint
*/ */
public void reset() public void reset()
{ {
_oshut=false;
_closed=false; _closed=false;
_in.rewind(); _in=null;
_out.clear(); BufferUtil.clear(_out);
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -203,7 +262,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
@Override @Override
public Object getTransport() public Object getTransport()
{ {
return _inBytes; return null;
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */

View File

@ -1,12 +1,21 @@
package org.eclipse.jetty.io; package org.eclipse.jetty.io;
import java.io.IOException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
public class RecycledIOFuture implements IOFuture
/* ------------------------------------------------------------ */
/** Dispatched IOFuture.
* <p>An implementation of IOFuture that can be extended to implement the
* {@link #dispatch(Runnable)} method so that callbacks can be dispatched.
* By default, the callbacks are called by the thread that called {@link #ready()} or
* {@link #fail(Throwable)}
*/
public class DispatchedIOFuture implements IOFuture
{ {
private final Lock _lock; private final Lock _lock;
private final Condition _block; private final Condition _block;
@ -17,21 +26,21 @@ public class RecycledIOFuture implements IOFuture
private Callback _callback; private Callback _callback;
public RecycledIOFuture() public DispatchedIOFuture()
{ {
// System.err.println(this);new Throwable().printStackTrace(); // System.err.println(this);new Throwable().printStackTrace();
_lock = new ReentrantLock(); _lock = new ReentrantLock();
_block = _lock.newCondition(); _block = _lock.newCondition();
} }
public RecycledIOFuture(Lock lock) public DispatchedIOFuture(Lock lock)
{ {
// System.err.println(this);new Throwable().printStackTrace(); // System.err.println(this);new Throwable().printStackTrace();
_lock = lock; _lock = lock;
_block = _lock.newCondition(); _block = _lock.newCondition();
} }
public RecycledIOFuture(boolean ready,Lock lock) public DispatchedIOFuture(boolean ready,Lock lock)
{ {
_ready=ready; _ready=ready;
_complete=ready; _complete=ready;
@ -262,4 +271,11 @@ public class RecycledIOFuture implements IOFuture
_ready, _ready,
_cause); _cause);
} }
public static void rethrow(ExecutionException e) throws IOException
{
if (e.getCause() instanceof IOException)
throw (IOException)e.getCause();
throw new RuntimeException(e);
}
} }

View File

@ -57,7 +57,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
private volatile long _lastNotIdleTimestamp; private volatile long _lastNotIdleTimestamp;
private volatile AbstractAsyncConnection _connection; private volatile AbstractAsyncConnection _connection;
private RecycledIOFuture _readFuture = new RecycledIOFuture(true,_lock) private DispatchedIOFuture _readFuture = new DispatchedIOFuture(true,_lock)
{ {
@Override @Override
protected void dispatch(Runnable task) protected void dispatch(Runnable task)
@ -83,7 +83,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
}; };
private ByteBuffer[] _writeBuffers; private ByteBuffer[] _writeBuffers;
private RecycledIOFuture _writeFuture = new RecycledIOFuture(true,_lock) private DispatchedIOFuture _writeFuture = new DispatchedIOFuture(true,_lock)
{ {
@Override @Override
protected void dispatch(Runnable task) protected void dispatch(Runnable task)

View File

@ -52,7 +52,7 @@ public class SslConnection extends AbstractAsyncConnection
private final Lock _lock = new ReentrantLock(); private final Lock _lock = new ReentrantLock();
private final RecycledIOFuture _appReadFuture = new RecycledIOFuture(true,_lock) private final DispatchedIOFuture _appReadFuture = new DispatchedIOFuture(true,_lock)
{ {
@Override @Override
protected void dispatch(Runnable callback) protected void dispatch(Runnable callback)
@ -63,7 +63,7 @@ public class SslConnection extends AbstractAsyncConnection
} }
}; };
private IOFuture.Callback _writeable = new IOFuture.Callback() private IOFuture.Callback _writeCallback = new IOFuture.Callback()
{ {
@Override @Override
public void onReady() public void onReady()
@ -82,7 +82,7 @@ public class SslConnection extends AbstractAsyncConnection
} }
}; };
private final RecycledIOFuture _appWriteFuture = new RecycledIOFuture(true,_lock); private final DispatchedIOFuture _appWriteFuture = new DispatchedIOFuture(true,_lock);
private Runnable _appReadTask; private Runnable _appReadTask;
private final SSLEngine _engine; private final SSLEngine _engine;
@ -489,7 +489,7 @@ public class SslConnection extends AbstractAsyncConnection
return true; return true;
_netWriteFuture=write; _netWriteFuture=write;
_netWriteFuture.setCallback(_writeable); _netWriteFuture.setCallback(_writeCallback);
} }
return result.bytesConsumed()>0 || result.bytesProduced()>0 ; return result.bytesConsumed()>0 || result.bytesProduced()>0 ;

View File

@ -1,71 +0,0 @@
// ========================================================================
// Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
/**
*
*
* To change the template for this generated type comment go to
* Window - Preferences - Java - Code Generation - Code and Comments
*/
public class StringEndPoint extends ByteArrayEndPoint
{
Charset _charset=StringUtil.__UTF8_CHARSET;
public StringEndPoint()
{
}
public StringEndPoint(String encoding)
{
this();
if (encoding!=null)
_charset=Charset.forName(encoding);
}
public void setInput(String s)
{
try
{
super.setIn(BufferUtil.toBuffer(s,_charset));
}
catch(Exception e)
{
throw new IllegalStateException(e.toString());
}
}
public String getOutput()
{
ByteBuffer b = getOut();
b.flip();
String s=BufferUtil.toString(b,_charset);
b.clear();
return s;
}
/**
* @return <code>true</code> if there are bytes remaining to be read from the encoded input
*/
public boolean hasMore()
{
return getOut().position()>0;
}
}

View File

@ -128,7 +128,7 @@ public class IOFutureTest
@Test @Test
public void testInCompleted() throws Exception public void testInCompleted() throws Exception
{ {
IOFuture future = new RecycledIOFuture(); IOFuture future = new DispatchedIOFuture();
assertFalse(future.isComplete()); assertFalse(future.isComplete());
assertFalse(future.isReady()); assertFalse(future.isReady());
@ -163,7 +163,7 @@ public class IOFutureTest
@Test @Test
public void testReady() throws Exception public void testReady() throws Exception
{ {
final RecycledIOFuture future = new RecycledIOFuture(); final DispatchedIOFuture future = new DispatchedIOFuture();
assertFalse(future.isComplete()); assertFalse(future.isComplete());
assertFalse(future.isReady()); assertFalse(future.isReady());
@ -243,7 +243,7 @@ public class IOFutureTest
@Test @Test
public void testFail() throws Exception public void testFail() throws Exception
{ {
final RecycledIOFuture future = new RecycledIOFuture(); final DispatchedIOFuture future = new DispatchedIOFuture();
final Exception ex=new Exception("failed"); final Exception ex=new Exception("failed");
assertFalse(future.isComplete()); assertFalse(future.isComplete());

View File

@ -229,31 +229,6 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Ht
_lowResourceMaxIdleTime = maxIdleTime; _lowResourceMaxIdleTime = maxIdleTime;
} }
/* ------------------------------------------------------------ */
/**
* @return Returns the maxIdleTime when resources are low.
* @deprecated
*/
@Deprecated
@Override
public final int getLowResourceMaxIdleTime()
{
return getLowResourcesMaxIdleTime();
}
/* ------------------------------------------------------------ */
/**
* @param maxIdleTime
* The maxIdleTime to set when resources are low.
* @deprecated
*/
@Deprecated
@Override
public final void setLowResourceMaxIdleTime(int maxIdleTime)
{
setLowResourcesMaxIdleTime(maxIdleTime);
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* @return Returns the soLingerTime. * @return Returns the soLingerTime.
@ -1187,7 +1162,7 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Ht
if (_statsStartedAt.get() == -1) if (_statsStartedAt.get() == -1)
return; return;
long duration = System.currentTimeMillis() - connection.getCreatedTimeStamp(); long duration = System.currentTimeMillis() - connection.getEndPoint().getCreatedTimeStamp();
int requests = (connection instanceof HttpConnection)?((HttpConnection)connection).getHttpChannel().getRequests():0; int requests = (connection instanceof HttpConnection)?((HttpConnection)connection).getHttpChannel().getRequests():0;
_requestStats.set(requests); _requestStats.set(requests);
_connectionStats.decrement(); _connectionStats.decrement();
@ -1235,14 +1210,6 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Ht
_reuseAddress = reuseAddress; _reuseAddress = reuseAddress;
} }
/* ------------------------------------------------------------ */
@Override
public boolean isLowResources()
{
if (_threadPool != null)
return _threadPool.isLowOnThreads();
return _server.getThreadPool().isLowOnThreads();
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
private void updateNotEqual(AtomicLong valueHolder, long compare, long value) private void updateNotEqual(AtomicLong valueHolder, long compare, long value)

View File

@ -863,7 +863,7 @@ public abstract class HttpChannel
protected abstract int write(ByteBuffer content) throws IOException; protected abstract int write(ByteBuffer content) throws IOException;
protected abstract int send(ByteBuffer content) throws IOException; protected abstract void send(ByteBuffer content) throws IOException;
protected abstract void sendError(int status, String reason, String content, boolean close) throws IOException; protected abstract void sendError(int status, String reason, String content, boolean close) throws IOException;

View File

@ -15,6 +15,9 @@ package org.eclipse.jetty.server;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jetty.http.HttpException; import org.eclipse.jetty.http.HttpException;
import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpFields;
@ -26,7 +29,10 @@ import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.AbstractAsyncConnection; import org.eclipse.jetty.io.AbstractAsyncConnection;
import org.eclipse.jetty.io.AsyncConnection; import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.CompleteIOFuture;
import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.IOFuture;
import org.eclipse.jetty.io.DispatchedIOFuture;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
@ -46,13 +52,14 @@ public class HttpConnection extends AbstractAsyncConnection
private final HttpParser _parser; private final HttpParser _parser;
private final HttpGenerator _generator; private final HttpGenerator _generator;
private final HttpChannel _channel; private final HttpChannel _channel;
private final Lock _lock = new ReentrantLock();
private IOFuture _writeFuture;
int _toFlush;
ByteBuffer _requestBuffer=null; ByteBuffer _requestBuffer=null;
ByteBuffer _responseHeader=null; ByteBuffer _responseHeader=null;
ByteBuffer _chunk=null; ByteBuffer _chunk=null;
ByteBuffer _responseBuffer=null; ByteBuffer _responseBuffer=null;
ByteBuffer _content=null;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -251,201 +258,208 @@ public class HttpConnection extends AbstractAsyncConnection
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
private int send(HttpGenerator.ResponseInfo info, ByteBuffer content) throws IOException private void send(HttpGenerator.ResponseInfo info, ByteBuffer content) throws IOException
{ {
if (_generator.isCommitted() || BufferUtil.hasContent(_responseBuffer) || _toFlush!=0) _lock.lock();
throw new IllegalStateException("!send after append"); try
if (_generator.isComplete())
throw new EofException();
long prepared=_generator.getContentPrepared();
do
{ {
if (LOG.isDebugEnabled()) if (_generator.isCommitted() || BufferUtil.hasContent(_responseBuffer))
LOG.debug("{}: send({},{},{})@{}", throw new IllegalStateException("!send after append");
this, if (_generator.isComplete())
BufferUtil.toSummaryString(_responseHeader), throw new EofException();
BufferUtil.toSummaryString(_responseBuffer),
BufferUtil.toSummaryString(content),
_generator.getState());
HttpGenerator.Result result=_generator.generate(info,_responseHeader,null,_responseBuffer,content,Action.COMPLETE);
if (LOG.isDebugEnabled())
LOG.debug("{}: {} ({},{},{})@{}",
this,
result,
BufferUtil.toSummaryString(_responseHeader),
BufferUtil.toSummaryString(_responseBuffer),
BufferUtil.toSummaryString(content),
_generator.getState());
switch(result) do
{ {
case NEED_HEADER: if (LOG.isDebugEnabled())
_responseHeader=_connector.getResponseBuffers().getHeader(); LOG.debug("{}: send({},{},{})@{}",
break; this,
BufferUtil.toSummaryString(_responseHeader),
BufferUtil.toSummaryString(_responseBuffer),
BufferUtil.toSummaryString(content),
_generator.getState());
case NEED_BUFFER: HttpGenerator.Result result=_generator.generate(info,_responseHeader,null,_responseBuffer,content,Action.COMPLETE);
_responseBuffer=_connector.getResponseBuffers().getBuffer(); if (LOG.isDebugEnabled())
break; LOG.debug("{}: {} ({},{},{})@{}",
this,
result,
BufferUtil.toSummaryString(_responseHeader),
BufferUtil.toSummaryString(_responseBuffer),
BufferUtil.toSummaryString(content),
_generator.getState());
case NEED_CHUNK: switch(result)
throw new IllegalStateException("!chunk when content length known"); {
case NEED_HEADER:
case FLUSH: _responseHeader=_connector.getResponseBuffers().getHeader();
_toFlush= break;
(BufferUtil.hasContent(_responseHeader)?8:0)+
(BufferUtil.hasContent(_chunk)?4:0)+ case NEED_BUFFER:
(BufferUtil.hasContent(_responseBuffer)?2:0); _responseBuffer=_connector.getResponseBuffers().getBuffer();
flush(true); break;
break;
case NEED_CHUNK:
case FLUSH_CONTENT: throw new IllegalStateException("!chunk when content length known");
_content=content;
_toFlush= case FLUSH:
(BufferUtil.hasContent(_responseHeader)?8:0)+ write(_responseHeader,_chunk,_responseBuffer).block();
(BufferUtil.hasContent(_chunk)?4:0)+ break;
(BufferUtil.hasContent(_content)?1:0);
flush(false); case FLUSH_CONTENT:
break; _writeFuture=write(_responseHeader,_chunk,content);
return;
case SHUTDOWN_OUT:
getEndPoint().shutdownOutput(); case SHUTDOWN_OUT:
break; getEndPoint().shutdownOutput();
break;
case OK:
break; case OK:
break;
}
}
while(BufferUtil.hasContent(content));
}
catch(InterruptedException e)
{
LOG.debug(e);
}
catch(ExecutionException e)
{
LOG.debug(e);
DispatchedIOFuture.rethrow(e);
}
finally
{
_lock.unlock();
}
}
/* ------------------------------------------------------------ */
private int generate(HttpGenerator.ResponseInfo info, ByteBuffer content, Action action) throws IOException
{
boolean hasContent=BufferUtil.hasContent(content);
long preparedBefore=0;
long preparedAfter;
_lock.lock();
try
{
preparedBefore=_generator.getContentPrepared();
if (_generator.isComplete())
throw new EofException();
do
{
// block if the last write is not complete
if (_writeFuture!=null && !_writeFuture.isComplete())
_writeFuture.block();
if (LOG.isDebugEnabled())
LOG.debug("{}: generate({},{},{},{},{})@{}",
this,
BufferUtil.toSummaryString(_responseHeader),
BufferUtil.toSummaryString(_chunk),
BufferUtil.toSummaryString(_responseBuffer),
BufferUtil.toSummaryString(content),
action,_generator.getState());
HttpGenerator.Result result=_generator.generate(info,_responseHeader,_chunk,_responseBuffer,content,action);
if (LOG.isDebugEnabled())
LOG.debug("{}: {} ({},{},{},{},{})@{}",
this,
result,
BufferUtil.toSummaryString(_responseHeader),
BufferUtil.toSummaryString(_chunk),
BufferUtil.toSummaryString(_responseBuffer),
BufferUtil.toSummaryString(content),
action,_generator.getState());
switch(result)
{
case NEED_HEADER:
_responseHeader=_connector.getResponseBuffers().getHeader();
break;
case NEED_BUFFER:
_responseBuffer=_connector.getResponseBuffers().getBuffer();
break;
case NEED_CHUNK:
_responseHeader=null;
_chunk=_connector.getResponseBuffers().getBuffer(HttpGenerator.CHUNK_SIZE);
break;
case FLUSH:
if (hasContent)
write(_responseHeader,_chunk,_responseBuffer).block();
else
_writeFuture=write(_responseHeader,_chunk,_responseBuffer);
break;
case FLUSH_CONTENT:
write(_responseHeader,_chunk,content).block();
break;
case SHUTDOWN_OUT:
getEndPoint().shutdownOutput();
break;
case OK:
break;
}
}
while(BufferUtil.hasContent(content));
}
catch(InterruptedException e)
{
LOG.debug(e);
}
catch(ExecutionException e)
{
LOG.debug(e);
if (e.getCause() instanceof IOException)
throw (IOException)e.getCause();
throw new RuntimeException(e);
}
finally
{
preparedAfter=_generator.getContentPrepared();
_lock.unlock();
}
return (int)(preparedAfter-preparedBefore);
}
private IOFuture write(ByteBuffer b0,ByteBuffer b1,ByteBuffer b2)
{
if (BufferUtil.hasContent(b0))
{
if (BufferUtil.hasContent(b1))
{
if (BufferUtil.hasContent(b2))
return _endp.write(b0,b1,b2);
return _endp.write(b0,b1);
}
else
{
if (BufferUtil.hasContent(b2))
return _endp.write(b0,b2);
return _endp.write(b0);
} }
} }
while(BufferUtil.hasContent(content)); else
return (int)(prepared-_generator.getContentPrepared());
}
/* ------------------------------------------------------------ */
private int write(HttpGenerator.ResponseInfo info, ByteBuffer content, Action action) throws IOException
{
if (_generator.isComplete())
throw new EofException();
long prepared=_generator.getContentPrepared();
do
{ {
if (_toFlush!=0) if (BufferUtil.hasContent(b1))
flush(true);
if (LOG.isDebugEnabled())
LOG.debug("{}: generate({},{},{},{},{})@{}",
this,
BufferUtil.toSummaryString(_responseHeader),
BufferUtil.toSummaryString(_chunk),
BufferUtil.toSummaryString(_responseBuffer),
BufferUtil.toSummaryString(content),
action,_generator.getState());
HttpGenerator.Result result=_generator.generate(info,_responseHeader,_chunk,_responseBuffer,content,action);
if (LOG.isDebugEnabled())
LOG.debug("{}: {} ({},{},{},{},{})@{}",
this,
result,
BufferUtil.toSummaryString(_responseHeader),
BufferUtil.toSummaryString(_chunk),
BufferUtil.toSummaryString(_responseBuffer),
BufferUtil.toSummaryString(content),
action,_generator.getState());
switch(result)
{ {
case NEED_HEADER: if (BufferUtil.hasContent(b2))
_responseHeader=_connector.getResponseBuffers().getHeader(); return _endp.write(b1,b2);
break; return _endp.write(b1);
case NEED_BUFFER:
_responseBuffer=_connector.getResponseBuffers().getBuffer();
break;
case NEED_CHUNK:
_responseHeader=null;
_chunk=_connector.getResponseBuffers().getBuffer(HttpGenerator.CHUNK_SIZE);
break;
case FLUSH:
_toFlush=
(BufferUtil.hasContent(_responseHeader)?8:0)+
(BufferUtil.hasContent(_chunk)?4:0)+
(BufferUtil.hasContent(_responseBuffer)?2:0);
flush(false);
break;
case FLUSH_CONTENT:
_content=content;
_toFlush=
(BufferUtil.hasContent(_responseHeader)?8:0)+
(BufferUtil.hasContent(_chunk)?4:0)+
(BufferUtil.hasContent(_content)?1:0);
flush(false);
break;
case SHUTDOWN_OUT:
getEndPoint().shutdownOutput();
break;
case OK:
break;
} }
} else
while(BufferUtil.hasContent(content));
return (int)(prepared-_generator.getContentPrepared());
}
/* ------------------------------------------------------------ */
private void flush(boolean block) throws IOException
{
while (_toFlush>0)
{
switch(_toFlush)
{ {
case 10: if (BufferUtil.hasContent(b2))
_endp.write(_responseHeader,_responseBuffer); return _endp.write(b2);
break; return CompleteIOFuture.COMPLETE;
case 9:
_endp.write(_responseHeader,_content);
_content=null;
break;
case 8:
_endp.write(_responseHeader);
break;
case 6:
_endp.write(_chunk,_responseBuffer);
break;
case 5:
_endp.write(_chunk,_content);
_content=null;
break;
case 4:
_endp.write(_chunk);
break;
case 2:
_endp.write(_responseBuffer);
break;
case 1:
_endp.write(_content);
_content=null;
break;
case 0:
default:
throw new IllegalStateException();
} }
if (!block)
break;
if (_toFlush>0)
blockWriteable();
} }
} }
@ -519,13 +533,13 @@ public class HttpConnection extends AbstractAsyncConnection
@Override @Override
protected int write(ByteBuffer content) throws IOException protected int write(ByteBuffer content) throws IOException
{ {
return HttpConnection.this.write(getResponseInfo(),content,Action.PREPARE); return HttpConnection.this.generate(getResponseInfo(),content,Action.PREPARE);
} }
@Override @Override
protected int send(ByteBuffer content) throws IOException protected void send(ByteBuffer content) throws IOException
{ {
return HttpConnection.this.send(getResponseInfo(),content); HttpConnection.this.send(getResponseInfo(),content);
} }
@Override @Override
@ -622,25 +636,25 @@ public class HttpConnection extends AbstractAsyncConnection
@Override @Override
protected void flushResponse() throws IOException protected void flushResponse() throws IOException
{ {
HttpConnection.this.write(getResponseInfo(),null,Action.FLUSH); HttpConnection.this.generate(getResponseInfo(),null,Action.FLUSH);
} }
@Override @Override
protected void completeResponse() throws IOException protected void completeResponse() throws IOException
{ {
HttpConnection.this.write(getResponseInfo(),null,Action.COMPLETE); HttpConnection.this.generate(getResponseInfo(),null,Action.COMPLETE);
} }
@Override @Override
protected void blockForContent() throws IOException protected void blockForContent() throws IOException
{ {
// While progress and the connection has not changed // While progress and the connection has not changed
while (!_endp.isInputShutdown()) while (_endp.isOpen())
{ {
try try
{ {
// Wait until we can read // Wait until we can read
getEndPoint().blockReadable(); getEndPoint().read().block();
// We will need a buffer to read into // We will need a buffer to read into
if (_requestBuffer==null) if (_requestBuffer==null)
@ -650,6 +664,15 @@ public class HttpConnection extends AbstractAsyncConnection
if (BufferUtil.hasContent(_requestBuffer) && _parser.parseNext(_requestBuffer)) if (BufferUtil.hasContent(_requestBuffer) && _parser.parseNext(_requestBuffer))
return; return;
} }
catch (InterruptedException e)
{
LOG.debug(e);
}
catch (ExecutionException e)
{
LOG.debug(e);
DispatchedIOFuture.rethrow(e);
}
finally finally
{ {
// Return empty request buffer // Return empty request buffer

View File

@ -152,7 +152,7 @@ public class LocalConnector extends AbstractConnector
{ {
if (!leaveOpen) if (!leaveOpen)
connectionClosed(connection); connectionClosed(connection);
_responsesBuffer = endPoint.getOut(); _responsesBuffer = endPoint.getOutput();
} }
} }
finally finally

View File

@ -154,7 +154,7 @@ public class HttpConnectionTest
{ {
try try
{ {
((StdErrLog)Log.getLogger(AbstractHttpConnection.class)).setHideStacks(true); ((StdErrLog)Log.getLogger(HttpConnection.class)).setHideStacks(true);
String response; String response;
@ -185,7 +185,7 @@ public class HttpConnectionTest
} }
finally finally
{ {
((StdErrLog)Log.getLogger(AbstractHttpConnection.class)).setHideStacks(false); ((StdErrLog)Log.getLogger(HttpConnection.class)).setHideStacks(false);
} }
} }
@ -335,7 +335,7 @@ public class HttpConnectionTest
Logger logger=null; Logger logger=null;
try try
{ {
((StdErrLog)Log.getLogger(AbstractHttpConnection.class)).setHideStacks(true); ((StdErrLog)Log.getLogger(HttpConnection.class)).setHideStacks(true);
response=connector.getResponses(requests); response=connector.getResponses(requests);
offset = checkContains(response,offset,"HTTP/1.1 500"); offset = checkContains(response,offset,"HTTP/1.1 500");
offset = checkContains(response,offset,"Connection: close"); offset = checkContains(response,offset,"Connection: close");
@ -343,7 +343,7 @@ public class HttpConnectionTest
} }
finally finally
{ {
((StdErrLog)Log.getLogger(AbstractHttpConnection.class)).setHideStacks(false); ((StdErrLog)Log.getLogger(HttpConnection.class)).setHideStacks(false);
} }
} }
@ -443,7 +443,7 @@ public class HttpConnectionTest
try try
{ {
baseRequest.setHandled(true); baseRequest.setHandled(true);
response.setHeader(HttpHeader.CONTENT_TYPE,MimeTypes.TEXT_HTML); response.setHeader(HttpHeader.CONTENT_TYPE.toString(),MimeTypes.Type.TEXT_HTML.toString());
response.setHeader("LongStr", longstr); response.setHeader("LongStr", longstr);
PrintWriter writer = response.getWriter(); PrintWriter writer = response.getWriter();
writer.write("<html><h1>FOO</h1></html>"); writer.write("<html><h1>FOO</h1></html>");

View File

@ -188,7 +188,7 @@ public class HttpWriterTest
hb.completeHeader(fields,true); hb.completeHeader(fields,true);
hb.flush(10000); hb.flush(10000);
String response = new String(endp.getOut().asArray(),StringUtil.__UTF8); String response = new String(endp.getOutput().asArray(),StringUtil.__UTF8);
assertTrue(response.startsWith("HTTP/1.1 200 OK\r\nContent-Length: 1025\r\n\r\n\u05531234567890")); assertTrue(response.startsWith("HTTP/1.1 200 OK\r\nContent-Length: 1025\r\n\r\n\u05531234567890"));
} }

View File

@ -411,7 +411,7 @@ public class ResponseTest
response.sendRedirect(tests[i][0]); response.sendRedirect(tests[i][0]);
String location = out.getOut().toString(); String location = out.getOutput().toString();
int l=location.indexOf("Location: "); int l=location.indexOf("Location: ");
int e=location.indexOf('\n',l); int e=location.indexOf('\n',l);
location=location.substring(l+10,e).trim(); location=location.substring(l+10,e).trim();
@ -504,7 +504,7 @@ public class ResponseTest
private Response newResponse() private Response newResponse()
{ {
ByteArrayEndPoint endPoint = new ByteArrayEndPoint(); ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
endPoint.setOut(new ByteArrayBuffer(1024)); endPoint.setOutput(new ByteArrayBuffer(1024));
endPoint.setGrowOutput(true); endPoint.setGrowOutput(true);
AbstractHttpConnection connection=new TestHttpConnection(connector, endPoint, connector.getServer()); AbstractHttpConnection connection=new TestHttpConnection(connector, endPoint, connector.getServer());
connection.getGenerator().reset(); connection.getGenerator().reset();

View File

@ -225,6 +225,27 @@ public class BufferUtil
return buffer==null?0:buffer.remaining(); return buffer==null?0:buffer.remaining();
} }
/* ------------------------------------------------------------ */
/** Get the space from the limit to the capacity
* @param buffer
* @return space
*/
public static int space(ByteBuffer buffer)
{
if (buffer==null)
return 0;
return buffer.capacity()-buffer.limit();
}
/* ------------------------------------------------------------ */
/** Compact the buffer
* @param buffer
*/
public static void compact(ByteBuffer buffer)
{
buffer.compact().flip();
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* Put data from one buffer into another, avoiding over/under flows * Put data from one buffer into another, avoiding over/under flows
@ -733,4 +754,5 @@ public class BufferUtil
} }
} }