BAEL-1463: Apache Storm Introduction

This commit is contained in:
DomWos 2018-10-13 23:55:53 +02:00
parent 90978bf35e
commit 99a840fd03
11 changed files with 343 additions and 0 deletions

View File

@ -259,6 +259,11 @@
<artifactId>slf4j-api</artifactId> <artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version> <version>${slf4j.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
</dependency>
<dependency> <dependency>
<groupId>ch.qos.logback</groupId> <groupId>ch.qos.logback</groupId>
@ -432,6 +437,7 @@
</repositories> </repositories>
<properties> <properties>
<storm.version>1.2.2</storm.version>
<kryo.version>4.0.1</kryo.version> <kryo.version>4.0.1</kryo.version>
<h2.version>1.4.196</h2.version> <h2.version>1.4.196</h2.version>
<reladomo.version>16.5.1</reladomo.version> <reladomo.version>16.5.1</reladomo.version>

View File

@ -0,0 +1,34 @@
package com.baeldung.storm;
import com.baeldung.storm.bolt.AggregatingBolt;
import com.baeldung.storm.bolt.FileWritingBolt;
import com.baeldung.storm.bolt.FilteringBolt;
import com.baeldung.storm.bolt.PrintingBolt;
import com.baeldung.storm.spout.RandomNumberSpout;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt;
public class TopologyRunner {
public static void main(String[] args) {
runTopology();
}
public static void runTopology() {
String filePath = "./src/main/resources/operations.txt";
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("randomNumberSpout", new RandomNumberSpout());
builder.setBolt("filteringBolt", new FilteringBolt()).shuffleGrouping("randomNumberSpout");
builder.setBolt("aggregatingBolt", new AggregatingBolt()
.withTimestampField("timestamp")
.withLag(BaseWindowedBolt.Duration.seconds(1))
.withWindow(BaseWindowedBolt.Duration.seconds(5))).shuffleGrouping("filteringBolt");
builder.setBolt("fileBolt", new FileWritingBolt(filePath)).shuffleGrouping("aggregatingBolt");
Config config = new Config();
config.setDebug(false);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Test", config, builder.createTopology());
}
}

View File

@ -0,0 +1,39 @@
package com.baeldung.storm.bolt;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.windowing.TupleWindow;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
public class AggregatingBolt extends BaseWindowedBolt {
OutputCollector outputCollector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.outputCollector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sumOfOperations", "beginningTimestamp", "endTimestamp"));
}
@Override
public void execute(TupleWindow tupleWindow) {
List<Tuple> tuples = tupleWindow.get();
tuples.sort(Comparator.comparing(a -> a.getLongByField("timestamp")));
//This is safe since the window is calculated basing on Tuple's timestamp, thus it can't really be empty
Long beginningTimestamp = tuples.get(0).getLongByField("timestamp");
Long endTimestamp = tuples.get(tuples.size() - 1).getLongByField("timestamp");
int sumOfOperations = tuples.stream().mapToInt(tuple -> tuple.getIntegerByField("operation")).sum();
Values values = new Values(sumOfOperations, beginningTimestamp, endTimestamp);
outputCollector.emit(values);
}
}

View File

@ -0,0 +1,63 @@
package com.baeldung.storm.bolt;
import com.baeldung.storm.model.AggregatedWindow;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
public class FileWritingBolt extends BaseRichBolt {
public static Logger logger = LoggerFactory.getLogger(FileWritingBolt.class);
BufferedWriter writer;
String filePath;
ObjectMapper objectMapper;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
try {
writer = new BufferedWriter(new FileWriter(filePath));
} catch (IOException e) {
logger.error("Failed to open a file for writing.", e);
}
}
@Override
public void execute(Tuple tuple) {
int sumOfOperations = tuple.getIntegerByField("sumOfOperations");
long beginningTimestamp = tuple.getLongByField("beginningTimestamp");
long endTimestamp = tuple.getLongByField("endTimestamp");
if(sumOfOperations > 200) {
AggregatedWindow aggregatedWindow = new AggregatedWindow(sumOfOperations, beginningTimestamp, endTimestamp);
try {
writer.write(objectMapper.writeValueAsString(aggregatedWindow));
writer.write("\n");
writer.flush();
} catch (IOException e) {
logger.error("Failed to write data to file.", e);
}
}
}
public FileWritingBolt(String filePath) {
this.filePath = filePath;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}

View File

@ -0,0 +1,22 @@
package com.baeldung.storm.bolt;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
public class FilteringBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
int operation = tuple.getIntegerByField("operation");
if(operation >= 0 ) {
basicOutputCollector.emit(tuple.getValues());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("operation", "timestamp"));
}
}

View File

@ -0,0 +1,18 @@
package com.baeldung.storm.bolt;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
public class PrintingBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
System.out.println(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}

View File

@ -0,0 +1,16 @@
package com.baeldung.storm.model;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
@JsonSerialize
public class AggregatedWindow {
int sumOfOperations;
long beginningTimestamp;
long endTimestamp;
public AggregatedWindow(int sumOfOperations, long beginningTimestamp, long endTimestamp) {
this.sumOfOperations = sumOfOperations;
this.beginningTimestamp = beginningTimestamp;
this.endTimestamp = endTimestamp;
}
}

View File

@ -0,0 +1,40 @@
package com.baeldung.storm.model;
public class User {
String username;
String password;
String email;
int age;
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}

View File

@ -0,0 +1,30 @@
package com.baeldung.storm.serialization;
import com.baeldung.storm.model.User;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
public class UserSerializer extends Serializer<User>{
@Override
public void write(Kryo kryo, Output output, User user) {
output.writeString(user.getEmail());
output.writeString(user.getUsername());
output.write(user.getAge());
}
@Override
public User read(Kryo kryo, Input input, Class<User> aClass) {
User user = new User();
String email = input.readString();
String name = input.readString();
int age = input.read();
user.setAge(age);
user.setEmail(email);
user.setUsername(name);
return user;
}
}

View File

@ -0,0 +1,35 @@
package com.baeldung.storm.spout;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Map;
import java.util.Random;
public class RandomIntSpout extends BaseRichSpout {
Random random;
SpoutOutputCollector outputCollector;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
random = new Random();
outputCollector = spoutOutputCollector;
}
@Override
public void nextTuple() {
Utils.sleep(1000);
outputCollector.emit(new Values(random.nextInt(), System.currentTimeMillis()));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("randomInt", "timestamp"));
}
}

View File

@ -0,0 +1,40 @@
package com.baeldung.storm.spout;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Map;
import java.util.Random;
public class RandomNumberSpout extends BaseRichSpout {
Random random;
SpoutOutputCollector collector;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
random = new Random();
collector = spoutOutputCollector;
}
@Override
public void nextTuple() {
Utils.sleep(1000);
//This will select random int from the range (-1000, 1000)
int operation = random.nextInt(1000 + 1 + 1000) - 1000;
long timestamp = System.currentTimeMillis();
Values values = new Values(operation, timestamp);
collector.emit(values);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("operation", "timestamp"));
}
}