Server-Sent Evensts

This commit is contained in:
eelhazati 2018-07-31 20:34:50 +01:00 committed by José Carlos Valero Sánchez
parent 62c6f3434c
commit 505a9f37eb
16 changed files with 627 additions and 1 deletions

View File

@ -550,7 +550,8 @@
<module>apache-meecrowave</module>
<module>spring-reactive-kotlin</module>
<module>jnosql</module>
<module>testing-modules/junit-abstract</module>
<module>testing-modules/junit-abstract</module>
<module>sse-jaxrs</module>
</modules>
</profile>

22
sse-jaxrs/pom.xml Normal file
View File

@ -0,0 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.baeldung.sse</groupId>
<artifactId>sse-jaxrs</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>pom</packaging>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<modules>
<module>sse-jaxrs-server</module>
<module>sse-jaxrs-client</module>
</modules>
</project>

View File

@ -0,0 +1,62 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.baeldung.sse</groupId>
<artifactId>sse-jaxrs</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>sse-jaxrs-client</artifactId>
<properties>
<cxf-version>3.2.0</cxf-version>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.6.0</version>
<executions>
<execution>
<id>singleEvent</id>
<goals>
<goal>java</goal>
</goals>
<configuration>
<mainClass>com.baeldung.sse.jaxrs.client.SseClientApp</mainClass>
</configuration>
</execution>
<execution>
<id>broadcast</id>
<goals>
<goal>java</goal>
</goals>
<configuration>
<mainClass>com.baeldung.sse.jaxrs.client.SseClientBroadcastApp</mainClass>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-rs-client</artifactId>
<version>${cxf-version}</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-rs-sse</artifactId>
<version>${cxf-version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,48 @@
package com.baeldung.sse.jaxrs.client;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.sse.InboundSseEvent;
import javax.ws.rs.sse.SseEventSource;
import java.util.function.Consumer;
public class SseClientApp {
private static final String url = "http://127.0.0.1:9080/sse-jaxrs-server/sse/stock/prices";
public static void main(String... args) throws Exception {
Client client = ClientBuilder.newClient();
WebTarget target = client.target(url);
try (SseEventSource eventSource = SseEventSource.target(target).build()) {
eventSource.register(onEvent, onError, onComplete);
eventSource.open();
//Consuming events for one hour
Thread.sleep(60 * 60 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
client.close();
System.out.println("End");
}
// A new event is received
private static Consumer<InboundSseEvent> onEvent = (inboundSseEvent) -> {
String data = inboundSseEvent.readData();
System.out.println(data);
};
//Error
private static Consumer<Throwable> onError = (throwable) -> {
throwable.printStackTrace();
};
//Connection close and there is nothing to receive
private static Runnable onComplete = () -> {
System.out.println("Done!");
};
}

View File

@ -0,0 +1,52 @@
package com.baeldung.sse.jaxrs.client;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.sse.InboundSseEvent;
import javax.ws.rs.sse.SseEventSource;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
public class SseClientBroadcastApp {
private static final String subscribeUrl = "http://localhost:9080/sse-jaxrs-server/sse/stock/subscribe";
public static void main(String... args) throws Exception {
Client client = ClientBuilder.newClient();
WebTarget target = client.target(subscribeUrl);
try (final SseEventSource eventSource = SseEventSource.target(target)
.reconnectingEvery(5, TimeUnit.SECONDS)
.build()) {
eventSource.register(onEvent, onError, onComplete);
eventSource.open();
System.out.println("Wainting for incoming event ...");
//Consuming events for one hour
Thread.sleep(60 * 60 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
client.close();
System.out.println("End");
}
// A new event is received
private static Consumer<InboundSseEvent> onEvent = (inboundSseEvent) -> {
String data = inboundSseEvent.readData();
System.out.println(data);
};
//Error
private static Consumer<Throwable> onError = (throwable) -> {
throwable.printStackTrace();
};
//Connection close and there is nothing to receive
private static Runnable onComplete = () -> {
System.out.println("Done!");
};
}

View File

@ -0,0 +1,85 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.baeldung.sse</groupId>
<artifactId>sse-jaxrs</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>sse-jaxrs-server</artifactId>
<packaging>war</packaging>
<properties>
<liberty-maven-plugin.version>2.4.2</liberty-maven-plugin.version>
<failOnMissingWebXml>false</failOnMissingWebXml>
<openliberty-version>18.0.0.2</openliberty-version>
</properties>
<build>
<finalName>${artifactId}</finalName>
<plugins>
<plugin>
<groupId>net.wasdev.wlp.maven.plugins</groupId>
<artifactId>liberty-maven-plugin</artifactId>
<version>${liberty-maven-plugin.version}</version>
<configuration>
<assemblyArtifact>
<groupId>io.openliberty</groupId>
<artifactId>openliberty-webProfile8</artifactId>
<version>${openliberty-version}</version>
<type>zip</type>
</assemblyArtifact>
<installAppPackages>project</installAppPackages>
<looseApplication>true</looseApplication>
<configFile>src/main/liberty/config/server.xml</configFile>
</configuration>
<executions>
<execution>
<id>install-server</id>
<phase>prepare-package</phase>
<goals>
<goal>install-server</goal>
<goal>create-server</goal>
<goal>install-feature</goal>
</goals>
</execution>
<execution>
<id>install-apps</id>
<phase>package</phase>
<goals>
<goal>install-apps</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>2.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>javax.enterprise</groupId>
<artifactId>cdi-api</artifactId>
<version>2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>javax.json.bind</groupId>
<artifactId>javax.json.bind-api</artifactId>
<version>1.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,8 @@
package com.baeldung.sse.jaxrs;
import javax.ws.rs.ApplicationPath;
import javax.ws.rs.core.Application;
@ApplicationPath("sse")
public class AppConfig extends Application {
}

View File

@ -0,0 +1,119 @@
package com.baeldung.sse.jaxrs;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.ws.rs.*;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.sse.OutboundSseEvent;
import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseBroadcaster;
import javax.ws.rs.sse.SseEventSink;
@ApplicationScoped
@Path("stock")
public class SseResource {
@Inject
private StockService stockService;
private Sse sse;
private SseBroadcaster sseBroadcaster;
private OutboundSseEvent.Builder eventBuilder;
@Context
public void setSse(Sse sse) {
this.sse = sse;
this.eventBuilder = sse.newEventBuilder();
this.sseBroadcaster = sse.newBroadcaster();
}
@GET
@Path("prices")
@Produces("text/event-stream")
public void getStockPrices(@Context SseEventSink sseEventSink,
@HeaderParam(HttpHeaders.LAST_EVENT_ID_HEADER) @DefaultValue("-1") int lastReceivedId) {
int lastEventId = 1;
if (lastReceivedId != -1) {
lastEventId = ++lastReceivedId;
}
boolean running = true;
while (running) {
Stock stock = stockService.getNextTransaction(lastEventId);
if (stock != null) {
OutboundSseEvent sseEvent = this.eventBuilder
.name("stock")
.id(String.valueOf(lastEventId))
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.data(Stock.class, stock)
.reconnectDelay(3000)
.comment("price change")
.build();
sseEventSink.send(sseEvent);
lastEventId++;
}
//Simulate connection close
if (lastEventId % 5 == 0) {
sseEventSink.close();
break;
}
try {
//Wait 5 seconds
Thread.sleep(5 * 1000);
} catch (InterruptedException ex) {
// ...
}
//Simulatae a while boucle break
running = lastEventId <= 2000;
}
sseEventSink.close();
}
@GET
@Path("subscribe")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void listen(@Context SseEventSink sseEventSink) {
sseEventSink.send(sse.newEvent("Welcome !"));
this.sseBroadcaster.register(sseEventSink);
sseEventSink.send(sse.newEvent("You are registred !"));
}
@GET
@Path("publish")
public void broadcast() {
Runnable r = new Runnable() {
@Override
public void run() {
int lastEventId = 0;
boolean running = true;
while (running) {
lastEventId++;
Stock stock = stockService.getNextTransaction(lastEventId);
if (stock != null) {
OutboundSseEvent sseEvent = eventBuilder
.name("stock")
.id(String.valueOf(lastEventId))
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.data(Stock.class, stock)
.reconnectDelay(3000)
.comment("price change")
.build();
sseBroadcaster.broadcast(sseEvent);
}
try {
//Wait 5 seconds
Thread.currentThread().sleep(5 * 1000);
} catch (InterruptedException ex) {
// ...
}
//Simulatae a while boucle break
running = lastEventId <= 2000;
}
}
};
new Thread(r).start();
}
}

View File

@ -0,0 +1,50 @@
package com.baeldung.sse.jaxrs;
import java.math.BigDecimal;
import java.time.LocalDateTime;
public class Stock {
private Integer id;
private String name;
private BigDecimal price;
LocalDateTime dateTime;
public Stock(Integer id, String name, BigDecimal price, LocalDateTime dateTime) {
this.id = id;
this.name = name;
this.price = price;
this.dateTime = dateTime;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public BigDecimal getPrice() {
return price;
}
public void setPrice(BigDecimal price) {
this.price = price;
}
public LocalDateTime getDateTime() {
return dateTime;
}
public void setDateTime(LocalDateTime dateTime) {
this.dateTime = dateTime;
}
}

View File

@ -0,0 +1,78 @@
package com.baeldung.sse.jaxrs;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.Initialized;
import javax.enterprise.event.Event;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import javax.inject.Named;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
@ApplicationScoped
@Named
public class StockService {
private static final BigDecimal UP = BigDecimal.valueOf(1.05f);
private static final BigDecimal DOWN = BigDecimal.valueOf(0.95f);
List<String> stockNames = Arrays.asList("GOOG", "IBM", "MS", "GOOG", "YAHO");
List<Stock> stocksDB = new ArrayList<>();
private AtomicInteger counter = new AtomicInteger(0);
public void init(@Observes @Initialized(ApplicationScoped.class) Object init) {
//Open price
System.out.println("@Start Init ...");
stockNames.forEach(stockName -> {
stocksDB.add(new Stock(counter.incrementAndGet(), stockName, generateOpenPrice(), LocalDateTime.now()));
});
Runnable runnable = new Runnable() {
@Override
public void run() {
//Simulate Change price and put every x seconds
while (true) {
int indx = new Random().nextInt(stockNames.size());
String stockName = stockNames.get(indx);
BigDecimal price = getLastPrice(stockName);
BigDecimal newprice = changePrice(price);
Stock stock = new Stock(counter.incrementAndGet(), stockName, newprice, LocalDateTime.now());
stocksDB.add(stock);
int r = new Random().nextInt(30);
try {
Thread.currentThread().sleep(r*1000);
} catch (InterruptedException ex) {
// ...
}
}
}
};
new Thread(runnable).start();
System.out.println("@End Init ...");
}
public Stock getNextTransaction(Integer lastEventId) {
return stocksDB.stream().filter(s -> s.getId().equals(lastEventId)).findFirst().orElse(null);
}
BigDecimal generateOpenPrice() {
float min = 70;
float max = 120;
return BigDecimal.valueOf(min + new Random().nextFloat() * (max - min)).setScale(4,RoundingMode.CEILING);
}
BigDecimal changePrice(BigDecimal price) {
return Math.random() >= 0.5 ? price.multiply(UP).setScale(4,RoundingMode.CEILING) : price.multiply(DOWN).setScale(4,RoundingMode.CEILING);
}
private BigDecimal getLastPrice(String stockName) {
return stocksDB.stream().filter(stock -> stock.getName().equals(stockName)).findFirst().get().getPrice();
}
}

View File

@ -0,0 +1,7 @@
<server description="OpenLiberty Server">
<featureManager>
<feature>jaxrs-2.1</feature>
<feature>cdi-2.0</feature>
</featureManager>
<httpEndpoint httpPort="9080" httpsPort="9443" id="defaultHttpEndpoint" host="*"/>
</server>

View File

@ -0,0 +1,6 @@
<beans xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee
http://xmlns.jcp.org/xml/ns/javaee/beans_2_0.xsd"
bean-discovery-mode="all">
</beans>

View File

@ -0,0 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_4_0.xsd"
version="4.0">
<display-name>Hello Servlet</display-name>
<welcome-file-list>
<welcome-file>index.html</welcome-file>
</welcome-file-list>
</web-app>

View File

@ -0,0 +1 @@
index

View File

@ -0,0 +1,38 @@
<!DOCTYPE html>
<html>
<head>
<title>Server-Sent Event Broadcasting</title>
</head>
<body>
<h2>Stock prices :</h2>
<div>
<ul id="data">
</ul>
</div>
<script>
var source = new EventSource('sse/stock/subscribe');
source.onopen = function(event) {
console.log(event);
};
source.addEventListener("stock", function(event) {
append(event.data);
}, false);
source.onmessage = function(event) {
append(event.data);
};
source.onerror = function(event) {
console.log(event);
};
function append(data) {
var ul = document.getElementById("data");
var li = document.createElement("li");
li.appendChild(document.createTextNode(data));
ul.insertBefore(li, ul.childNodes[0]);
};
</script>
</body>
</html>

View File

@ -0,0 +1,38 @@
<!DOCTYPE html>
<html>
<head>
<title>Server-Sent Event</title>
</head>
<body>
<h2>Stock prices :</h2>
<div>
<ul id="data">
</ul>
</div>
<script>
var source = new EventSource('sse/stock/prices');
source.onopen = function(event) {
console.log(event);
};
source.addEventListener("stock", function(event) {
append(event.data);
}, false);
source.onmessage = function(event) {
append(event.data);
};
source.onerror = function(event) {
console.log(event);
};
function append(data) {
var ul = document.getElementById("data");
var li = document.createElement("li");
li.appendChild(document.createTextNode(data));
ul.insertBefore(li, ul.childNodes[0]);
};
</script>
</body>
</html>