only handle early EOF if exchange is not done
This commit is contained in:
parent
4e79ea0dca
commit
8cfa671253
|
@ -346,10 +346,13 @@ public abstract class AbstractHttpConnection extends AbstractConnection implemen
|
||||||
HttpExchange exchange = _exchange;
|
HttpExchange exchange = _exchange;
|
||||||
if (exchange!=null)
|
if (exchange!=null)
|
||||||
{
|
{
|
||||||
exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
|
if (!exchange.isDone())
|
||||||
|
{
|
||||||
|
if (exchange.setStatus(HttpExchange.STATUS_EXCEPTED))
|
||||||
exchange.getEventListener().onException(new EofException("early EOF"));
|
exchange.getEventListener().onException(new EofException("early EOF"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -389,7 +392,7 @@ public abstract class AbstractHttpConnection extends AbstractConnection implemen
|
||||||
default:
|
default:
|
||||||
String exch= exchange.toString();
|
String exch= exchange.toString();
|
||||||
String reason = _endp.isOpen()?(_endp.isInputShutdown()?"half closed: ":"local close: "):"closed: ";
|
String reason = _endp.isOpen()?(_endp.isInputShutdown()?"half closed: ":"local close: "):"closed: ";
|
||||||
exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
|
if (exchange.setStatus(HttpExchange.STATUS_EXCEPTED))
|
||||||
exchange.getEventListener().onException(new EofException(reason+exch));
|
exchange.getEventListener().onException(new EofException(reason+exch));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -161,7 +161,7 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async
|
||||||
exchange.getStatus() != HttpExchange.STATUS_CANCELLED &&
|
exchange.getStatus() != HttpExchange.STATUS_CANCELLED &&
|
||||||
!exchange.isDone())
|
!exchange.isDone())
|
||||||
{
|
{
|
||||||
exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
|
if (exchange.setStatus(HttpExchange.STATUS_EXCEPTED))
|
||||||
exchange.getEventListener().onException(e);
|
exchange.getEventListener().onException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -162,7 +162,7 @@ public class BlockingHttpConnection extends AbstractHttpConnection
|
||||||
exchange.getStatus() != HttpExchange.STATUS_CANCELLED &&
|
exchange.getStatus() != HttpExchange.STATUS_CANCELLED &&
|
||||||
!exchange.isDone())
|
!exchange.isDone())
|
||||||
{
|
{
|
||||||
exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
|
if(exchange.setStatus(HttpExchange.STATUS_EXCEPTED))
|
||||||
exchange.getEventListener().onException(e);
|
exchange.getEventListener().onException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -294,7 +294,7 @@ public class HttpDestination implements Dumpable
|
||||||
else if (_queue.size() > 0)
|
else if (_queue.size() > 0)
|
||||||
{
|
{
|
||||||
HttpExchange ex = _queue.remove(0);
|
HttpExchange ex = _queue.remove(0);
|
||||||
ex.setStatus(HttpExchange.STATUS_EXCEPTED);
|
if (ex.setStatus(HttpExchange.STATUS_EXCEPTED))
|
||||||
ex.getEventListener().onConnectionFailed(throwable);
|
ex.getEventListener().onConnectionFailed(throwable);
|
||||||
|
|
||||||
// Since an existing connection had failed, we need to create a
|
// Since an existing connection had failed, we need to create a
|
||||||
|
@ -328,7 +328,7 @@ public class HttpDestination implements Dumpable
|
||||||
if (_queue.size() > 0)
|
if (_queue.size() > 0)
|
||||||
{
|
{
|
||||||
HttpExchange ex = _queue.remove(0);
|
HttpExchange ex = _queue.remove(0);
|
||||||
ex.setStatus(HttpExchange.STATUS_EXCEPTED);
|
if(ex.setStatus(HttpExchange.STATUS_EXCEPTED))
|
||||||
ex.getEventListener().onException(throwable);
|
ex.getEventListener().onException(throwable);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -428,7 +428,6 @@ public class HttpDestination implements Dumpable
|
||||||
synchronized (this)
|
synchronized (this)
|
||||||
{
|
{
|
||||||
_connections.remove(connection);
|
_connections.remove(connection);
|
||||||
System.err.println("Q "+_queue);
|
|
||||||
if (!_queue.isEmpty())
|
if (!_queue.isEmpty())
|
||||||
startConnection = true;
|
startConnection = true;
|
||||||
}
|
}
|
||||||
|
@ -717,7 +716,7 @@ public class HttpDestination implements Dumpable
|
||||||
protected void onException(Throwable x)
|
protected void onException(Throwable x)
|
||||||
{
|
{
|
||||||
_queue.remove(exchange);
|
_queue.remove(exchange);
|
||||||
exchange.setStatus(STATUS_EXCEPTED);
|
if (exchange.setStatus(STATUS_EXCEPTED))
|
||||||
exchange.getEventListener().onException(x);
|
exchange.getEventListener().onException(x);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -725,7 +724,7 @@ public class HttpDestination implements Dumpable
|
||||||
protected void onExpire()
|
protected void onExpire()
|
||||||
{
|
{
|
||||||
_queue.remove(exchange);
|
_queue.remove(exchange);
|
||||||
exchange.setStatus(STATUS_EXPIRED);
|
if (exchange.setStatus(STATUS_EXPIRED))
|
||||||
exchange.getEventListener().onExpire();
|
exchange.getEventListener().onExpire();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -181,12 +181,18 @@ public class HttpExchange
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void setStatus(int newStatus)
|
/* ------------------------------------------------------------ */
|
||||||
|
/**
|
||||||
|
* @param newStatus
|
||||||
|
* @return True if the status was actually set.
|
||||||
|
*/
|
||||||
|
boolean setStatus(int newStatus)
|
||||||
{
|
{
|
||||||
|
boolean set = false;
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
int oldStatus = _status.get();
|
int oldStatus = _status.get();
|
||||||
boolean set = false;
|
boolean ignored = false;
|
||||||
if (oldStatus != newStatus)
|
if (oldStatus != newStatus)
|
||||||
{
|
{
|
||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
|
@ -313,7 +319,7 @@ public class HttpExchange
|
||||||
case STATUS_CANCELLING:
|
case STATUS_CANCELLING:
|
||||||
case STATUS_EXPIRED:
|
case STATUS_EXPIRED:
|
||||||
// Don't change the status, it's too late
|
// Don't change the status, it's too late
|
||||||
set = true;
|
ignored = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -327,7 +333,7 @@ public class HttpExchange
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
// Ignore other statuses, we're cancelling
|
// Ignore other statuses, we're cancelling
|
||||||
set = true;
|
ignored = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -341,12 +347,12 @@ public class HttpExchange
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case STATUS_COMPLETED:
|
case STATUS_COMPLETED:
|
||||||
set = true;
|
ignored = true;
|
||||||
done();
|
done();
|
||||||
break;
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
set = true;
|
ignored = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -355,7 +361,7 @@ public class HttpExchange
|
||||||
throw new AssertionError(oldStatus + " => " + newStatus);
|
throw new AssertionError(oldStatus + " => " + newStatus);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!set)
|
if (!set && !ignored)
|
||||||
throw new IllegalStateException(toState(oldStatus) + " => " + toState(newStatus));
|
throw new IllegalStateException(toState(oldStatus) + " => " + toState(newStatus));
|
||||||
LOG.debug("setStatus {} {}",newStatus,this);
|
LOG.debug("setStatus {} {}",newStatus,this);
|
||||||
}
|
}
|
||||||
|
@ -363,6 +369,7 @@ public class HttpExchange
|
||||||
{
|
{
|
||||||
LOG.warn(x);
|
LOG.warn(x);
|
||||||
}
|
}
|
||||||
|
return set;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean setStatusExpired(int newStatus, int oldStatus)
|
private boolean setStatusExpired(int newStatus, int oldStatus)
|
||||||
|
|
|
@ -76,13 +76,13 @@ public abstract class AbstractHttpExchangeCancelTest
|
||||||
TestHttpExchange exchange = new TestHttpExchange()
|
TestHttpExchange exchange = new TestHttpExchange()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
void setStatus(int status)
|
boolean setStatus(int status)
|
||||||
{
|
{
|
||||||
// Cancel before setting the new status
|
// Cancel before setting the new status
|
||||||
if (getStatus() == HttpExchange.STATUS_START &&
|
if (getStatus() == HttpExchange.STATUS_START &&
|
||||||
status == STATUS_WAITING_FOR_CONNECTION)
|
status == STATUS_WAITING_FOR_CONNECTION)
|
||||||
cancel();
|
cancel();
|
||||||
super.setStatus(status);
|
return super.setStatus(status);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
exchange.setAddress(newAddress());
|
exchange.setAddress(newAddress());
|
||||||
|
@ -113,14 +113,15 @@ public abstract class AbstractHttpExchangeCancelTest
|
||||||
TestHttpExchange exchange = new TestHttpExchange()
|
TestHttpExchange exchange = new TestHttpExchange()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
void setStatus(int status)
|
boolean setStatus(int status)
|
||||||
{
|
{
|
||||||
// Cancel after setting the new status
|
// Cancel after setting the new status
|
||||||
int oldStatus = getStatus();
|
int oldStatus = getStatus();
|
||||||
super.setStatus(status);
|
boolean set = super.setStatus(status);
|
||||||
if (oldStatus == STATUS_START &&
|
if (oldStatus == STATUS_START &&
|
||||||
getStatus() == HttpExchange.STATUS_WAITING_FOR_CONNECTION)
|
getStatus() == HttpExchange.STATUS_WAITING_FOR_CONNECTION)
|
||||||
cancel();
|
cancel();
|
||||||
|
return set;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
exchange.setAddress(newAddress());
|
exchange.setAddress(newAddress());
|
||||||
|
|
|
@ -51,4 +51,10 @@ public class NonBlockingHttpExchangeCancelTest extends AbstractHttpExchangeCance
|
||||||
{
|
{
|
||||||
return httpClient;
|
return httpClient;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* ------------------------------------------------------------ */
|
||||||
|
public void testHttpExchangeCancelOnRequestComplete() throws Exception
|
||||||
|
{
|
||||||
|
super.testHttpExchangeCancelOnRequestComplete();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue