112 lines
		
	
	
		
			4.0 KiB
		
	
	
	
		
			Java
		
	
	
	
	
	
		
		
			
		
	
	
			112 lines
		
	
	
		
			4.0 KiB
		
	
	
	
		
			Java
		
	
	
	
	
	
|  | package actors;
 | ||
|  | 
 | ||
|  | import akka.actor.AbstractActor;
 | ||
|  | import akka.actor.ActorRef;
 | ||
|  | import akka.actor.PoisonPill;
 | ||
|  | import akka.actor.Props;
 | ||
|  | import akka.event.Logging;
 | ||
|  | import akka.event.LoggingAdapter;
 | ||
|  | import akka.http.javadsl.Http;
 | ||
|  | import akka.http.javadsl.marshallers.jackson.Jackson;
 | ||
|  | import akka.http.javadsl.model.HttpMessage;
 | ||
|  | import akka.http.javadsl.model.HttpRequest;
 | ||
|  | import akka.http.javadsl.model.HttpResponse;
 | ||
|  | import akka.stream.Materializer;
 | ||
|  | import com.fasterxml.jackson.databind.JsonNode;
 | ||
|  | import dto.MessageDTO;
 | ||
|  | import dto.RequestDTO;
 | ||
|  | import utils.MessageConverter;
 | ||
|  | 
 | ||
|  | import java.time.OffsetDateTime;
 | ||
|  | import java.time.format.DateTimeFormatter;
 | ||
|  | import java.util.UUID;
 | ||
|  | import java.util.concurrent.CompletionStage;
 | ||
|  | import java.util.concurrent.ThreadLocalRandom;
 | ||
|  | 
 | ||
|  | public class Messenger extends AbstractActor {
 | ||
|  |     private LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
 | ||
|  | 
 | ||
|  |     private ActorRef out;
 | ||
|  | 
 | ||
|  |     public Messenger(ActorRef out) {
 | ||
|  |         this.out = out;
 | ||
|  |     }
 | ||
|  | 
 | ||
|  |     public static Props props(ActorRef out) {
 | ||
|  |         return Props.create(Messenger.class, () -> new Messenger(out));
 | ||
|  |     }
 | ||
|  | 
 | ||
|  |     @Override
 | ||
|  |     public void preStart() throws Exception {
 | ||
|  |         log.info("Messenger actor started at {}",
 | ||
|  |           OffsetDateTime.now().format(DateTimeFormatter.ISO_OFFSET_DATE_TIME));
 | ||
|  |     }
 | ||
|  | 
 | ||
|  |     @Override
 | ||
|  |     public void postStop() throws Exception {
 | ||
|  |         log.info("Messenger actor stopped at {}",
 | ||
|  |           OffsetDateTime.now().format(DateTimeFormatter.ISO_OFFSET_DATE_TIME));
 | ||
|  |     }
 | ||
|  | 
 | ||
|  |     private void onSendMessage(JsonNode jsonNode) {
 | ||
|  |         RequestDTO requestDTO = MessageConverter.jsonNodeToRequest(jsonNode);
 | ||
|  |         String message = requestDTO.getMessage().toLowerCase();
 | ||
|  |         if("stop".equals(message)) {
 | ||
|  |             MessageDTO messageDTO = createMessageDTO("1", "1", "Stop", "Stopping actor");
 | ||
|  |             out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf());
 | ||
|  |             self().tell(PoisonPill.getInstance(), getSelf());
 | ||
|  |         } else {
 | ||
|  |             log.info("Actor received. {}", requestDTO);
 | ||
|  |             processMessage(requestDTO);
 | ||
|  |         }
 | ||
|  |     }
 | ||
|  | 
 | ||
|  |     private MessageDTO createMessageDTO(String userId, String id, String title, String message) {
 | ||
|  |         MessageDTO messageDTO = new MessageDTO();
 | ||
|  |         messageDTO.setUserId(UUID.randomUUID().toString());
 | ||
|  |         messageDTO.setId(UUID.randomUUID().toString());
 | ||
|  |         messageDTO.setTitle("Self Kill");
 | ||
|  |         messageDTO.setBody("Stopping actor");
 | ||
|  |         return messageDTO;
 | ||
|  |     }
 | ||
|  | 
 | ||
|  |     private void processMessage(RequestDTO requestDTO) {
 | ||
|  |         CompletionStage<HttpResponse> responseFuture = getRandomMessage();
 | ||
|  |         responseFuture.thenCompose(this::consumeHttpResponse)
 | ||
|  |           .thenAccept(messageDTO ->
 | ||
|  |             out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf()));
 | ||
|  |     }
 | ||
|  | 
 | ||
|  |     private CompletionStage<HttpResponse> getRandomMessage() {
 | ||
|  |         int postId = ThreadLocalRandom.current().nextInt(0, 100);
 | ||
|  |         return Http.get(getContext().getSystem()).singleRequest(
 | ||
|  |           HttpRequest.create("https://jsonplaceholder.typicode.com/posts/" + postId)
 | ||
|  |         );
 | ||
|  |     }
 | ||
|  | 
 | ||
|  |     private void discardEntity(HttpResponse httpResponse, Materializer materializer) {
 | ||
|  |         httpResponse.discardEntityBytes(materializer)
 | ||
|  |           .completionStage()
 | ||
|  |           .whenComplete((done, ex) -> log.info("Entity discarded completely!"));
 | ||
|  |     }
 | ||
|  | 
 | ||
|  |     private CompletionStage<MessageDTO> consumeHttpResponse(HttpResponse httpResponse) {
 | ||
|  |         Materializer materializer = Materializer.matFromSystem(getContext().getSystem());
 | ||
|  |         return Jackson.unmarshaller(MessageDTO.class)
 | ||
|  |           .unmarshal(httpResponse.entity(), materializer)
 | ||
|  |           .thenApply(messageDTO -> {
 | ||
|  |               log.info("Received message: {}", messageDTO);
 | ||
|  |               discardEntity(httpResponse, materializer);
 | ||
|  |               return messageDTO;
 | ||
|  |           });
 | ||
|  |     }
 | ||
|  | 
 | ||
|  |     @Override
 | ||
|  |     public Receive createReceive() {
 | ||
|  |         return receiveBuilder()
 | ||
|  |           .match(JsonNode.class, this::onSendMessage)
 | ||
|  |           .matchAny(o -> log.error("Received unknown message: {}", o.getClass()))
 | ||
|  |           .build();
 | ||
|  |     }
 | ||
|  | }
 |