diff --git a/libraries-data/pom.xml b/libraries-data/pom.xml
index bbf0c7832f..54d24edbf6 100644
--- a/libraries-data/pom.xml
+++ b/libraries-data/pom.xml
@@ -259,6 +259,11 @@
slf4j-api
${slf4j.version}
+
+ org.apache.storm
+ storm-core
+ ${storm.version}
+
ch.qos.logback
@@ -432,6 +437,7 @@
+ 1.2.2
4.0.1
1.4.196
16.5.1
diff --git a/libraries-data/src/main/java/com/baeldung/storm/TopologyRunner.java b/libraries-data/src/main/java/com/baeldung/storm/TopologyRunner.java
new file mode 100644
index 0000000000..326f53c0b8
--- /dev/null
+++ b/libraries-data/src/main/java/com/baeldung/storm/TopologyRunner.java
@@ -0,0 +1,34 @@
+package com.baeldung.storm;
+
+import com.baeldung.storm.bolt.AggregatingBolt;
+import com.baeldung.storm.bolt.FileWritingBolt;
+import com.baeldung.storm.bolt.FilteringBolt;
+import com.baeldung.storm.bolt.PrintingBolt;
+import com.baeldung.storm.spout.RandomNumberSpout;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+
+public class TopologyRunner {
+ public static void main(String[] args) {
+ runTopology();
+ }
+
+ public static void runTopology() {
+ String filePath = "./src/main/resources/operations.txt";
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("randomNumberSpout", new RandomNumberSpout());
+ builder.setBolt("filteringBolt", new FilteringBolt()).shuffleGrouping("randomNumberSpout");
+ builder.setBolt("aggregatingBolt", new AggregatingBolt()
+ .withTimestampField("timestamp")
+ .withLag(BaseWindowedBolt.Duration.seconds(1))
+ .withWindow(BaseWindowedBolt.Duration.seconds(5))).shuffleGrouping("filteringBolt");
+ builder.setBolt("fileBolt", new FileWritingBolt(filePath)).shuffleGrouping("aggregatingBolt");
+
+ Config config = new Config();
+ config.setDebug(false);
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("Test", config, builder.createTopology());
+ }
+}
diff --git a/libraries-data/src/main/java/com/baeldung/storm/bolt/AggregatingBolt.java b/libraries-data/src/main/java/com/baeldung/storm/bolt/AggregatingBolt.java
new file mode 100644
index 0000000000..c7263cd8d5
--- /dev/null
+++ b/libraries-data/src/main/java/com/baeldung/storm/bolt/AggregatingBolt.java
@@ -0,0 +1,39 @@
+package com.baeldung.storm.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.windowing.TupleWindow;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+public class AggregatingBolt extends BaseWindowedBolt {
+ private OutputCollector outputCollector;
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.outputCollector = collector;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("sumOfOperations", "beginningTimestamp", "endTimestamp"));
+ }
+
+ @Override
+ public void execute(TupleWindow tupleWindow) {
+ List tuples = tupleWindow.get();
+ tuples.sort(Comparator.comparing(a -> a.getLongByField("timestamp")));
+ //This is safe since the window is calculated basing on Tuple's timestamp, thus it can't really be empty
+ Long beginningTimestamp = tuples.get(0).getLongByField("timestamp");
+ Long endTimestamp = tuples.get(tuples.size() - 1).getLongByField("timestamp");
+ int sumOfOperations = tuples.stream().mapToInt(tuple -> tuple.getIntegerByField("operation")).sum();
+ Values values = new Values(sumOfOperations, beginningTimestamp, endTimestamp);
+ outputCollector.emit(values);
+ }
+}
diff --git a/libraries-data/src/main/java/com/baeldung/storm/bolt/FileWritingBolt.java b/libraries-data/src/main/java/com/baeldung/storm/bolt/FileWritingBolt.java
new file mode 100644
index 0000000000..339e0dc055
--- /dev/null
+++ b/libraries-data/src/main/java/com/baeldung/storm/bolt/FileWritingBolt.java
@@ -0,0 +1,72 @@
+package com.baeldung.storm.bolt;
+
+import com.baeldung.storm.model.AggregatedWindow;
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Map;
+
+public class FileWritingBolt extends BaseRichBolt {
+ public static Logger logger = LoggerFactory.getLogger(FileWritingBolt.class);
+ private BufferedWriter writer;
+ private String filePath;
+ private ObjectMapper objectMapper;
+
+ @Override
+ public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
+ objectMapper = new ObjectMapper();
+ objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
+ try {
+ writer = new BufferedWriter(new FileWriter(filePath));
+ } catch (IOException e) {
+ logger.error("Failed to open a file for writing.", e);
+ }
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ int sumOfOperations = tuple.getIntegerByField("sumOfOperations");
+ long beginningTimestamp = tuple.getLongByField("beginningTimestamp");
+ long endTimestamp = tuple.getLongByField("endTimestamp");
+
+ if(sumOfOperations > 200) {
+ AggregatedWindow aggregatedWindow = new AggregatedWindow(sumOfOperations, beginningTimestamp, endTimestamp);
+ try {
+ writer.write(objectMapper.writeValueAsString(aggregatedWindow));
+ writer.write("\n");
+ writer.flush();
+ } catch (IOException e) {
+ logger.error("Failed to write data to file.", e);
+ }
+ }
+ }
+
+ public FileWritingBolt(String filePath) {
+ this.filePath = filePath;
+ }
+
+ @Override
+ public void cleanup() {
+ try {
+ writer.close();
+ } catch (IOException e) {
+ logger.error("Failed to close the writer!");
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+
+ }
+}
diff --git a/libraries-data/src/main/java/com/baeldung/storm/bolt/FilteringBolt.java b/libraries-data/src/main/java/com/baeldung/storm/bolt/FilteringBolt.java
new file mode 100644
index 0000000000..564076a1df
--- /dev/null
+++ b/libraries-data/src/main/java/com/baeldung/storm/bolt/FilteringBolt.java
@@ -0,0 +1,22 @@
+package com.baeldung.storm.bolt;
+
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+
+public class FilteringBolt extends BaseBasicBolt {
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
+ int operation = tuple.getIntegerByField("operation");
+ if(operation > 0 ) {
+ basicOutputCollector.emit(tuple.getValues());
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare(new Fields("operation", "timestamp"));
+ }
+}
diff --git a/libraries-data/src/main/java/com/baeldung/storm/bolt/PrintingBolt.java b/libraries-data/src/main/java/com/baeldung/storm/bolt/PrintingBolt.java
new file mode 100644
index 0000000000..efd2c9b1d9
--- /dev/null
+++ b/libraries-data/src/main/java/com/baeldung/storm/bolt/PrintingBolt.java
@@ -0,0 +1,18 @@
+package com.baeldung.storm.bolt;
+
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Tuple;
+
+public class PrintingBolt extends BaseBasicBolt {
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
+ System.out.println(tuple);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+
+ }
+}
diff --git a/libraries-data/src/main/java/com/baeldung/storm/model/AggregatedWindow.java b/libraries-data/src/main/java/com/baeldung/storm/model/AggregatedWindow.java
new file mode 100644
index 0000000000..beaf54d34c
--- /dev/null
+++ b/libraries-data/src/main/java/com/baeldung/storm/model/AggregatedWindow.java
@@ -0,0 +1,16 @@
+package com.baeldung.storm.model;
+
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+@JsonSerialize
+public class AggregatedWindow {
+ int sumOfOperations;
+ long beginningTimestamp;
+ long endTimestamp;
+
+ public AggregatedWindow(int sumOfOperations, long beginningTimestamp, long endTimestamp) {
+ this.sumOfOperations = sumOfOperations;
+ this.beginningTimestamp = beginningTimestamp;
+ this.endTimestamp = endTimestamp;
+ }
+}
diff --git a/libraries-data/src/main/java/com/baeldung/storm/model/User.java b/libraries-data/src/main/java/com/baeldung/storm/model/User.java
new file mode 100644
index 0000000000..eafbf0e1eb
--- /dev/null
+++ b/libraries-data/src/main/java/com/baeldung/storm/model/User.java
@@ -0,0 +1,40 @@
+package com.baeldung.storm.model;
+
+public class User {
+ private String username;
+ private String password;
+ private String email;
+ private int age;
+
+ public String getUsername() {
+ return username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getEmail() {
+ return email;
+ }
+
+ public void setEmail(String email) {
+ this.email = email;
+ }
+
+ public int getAge() {
+ return age;
+ }
+
+ public void setAge(int age) {
+ this.age = age;
+ }
+}
diff --git a/libraries-data/src/main/java/com/baeldung/storm/serialization/UserSerializer.java b/libraries-data/src/main/java/com/baeldung/storm/serialization/UserSerializer.java
new file mode 100644
index 0000000000..6199a203da
--- /dev/null
+++ b/libraries-data/src/main/java/com/baeldung/storm/serialization/UserSerializer.java
@@ -0,0 +1,30 @@
+package com.baeldung.storm.serialization;
+
+
+import com.baeldung.storm.model.User;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+public class UserSerializer extends Serializer{
+ @Override
+ public void write(Kryo kryo, Output output, User user) {
+ output.writeString(user.getEmail());
+ output.writeString(user.getUsername());
+ output.write(user.getAge());
+ }
+
+ @Override
+ public User read(Kryo kryo, Input input, Class aClass) {
+ User user = new User();
+ String email = input.readString();
+ String name = input.readString();
+ int age = input.read();
+ user.setAge(age);
+ user.setEmail(email);
+ user.setUsername(name);
+
+ return user;
+ }
+}
diff --git a/libraries-data/src/main/java/com/baeldung/storm/spout/RandomIntSpout.java b/libraries-data/src/main/java/com/baeldung/storm/spout/RandomIntSpout.java
new file mode 100644
index 0000000000..4a8ef76598
--- /dev/null
+++ b/libraries-data/src/main/java/com/baeldung/storm/spout/RandomIntSpout.java
@@ -0,0 +1,35 @@
+package com.baeldung.storm.spout;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+import java.util.Map;
+import java.util.Random;
+
+public class RandomIntSpout extends BaseRichSpout {
+
+ private Random random;
+ private SpoutOutputCollector outputCollector;
+
+ @Override
+ public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
+ random = new Random();
+ outputCollector = spoutOutputCollector;
+ }
+
+ @Override
+ public void nextTuple() {
+ Utils.sleep(1000);
+ outputCollector.emit(new Values(random.nextInt(), System.currentTimeMillis()));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare(new Fields("randomInt", "timestamp"));
+ }
+}
diff --git a/libraries-data/src/main/java/com/baeldung/storm/spout/RandomNumberSpout.java b/libraries-data/src/main/java/com/baeldung/storm/spout/RandomNumberSpout.java
new file mode 100644
index 0000000000..c9291cdc9d
--- /dev/null
+++ b/libraries-data/src/main/java/com/baeldung/storm/spout/RandomNumberSpout.java
@@ -0,0 +1,40 @@
+package com.baeldung.storm.spout;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+import java.util.Map;
+import java.util.Random;
+
+public class RandomNumberSpout extends BaseRichSpout {
+ private Random random;
+ private SpoutOutputCollector collector;
+
+ @Override
+ public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
+ random = new Random();
+ collector = spoutOutputCollector;
+ }
+
+ @Override
+ public void nextTuple() {
+ Utils.sleep(1000);
+ //This will select random int from the range (0, 100)
+ int operation = random.nextInt(101);
+ long timestamp = System.currentTimeMillis();
+
+ Values values = new Values(operation, timestamp);
+ collector.emit(values);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare(new Fields("operation", "timestamp"));
+ }
+}