Merge pull request #4914 from shreyasht/master
BAEL-1164 Changes for hazelcast jet.
This commit is contained in:
commit
e35e84291e
@ -13,16 +13,12 @@
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.hazelcast</groupId>
|
||||
<artifactId>hazelcast</artifactId>
|
||||
<version>${hazelcast.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.hazelcast</groupId>
|
||||
<artifactId>hazelcast-client</artifactId>
|
||||
<version>${hazelcast.version}</version>
|
||||
</dependency>
|
||||
<!-- Hazelcast Jet -->
|
||||
<dependency>
|
||||
<groupId>com.hazelcast.jet</groupId>
|
||||
<artifactId>hazelcast-jet</artifactId>
|
||||
<version>${hazelcast.jet.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
@ -36,8 +32,8 @@
|
||||
</build>
|
||||
|
||||
<properties>
|
||||
<!-- hazelcast -->
|
||||
<hazelcast.version>3.8.4</hazelcast.version>
|
||||
<!-- hazelcast jet-->
|
||||
<hazelcast.jet.version>0.6</hazelcast.jet.version>
|
||||
</properties>
|
||||
|
||||
</project>
|
@ -0,0 +1,51 @@
|
||||
package com.baeldung.hazelcast.jet;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static com.hazelcast.jet.Traversers.traverseArray;
|
||||
import static com.hazelcast.jet.aggregate.AggregateOperations.counting;
|
||||
import static com.hazelcast.jet.function.DistributedFunctions.wholeItem;
|
||||
|
||||
import com.hazelcast.jet.Jet;
|
||||
import com.hazelcast.jet.JetInstance;
|
||||
import com.hazelcast.jet.pipeline.Pipeline;
|
||||
import com.hazelcast.jet.pipeline.Sinks;
|
||||
import com.hazelcast.jet.pipeline.Sources;
|
||||
|
||||
public class WordCounter {
|
||||
|
||||
private static final String LIST_NAME = "textList";
|
||||
|
||||
private static final String MAP_NAME = "countMap";
|
||||
|
||||
private Pipeline createPipeLine() {
|
||||
Pipeline p = Pipeline.create();
|
||||
p.drawFrom(Sources.<String> list(LIST_NAME))
|
||||
.flatMap(word -> traverseArray(word.toLowerCase()
|
||||
.split("\\W+")))
|
||||
.filter(word -> !word.isEmpty())
|
||||
.groupingKey(wholeItem())
|
||||
.aggregate(counting())
|
||||
.drainTo(Sinks.map(MAP_NAME));
|
||||
return p;
|
||||
}
|
||||
|
||||
public Long countWord(List<String> sentences, String word) {
|
||||
long count = 0;
|
||||
JetInstance jet = Jet.newJetInstance();
|
||||
try {
|
||||
List<String> textList = jet.getList(LIST_NAME);
|
||||
textList.addAll(sentences);
|
||||
Pipeline p = createPipeLine();
|
||||
jet.newJob(p)
|
||||
.join();
|
||||
Map<String, Long> counts = jet.getMap(MAP_NAME);
|
||||
count = counts.get(word);
|
||||
} finally {
|
||||
Jet.shutdownAll();
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,21 @@
|
||||
package com.baeldung.hazelcast.jet;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class WordCounterUnitTest {
|
||||
|
||||
@Test
|
||||
public void whenGivenSentencesAndWord_ThenReturnCountOfWord() {
|
||||
List<String> sentences = new ArrayList<>();
|
||||
sentences.add("The first second was alright, but the second second was tough.");
|
||||
WordCounter wordCounter = new WordCounter();
|
||||
long countSecond = wordCounter.countWord(sentences, "second");
|
||||
assertTrue(countSecond == 3);
|
||||
}
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user