jetty-9 added ServerSentEvent support
This commit is contained in:
parent
b132663c2b
commit
ac7fbaf62b
|
@ -0,0 +1,108 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.servlets;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* <p>{@link EventSource} is the passive half of an event source connection, as defined by the
|
||||
* <a href="http://www.w3.org/TR/eventsource/">EventSource Specification</a>.</p>
|
||||
* <p>{@link EventSource.Emitter} is the active half of the connection and allows to operate on the connection.</p>
|
||||
* <p>{@link EventSource} allows applications to be notified of events happening on the connection;
|
||||
* two events are being notified: the opening of the event source connection, where method
|
||||
* {@link EventSource#onOpen(Emitter)} is invoked, and the closing of the event source connection,
|
||||
* where method {@link EventSource#onClose()} is invoked.</p>
|
||||
*
|
||||
* @see EventSourceServlet
|
||||
*/
|
||||
public interface EventSource
|
||||
{
|
||||
/**
|
||||
* <p>Callback method invoked when an event source connection is opened.</p>
|
||||
*
|
||||
* @param emitter the {@link Emitter} instance that allows to operate on the connection
|
||||
* @throws IOException if the implementation of the method throws such exception
|
||||
*/
|
||||
public void onOpen(Emitter emitter) throws IOException;
|
||||
|
||||
/**
|
||||
* <p>Callback method invoked when an event source connection is closed.</p>
|
||||
*/
|
||||
public void onClose();
|
||||
|
||||
/**
|
||||
* <p>{@link Emitter} is the active half of an event source connection, and allows applications
|
||||
* to operate on the connection by sending events, data or comments, or by closing the connection.</p>
|
||||
* <p>An {@link Emitter} instance will be created for each new event source connection.</p>
|
||||
* <p>{@link Emitter} instances are fully thread safe and can be used from multiple threads.</p>
|
||||
*/
|
||||
public interface Emitter
|
||||
{
|
||||
/**
|
||||
* <p>Sends a named event with data to the client.</p>
|
||||
* <p>When invoked as: <code>event("foo", "bar")</code>, the client will receive the lines:</p>
|
||||
* <pre>
|
||||
* event: foo
|
||||
* data: bar
|
||||
* </pre>
|
||||
*
|
||||
* @param name the event name
|
||||
* @param data the data to be sent
|
||||
* @throws IOException if an I/O failure occurred
|
||||
* @see #data(String)
|
||||
*/
|
||||
public void event(String name, String data) throws IOException;
|
||||
|
||||
/**
|
||||
* <p>Sends a default event with data to the client.</p>
|
||||
* <p>When invoked as: <code>data("baz")</code>, the client will receive the line:</p>
|
||||
* <pre>
|
||||
* data: baz
|
||||
* </pre>
|
||||
* <p>When invoked as: <code>data("foo\r\nbar\rbaz\nbax")</code>, the client will receive the lines:</p>
|
||||
* <pre>
|
||||
* data: foo
|
||||
* data: bar
|
||||
* data: baz
|
||||
* data: bax
|
||||
* </pre>
|
||||
*
|
||||
* @param data the data to be sent
|
||||
* @throws IOException if an I/O failure occurred
|
||||
*/
|
||||
public void data(String data) throws IOException;
|
||||
|
||||
/**
|
||||
* <p>Sends a comment to the client.</p>
|
||||
* <p>When invoked as: <code>comment("foo")</code>, the client will receive the line:</p>
|
||||
* <pre>
|
||||
* : foo
|
||||
* </pre>
|
||||
*
|
||||
* @param comment the comment to send
|
||||
* @throws IOException if an I/O failure occurred
|
||||
*/
|
||||
public void comment(String comment) throws IOException;
|
||||
|
||||
/**
|
||||
* <p>Closes this event source connection.</p>
|
||||
*/
|
||||
public void close();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,255 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
|
||||
package org.eclipse.jetty.servlets;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.StringReader;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.Enumeration;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.servlet.AsyncContext;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.ServletOutputStream;
|
||||
import javax.servlet.http.HttpServlet;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
|
||||
/**
|
||||
* <p>A servlet that implements the <a href="http://www.w3.org/TR/eventsource/">event source protocol</a>,
|
||||
* also known as "server sent events".</p>
|
||||
* <p>This servlet must be subclassed to implement abstract method {@link #newEventSource(HttpServletRequest)}
|
||||
* to return an instance of {@link EventSource} that allows application to listen for event source events
|
||||
* and to emit event source events.</p>
|
||||
* <p>This servlet supports the following configuration parameters:</p>
|
||||
* <ul>
|
||||
* <li><code>heartBeatPeriod</code>, that specifies the heartbeat period, in seconds, used to check
|
||||
* whether the connection has been closed by the client; defaults to 10 seconds.</li>
|
||||
* </ul>
|
||||
*
|
||||
* <p>NOTE: there is currently no support for <code>last-event-id</code>.</p>
|
||||
*/
|
||||
public abstract class EventSourceServlet extends HttpServlet
|
||||
{
|
||||
private static final Charset UTF_8 = Charset.forName("UTF-8");
|
||||
private static final byte[] CRLF = new byte[]{'\r', '\n'};
|
||||
private static final byte[] EVENT_FIELD;
|
||||
private static final byte[] DATA_FIELD;
|
||||
private static final byte[] COMMENT_FIELD;
|
||||
static
|
||||
{
|
||||
try
|
||||
{
|
||||
EVENT_FIELD = "event: ".getBytes(UTF_8.name());
|
||||
DATA_FIELD = "data: ".getBytes(UTF_8.name());
|
||||
COMMENT_FIELD = ": ".getBytes(UTF_8.name());
|
||||
}
|
||||
catch (UnsupportedEncodingException x)
|
||||
{
|
||||
throw new RuntimeException(x);
|
||||
}
|
||||
}
|
||||
|
||||
private ScheduledExecutorService scheduler;
|
||||
private int heartBeatPeriod = 10;
|
||||
|
||||
@Override
|
||||
public void init() throws ServletException
|
||||
{
|
||||
String heartBeatPeriodParam = getServletConfig().getInitParameter("heartBeatPeriod");
|
||||
if (heartBeatPeriodParam != null)
|
||||
heartBeatPeriod = Integer.parseInt(heartBeatPeriodParam);
|
||||
scheduler = Executors.newSingleThreadScheduledExecutor();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy()
|
||||
{
|
||||
if (scheduler != null)
|
||||
scheduler.shutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
|
||||
{
|
||||
@SuppressWarnings("unchecked")
|
||||
Enumeration<String> acceptValues = request.getHeaders("Accept");
|
||||
while (acceptValues.hasMoreElements())
|
||||
{
|
||||
String accept = acceptValues.nextElement();
|
||||
if (accept.equals("text/event-stream"))
|
||||
{
|
||||
EventSource eventSource = newEventSource(request);
|
||||
if (eventSource == null)
|
||||
{
|
||||
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
|
||||
}
|
||||
else
|
||||
{
|
||||
respond(request, response);
|
||||
AsyncContext async = request.startAsync();
|
||||
// Infinite timeout because the continuation is never resumed,
|
||||
// but only completed on close
|
||||
async.setTimeout(0);
|
||||
EventSourceEmitter emitter = new EventSourceEmitter(eventSource, async);
|
||||
emitter.scheduleHeartBeat();
|
||||
open(eventSource, emitter);
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
super.doGet(request, response);
|
||||
}
|
||||
|
||||
protected abstract EventSource newEventSource(HttpServletRequest request);
|
||||
|
||||
protected void respond(HttpServletRequest request, HttpServletResponse response) throws IOException
|
||||
{
|
||||
response.setStatus(HttpServletResponse.SC_OK);
|
||||
response.setCharacterEncoding(UTF_8.name());
|
||||
response.setContentType("text/event-stream");
|
||||
// By adding this header, and not closing the connection,
|
||||
// we disable HTTP chunking, and we can use write()+flush()
|
||||
// to send data in the text/event-stream protocol
|
||||
response.addHeader("Connection", "close");
|
||||
response.flushBuffer();
|
||||
}
|
||||
|
||||
protected void open(EventSource eventSource, EventSource.Emitter emitter) throws IOException
|
||||
{
|
||||
eventSource.onOpen(emitter);
|
||||
}
|
||||
|
||||
protected class EventSourceEmitter implements EventSource.Emitter, Runnable
|
||||
{
|
||||
private final EventSource eventSource;
|
||||
private final AsyncContext async;
|
||||
private final ServletOutputStream output;
|
||||
private Future<?> heartBeat;
|
||||
private boolean closed;
|
||||
|
||||
public EventSourceEmitter(EventSource eventSource, AsyncContext async) throws IOException
|
||||
{
|
||||
this.eventSource = eventSource;
|
||||
this.async = async;
|
||||
this.output = async.getResponse().getOutputStream();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void event(String name, String data) throws IOException
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
output.write(EVENT_FIELD);
|
||||
output.write(name.getBytes(UTF_8.name()));
|
||||
output.write(CRLF);
|
||||
data(data);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void data(String data) throws IOException
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
BufferedReader reader = new BufferedReader(new StringReader(data));
|
||||
String line;
|
||||
while ((line = reader.readLine()) != null)
|
||||
{
|
||||
output.write(DATA_FIELD);
|
||||
output.write(line.getBytes(UTF_8.name()));
|
||||
output.write(CRLF);
|
||||
}
|
||||
output.write(CRLF);
|
||||
flush();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void comment(String comment) throws IOException
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
output.write(COMMENT_FIELD);
|
||||
output.write(comment.getBytes(UTF_8.name()));
|
||||
output.write(CRLF);
|
||||
output.write(CRLF);
|
||||
flush();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
// If the other peer closes the connection, the first
|
||||
// flush() should generate a TCP reset that is detected
|
||||
// on the second flush()
|
||||
try
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
output.write('\r');
|
||||
flush();
|
||||
output.write('\n');
|
||||
flush();
|
||||
}
|
||||
// We could write, reschedule heartbeat
|
||||
scheduleHeartBeat();
|
||||
}
|
||||
catch (IOException x)
|
||||
{
|
||||
// The other peer closed the connection
|
||||
close();
|
||||
eventSource.onClose();
|
||||
}
|
||||
}
|
||||
|
||||
protected void flush() throws IOException
|
||||
{
|
||||
async.getResponse().flushBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
closed = true;
|
||||
heartBeat.cancel(false);
|
||||
}
|
||||
async.complete();
|
||||
}
|
||||
|
||||
private void scheduleHeartBeat()
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
if (!closed)
|
||||
heartBeat = scheduler.schedule(this, heartBeatPeriod, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,348 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.servlets;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStream;
|
||||
import java.net.Socket;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
|
||||
import org.eclipse.jetty.server.NetworkConnector;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||
import org.eclipse.jetty.servlet.ServletHolder;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class EventSourceServletTest
|
||||
{
|
||||
private Server server;
|
||||
private NetworkConnector connector;
|
||||
private ServletContextHandler context;
|
||||
|
||||
@Before
|
||||
public void startServer() throws Exception
|
||||
{
|
||||
server = new Server(0);
|
||||
connector = (NetworkConnector)server.getConnectors()[0];
|
||||
|
||||
String contextPath = "/test";
|
||||
context = new ServletContextHandler(server, contextPath, ServletContextHandler.SESSIONS);
|
||||
server.start();
|
||||
}
|
||||
|
||||
@After
|
||||
public void stopServer() throws Exception
|
||||
{
|
||||
if (server != null)
|
||||
server.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBasicFunctionality() throws Exception
|
||||
{
|
||||
final AtomicReference<EventSource.Emitter> emitterRef = new AtomicReference<EventSource.Emitter>();
|
||||
final CountDownLatch emitterLatch = new CountDownLatch(1);
|
||||
final CountDownLatch closeLatch = new CountDownLatch(1);
|
||||
class S extends EventSourceServlet
|
||||
{
|
||||
@Override
|
||||
protected EventSource newEventSource(HttpServletRequest request)
|
||||
{
|
||||
return new EventSource()
|
||||
{
|
||||
public void onOpen(Emitter emitter) throws IOException
|
||||
{
|
||||
emitterRef.set(emitter);
|
||||
emitterLatch.countDown();
|
||||
}
|
||||
|
||||
public void onClose()
|
||||
{
|
||||
closeLatch.countDown();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
String servletPath = "/eventsource";
|
||||
ServletHolder servletHolder = new ServletHolder(new S());
|
||||
int heartBeatPeriod = 2;
|
||||
servletHolder.setInitParameter("heartBeatPeriod", String.valueOf(heartBeatPeriod));
|
||||
context.addServlet(servletHolder, servletPath);
|
||||
|
||||
Socket socket = new Socket("localhost", connector.getLocalPort());
|
||||
writeHTTPRequest(socket, servletPath);
|
||||
BufferedReader reader = readAndDiscardHTTPResponse(socket);
|
||||
|
||||
Assert.assertTrue(emitterLatch.await(1, TimeUnit.SECONDS));
|
||||
EventSource.Emitter emitter = emitterRef.get();
|
||||
Assert.assertNotNull(emitter);
|
||||
|
||||
String data = "foo";
|
||||
emitter.data(data);
|
||||
|
||||
String line = reader.readLine();
|
||||
String received = "";
|
||||
while (line != null)
|
||||
{
|
||||
received += line;
|
||||
if (line.length() == 0)
|
||||
break;
|
||||
line = reader.readLine();
|
||||
}
|
||||
|
||||
Assert.assertEquals("data: " + data, received);
|
||||
|
||||
socket.close();
|
||||
Assert.assertTrue(closeLatch.await(heartBeatPeriod * 2, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testServerSideClose() throws Exception
|
||||
{
|
||||
final AtomicReference<EventSource.Emitter> emitterRef = new AtomicReference<EventSource.Emitter>();
|
||||
final CountDownLatch emitterLatch = new CountDownLatch(1);
|
||||
class S extends EventSourceServlet
|
||||
{
|
||||
@Override
|
||||
protected EventSource newEventSource(HttpServletRequest request)
|
||||
{
|
||||
return new EventSource()
|
||||
{
|
||||
public void onOpen(Emitter emitter) throws IOException
|
||||
{
|
||||
emitterRef.set(emitter);
|
||||
emitterLatch.countDown();
|
||||
}
|
||||
|
||||
public void onClose()
|
||||
{
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
String servletPath = "/eventsource";
|
||||
context.addServlet(new ServletHolder(new S()), servletPath);
|
||||
|
||||
Socket socket = new Socket("localhost", connector.getLocalPort());
|
||||
writeHTTPRequest(socket, servletPath);
|
||||
BufferedReader reader = readAndDiscardHTTPResponse(socket);
|
||||
|
||||
Assert.assertTrue(emitterLatch.await(1, TimeUnit.SECONDS));
|
||||
EventSource.Emitter emitter = emitterRef.get();
|
||||
Assert.assertNotNull(emitter);
|
||||
|
||||
String comment = "foo";
|
||||
emitter.comment(comment);
|
||||
|
||||
String line = reader.readLine();
|
||||
String received = "";
|
||||
while (line != null)
|
||||
{
|
||||
received += line;
|
||||
if (line.length() == 0)
|
||||
break;
|
||||
line = reader.readLine();
|
||||
}
|
||||
|
||||
Assert.assertEquals(": " + comment, received);
|
||||
|
||||
emitter.close();
|
||||
|
||||
line = reader.readLine();
|
||||
Assert.assertNull(line);
|
||||
|
||||
socket.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEncoding() throws Exception
|
||||
{
|
||||
// The EURO symbol
|
||||
final String data = "\u20AC";
|
||||
class S extends EventSourceServlet
|
||||
{
|
||||
@Override
|
||||
protected EventSource newEventSource(HttpServletRequest request)
|
||||
{
|
||||
return new EventSource()
|
||||
{
|
||||
public void onOpen(Emitter emitter) throws IOException
|
||||
{
|
||||
emitter.data(data);
|
||||
}
|
||||
|
||||
public void onClose()
|
||||
{
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
String servletPath = "/eventsource";
|
||||
context.addServlet(new ServletHolder(new S()), servletPath);
|
||||
|
||||
Socket socket = new Socket("localhost", connector.getLocalPort());
|
||||
writeHTTPRequest(socket, servletPath);
|
||||
BufferedReader reader = readAndDiscardHTTPResponse(socket);
|
||||
|
||||
String line = reader.readLine();
|
||||
String received = "";
|
||||
while (line != null)
|
||||
{
|
||||
received += line;
|
||||
if (line.length() == 0)
|
||||
break;
|
||||
line = reader.readLine();
|
||||
}
|
||||
|
||||
Assert.assertEquals("data: " + data, received);
|
||||
|
||||
socket.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiLineData() throws Exception
|
||||
{
|
||||
String data1 = "data1";
|
||||
String data2 = "data2";
|
||||
String data3 = "data3";
|
||||
String data4 = "data4";
|
||||
final String data = data1 + "\r\n" + data2 + "\r" + data3 + "\n" + data4;
|
||||
class S extends EventSourceServlet
|
||||
{
|
||||
@Override
|
||||
protected EventSource newEventSource(HttpServletRequest request)
|
||||
{
|
||||
return new EventSource()
|
||||
{
|
||||
public void onOpen(Emitter emitter) throws IOException
|
||||
{
|
||||
emitter.data(data);
|
||||
}
|
||||
|
||||
public void onClose()
|
||||
{
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
String servletPath = "/eventsource";
|
||||
context.addServlet(new ServletHolder(new S()), servletPath);
|
||||
|
||||
Socket socket = new Socket("localhost", connector.getLocalPort());
|
||||
writeHTTPRequest(socket, servletPath);
|
||||
BufferedReader reader = readAndDiscardHTTPResponse(socket);
|
||||
|
||||
String line1 = reader.readLine();
|
||||
Assert.assertEquals("data: " + data1, line1);
|
||||
String line2 = reader.readLine();
|
||||
Assert.assertEquals("data: " + data2, line2);
|
||||
String line3 = reader.readLine();
|
||||
Assert.assertEquals("data: " + data3, line3);
|
||||
String line4 = reader.readLine();
|
||||
Assert.assertEquals("data: " + data4, line4);
|
||||
String line5 = reader.readLine();
|
||||
Assert.assertEquals(0, line5.length());
|
||||
|
||||
socket.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEvents() throws Exception
|
||||
{
|
||||
final String name = "event1";
|
||||
final String data = "data2";
|
||||
class S extends EventSourceServlet
|
||||
{
|
||||
@Override
|
||||
protected EventSource newEventSource(HttpServletRequest request)
|
||||
{
|
||||
return new EventSource()
|
||||
{
|
||||
public void onOpen(Emitter emitter) throws IOException
|
||||
{
|
||||
emitter.event(name, data);
|
||||
}
|
||||
|
||||
public void onClose()
|
||||
{
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
String servletPath = "/eventsource";
|
||||
context.addServlet(new ServletHolder(new S()), servletPath);
|
||||
|
||||
Socket socket = new Socket("localhost", connector.getLocalPort());
|
||||
writeHTTPRequest(socket, servletPath);
|
||||
BufferedReader reader = readAndDiscardHTTPResponse(socket);
|
||||
|
||||
String line1 = reader.readLine();
|
||||
Assert.assertEquals("event: " + name, line1);
|
||||
String line2 = reader.readLine();
|
||||
Assert.assertEquals("data: " + data, line2);
|
||||
String line3 = reader.readLine();
|
||||
Assert.assertEquals(0, line3.length());
|
||||
|
||||
socket.close();
|
||||
}
|
||||
|
||||
private void writeHTTPRequest(Socket socket, String servletPath) throws IOException
|
||||
{
|
||||
int serverPort = socket.getPort();
|
||||
OutputStream output = socket.getOutputStream();
|
||||
|
||||
String handshake = "";
|
||||
handshake += "GET " + context.getContextPath() + servletPath + " HTTP/1.1\r\n";
|
||||
handshake += "Host: localhost:" + serverPort + "\r\n";
|
||||
handshake += "Accept: text/event-stream\r\n";
|
||||
handshake += "\r\n";
|
||||
output.write(handshake.getBytes("UTF-8"));
|
||||
output.flush();
|
||||
}
|
||||
|
||||
private BufferedReader readAndDiscardHTTPResponse(Socket socket) throws IOException
|
||||
{
|
||||
// Read and discard the HTTP response
|
||||
InputStream input = socket.getInputStream();
|
||||
BufferedReader reader = new BufferedReader(new InputStreamReader(input));
|
||||
String line = reader.readLine();
|
||||
while (line != null)
|
||||
{
|
||||
if (line.length() == 0)
|
||||
break;
|
||||
line = reader.readLine();
|
||||
}
|
||||
// Now we can parse the event-source stream
|
||||
return reader;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue