diff --git a/libraries-data/pom.xml b/libraries-data/pom.xml
index bbf0c7832f..54d24edbf6 100644
--- a/libraries-data/pom.xml
+++ b/libraries-data/pom.xml
@@ -259,6 +259,11 @@
slf4j-api
${slf4j.version}
+
+ org.apache.storm
+ storm-core
+ ${storm.version}
+
ch.qos.logback
@@ -432,6 +437,7 @@
+ 1.2.2
4.0.1
1.4.196
16.5.1
diff --git a/libraries-data/src/main/java/com/baeldung/storm/TopologyRunner.java b/libraries-data/src/main/java/com/baeldung/storm/TopologyRunner.java
new file mode 100644
index 0000000000..326f53c0b8
--- /dev/null
+++ b/libraries-data/src/main/java/com/baeldung/storm/TopologyRunner.java
@@ -0,0 +1,34 @@
+package com.baeldung.storm;
+
+import com.baeldung.storm.bolt.AggregatingBolt;
+import com.baeldung.storm.bolt.FileWritingBolt;
+import com.baeldung.storm.bolt.FilteringBolt;
+import com.baeldung.storm.bolt.PrintingBolt;
+import com.baeldung.storm.spout.RandomNumberSpout;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+
+public class TopologyRunner {
+ public static void main(String[] args) {
+ runTopology();
+ }
+
+ public static void runTopology() {
+ String filePath = "./src/main/resources/operations.txt";
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("randomNumberSpout", new RandomNumberSpout());
+ builder.setBolt("filteringBolt", new FilteringBolt()).shuffleGrouping("randomNumberSpout");
+ builder.setBolt("aggregatingBolt", new AggregatingBolt()
+ .withTimestampField("timestamp")
+ .withLag(BaseWindowedBolt.Duration.seconds(1))
+ .withWindow(BaseWindowedBolt.Duration.seconds(5))).shuffleGrouping("filteringBolt");
+ builder.setBolt("fileBolt", new FileWritingBolt(filePath)).shuffleGrouping("aggregatingBolt");
+
+ Config config = new Config();
+ config.setDebug(false);
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("Test", config, builder.createTopology());
+ }
+}
diff --git a/libraries-data/src/main/java/com/baeldung/storm/bolt/AggregatingBolt.java b/libraries-data/src/main/java/com/baeldung/storm/bolt/AggregatingBolt.java
new file mode 100644
index 0000000000..555ba7e692
--- /dev/null
+++ b/libraries-data/src/main/java/com/baeldung/storm/bolt/AggregatingBolt.java
@@ -0,0 +1,39 @@
+package com.baeldung.storm.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.windowing.TupleWindow;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+public class AggregatingBolt extends BaseWindowedBolt {
+ OutputCollector outputCollector;
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.outputCollector = collector;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("sumOfOperations", "beginningTimestamp", "endTimestamp"));
+ }
+
+ @Override
+ public void execute(TupleWindow tupleWindow) {
+ List tuples = tupleWindow.get();
+ tuples.sort(Comparator.comparing(a -> a.getLongByField("timestamp")));
+ //This is safe since the window is calculated basing on Tuple's timestamp, thus it can't really be empty
+ Long beginningTimestamp = tuples.get(0).getLongByField("timestamp");
+ Long endTimestamp = tuples.get(tuples.size() - 1).getLongByField("timestamp");
+ int sumOfOperations = tuples.stream().mapToInt(tuple -> tuple.getIntegerByField("operation")).sum();
+ Values values = new Values(sumOfOperations, beginningTimestamp, endTimestamp);
+ outputCollector.emit(values);
+ }
+}
diff --git a/libraries-data/src/main/java/com/baeldung/storm/bolt/FileWritingBolt.java b/libraries-data/src/main/java/com/baeldung/storm/bolt/FileWritingBolt.java
new file mode 100644
index 0000000000..a35ff3aaf5
--- /dev/null
+++ b/libraries-data/src/main/java/com/baeldung/storm/bolt/FileWritingBolt.java
@@ -0,0 +1,63 @@
+package com.baeldung.storm.bolt;
+
+import com.baeldung.storm.model.AggregatedWindow;
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Map;
+
+public class FileWritingBolt extends BaseRichBolt {
+ public static Logger logger = LoggerFactory.getLogger(FileWritingBolt.class);
+ BufferedWriter writer;
+ String filePath;
+ ObjectMapper objectMapper;
+ @Override
+ public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
+ objectMapper = new ObjectMapper();
+ objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
+ try {
+ writer = new BufferedWriter(new FileWriter(filePath));
+ } catch (IOException e) {
+ logger.error("Failed to open a file for writing.", e);
+ }
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ int sumOfOperations = tuple.getIntegerByField("sumOfOperations");
+ long beginningTimestamp = tuple.getLongByField("beginningTimestamp");
+ long endTimestamp = tuple.getLongByField("endTimestamp");
+
+ if(sumOfOperations > 200) {
+ AggregatedWindow aggregatedWindow = new AggregatedWindow(sumOfOperations, beginningTimestamp, endTimestamp);
+ try {
+ writer.write(objectMapper.writeValueAsString(aggregatedWindow));
+ writer.write("\n");
+ writer.flush();
+ } catch (IOException e) {
+ logger.error("Failed to write data to file.", e);
+ }
+ }
+ }
+
+ public FileWritingBolt(String filePath) {
+ this.filePath = filePath;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+
+ }
+}
diff --git a/libraries-data/src/main/java/com/baeldung/storm/bolt/FilteringBolt.java b/libraries-data/src/main/java/com/baeldung/storm/bolt/FilteringBolt.java
new file mode 100644
index 0000000000..a2e80deb33
--- /dev/null
+++ b/libraries-data/src/main/java/com/baeldung/storm/bolt/FilteringBolt.java
@@ -0,0 +1,22 @@
+package com.baeldung.storm.bolt;
+
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+
+public class FilteringBolt extends BaseBasicBolt {
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
+ int operation = tuple.getIntegerByField("operation");
+ if(operation >= 0 ) {
+ basicOutputCollector.emit(tuple.getValues());
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare(new Fields("operation", "timestamp"));
+ }
+}
diff --git a/libraries-data/src/main/java/com/baeldung/storm/bolt/PrintingBolt.java b/libraries-data/src/main/java/com/baeldung/storm/bolt/PrintingBolt.java
new file mode 100644
index 0000000000..efd2c9b1d9
--- /dev/null
+++ b/libraries-data/src/main/java/com/baeldung/storm/bolt/PrintingBolt.java
@@ -0,0 +1,18 @@
+package com.baeldung.storm.bolt;
+
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Tuple;
+
+public class PrintingBolt extends BaseBasicBolt {
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
+ System.out.println(tuple);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+
+ }
+}
diff --git a/libraries-data/src/main/java/com/baeldung/storm/model/AggregatedWindow.java b/libraries-data/src/main/java/com/baeldung/storm/model/AggregatedWindow.java
new file mode 100644
index 0000000000..beaf54d34c
--- /dev/null
+++ b/libraries-data/src/main/java/com/baeldung/storm/model/AggregatedWindow.java
@@ -0,0 +1,16 @@
+package com.baeldung.storm.model;
+
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+@JsonSerialize
+public class AggregatedWindow {
+ int sumOfOperations;
+ long beginningTimestamp;
+ long endTimestamp;
+
+ public AggregatedWindow(int sumOfOperations, long beginningTimestamp, long endTimestamp) {
+ this.sumOfOperations = sumOfOperations;
+ this.beginningTimestamp = beginningTimestamp;
+ this.endTimestamp = endTimestamp;
+ }
+}
diff --git a/libraries-data/src/main/java/com/baeldung/storm/model/User.java b/libraries-data/src/main/java/com/baeldung/storm/model/User.java
new file mode 100644
index 0000000000..62b9ac639b
--- /dev/null
+++ b/libraries-data/src/main/java/com/baeldung/storm/model/User.java
@@ -0,0 +1,40 @@
+package com.baeldung.storm.model;
+
+public class User {
+ String username;
+ String password;
+ String email;
+ int age;
+
+ public String getUsername() {
+ return username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getEmail() {
+ return email;
+ }
+
+ public void setEmail(String email) {
+ this.email = email;
+ }
+
+ public int getAge() {
+ return age;
+ }
+
+ public void setAge(int age) {
+ this.age = age;
+ }
+}
diff --git a/libraries-data/src/main/java/com/baeldung/storm/serialization/UserSerializer.java b/libraries-data/src/main/java/com/baeldung/storm/serialization/UserSerializer.java
new file mode 100644
index 0000000000..6199a203da
--- /dev/null
+++ b/libraries-data/src/main/java/com/baeldung/storm/serialization/UserSerializer.java
@@ -0,0 +1,30 @@
+package com.baeldung.storm.serialization;
+
+
+import com.baeldung.storm.model.User;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+public class UserSerializer extends Serializer{
+ @Override
+ public void write(Kryo kryo, Output output, User user) {
+ output.writeString(user.getEmail());
+ output.writeString(user.getUsername());
+ output.write(user.getAge());
+ }
+
+ @Override
+ public User read(Kryo kryo, Input input, Class aClass) {
+ User user = new User();
+ String email = input.readString();
+ String name = input.readString();
+ int age = input.read();
+ user.setAge(age);
+ user.setEmail(email);
+ user.setUsername(name);
+
+ return user;
+ }
+}
diff --git a/libraries-data/src/main/java/com/baeldung/storm/spout/RandomIntSpout.java b/libraries-data/src/main/java/com/baeldung/storm/spout/RandomIntSpout.java
new file mode 100644
index 0000000000..669eb4f897
--- /dev/null
+++ b/libraries-data/src/main/java/com/baeldung/storm/spout/RandomIntSpout.java
@@ -0,0 +1,35 @@
+package com.baeldung.storm.spout;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+import java.util.Map;
+import java.util.Random;
+
+public class RandomIntSpout extends BaseRichSpout {
+
+ Random random;
+ SpoutOutputCollector outputCollector;
+
+ @Override
+ public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
+ random = new Random();
+ outputCollector = spoutOutputCollector;
+ }
+
+ @Override
+ public void nextTuple() {
+ Utils.sleep(1000);
+ outputCollector.emit(new Values(random.nextInt(), System.currentTimeMillis()));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare(new Fields("randomInt", "timestamp"));
+ }
+}
diff --git a/libraries-data/src/main/java/com/baeldung/storm/spout/RandomNumberSpout.java b/libraries-data/src/main/java/com/baeldung/storm/spout/RandomNumberSpout.java
new file mode 100644
index 0000000000..5d7d3cc53e
--- /dev/null
+++ b/libraries-data/src/main/java/com/baeldung/storm/spout/RandomNumberSpout.java
@@ -0,0 +1,40 @@
+package com.baeldung.storm.spout;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+import java.util.Map;
+import java.util.Random;
+
+public class RandomNumberSpout extends BaseRichSpout {
+ Random random;
+ SpoutOutputCollector collector;
+
+ @Override
+ public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
+ random = new Random();
+ collector = spoutOutputCollector;
+ }
+
+ @Override
+ public void nextTuple() {
+ Utils.sleep(1000);
+ //This will select random int from the range (-1000, 1000)
+ int operation = random.nextInt(1000 + 1 + 1000) - 1000;
+ long timestamp = System.currentTimeMillis();
+
+ Values values = new Values(operation, timestamp);
+ collector.emit(values);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare(new Fields("operation", "timestamp"));
+ }
+}