java-tutorials/web-modules/play-modules/websockets/app/actors/Messenger.java

112 lines
4.0 KiB
Java
Raw Normal View History

package actors;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.http.javadsl.Http;
import akka.http.javadsl.marshallers.jackson.Jackson;
import akka.http.javadsl.model.HttpMessage;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.stream.Materializer;
import com.fasterxml.jackson.databind.JsonNode;
import dto.MessageDTO;
import dto.RequestDTO;
import utils.MessageConverter;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.util.UUID;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ThreadLocalRandom;
public class Messenger extends AbstractActor {
private LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
private ActorRef out;
public Messenger(ActorRef out) {
this.out = out;
}
public static Props props(ActorRef out) {
return Props.create(Messenger.class, () -> new Messenger(out));
}
@Override
public void preStart() throws Exception {
log.info("Messenger actor started at {}",
OffsetDateTime.now().format(DateTimeFormatter.ISO_OFFSET_DATE_TIME));
}
@Override
public void postStop() throws Exception {
log.info("Messenger actor stopped at {}",
OffsetDateTime.now().format(DateTimeFormatter.ISO_OFFSET_DATE_TIME));
}
private void onSendMessage(JsonNode jsonNode) {
RequestDTO requestDTO = MessageConverter.jsonNodeToRequest(jsonNode);
String message = requestDTO.getMessage().toLowerCase();
if("stop".equals(message)) {
MessageDTO messageDTO = createMessageDTO("1", "1", "Stop", "Stopping actor");
out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf());
self().tell(PoisonPill.getInstance(), getSelf());
} else {
log.info("Actor received. {}", requestDTO);
processMessage(requestDTO);
}
}
private MessageDTO createMessageDTO(String userId, String id, String title, String message) {
MessageDTO messageDTO = new MessageDTO();
messageDTO.setUserId(UUID.randomUUID().toString());
messageDTO.setId(UUID.randomUUID().toString());
messageDTO.setTitle("Self Kill");
messageDTO.setBody("Stopping actor");
return messageDTO;
}
private void processMessage(RequestDTO requestDTO) {
CompletionStage<HttpResponse> responseFuture = getRandomMessage();
responseFuture.thenCompose(this::consumeHttpResponse)
.thenAccept(messageDTO ->
out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf()));
}
private CompletionStage<HttpResponse> getRandomMessage() {
int postId = ThreadLocalRandom.current().nextInt(0, 100);
return Http.get(getContext().getSystem()).singleRequest(
HttpRequest.create("https://jsonplaceholder.typicode.com/posts/" + postId)
);
}
private void discardEntity(HttpResponse httpResponse, Materializer materializer) {
httpResponse.discardEntityBytes(materializer)
.completionStage()
.whenComplete((done, ex) -> log.info("Entity discarded completely!"));
}
private CompletionStage<MessageDTO> consumeHttpResponse(HttpResponse httpResponse) {
Materializer materializer = Materializer.matFromSystem(getContext().getSystem());
return Jackson.unmarshaller(MessageDTO.class)
.unmarshal(httpResponse.entity(), materializer)
.thenApply(messageDTO -> {
log.info("Received message: {}", messageDTO);
discardEntity(httpResponse, materializer);
return messageDTO;
});
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(JsonNode.class, this::onSendMessage)
.matchAny(o -> log.error("Received unknown message: {}", o.getClass()))
.build();
}
}