diff --git a/core-java/src/main/java/com/baeldung/decimalformat/DoubletoString.java b/core-java/src/main/java/com/baeldung/decimalformat/DoubletoString.java index 87d10a3548..e605c5e200 100644 --- a/core-java/src/main/java/com/baeldung/decimalformat/DoubletoString.java +++ b/core-java/src/main/java/com/baeldung/decimalformat/DoubletoString.java @@ -1,38 +1,29 @@ package com.baeldung.decimalformat; - import java.math.RoundingMode; - import java.text.DecimalFormat; - import java.text.NumberFormat; - import java.util.Locale; +import java.math.RoundingMode; +import java.text.DecimalFormat; +import java.text.NumberFormat; - public class DoubletoString { +public class DoubletoString { - public static void main(String[] args) { + public static void main(String[] args) { - double doubleValue = 345.56; + double doubleValue = 345.56; - System.out.println(String.valueOf((int) doubleValue)); + System.out.println(String.valueOf((int) doubleValue)); - System.out.println(String.format("%.0f", doubleValue)); + System.out.println(String.format("%.0f", doubleValue)); - doubleValue = Math.floor(doubleValue); - DecimalFormat df = new DecimalFormat("#"); - df.setRoundingMode(RoundingMode.FLOOR); - System.out.println(df.format(doubleValue)); - - Locale enlocale = new Locale("en", "US"); - String pattern = "###,##"; - df = (DecimalFormat) NumberFormat.getNumberInstance(enlocale); - df.applyPattern(pattern); - String format = df.format(doubleValue); - System.out.println(format); + NumberFormat nf = NumberFormat.getInstance(); + nf.setMaximumFractionDigits(0); + nf.setRoundingMode(RoundingMode.FLOOR); + System.out.println(nf.format(doubleValue)); - Locale dalocale = new Locale("da", "DK"); - df = (DecimalFormat) NumberFormat.getNumberInstance(dalocale); - df.applyPattern(pattern); - System.out.println(df.format(doubleValue)); + doubleValue = Math.floor(doubleValue); + DecimalFormat df = new DecimalFormat("#,###"); + df.setRoundingMode(RoundingMode.FLOOR); + System.out.println(df.format(doubleValue)); + } - } - - } +} diff --git a/libraries/pom.xml b/libraries/pom.xml index 1a88acfb7b..f7980f10f6 100644 --- a/libraries/pom.xml +++ b/libraries/pom.xml @@ -933,7 +933,7 @@ 2.5.5 1.23.0 v4-rev493-1.21.0 - 1.0.0 + 2.0.0 1.7.0 3.0.14 2.2.0 diff --git a/libraries/src/main/java/com/baeldung/kafka/TransactionalApp.java b/libraries/src/main/java/com/baeldung/kafka/TransactionalApp.java new file mode 100644 index 0000000000..1e95041a0d --- /dev/null +++ b/libraries/src/main/java/com/baeldung/kafka/TransactionalApp.java @@ -0,0 +1,102 @@ +package com.baeldung.kafka; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static java.time.Duration.ofSeconds; +import static java.util.Collections.singleton; +import static org.apache.kafka.clients.consumer.ConsumerConfig.*; +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.*; + +public class TransactionalApp { + + private static final String CONSUMER_GROUP_ID = "test"; + private static final String OUTPUT_TOPIC = "output"; + private static final String INPUT_TOPIC = "input"; + + public static void main(String[] args) { + + KafkaConsumer consumer = initConsumer(); + KafkaProducer producer = initProducer(); + + producer.initTransactions(); + + try { + + while (true) { + + ConsumerRecords records = consumer.poll(ofSeconds(20)); + + producer.beginTransaction(); + + for (ConsumerRecord record : records) + producer.send(new ProducerRecord(OUTPUT_TOPIC, record)); + + Map offsetsToCommit = new HashMap<>(); + + for (TopicPartition partition : records.partitions()) { + List> partitionedRecords = records.records(partition); + long offset = partitionedRecords.get(partitionedRecords.size() - 1).offset(); + + offsetsToCommit.put(partition, new OffsetAndMetadata(offset)); + } + + producer.sendOffsetsToTransaction(offsetsToCommit, CONSUMER_GROUP_ID); + producer.commitTransaction(); + + } + + } catch (KafkaException e) { + + producer.abortTransaction(); + + } + + + } + + private static KafkaConsumer initConsumer() { + Properties props = new Properties(); + props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(GROUP_ID_CONFIG, CONSUMER_GROUP_ID); + props.put(ENABLE_AUTO_COMMIT_CONFIG, "false"); + props.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + props.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + + KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.subscribe(singleton(INPUT_TOPIC)); + return consumer; + } + + private static KafkaProducer initProducer() { + + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ACKS_CONFIG, "all"); + props.put(RETRIES_CONFIG, 3); + props.put(BATCH_SIZE_CONFIG, 16384); + props.put(LINGER_MS_CONFIG, 1); + props.put(BUFFER_MEMORY_CONFIG, 33554432); + props.put(ENABLE_IDEMPOTENCE_CONFIG, "true"); + props.put(TRANSACTIONAL_ID_CONFIG, "prod-1"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + + return new KafkaProducer(props); + + } + +} diff --git a/libraries/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java b/libraries/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java index 4406494d30..e61f4158a7 100644 --- a/libraries/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java +++ b/libraries/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java @@ -4,10 +4,12 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.test.TestUtils; import org.junit.Ignore; import org.junit.Test; @@ -36,20 +38,20 @@ public class KafkaStreamsLiveTest { streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); // when - KStreamBuilder builder = new KStreamBuilder(); + StreamsBuilder builder = new StreamsBuilder(); KStream textLines = builder.stream(inputTopic); Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS); KTable wordCounts = textLines.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase()))).groupBy((key, word) -> word).count(); - wordCounts.foreach((word, count) -> System.out.println("word: " + word + " -> " + count)); + textLines.foreach((word, count) -> System.out.println("word: " + word + " -> " + count)); String outputTopic = "outputTopic"; final Serde stringSerde = Serdes.String(); - final Serde longSerde = Serdes.Long(); - wordCounts.to(stringSerde, longSerde, outputTopic); + final Serde longSerde = Serdes.String(); + textLines.to(outputTopic, Produced.with(stringSerde,longSerde)); - KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); + KafkaStreams streams = new KafkaStreams(new Topology(), streamsConfiguration); streams.start(); // then