BAEL-3352 - Websockets with the Play Framework and Akka (#7983)

* Websocket implementation

* Websocket implementation with Akka Streams

* Websocket implementation with Akka Streams

* Websocket implementation with Akka Streams

* Added configuration options for play server timeout and websocket frame lengths

* Cleaned up code for consuming http endpoint in Messenger actor

* Cleaned up code for consuming http endpoint in Messenger actor

* Cleaned up code for akka streams implementation for websocket

* Renamed unit test method

* Added Poison Pill for stopping the actor. Fixed indentations.

* Refactored the WebSocket method for readability

* Refactored the JavaScript for readability

* Code refactoring and removing unwanted comments

* Added the latest version of jQuery

* Removed .gitignore in favor of the one at the project root
This commit is contained in:
Alfred Samanga 2019-11-23 13:09:07 +02:00 committed by ashleyfrieze
parent 13c42ac4dc
commit 59ac0633fc
17 changed files with 529 additions and 0 deletions

View File

@ -0,0 +1,111 @@
package actors;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.http.javadsl.Http;
import akka.http.javadsl.marshallers.jackson.Jackson;
import akka.http.javadsl.model.HttpMessage;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.stream.Materializer;
import com.fasterxml.jackson.databind.JsonNode;
import dto.MessageDTO;
import dto.RequestDTO;
import utils.MessageConverter;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.util.UUID;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ThreadLocalRandom;
public class Messenger extends AbstractActor {
private LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
private ActorRef out;
public Messenger(ActorRef out) {
this.out = out;
}
public static Props props(ActorRef out) {
return Props.create(Messenger.class, () -> new Messenger(out));
}
@Override
public void preStart() throws Exception {
log.info("Messenger actor started at {}",
OffsetDateTime.now().format(DateTimeFormatter.ISO_OFFSET_DATE_TIME));
}
@Override
public void postStop() throws Exception {
log.info("Messenger actor stopped at {}",
OffsetDateTime.now().format(DateTimeFormatter.ISO_OFFSET_DATE_TIME));
}
private void onSendMessage(JsonNode jsonNode) {
RequestDTO requestDTO = MessageConverter.jsonNodeToRequest(jsonNode);
String message = requestDTO.getMessage().toLowerCase();
if("stop".equals(message)) {
MessageDTO messageDTO = createMessageDTO("1", "1", "Stop", "Stopping actor");
out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf());
self().tell(PoisonPill.getInstance(), getSelf());
} else {
log.info("Actor received. {}", requestDTO);
processMessage(requestDTO);
}
}
private MessageDTO createMessageDTO(String userId, String id, String title, String message) {
MessageDTO messageDTO = new MessageDTO();
messageDTO.setUserId(UUID.randomUUID().toString());
messageDTO.setId(UUID.randomUUID().toString());
messageDTO.setTitle("Self Kill");
messageDTO.setBody("Stopping actor");
return messageDTO;
}
private void processMessage(RequestDTO requestDTO) {
CompletionStage<HttpResponse> responseFuture = getRandomMessage();
responseFuture.thenCompose(this::consumeHttpResponse)
.thenAccept(messageDTO ->
out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf()));
}
private CompletionStage<HttpResponse> getRandomMessage() {
int postId = ThreadLocalRandom.current().nextInt(0, 100);
return Http.get(getContext().getSystem()).singleRequest(
HttpRequest.create("https://jsonplaceholder.typicode.com/posts/" + postId)
);
}
private void discardEntity(HttpResponse httpResponse, Materializer materializer) {
httpResponse.discardEntityBytes(materializer)
.completionStage()
.whenComplete((done, ex) -> log.info("Entity discarded completely!"));
}
private CompletionStage<MessageDTO> consumeHttpResponse(HttpResponse httpResponse) {
Materializer materializer = Materializer.matFromSystem(getContext().getSystem());
return Jackson.unmarshaller(MessageDTO.class)
.unmarshal(httpResponse.entity(), materializer)
.thenApply(messageDTO -> {
log.info("Received message: {}", messageDTO);
discardEntity(httpResponse, materializer);
return messageDTO;
});
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(JsonNode.class, this::onSendMessage)
.matchAny(o -> log.error("Received unknown message: {}", o.getClass()))
.build();
}
}

View File

@ -0,0 +1,79 @@
package controllers;
import actors.Messenger;
import akka.actor.ActorSystem;
import akka.stream.Materializer;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import com.fasterxml.jackson.databind.JsonNode;
import dto.MessageDTO;
import lombok.extern.slf4j.Slf4j;
import play.libs.F;
import play.libs.streams.ActorFlow;
import play.mvc.*;
import utils.MessageConverter;
import javax.inject.Inject;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
@Slf4j
public class HomeController extends Controller {
private ActorSystem actorSystem;
private Materializer materializer;
@Inject
public HomeController(ActorSystem actorSystem, Materializer materializer) {
this.actorSystem = actorSystem;
this.materializer = materializer;
}
public Result index(Http.Request request) {
String url = routes.HomeController.socket().webSocketURL(request);
//To test WebSockets with akka streams, uncomment the next line and comment out the previous
//String url = routes.HomeController.akkaStreamsSocket().webSocketURL(request);
return ok(views.html.index.render(url));
}
public WebSocket socket() {
return WebSocket.Json.acceptOrResult(this::createActorFlow);
}
private CompletionStage<F.Either<Result, Flow<JsonNode, JsonNode, ?>>> createActorFlow(
Http.RequestHeader request) {
return CompletableFuture.completedFuture(F.Either.Right(createFlowForActor()));
}
private CompletionStage<F.Either<Result, Flow<JsonNode, JsonNode, ?>>>
createActorFlow2(Http.RequestHeader request) {
return CompletableFuture.completedFuture(
request.session()
.getOptional("username")
.map(username ->
F.Either.<Result, Flow<JsonNode, JsonNode, ?>>Right(
createFlowForActor()))
.orElseGet(() -> F.Either.Left(forbidden())));
}
private Flow<JsonNode, JsonNode, ?> createFlowForActor() {
return ActorFlow.actorRef(out -> Messenger.props(out), actorSystem, materializer);
}
public WebSocket akkaStreamsSocket() {
return WebSocket.Json.accept(
request -> {
Sink<JsonNode, ?> in = Sink.foreach(System.out::println);
MessageDTO messageDTO = new MessageDTO("1", "1", "Title", "Test Body");
Source<JsonNode, ?> out = Source.tick(
Duration.ofSeconds(2),
Duration.ofSeconds(2),
MessageConverter.messageToJsonNode(messageDTO)
);
return Flow.fromSinkAndSource(in, out);
});
}
}

View File

@ -0,0 +1,60 @@
package dto;
public class MessageDTO {
private String userId;
private String id;
private String title;
private String body;
public MessageDTO() {
}
public MessageDTO(String userId, String id, String title, String body) {
this.userId = userId;
this.id = id;
this.title = title;
this.body = body;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getBody() {
return body;
}
public void setBody(String body) {
this.body = body;
}
@Override
public String toString() {
return "MessageDTO{" +
"userId='" + userId + '\'' +
", id='" + id + '\'' +
", title='" + title + '\'' +
", body='" + body + '\'' +
'}';
}
}

View File

@ -0,0 +1,27 @@
package dto;
public class RequestDTO {
private String message;
public RequestDTO() {
}
public RequestDTO(String message) {
this.message = message;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
@Override
public String toString() {
return "RequestDTO{" +
"message='" + message + '\'' +
'}';
}
}

View File

@ -0,0 +1,24 @@
package utils;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import dto.MessageDTO;
import dto.RequestDTO;
public class MessageConverter {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static MessageDTO jsonNodeToMessage(JsonNode jsonNode) {
return OBJECT_MAPPER.convertValue(jsonNode, MessageDTO.class);
}
public static JsonNode messageToJsonNode(MessageDTO messageDTO) {
return OBJECT_MAPPER.convertValue(messageDTO, JsonNode.class);
}
public static RequestDTO jsonNodeToRequest(JsonNode jsonNode) {
return OBJECT_MAPPER.convertValue(jsonNode, RequestDTO.class);
}
public static JsonNode requestToJsonNode(RequestDTO requestDTO) {
return OBJECT_MAPPER.convertValue(requestDTO, JsonNode.class);
}
}

View File

@ -0,0 +1,97 @@
@(url: String)
@main("Welcome to Play") {
<h1>Welcome to Play WebSockets!</h1>
<div id="messageContent"></div>
<form>
<textarea id="messageInput"></textarea>
<button id="sendButton">Send</button>
</form>
<script src="https://code.jquery.com/jquery-3.4.1.min.js"></script>
<script>
var webSocket;
var messageInput;
function init() {
initWebSocket();
}
function initWebSocket() {
webSocket = new WebSocket("@url");
webSocket.onopen = onOpen;
webSocket.onclose = onClose;
webSocket.onmessage = onMessage;
webSocket.onerror = onError;
}
function onOpen(evt) {
writeToScreen("CONNECTED");
}
function onClose(evt) {
writeToScreen("DISCONNECTED");
appendMessageToView(":", "DISCONNECTED");
}
function onError(evt) {
writeToScreen("ERROR: " + evt.data);
writeToScreen("ERROR: " + JSON.stringify(evt));
}
function onMessage(evt) {
var receivedData = JSON.parse(evt.data);
console.log("New Data: ", receivedData);
appendMessageToView("Server", receivedData.body);
}
function appendMessageToView(title, message) {
$("#messageContent").append("<p>" + title + ": " + message + "</p>");
}
function writeToScreen(message) {
console.log("New message: ", message);
}
function doSend(protocolMessage) {
if(webSocket.readyState == WebSocket.OPEN) {
writeToScreen("SENT: " + protocolMessage.message);
webSocket.send(JSON.stringify(protocolMessage));
} else {
writeToScreen("Could not send data. Websocket is not open.");
}
}
window.addEventListener("load", init, false);
$(".sendButton").click(function () {
console.log("Submitting.");
newMessage();
});
$(window).on("keydown", function (e) {
if (e.which == 13) {
console.log("Enter pressed.");
newMessage();
return false;
}
});
function newMessage() {
messageInput = $("#messageInput").val();
$("#messageInput").val("");
if ($.trim(messageInput) == "") {
return false;
}
appendMessageToView("Me", messageInput);
var message = {
message: messageInput
};
doSend(message);
}
</script>
}

View File

@ -0,0 +1,14 @@
@(title: String)(content: Html)
<!DOCTYPE html>
<html lang="en">
<head>
<title>@title</title>
<link rel="stylesheet" media="screen" href="@routes.Assets.versioned("stylesheets/main.css")">
<link rel="shortcut icon" type="image/png" href="@routes.Assets.versioned("images/favicon.png")">
</head>
<body>
@content
<script src="@routes.Assets.versioned("javascripts/main.js")" type="text/javascript"></script>
</body>
</html>

View File

@ -0,0 +1,22 @@
name := """websockets"""
organization := "com.baeldung"
version := "1.0-SNAPSHOT"
lazy val root = (project in file(".")).enablePlugins(PlayJava)
scalaVersion := "2.13.0"
lazy val akkaVersion = "2.6.0-M8"
lazy val akkaHttpVersion = "10.1.10"
libraryDependencies += guice
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % akkaVersion
libraryDependencies += "com.typesafe.akka" %% "akka-testkit" % akkaVersion
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % akkaVersion
libraryDependencies += "com.typesafe.akka" %% "akka-http-jackson" % akkaHttpVersion
libraryDependencies += "com.typesafe.akka" %% "akka-http" % akkaHttpVersion
libraryDependencies += "org.projectlombok" % "lombok" % "1.18.8" % "provided"
libraryDependencies += "junit" % "junit" % "4.12"
PlayKeys.devSettings += "play.server.http.idleTimeout" -> "infinite"

View File

@ -0,0 +1,7 @@
# This is the main configuration file for the application.
# https://www.playframework.com/documentation/latest/ConfigFile
########################################
# akka-http-core Reference Config File #
########################################
play.server.http.idleTimeout = "infinite"

View File

@ -0,0 +1,37 @@
<!-- https://www.playframework.com/documentation/latest/SettingsLogger -->
<configuration>
<conversionRule conversionWord="coloredLevel" converterClass="play.api.libs.logback.ColoredLevel" />
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>${application.home:-.}/logs/application.log</file>
<encoder>
<pattern>%date [%level] from %logger in %thread - %message%n%xException</pattern>
</encoder>
</appender>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%coloredLevel %logger{15} - %message%n%xException{10}</pattern>
</encoder>
</appender>
<appender name="ASYNCFILE" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="FILE" />
</appender>
<appender name="ASYNCSTDOUT" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="STDOUT" />
</appender>
<logger name="akka" level="INFO" />
<logger name="actors" level="INFO"/>
<logger name="play" level="INFO" />
<logger name="application" level="DEBUG" />
<root level="WARN">
<appender-ref ref="ASYNCFILE" />
<appender-ref ref="ASYNCSTDOUT" />
</root>
</configuration>

View File

@ -0,0 +1,11 @@
# Routes
# This file defines all application routes (Higher priority routes first)
# ~~~~
# An example controller showing a sample home page
GET / controllers.HomeController.index(request: Request)
GET /chat controllers.HomeController.socket
GET /chat/with/streams controllers.HomeController.akkaStreamsSocket
# Map static resources from the /public folder to the /assets URL path
GET /assets/*file controllers.Assets.versioned(path="/public", file: Asset)

View File

@ -0,0 +1 @@
sbt.version=1.2.8

View File

@ -0,0 +1,7 @@
// The Play plugin
addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.7.3")
// Defines scaffolding (found under .g8 folder)
// http://www.foundweekends.org/giter8/scaffolding.html
// sbt "g8Scaffold form"
addSbtPlugin("org.foundweekends.giter8" % "sbt-giter8-scaffold" % "0.11.0")

Binary file not shown.

After

Width:  |  Height:  |  Size: 687 B

View File

@ -0,0 +1,32 @@
package controllers;
import org.junit.Test;
import play.Application;
import play.inject.guice.GuiceApplicationBuilder;
import play.mvc.Http;
import play.mvc.Result;
import play.test.WithApplication;
import static org.junit.Assert.assertEquals;
import static play.mvc.Http.Status.OK;
import static play.test.Helpers.GET;
import static play.test.Helpers.route;
public class HomeControllerTest extends WithApplication {
@Override
protected Application provideApplication() {
return new GuiceApplicationBuilder().build();
}
@Test
public void giveRequest_whenRootPath_ThenStatusOkay() {
Http.RequestBuilder request = new Http.RequestBuilder()
.method(GET)
.uri("/");
Result result = route(app, request);
assertEquals(OK, result.status());
}
}