Reviewed patch for #307457 (Exchanges are left unhandled when connection is lost): improved and cleaned up the fix.
git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@1874 7e9141cc-0065-0410-87d8-b60c137991c4
This commit is contained in:
parent
222826d2f7
commit
6adb41cf5d
|
@ -229,9 +229,10 @@ public class HttpDestination
|
||||||
if (connection==null)
|
if (connection==null)
|
||||||
return null;
|
return null;
|
||||||
|
|
||||||
|
// Check if the connection was idle,
|
||||||
|
// but it expired just a moment ago
|
||||||
if (connection.cancelIdleTimeout())
|
if (connection.cancelIdleTimeout())
|
||||||
return connection;
|
return connection;
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -328,13 +329,7 @@ public class HttpDestination
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
HttpExchange ex = _queue.removeFirst();
|
HttpExchange ex = _queue.removeFirst();
|
||||||
|
send(connection, ex);
|
||||||
// If server closes the connection, put the exchange back
|
|
||||||
// to the idle queue and recycle the connection
|
|
||||||
if(!connection.send(ex)) {
|
|
||||||
_queue.addFirst(ex);
|
|
||||||
recycleConnection(connection);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -383,13 +378,7 @@ public class HttpDestination
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
HttpExchange ex = _queue.removeFirst();
|
HttpExchange ex = _queue.removeFirst();
|
||||||
|
send(connection, ex);
|
||||||
// If server closes the connection, put the exchange back
|
|
||||||
// to the idle queue and recycle the connection
|
|
||||||
if(!connection.send(ex)) {
|
|
||||||
_queue.addFirst(ex);
|
|
||||||
recycleConnection(connection);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
this.notifyAll();
|
this.notifyAll();
|
||||||
}
|
}
|
||||||
|
@ -405,7 +394,7 @@ public class HttpDestination
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void returnIdleConnection(HttpConnection connection) throws IOException
|
public void returnIdleConnection(HttpConnection connection)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
@ -426,29 +415,6 @@ public class HttpDestination
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void recycleConnection(HttpConnection connection)
|
|
||||||
{
|
|
||||||
connection.cancelIdleTimeout();
|
|
||||||
try
|
|
||||||
{
|
|
||||||
connection.close();
|
|
||||||
}
|
|
||||||
catch (IOException e)
|
|
||||||
{
|
|
||||||
Log.ignore(e);
|
|
||||||
}
|
|
||||||
synchronized (this)
|
|
||||||
{
|
|
||||||
// If a connection had failed, need to remove it from _idle
|
|
||||||
// and _connections queues and start a new connection
|
|
||||||
_idle.remove(connection);
|
|
||||||
_connections.remove(connection);
|
|
||||||
|
|
||||||
if (!_queue.isEmpty() && _client.isStarted())
|
|
||||||
startNewConnection();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void send(HttpExchange ex) throws IOException
|
public void send(HttpExchange ex) throws IOException
|
||||||
{
|
{
|
||||||
LinkedList<String> listeners = _client.getRegisteredListeners();
|
LinkedList<String> listeners = _client.getRegisteredListeners();
|
||||||
|
@ -523,16 +489,7 @@ public class HttpDestination
|
||||||
HttpConnection connection = getIdleConnection();
|
HttpConnection connection = getIdleConnection();
|
||||||
if (connection != null)
|
if (connection != null)
|
||||||
{
|
{
|
||||||
synchronized (this)
|
send(connection, ex);
|
||||||
{
|
|
||||||
// Send could fail due to server closes the connection.
|
|
||||||
// put the exchange back to the idle queue and recycle the connection
|
|
||||||
if(!connection.send(ex))
|
|
||||||
{
|
|
||||||
_queue.add(ex);
|
|
||||||
recycleConnection(connection);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -547,6 +504,20 @@ public class HttpDestination
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void send(HttpConnection connection, HttpExchange exchange) throws IOException
|
||||||
|
{
|
||||||
|
synchronized (this)
|
||||||
|
{
|
||||||
|
// If server closes the connection, put the exchange back
|
||||||
|
// to the exchange queue and recycle the connection
|
||||||
|
if(!connection.send(exchange))
|
||||||
|
{
|
||||||
|
_queue.addFirst(exchange);
|
||||||
|
returnIdleConnection(connection);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized String toString()
|
public synchronized String toString()
|
||||||
{
|
{
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
|
|
||||||
package org.eclipse.jetty.client;
|
package org.eclipse.jetty.client;
|
||||||
|
|
||||||
|
import java.io.OutputStream;
|
||||||
import java.net.ServerSocket;
|
import java.net.ServerSocket;
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
@ -29,6 +30,62 @@ import static org.junit.Assert.assertTrue;
|
||||||
*/
|
*/
|
||||||
public class ConnectionTest
|
public class ConnectionTest
|
||||||
{
|
{
|
||||||
|
@Test
|
||||||
|
public void testServerClosedConnection() throws Exception
|
||||||
|
{
|
||||||
|
ServerSocket serverSocket = new ServerSocket();
|
||||||
|
serverSocket.bind(null);
|
||||||
|
int port=serverSocket.getLocalPort();
|
||||||
|
|
||||||
|
HttpClient httpClient = new HttpClient();
|
||||||
|
httpClient.setMaxConnectionsPerAddress(1);
|
||||||
|
httpClient.start();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
HttpExchange exchange = new ConnectionExchange(latch);
|
||||||
|
exchange.setAddress(new Address("localhost", port));
|
||||||
|
exchange.setURI("/");
|
||||||
|
httpClient.send(exchange);
|
||||||
|
|
||||||
|
Socket remote = serverSocket.accept();
|
||||||
|
OutputStream output = remote.getOutputStream();
|
||||||
|
output.write("HTTP/1.1 200 OK\r\n".getBytes("UTF-8"));
|
||||||
|
output.write("Content-Length: 0\r\n".getBytes("UTF-8"));
|
||||||
|
output.write("\r\n".getBytes("UTF-8"));
|
||||||
|
output.flush();
|
||||||
|
|
||||||
|
assertEquals(HttpExchange.STATUS_COMPLETED, exchange.waitForDone());
|
||||||
|
|
||||||
|
remote.close();
|
||||||
|
|
||||||
|
// Need to wait a bit to allow the client to detect
|
||||||
|
// that the server has closed the connection
|
||||||
|
Thread.sleep(500);
|
||||||
|
|
||||||
|
// The server has closed the connection and another attempt to send
|
||||||
|
// with the same connection would fail because the connection has been
|
||||||
|
// closed by the client as well.
|
||||||
|
// The client must open a new connection in this case, and we check
|
||||||
|
// that the new request completes correctly
|
||||||
|
exchange.reset();
|
||||||
|
httpClient.send(exchange);
|
||||||
|
|
||||||
|
remote = serverSocket.accept();
|
||||||
|
output = remote.getOutputStream();
|
||||||
|
output.write("HTTP/1.1 200 OK\r\n".getBytes("UTF-8"));
|
||||||
|
output.write("Content-Length: 0\r\n".getBytes("UTF-8"));
|
||||||
|
output.write("\r\n".getBytes("UTF-8"));
|
||||||
|
output.flush();
|
||||||
|
|
||||||
|
assertEquals(HttpExchange.STATUS_COMPLETED, exchange.waitForDone());
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
httpClient.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConnectionFailed() throws Exception
|
public void testConnectionFailed() throws Exception
|
||||||
{
|
{
|
||||||
|
@ -67,6 +124,50 @@ public class ConnectionTest
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipleConnectionsFailed() throws Exception
|
||||||
|
{
|
||||||
|
ServerSocket serverSocket = new ServerSocket();
|
||||||
|
serverSocket.bind(null);
|
||||||
|
int port=serverSocket.getLocalPort();
|
||||||
|
serverSocket.close();
|
||||||
|
|
||||||
|
HttpClient httpClient = new HttpClient();
|
||||||
|
httpClient.setMaxConnectionsPerAddress(1);
|
||||||
|
httpClient.start();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
HttpExchange[] exchanges = new HttpExchange[20];
|
||||||
|
final CountDownLatch latch = new CountDownLatch(exchanges.length);
|
||||||
|
for (int i = 0; i < exchanges.length; ++i)
|
||||||
|
{
|
||||||
|
HttpExchange exchange = new HttpExchange()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected void onConnectionFailed(Throwable x)
|
||||||
|
{
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
exchange.setAddress(new Address("localhost", port));
|
||||||
|
exchange.setURI("/");
|
||||||
|
exchanges[i] = exchange;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (HttpExchange exchange : exchanges)
|
||||||
|
httpClient.send(exchange);
|
||||||
|
|
||||||
|
for (HttpExchange exchange : exchanges)
|
||||||
|
assertEquals(HttpExchange.STATUS_EXCEPTED, exchange.waitForDone());
|
||||||
|
|
||||||
|
assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
httpClient.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConnectionTimeoutWithSocketConnector() throws Exception
|
public void testConnectionTimeoutWithSocketConnector() throws Exception
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue