From 0be91964dfda73817a65d747e1d885cc838d4be1 Mon Sep 17 00:00:00 2001
From: Shreyash <shreyash.thakare@gmail.com>
Date: Tue, 7 Aug 2018 13:14:21 +0530
Subject: [PATCH] BAEL-1164 hzaelcast Jet.

---
 hazelcast/pom.xml                             | 20 +++-----
 .../baeldung/hazelcast/jet/WordCounter.java   | 51 +++++++++++++++++++
 .../hazelcast/jet/WordCounterUnitTest.java    | 21 ++++++++
 3 files changed, 80 insertions(+), 12 deletions(-)
 create mode 100644 hazelcast/src/main/java/com/baeldung/hazelcast/jet/WordCounter.java
 create mode 100644 hazelcast/src/test/java/com/baeldung/hazelcast/jet/WordCounterUnitTest.java

diff --git a/hazelcast/pom.xml b/hazelcast/pom.xml
index cc366cd0a6..705792ad05 100644
--- a/hazelcast/pom.xml
+++ b/hazelcast/pom.xml
@@ -13,16 +13,12 @@
     </parent>
 
     <dependencies>
-        <dependency>
-            <groupId>com.hazelcast</groupId>
-            <artifactId>hazelcast</artifactId>
-            <version>${hazelcast.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.hazelcast</groupId>
-            <artifactId>hazelcast-client</artifactId>
-            <version>${hazelcast.version}</version>
-        </dependency>
+        <!-- Hazelcast Jet -->
+		<dependency>
+          <groupId>com.hazelcast.jet</groupId>
+          <artifactId>hazelcast-jet</artifactId>
+          <version>${hazelcast.jet.version}</version>
+		</dependency>
     </dependencies>
 
     <build>
@@ -36,8 +32,8 @@
     </build>
 
     <properties>
-        <!-- hazelcast -->
-        <hazelcast.version>3.8.4</hazelcast.version>
+        <!-- hazelcast jet-->
+        <hazelcast.jet.version>0.6</hazelcast.jet.version>
     </properties>
 
 </project>
\ No newline at end of file
diff --git a/hazelcast/src/main/java/com/baeldung/hazelcast/jet/WordCounter.java b/hazelcast/src/main/java/com/baeldung/hazelcast/jet/WordCounter.java
new file mode 100644
index 0000000000..971986bcae
--- /dev/null
+++ b/hazelcast/src/main/java/com/baeldung/hazelcast/jet/WordCounter.java
@@ -0,0 +1,51 @@
+package com.baeldung.hazelcast.jet;
+
+import java.util.List;
+import java.util.Map;
+
+import static com.hazelcast.jet.Traversers.traverseArray;
+import static com.hazelcast.jet.aggregate.AggregateOperations.counting;
+import static com.hazelcast.jet.function.DistributedFunctions.wholeItem;
+
+import com.hazelcast.jet.Jet;
+import com.hazelcast.jet.JetInstance;
+import com.hazelcast.jet.pipeline.Pipeline;
+import com.hazelcast.jet.pipeline.Sinks;
+import com.hazelcast.jet.pipeline.Sources;
+
+public class WordCounter {
+
+    private static final String LIST_NAME = "textList";
+
+    private static final String MAP_NAME = "countMap";
+
+    private Pipeline createPipeLine() {
+        Pipeline p = Pipeline.create();
+        p.drawFrom(Sources.<String> list(LIST_NAME))
+            .flatMap(word -> traverseArray(word.toLowerCase()
+                .split("\\W+")))
+            .filter(word -> !word.isEmpty())
+            .groupingKey(wholeItem())
+            .aggregate(counting())
+            .drainTo(Sinks.map(MAP_NAME));
+        return p;
+    }
+
+    public Long countWord(List<String> sentences, String word) {
+        long count = 0;
+        JetInstance jet = Jet.newJetInstance();
+        try {
+            List<String> textList = jet.getList(LIST_NAME);
+            textList.addAll(sentences);
+            Pipeline p = createPipeLine();
+            jet.newJob(p)
+                .join();
+            Map<String, Long> counts = jet.getMap(MAP_NAME);
+            count = counts.get(word);
+        } finally {
+            Jet.shutdownAll();
+        }
+        return count;
+    }
+
+}
diff --git a/hazelcast/src/test/java/com/baeldung/hazelcast/jet/WordCounterUnitTest.java b/hazelcast/src/test/java/com/baeldung/hazelcast/jet/WordCounterUnitTest.java
new file mode 100644
index 0000000000..95596b3860
--- /dev/null
+++ b/hazelcast/src/test/java/com/baeldung/hazelcast/jet/WordCounterUnitTest.java
@@ -0,0 +1,21 @@
+package com.baeldung.hazelcast.jet;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+
+public class WordCounterUnitTest {
+
+    @Test
+    public void whenGivenSentencesAndWord_ThenReturnCountOfWord() {
+        List<String> sentences = new ArrayList<>();
+        sentences.add("The first second was alright, but the second second was tough.");
+        WordCounter wordCounter = new WordCounter();
+        long countSecond = wordCounter.countWord(sentences, "second");
+        assertTrue(countSecond == 3);
+    }
+
+}