Improved connection/destination close, so that a connection can
always notify its destination that it has been closed.
This commit is contained in:
parent
c705bb9480
commit
8d9fd6ad71
|
@ -218,6 +218,10 @@ public abstract class HttpDestination implements Destination, Closeable, Dumpabl
|
||||||
LOG.debug("Closed {}", this);
|
LOG.debug("Closed {}", this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void close(Connection connection)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Aborts all the {@link HttpExchange}s queued in this destination.
|
* Aborts all the {@link HttpExchange}s queued in this destination.
|
||||||
*
|
*
|
||||||
|
|
|
@ -126,6 +126,19 @@ public abstract class MultiplexHttpDestination<C extends Connection> extends Htt
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close(Connection connection)
|
||||||
|
{
|
||||||
|
super.close(connection);
|
||||||
|
assert this.connection == connection;
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
ConnectState current = connect.get();
|
||||||
|
if (connect.compareAndSet(current, ConnectState.DISCONNECTED))
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
protected abstract void send(C connection, HttpExchange exchange);
|
protected abstract void send(C connection, HttpExchange exchange);
|
||||||
|
|
||||||
private enum ConnectState
|
private enum ConnectState
|
||||||
|
|
|
@ -96,7 +96,7 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
|
||||||
if (exchange != null)
|
if (exchange != null)
|
||||||
return exchange.getRequest().abort(new TimeoutException());
|
return exchange.getRequest().abort(new TimeoutException());
|
||||||
|
|
||||||
getHttpDestination().remove(this);
|
getHttpDestination().close(this);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -126,7 +126,7 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
|
||||||
@Override
|
@Override
|
||||||
public void close()
|
public void close()
|
||||||
{
|
{
|
||||||
getHttpDestination().remove(this);
|
getHttpDestination().close(this);
|
||||||
getEndPoint().shutdownOutput();
|
getEndPoint().shutdownOutput();
|
||||||
LOG.debug("{} oshut", this);
|
LOG.debug("{} oshut", this);
|
||||||
getEndPoint().close();
|
getEndPoint().close();
|
||||||
|
|
|
@ -155,23 +155,25 @@ public class HttpDestinationOverHTTP extends HttpDestination implements Promise<
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
LOG.debug("{} is stopped", client);
|
LOG.debug("{} is stopped", client);
|
||||||
remove(connection);
|
close(connection);
|
||||||
connection.close();
|
connection.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void remove(HttpConnectionOverHTTP connection)
|
@Override
|
||||||
|
public void close(Connection oldConnection)
|
||||||
{
|
{
|
||||||
connectionPool.remove(connection);
|
super.close(oldConnection);
|
||||||
|
connectionPool.remove(oldConnection);
|
||||||
|
|
||||||
// We need to execute queued requests even if this connection failed.
|
// We need to execute queued requests even if this connection failed.
|
||||||
// We may create a connection that is not needed, but it will eventually
|
// We may create a connection that is not needed, but it will eventually
|
||||||
// idle timeout, so no worries
|
// idle timeout, so no worries
|
||||||
if (!getHttpExchanges().isEmpty())
|
if (!getHttpExchanges().isEmpty())
|
||||||
{
|
{
|
||||||
connection = acquire();
|
HttpConnectionOverHTTP newConnection = acquire();
|
||||||
if (connection != null)
|
if (newConnection != null)
|
||||||
process(connection, false);
|
process(newConnection, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue