360051 SocketConnectionTest.testServerClosedConnection is excluded.
Reworked StreamEndPoint methods isInputShutdown(), isOutputShutdown(), shutdownInput(), shutdownOutput(), and updated subclasses to call super where appropriate. The test had to be modified to work properly with SocketConnector, because behavior is different from SelectConnector, but the CPU spinning is fixed.
This commit is contained in:
parent
8d61ce374b
commit
e741b98510
|
@ -73,7 +73,7 @@ public class HttpConnection extends AbstractConnection implements Dumpable
|
|||
HttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
|
||||
{
|
||||
super(endp);
|
||||
|
||||
|
||||
_generator = new HttpGenerator(requestBuffers,endp);
|
||||
_parser = new HttpParser(responseBuffers,endp,new Handler());
|
||||
}
|
||||
|
@ -277,6 +277,9 @@ public class HttpConnection extends AbstractConnection implements Dumpable
|
|||
{
|
||||
long filled = _parser.parseAvailable();
|
||||
io += filled;
|
||||
|
||||
if (_parser.isIdle() && _endp.isInputShutdown())
|
||||
throw new EOFException();
|
||||
}
|
||||
|
||||
if (io > 0)
|
||||
|
@ -353,7 +356,7 @@ public class HttpConnection extends AbstractConnection implements Dumpable
|
|||
complete = true;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// if the endpoint is closed, but the parser incomplete
|
||||
if (!_endp.isOpen() && !(_parser.isComplete()||_parser.isIdle()))
|
||||
{
|
||||
|
@ -373,8 +376,7 @@ public class HttpConnection extends AbstractConnection implements Dumpable
|
|||
}
|
||||
}
|
||||
|
||||
// TODO should not need this
|
||||
if (_endp.isInputShutdown() && !_parser.isComplete())
|
||||
if (_endp.isInputShutdown() && !_parser.isComplete() && !_parser.isIdle())
|
||||
{
|
||||
if (_exchange!=null && !_exchange.isDone())
|
||||
{
|
||||
|
@ -450,7 +452,7 @@ public class HttpConnection extends AbstractConnection implements Dumpable
|
|||
finally
|
||||
{
|
||||
_parser.returnBuffers();
|
||||
|
||||
|
||||
// Do we have more stuff to write?
|
||||
if (!_generator.isComplete() && _generator.getBytesBuffered()>0 && _endp.isOpen() && _endp instanceof AsyncEndPoint)
|
||||
{
|
||||
|
@ -568,6 +570,8 @@ public class HttpConnection extends AbstractConnection implements Dumpable
|
|||
|
||||
private boolean shouldClose()
|
||||
{
|
||||
if (_endp.isInputShutdown())
|
||||
return true;
|
||||
if (_connectionHeader!=null)
|
||||
{
|
||||
if (HttpHeaderValues.CLOSE_BUFFER.equals(_connectionHeader))
|
||||
|
@ -746,7 +750,7 @@ public class HttpConnection extends AbstractConnection implements Dumpable
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @see org.eclipse.jetty.util.component.Dumpable#dump()
|
||||
|
@ -768,7 +772,7 @@ public class HttpConnection extends AbstractConnection implements Dumpable
|
|||
AggregateLifeCycle.dump(out,indent,Collections.singletonList(_endp));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
private class ConnectionIdleTask extends Timeout.Task
|
||||
{
|
||||
|
@ -783,14 +787,14 @@ public class HttpConnection extends AbstractConnection implements Dumpable
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
private class NonFinalResponseListener implements HttpEventListener
|
||||
{
|
||||
final HttpExchange _exchange;
|
||||
final HttpEventListener _next;
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public NonFinalResponseListener(HttpExchange exchange)
|
||||
{
|
||||
|
@ -835,7 +839,7 @@ public class HttpConnection extends AbstractConnection implements Dumpable
|
|||
{
|
||||
_exchange.setEventListener(_next);
|
||||
_exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
|
||||
_parser.reset();
|
||||
_parser.reset();
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
|
|
|
@ -386,16 +386,16 @@ public abstract class AbstractConnectionTest
|
|||
}
|
||||
}
|
||||
|
||||
private class ConnectionExchange extends HttpExchange
|
||||
protected class ConnectionExchange extends HttpExchange
|
||||
{
|
||||
private final CountDownLatch latch;
|
||||
|
||||
private ConnectionExchange()
|
||||
protected ConnectionExchange()
|
||||
{
|
||||
this.latch = null;
|
||||
}
|
||||
|
||||
private ConnectionExchange(CountDownLatch latch)
|
||||
protected ConnectionExchange(CountDownLatch latch)
|
||||
{
|
||||
this.latch = latch;
|
||||
}
|
||||
|
|
|
@ -13,6 +13,16 @@
|
|||
|
||||
package org.eclipse.jetty.client;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStream;
|
||||
import java.net.ServerSocket;
|
||||
import java.net.Socket;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class SocketConnectionTest extends AbstractConnectionTest
|
||||
{
|
||||
protected HttpClient newHttpClient()
|
||||
|
@ -21,10 +31,61 @@ public class SocketConnectionTest extends AbstractConnectionTest
|
|||
httpClient.setConnectorType(HttpClient.CONNECTOR_SOCKET);
|
||||
return httpClient;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void testServerClosedConnection()
|
||||
public void testServerClosedConnection() throws Exception
|
||||
{
|
||||
// TODO work out why this does not work
|
||||
// Differently from the SelectConnector, the SocketConnector cannot detect server closes.
|
||||
// Therefore, upon a second send, the exchange will fail.
|
||||
// Applications needs to retry it explicitly.
|
||||
|
||||
ServerSocket serverSocket = new ServerSocket();
|
||||
serverSocket.bind(null);
|
||||
int port=serverSocket.getLocalPort();
|
||||
|
||||
HttpClient httpClient = this.newHttpClient();
|
||||
httpClient.setMaxConnectionsPerAddress(1);
|
||||
httpClient.start();
|
||||
try
|
||||
{
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
HttpExchange exchange = new ConnectionExchange(latch);
|
||||
exchange.setAddress(new Address("localhost", port));
|
||||
exchange.setRequestURI("/");
|
||||
httpClient.send(exchange);
|
||||
|
||||
Socket remote = serverSocket.accept();
|
||||
|
||||
// HttpClient.send() above is async, so if we write the response immediately
|
||||
// there is a chance that it arrives before the request is being sent, so we
|
||||
// read the request before sending the response to avoid the race
|
||||
InputStream input = remote.getInputStream();
|
||||
BufferedReader reader = new BufferedReader(new InputStreamReader(input, "UTF-8"));
|
||||
String line;
|
||||
while ((line = reader.readLine()) != null)
|
||||
{
|
||||
if (line.length() == 0)
|
||||
break;
|
||||
}
|
||||
|
||||
OutputStream output = remote.getOutputStream();
|
||||
output.write("HTTP/1.1 200 OK\r\n".getBytes("UTF-8"));
|
||||
output.write("Content-Length: 0\r\n".getBytes("UTF-8"));
|
||||
output.write("\r\n".getBytes("UTF-8"));
|
||||
output.flush();
|
||||
|
||||
assertEquals(HttpExchange.STATUS_COMPLETED, exchange.waitForDone());
|
||||
|
||||
remote.close();
|
||||
|
||||
exchange.reset();
|
||||
httpClient.send(exchange);
|
||||
|
||||
assertEquals(HttpExchange.STATUS_EXCEPTED, exchange.waitForDone());
|
||||
}
|
||||
finally
|
||||
{
|
||||
httpClient.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,11 +4,11 @@
|
|||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
// The Eclipse Public License is available at
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
|
||||
package org.eclipse.jetty.io.bio;
|
||||
|
@ -17,6 +17,7 @@ import java.io.IOException;
|
|||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import javax.net.ssl.SSLSocket;
|
||||
|
||||
import org.eclipse.jetty.util.StringUtil;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
|
@ -34,13 +35,13 @@ public class SocketEndPoint extends StreamEndPoint
|
|||
final Socket _socket;
|
||||
final InetSocketAddress _local;
|
||||
final InetSocketAddress _remote;
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
*
|
||||
*
|
||||
*/
|
||||
public SocketEndPoint(Socket socket)
|
||||
throws IOException
|
||||
throws IOException
|
||||
{
|
||||
super(socket.getInputStream(),socket.getOutputStream());
|
||||
_socket=socket;
|
||||
|
@ -48,13 +49,13 @@ public class SocketEndPoint extends StreamEndPoint
|
|||
_remote=(InetSocketAddress)_socket.getRemoteSocketAddress();
|
||||
super.setMaxIdleTime(_socket.getSoTimeout());
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
*
|
||||
*
|
||||
*/
|
||||
protected SocketEndPoint(Socket socket, int maxIdleTime)
|
||||
throws IOException
|
||||
throws IOException
|
||||
{
|
||||
super(socket.getInputStream(),socket.getOutputStream());
|
||||
_socket=socket;
|
||||
|
@ -63,7 +64,7 @@ public class SocketEndPoint extends StreamEndPoint
|
|||
_socket.setSoTimeout(maxIdleTime>0?maxIdleTime:0);
|
||||
super.setMaxIdleTime(maxIdleTime);
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/* (non-Javadoc)
|
||||
* @see org.eclipse.io.BufferIO#isClosed()
|
||||
|
@ -73,19 +74,19 @@ public class SocketEndPoint extends StreamEndPoint
|
|||
{
|
||||
return super.isOpen() && _socket!=null && !_socket.isClosed();
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
@Override
|
||||
public boolean isInputShutdown()
|
||||
{
|
||||
return !super.isOpen() || _socket!=null && _socket.isInputShutdown();
|
||||
return !isOpen() || super.isInputShutdown();
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
@Override
|
||||
public boolean isOutputShutdown()
|
||||
{
|
||||
return !super.isOpen() || _socket!=null && _socket.isOutputShutdown();
|
||||
return !isOpen() || super.isOutputShutdown();
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
|
@ -94,9 +95,13 @@ public class SocketEndPoint extends StreamEndPoint
|
|||
*/
|
||||
@Override
|
||||
public void shutdownOutput() throws IOException
|
||||
{
|
||||
if (!_socket.isClosed() && !_socket.isOutputShutdown())
|
||||
_socket.shutdownOutput();
|
||||
{
|
||||
if (!isOutputShutdown())
|
||||
{
|
||||
super.shutdownOutput();
|
||||
if (!(_socket instanceof SSLSocket))
|
||||
_socket.shutdownOutput();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -106,11 +111,15 @@ public class SocketEndPoint extends StreamEndPoint
|
|||
*/
|
||||
@Override
|
||||
public void shutdownInput() throws IOException
|
||||
{
|
||||
if (!_socket.isClosed() && !_socket.isInputShutdown())
|
||||
_socket.shutdownInput();
|
||||
{
|
||||
if (!isInputShutdown())
|
||||
{
|
||||
super.shutdownInput();
|
||||
if (!(_socket instanceof SSLSocket))
|
||||
_socket.shutdownInput();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/* (non-Javadoc)
|
||||
* @see org.eclipse.io.BufferIO#close()
|
||||
|
@ -122,10 +131,10 @@ public class SocketEndPoint extends StreamEndPoint
|
|||
_in=null;
|
||||
_out=null;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/*
|
||||
/*
|
||||
* @see org.eclipse.io.EndPoint#getLocalAddr()
|
||||
*/
|
||||
@Override
|
||||
|
@ -133,12 +142,12 @@ public class SocketEndPoint extends StreamEndPoint
|
|||
{
|
||||
if (_local==null || _local.getAddress()==null || _local.getAddress().isAnyLocalAddress())
|
||||
return StringUtil.ALL_INTERFACES;
|
||||
|
||||
|
||||
return _local.getAddress().getHostAddress();
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/*
|
||||
/*
|
||||
* @see org.eclipse.io.EndPoint#getLocalHost()
|
||||
*/
|
||||
@Override
|
||||
|
@ -146,12 +155,12 @@ public class SocketEndPoint extends StreamEndPoint
|
|||
{
|
||||
if (_local==null || _local.getAddress()==null || _local.getAddress().isAnyLocalAddress())
|
||||
return StringUtil.ALL_INTERFACES;
|
||||
|
||||
|
||||
return _local.getAddress().getCanonicalHostName();
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/*
|
||||
/*
|
||||
* @see org.eclipse.io.EndPoint#getLocalPort()
|
||||
*/
|
||||
@Override
|
||||
|
@ -163,7 +172,7 @@ public class SocketEndPoint extends StreamEndPoint
|
|||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/*
|
||||
/*
|
||||
* @see org.eclipse.io.EndPoint#getRemoteAddr()
|
||||
*/
|
||||
@Override
|
||||
|
@ -176,7 +185,7 @@ public class SocketEndPoint extends StreamEndPoint
|
|||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/*
|
||||
/*
|
||||
* @see org.eclipse.io.EndPoint#getRemoteHost()
|
||||
*/
|
||||
@Override
|
||||
|
@ -188,7 +197,7 @@ public class SocketEndPoint extends StreamEndPoint
|
|||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/*
|
||||
/*
|
||||
* @see org.eclipse.io.EndPoint#getRemotePort()
|
||||
*/
|
||||
@Override
|
||||
|
@ -200,7 +209,7 @@ public class SocketEndPoint extends StreamEndPoint
|
|||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/*
|
||||
/*
|
||||
* @see org.eclipse.io.EndPoint#getConnection()
|
||||
*/
|
||||
@Override
|
||||
|
@ -237,5 +246,5 @@ public class SocketEndPoint extends StreamEndPoint
|
|||
_socket.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -22,17 +22,13 @@ import java.net.SocketTimeoutException;
|
|||
import org.eclipse.jetty.io.Buffer;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
|
||||
/**
|
||||
*
|
||||
*
|
||||
* To change the template for this generated type comment go to
|
||||
* Window - Preferences - Java - Code Generation - Code and Comments
|
||||
*/
|
||||
public class StreamEndPoint implements EndPoint
|
||||
{
|
||||
InputStream _in;
|
||||
OutputStream _out;
|
||||
int _maxIdleTime;
|
||||
boolean _ishut;
|
||||
boolean _oshut;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -75,23 +71,33 @@ public class StreamEndPoint implements EndPoint
|
|||
}
|
||||
|
||||
public void shutdownOutput() throws IOException
|
||||
{
|
||||
{
|
||||
if (_oshut)
|
||||
return;
|
||||
_oshut = true;
|
||||
if (_out!=null)
|
||||
_out.close();
|
||||
}
|
||||
|
||||
|
||||
public boolean isInputShutdown()
|
||||
{
|
||||
return !isOpen();
|
||||
return _ishut;
|
||||
}
|
||||
|
||||
public void shutdownInput() throws IOException
|
||||
{
|
||||
{
|
||||
if (_ishut)
|
||||
return;
|
||||
_ishut = true;
|
||||
if (_in!=null)
|
||||
_in.close();
|
||||
}
|
||||
|
||||
|
||||
public boolean isOutputShutdown()
|
||||
{
|
||||
return !isOpen();
|
||||
return _oshut;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* @see org.eclipse.io.BufferIO#close()
|
||||
*/
|
||||
|
@ -107,35 +113,43 @@ public class StreamEndPoint implements EndPoint
|
|||
|
||||
protected void idleExpired() throws IOException
|
||||
{
|
||||
_in.close();
|
||||
if (_in!=null)
|
||||
_in.close();
|
||||
}
|
||||
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see org.eclipse.io.BufferIO#fill(org.eclipse.io.Buffer)
|
||||
*/
|
||||
public int fill(Buffer buffer) throws IOException
|
||||
{
|
||||
// TODO handle null array()
|
||||
if (_in==null)
|
||||
return 0;
|
||||
|
||||
int space=buffer.space();
|
||||
if (space<=0)
|
||||
{
|
||||
if (buffer.hasContent())
|
||||
return 0;
|
||||
throw new IOException("FULL");
|
||||
}
|
||||
int space=buffer.space();
|
||||
if (space<=0)
|
||||
{
|
||||
if (buffer.hasContent())
|
||||
return 0;
|
||||
throw new IOException("FULL");
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
return buffer.readFrom(_in,space);
|
||||
}
|
||||
catch(SocketTimeoutException e)
|
||||
{
|
||||
idleExpired();
|
||||
return -1;
|
||||
}
|
||||
try
|
||||
{
|
||||
int read=buffer.readFrom(_in, space);
|
||||
if (read<0 && isOpen())
|
||||
{
|
||||
if (!isInputShutdown())
|
||||
shutdownInput();
|
||||
else if (isOutputShutdown())
|
||||
close();
|
||||
}
|
||||
return read;
|
||||
}
|
||||
catch(SocketTimeoutException e)
|
||||
{
|
||||
idleExpired();
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
|
@ -143,7 +157,6 @@ public class StreamEndPoint implements EndPoint
|
|||
*/
|
||||
public int flush(Buffer buffer) throws IOException
|
||||
{
|
||||
// TODO handle null array()
|
||||
if (_out==null)
|
||||
return -1;
|
||||
int length=buffer.length();
|
||||
|
@ -314,13 +327,13 @@ public class StreamEndPoint implements EndPoint
|
|||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public int getMaxIdleTime()
|
||||
{
|
||||
return _maxIdleTime;
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public void setMaxIdleTime(int timeMs) throws IOException
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue