Switched fields to be static to remove initialization in method.

This commit is contained in:
DomWos 2018-09-03 12:08:45 +02:00
parent 1921b585a6
commit be81f87c3c
3 changed files with 11 additions and 6 deletions

View File

@ -48,7 +48,7 @@ public static void createBackup () throws Exception {
String inputTopic = "flink_input"; String inputTopic = "flink_input";
String outputTopic = "flink_output"; String outputTopic = "flink_output";
String consumerGroup = "baeldung"; String consumerGroup = "baeldung";
String kafkaAddress = "192.168.99.100:9092"; String kafkaAddress = "localhost:9092";
StreamExecutionEnvironment environment = StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment.getExecutionEnvironment();

View File

@ -1,6 +1,8 @@
package com.baeldung.flink.schema; package com.baeldung.flink.schema;
import com.baeldung.flink.model.Backup; import com.baeldung.flink.model.Backup;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema;
@ -10,12 +12,14 @@ import org.slf4j.LoggerFactory;
public class BackupSerializationSchema public class BackupSerializationSchema
implements SerializationSchema<Backup> { implements SerializationSchema<Backup> {
ObjectMapper objectMapper; static ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
Logger logger = LoggerFactory.getLogger(BackupSerializationSchema.class); Logger logger = LoggerFactory.getLogger(BackupSerializationSchema.class);
@Override @Override
public byte[] serialize(Backup backupMessage) { public byte[] serialize(Backup backupMessage) {
if(objectMapper == null) { if(objectMapper == null) {
objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
} }
try { try {

View File

@ -1,6 +1,8 @@
package com.baeldung.flink.schema; package com.baeldung.flink.schema;
import com.baeldung.flink.model.InputMessage; import com.baeldung.flink.model.InputMessage;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.DeserializationSchema;
@ -11,13 +13,12 @@ import java.io.IOException;
public class InputMessageDeserializationSchema implements public class InputMessageDeserializationSchema implements
DeserializationSchema<InputMessage> { DeserializationSchema<InputMessage> {
ObjectMapper objectMapper; static ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
@Override @Override
public InputMessage deserialize(byte[] bytes) throws IOException { public InputMessage deserialize(byte[] bytes) throws IOException {
if(objectMapper == null) {
objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
}
return objectMapper.readValue(bytes, InputMessage.class); return objectMapper.readValue(bytes, InputMessage.class);
} }