Implemented notification of session failure events.

This commit is contained in:
Simone Bordet 2015-02-13 13:13:54 +01:00
parent 9677988960
commit ff7e0e626a
4 changed files with 160 additions and 8 deletions

View File

@ -0,0 +1,123 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.junit.Assert;
import org.junit.Test;
public class SessionFailureTest extends AbstractTest
{
@Test
public void testWrongPreface() throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
start(new ServerSessionListener.Adapter()
{
@Override
public void onFailure(Session session, Throwable failure)
{
latch.countDown();
}
});
try (Socket socket = new Socket("localhost", connector.getLocalPort()))
{
// Preface starts with byte 0x50, send something different.
OutputStream output = socket.getOutputStream();
output.write(0x0);
output.flush();
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
// The server will reply with a GOAWAY frame, and then shutdown.
// Read until EOF.
socket.setSoTimeout(1000);
InputStream input = socket.getInputStream();
while (true)
{
if (input.read() < 0)
break;
}
}
}
@Test
public void testWriteFailure() throws Exception
{
final CountDownLatch writeLatch = new CountDownLatch(1);
final CountDownLatch serverFailureLatch = new CountDownLatch(1);
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
// Forcibly close the connection.
((HTTP2Session)stream.getSession()).getEndPoint().close();
// Now try to write something: it should fail.
stream.headers(frame, new Callback.Adapter()
{
@Override
public void failed(Throwable x)
{
writeLatch.countDown();
}
});
return null;
}
@Override
public void onFailure(Session session, Throwable failure)
{
serverFailureLatch.countDown();
}
});
final CountDownLatch clientFailureLatch = new CountDownLatch(1);
Session session = newClient(new Session.Listener.Adapter()
{
@Override
public void onFailure(Session session, Throwable failure)
{
clientFailureLatch.countDown();
}
});
HeadersFrame frame = new HeadersFrame(0, newRequest("GET", new HttpFields()), null, true);
Promise<Stream> promise = new Promise.Adapter<>();
session.newStream(frame, promise, null);
Assert.assertTrue(writeLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(serverFailureLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(clientFailureLatch.await(5, TimeUnit.SECONDS));
Assert.assertFalse(((HTTP2Session)session).getEndPoint().isOpen());
}
}

View File

@ -231,7 +231,7 @@ public class HTTP2Flusher extends IteratingCallback
if (actives.isEmpty())
{
if (isClosed())
terminate(new ClosedChannelException());
abort(new ClosedChannelException());
if (LOG.isDebugEnabled())
LOG.debug("Flushed {}", session);
@ -291,7 +291,7 @@ public class HTTP2Flusher extends IteratingCallback
protected void onCompleteFailure(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug(x);
LOG.debug("Failed", x);
lease.recycle();
@ -307,10 +307,10 @@ public class HTTP2Flusher extends IteratingCallback
entry.failed(x);
}
terminate(x);
abort(x);
}
private void terminate(Throwable x)
private void abort(Throwable x)
{
Queue<Entry> queued;
synchronized (this)
@ -320,10 +320,12 @@ public class HTTP2Flusher extends IteratingCallback
}
if (LOG.isDebugEnabled())
LOG.debug("Terminating, queued={}", queued.size());
LOG.debug("Aborting, queued={}", queued.size());
for (Entry entry : queued)
closed(entry, x);
session.abort(x);
}
private void closed(Entry entry, Throwable failure)

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.http2;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
@ -26,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -394,6 +397,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
public void onConnectionFailure(int error, String reason)
{
close(error, reason, Callback.Adapter.INSTANCE);
notifyFailure(this, new IOException(String.format("%d/%s", error, reason)));
}
@Override
@ -754,7 +758,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
// The other peer did not send a GO_AWAY, no need to be gentle.
if (LOG.isDebugEnabled())
LOG.debug("Abrupt close for {}", this);
terminate();
abort(new ClosedChannelException());
break;
}
case LOCALLY_CLOSED:
@ -811,7 +815,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
case LOCALLY_CLOSED:
case REMOTELY_CLOSED:
{
terminate();
abort(new TimeoutException());
break;
}
default:
@ -856,6 +860,12 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
}
protected void abort(Throwable failure)
{
terminate();
notifyFailure(this, failure);
}
public boolean isDisconnected()
{
return !endPoint.isOpen();
@ -927,6 +937,18 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
}
protected void notifyFailure(Session session, Throwable failure)
{
try
{
listener.onFailure(session, failure);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
}
}
@Override
public String toString()
{

View File

@ -179,7 +179,12 @@ public interface Session
*/
public void onClose(Session session, GoAwayFrame frame);
// TODO: how come this is not called ???
/**
* <p>Callback method invoked when a failure has been detected for this session.</p>
*
* @param session the session
* @param failure the failure
*/
public void onFailure(Session session, Throwable failure);
/**