Merge branch 'master' into stream-foreach-ifelse-logic
This commit is contained in:
commit
348edfb1a9
|
@ -44,4 +44,12 @@ public class Employee {
|
|||
result = 31 * result + name.hashCode();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Employee{" +
|
||||
"id=" + id +
|
||||
", name='" + name + '\'' +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
package com.baeldung.sort;
|
||||
|
||||
public class Employee implements Comparable<Employee> {
|
||||
|
||||
private Long id;
|
||||
private String name;
|
||||
|
||||
public Employee(Long id, String name) {
|
||||
this.name = name;
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public Long getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public void setId(Long id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
|
||||
Employee employee = (Employee) o;
|
||||
|
||||
if (!id.equals(employee.id)) return false;
|
||||
return name.equals(employee.name);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = id.hashCode();
|
||||
result = 31 * result + name.hashCode();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Employee{" +
|
||||
"id=" + id +
|
||||
", name='" + name + '\'' +
|
||||
'}';
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Employee employee) {
|
||||
return (int)(this.id - employee.getId());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,104 @@
|
|||
package com.baeldung.sort;
|
||||
|
||||
import com.google.common.base.Functions;
|
||||
import com.google.common.collect.ImmutableSortedMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Ordering;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class SortHashMap {
|
||||
|
||||
private static Map<String, Employee> map = new HashMap<>();
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
initialize();
|
||||
|
||||
treeMapSortByKey();
|
||||
|
||||
arrayListSortByValue();
|
||||
arrayListSortByKey();
|
||||
|
||||
sortStream();
|
||||
|
||||
sortGuava();
|
||||
|
||||
addDuplicates();
|
||||
|
||||
treeSetByKey();
|
||||
treeSetByValue();
|
||||
|
||||
}
|
||||
|
||||
private static void sortGuava() {
|
||||
final Ordering naturalOrdering =
|
||||
Ordering.natural().onResultOf(Functions.forMap(map, null));
|
||||
|
||||
System.out.println(ImmutableSortedMap.copyOf(map, naturalOrdering));
|
||||
}
|
||||
|
||||
private static void sortStream() {
|
||||
map.entrySet().stream()
|
||||
.sorted(Map.Entry.<String, Employee>comparingByKey().reversed())
|
||||
.forEach(System.out::println);
|
||||
|
||||
Map<String, Employee> result = map.entrySet().stream()
|
||||
.sorted(Map.Entry.comparingByValue())
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue,
|
||||
(oldValue, newValue) -> oldValue, LinkedHashMap::new));
|
||||
|
||||
result.entrySet().forEach(System.out::println);
|
||||
}
|
||||
|
||||
private static void treeSetByValue() {
|
||||
SortedSet<Employee> values = new TreeSet<>(map.values());
|
||||
System.out.println(values);
|
||||
}
|
||||
|
||||
private static void treeSetByKey() {
|
||||
SortedSet<String> keysSet = new TreeSet<>(map.keySet());
|
||||
System.out.println(keysSet);
|
||||
}
|
||||
|
||||
private static void treeMapSortByKey() {
|
||||
TreeMap<String, Employee> sorted = new TreeMap<>(map);
|
||||
sorted.putAll(map);
|
||||
|
||||
sorted.entrySet().forEach(System.out::println);
|
||||
|
||||
}
|
||||
|
||||
private static void arrayListSortByValue() {
|
||||
List<Employee> employeeById = new ArrayList<>(map.values());
|
||||
|
||||
Collections.sort(employeeById);
|
||||
|
||||
System.out.println(employeeById);
|
||||
}
|
||||
|
||||
private static void arrayListSortByKey() {
|
||||
List<String> employeeByKey = new ArrayList<>(map.keySet());
|
||||
Collections.sort(employeeByKey);
|
||||
System.out.println(employeeByKey);
|
||||
}
|
||||
|
||||
private static void initialize() {
|
||||
Employee employee1 = new Employee(1L, "Mher");
|
||||
map.put(employee1.getName(), employee1);
|
||||
Employee employee2 = new Employee(22L, "Annie");
|
||||
map.put(employee2.getName(), employee2);
|
||||
Employee employee3 = new Employee(8L, "John");
|
||||
map.put(employee3.getName(), employee3);
|
||||
Employee employee4 = new Employee(2L, "George");
|
||||
map.put(employee4.getName(), employee4);
|
||||
}
|
||||
|
||||
private static void addDuplicates() {
|
||||
Employee employee5 = new Employee(1L, "Mher");
|
||||
map.put(employee5.getName(), employee5);
|
||||
Employee employee6 = new Employee(22L, "Annie");
|
||||
map.put(employee6.getName(), employee6);
|
||||
}
|
||||
}
|
|
@ -8,12 +8,30 @@
|
|||
|
||||
<parent>
|
||||
<groupId>com.baeldung</groupId>
|
||||
<artifactId>parent-modules</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
<artifactId>parent-boot-1</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<relativePath>../parent-boot-1</relativePath>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-web</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.security.oauth</groupId>
|
||||
<artifactId>spring-security-oauth2</artifactId>
|
||||
<version>2.3.3.RELEASE</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.github.scribejava</groupId>
|
||||
<artifactId>scribejava-apis</artifactId>
|
||||
<version>${scribejava.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
|
@ -25,6 +43,9 @@
|
|||
|
||||
<properties>
|
||||
<junit.version>4.12</junit.version>
|
||||
<spring-boot-maven-plugin.version>2.0.4.RELEASE</spring-boot-maven-plugin.version>
|
||||
<scribejava.version>5.6.0</scribejava.version>
|
||||
</properties>
|
||||
|
||||
|
||||
</project>
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
package com.baeldung.scribejava;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
|
||||
@SpringBootApplication
|
||||
public class ScribejavaApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(ScribejavaApplication.class, args);
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,27 @@
|
|||
package com.baeldung.scribejava.api;
|
||||
|
||||
import com.github.scribejava.core.builder.api.DefaultApi20;
|
||||
|
||||
public class MyApi extends DefaultApi20 {
|
||||
|
||||
private MyApi() {
|
||||
}
|
||||
|
||||
private static class InstanceHolder {
|
||||
private static final MyApi INSTANCE = new MyApi();
|
||||
}
|
||||
|
||||
public static MyApi instance() {
|
||||
return InstanceHolder.INSTANCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getAccessTokenEndpoint() {
|
||||
return "http://localhost:8080/oauth/token";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getAuthorizationBaseUrl() {
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
package com.baeldung.scribejava.controller;
|
||||
|
||||
import com.baeldung.scribejava.service.GoogleService;
|
||||
import com.github.scribejava.core.model.OAuth2AccessToken;
|
||||
import com.github.scribejava.core.model.OAuthRequest;
|
||||
import com.github.scribejava.core.model.Response;
|
||||
import com.github.scribejava.core.model.Verb;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
@RestController
|
||||
public class GoogleController {
|
||||
|
||||
@Autowired
|
||||
private GoogleService service;
|
||||
|
||||
|
||||
@GetMapping(value ="/me/google")
|
||||
public void me(HttpServletResponse response){
|
||||
String auth = service.getService().getAuthorizationUrl();
|
||||
|
||||
response.setHeader("Location", auth);
|
||||
response.setStatus(302);
|
||||
|
||||
}
|
||||
|
||||
@GetMapping(value = "/auth/google")
|
||||
public String google(@RequestParam String code, HttpServletResponse servletResponse){
|
||||
|
||||
try {
|
||||
OAuth2AccessToken token = service.getService().getAccessToken(code);
|
||||
|
||||
OAuthRequest request = new OAuthRequest(Verb.GET, "https://www.googleapis.com/oauth2/v1/userinfo?alt=json");
|
||||
service.getService().signRequest(token, request);
|
||||
Response response = service.getService().execute(request);
|
||||
return response.getBody();
|
||||
|
||||
}catch (Exception e){
|
||||
servletResponse.setStatus(HttpServletResponse.SC_BAD_REQUEST);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
package com.baeldung.scribejava.controller;
|
||||
|
||||
import com.baeldung.scribejava.service.TwitterService;
|
||||
import com.github.scribejava.core.model.*;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import java.io.IOException;
|
||||
import java.util.Scanner;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
@RestController
|
||||
public class TwitterController {
|
||||
|
||||
@Autowired
|
||||
private TwitterService service;
|
||||
|
||||
|
||||
@GetMapping(value ="/me/twitter")
|
||||
public String me(HttpServletResponse servletResponse){
|
||||
try {
|
||||
OAuth1RequestToken requestToken = service.getService().getRequestToken();
|
||||
|
||||
String auth = service.getService().getAuthorizationUrl(requestToken);
|
||||
|
||||
Runtime runtime = Runtime.getRuntime();
|
||||
try {
|
||||
runtime.exec("rundll32 url.dll,FileProtocolHandler " + auth);
|
||||
} catch (IOException e) {
|
||||
servletResponse.setStatus(HttpServletResponse.SC_BAD_REQUEST);
|
||||
return null;
|
||||
}
|
||||
|
||||
System.out.println("Insert twitter code:");
|
||||
Scanner in = new Scanner(System.in);
|
||||
|
||||
String oauthverifier = in.nextLine();
|
||||
|
||||
final OAuth1AccessToken accessToken = service.getService().getAccessToken(requestToken,oauthverifier);
|
||||
|
||||
OAuthRequest request = new OAuthRequest(Verb.GET, "https://api.twitter.com/1.1/account/verify_credentials.json");
|
||||
service.getService().signRequest(accessToken, request);
|
||||
Response response = service.getService().execute(request);
|
||||
return response.getBody();
|
||||
|
||||
} catch (IOException | InterruptedException | ExecutionException e) {
|
||||
servletResponse.setStatus(HttpServletResponse.SC_BAD_REQUEST);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
package com.baeldung.scribejava.controller;
|
||||
|
||||
import com.baeldung.scribejava.service.MyService;
|
||||
import com.github.scribejava.core.model.OAuth2AccessToken;
|
||||
import com.github.scribejava.core.model.OAuthRequest;
|
||||
import com.github.scribejava.core.model.Response;
|
||||
import com.github.scribejava.core.model.Verb;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import java.security.Principal;
|
||||
|
||||
@RestController(value = "/user")
|
||||
public class UserController {
|
||||
|
||||
@Autowired
|
||||
private MyService service;
|
||||
|
||||
@GetMapping("/me/myapi")
|
||||
public String me(@RequestParam String username, @RequestParam String password, HttpServletResponse responsehttp) {
|
||||
|
||||
try {
|
||||
OAuth2AccessToken token = service.getService().getAccessTokenPasswordGrant(username, password);
|
||||
|
||||
OAuthRequest request = new OAuthRequest(Verb.GET, "http://localhost:8080/me");
|
||||
service.getService().signRequest(token, request);
|
||||
Response response = service.getService().execute(request);
|
||||
|
||||
return response.getBody();
|
||||
|
||||
} catch (Exception e) {
|
||||
responsehttp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
|
||||
}
|
||||
|
||||
return null;
|
||||
|
||||
}
|
||||
|
||||
@GetMapping("/me")
|
||||
public Principal user(Principal principal) {
|
||||
return principal;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
package com.baeldung.scribejava.oauth;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.security.authentication.AuthenticationManager;
|
||||
import org.springframework.security.oauth2.config.annotation.configurers.ClientDetailsServiceConfigurer;
|
||||
import org.springframework.security.oauth2.config.annotation.web.configuration.AuthorizationServerConfigurerAdapter;
|
||||
import org.springframework.security.oauth2.config.annotation.web.configuration.EnableAuthorizationServer;
|
||||
import org.springframework.security.oauth2.config.annotation.web.configurers.AuthorizationServerEndpointsConfigurer;
|
||||
import org.springframework.security.oauth2.config.annotation.web.configurers.AuthorizationServerSecurityConfigurer;
|
||||
|
||||
|
||||
@Configuration
|
||||
@EnableAuthorizationServer
|
||||
public class AuthServiceConfig extends AuthorizationServerConfigurerAdapter {
|
||||
|
||||
@Autowired
|
||||
@Qualifier("authenticationManagerBean")
|
||||
private AuthenticationManager authenticationManager;
|
||||
|
||||
@Override
|
||||
public void configure(AuthorizationServerSecurityConfigurer oauthServer) throws Exception {
|
||||
oauthServer.tokenKeyAccess("permitAll()")
|
||||
.checkTokenAccess("isAuthenticated()");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(ClientDetailsServiceConfigurer clients) throws Exception {
|
||||
clients.inMemory()
|
||||
.withClient("baeldung_api_key")
|
||||
.secret("baeldung_api_secret")
|
||||
.authorizedGrantTypes("password","refresh_token")
|
||||
.scopes("read","write").autoApprove(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(AuthorizationServerEndpointsConfigurer endpoints) throws Exception {
|
||||
endpoints
|
||||
.authenticationManager(authenticationManager)
|
||||
.allowedTokenEndpointRequestMethods(HttpMethod.GET, HttpMethod.POST);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,53 @@
|
|||
package com.baeldung.scribejava.oauth;
|
||||
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.security.authentication.AuthenticationManager;
|
||||
import org.springframework.security.config.annotation.authentication.builders.AuthenticationManagerBuilder;
|
||||
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
|
||||
import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;
|
||||
import org.springframework.security.oauth2.config.annotation.web.configuration.EnableResourceServer;
|
||||
import org.springframework.security.oauth2.config.annotation.web.configuration.ResourceServerConfigurerAdapter;
|
||||
|
||||
@Configuration
|
||||
@EnableResourceServer
|
||||
public class WebSecurityConfig extends WebSecurityConfigurerAdapter {
|
||||
|
||||
@Override
|
||||
protected void configure(HttpSecurity http) throws Exception {
|
||||
http
|
||||
.headers().frameOptions().disable()
|
||||
.and()
|
||||
.csrf().disable();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void configure(AuthenticationManagerBuilder auth) throws Exception {
|
||||
auth.inMemoryAuthentication()
|
||||
.withUser("baeldung")
|
||||
.password("scribejava")
|
||||
.roles("USER");
|
||||
}
|
||||
|
||||
@Override
|
||||
@Bean
|
||||
public AuthenticationManager authenticationManagerBean() throws Exception {
|
||||
return super.authenticationManagerBean();
|
||||
}
|
||||
|
||||
|
||||
@EnableResourceServer
|
||||
@Configuration
|
||||
public class ResourceServerConfig extends ResourceServerConfigurerAdapter {
|
||||
|
||||
@Override
|
||||
public void configure(HttpSecurity http) throws Exception {
|
||||
http
|
||||
.authorizeRequests()
|
||||
.antMatchers("/user/me").authenticated()
|
||||
.and()
|
||||
.csrf().disable();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
package com.baeldung.scribejava.service;
|
||||
|
||||
import com.github.scribejava.apis.GoogleApi20;
|
||||
import com.github.scribejava.core.builder.ServiceBuilder;
|
||||
import com.github.scribejava.core.oauth.OAuth20Service;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
@Component
|
||||
public class GoogleService {
|
||||
|
||||
private OAuth20Service service;
|
||||
private final String API_KEY = "api_key";
|
||||
private final String API_SECRET = "api_secret";
|
||||
private final String SCOPE = "https://www.googleapis.com/auth/userinfo.email";
|
||||
private final String CALLBACK = "http://localhost:8080/auth/google";
|
||||
|
||||
@PostConstruct
|
||||
private void init(){
|
||||
this.service = new ServiceBuilder(API_KEY)
|
||||
.apiSecret(API_SECRET)
|
||||
.scope(SCOPE)
|
||||
.callback(CALLBACK)
|
||||
.build(GoogleApi20.instance());
|
||||
}
|
||||
|
||||
|
||||
public OAuth20Service getService() {
|
||||
return service;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
package com.baeldung.scribejava.service;
|
||||
|
||||
import com.baeldung.scribejava.api.MyApi;
|
||||
import com.github.scribejava.core.builder.ServiceBuilder;
|
||||
import com.github.scribejava.core.oauth.OAuth20Service;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
|
||||
@Component
|
||||
public class MyService {
|
||||
|
||||
private OAuth20Service service;
|
||||
private final String API_KEY = "baeldung_api_key";
|
||||
private final String API_SECRET = "baeldung_api_secret";
|
||||
|
||||
@PostConstruct
|
||||
private void init(){
|
||||
this.service = new ServiceBuilder(API_KEY)
|
||||
.apiSecret(API_SECRET)
|
||||
.scope("read write")
|
||||
.build(MyApi.instance());
|
||||
}
|
||||
|
||||
|
||||
public OAuth20Service getService() {
|
||||
return service;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
package com.baeldung.scribejava.service;
|
||||
|
||||
import com.github.scribejava.apis.TwitterApi;
|
||||
import com.github.scribejava.core.builder.ServiceBuilder;
|
||||
import com.github.scribejava.core.oauth.OAuth10aService;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
|
||||
@Component
|
||||
public class TwitterService {
|
||||
|
||||
private final String API_KEY = "api_key";
|
||||
private final String API_SECRET = "api_secret";
|
||||
private OAuth10aService service;
|
||||
|
||||
@PostConstruct
|
||||
private void init(){
|
||||
this.service = new ServiceBuilder(API_KEY)
|
||||
.apiSecret(API_SECRET)
|
||||
.build(TwitterApi.instance());
|
||||
}
|
||||
|
||||
public OAuth10aService getService(){
|
||||
return service;
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1 @@
|
|||
security.oauth2.resource.filter-order = 3
|
|
@ -0,0 +1,17 @@
|
|||
package com.baeldung.scribejava;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest
|
||||
public class ScribejavaUnitTest {
|
||||
|
||||
@Test
|
||||
public void contextLoad(){
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,56 @@
|
|||
package com.baeldung.kafka;
|
||||
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.common.KafkaException;
|
||||
|
||||
import java.util.Properties;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
|
||||
import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG;
|
||||
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
|
||||
import static org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_CONFIG;
|
||||
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
|
||||
|
||||
public class TransactionalMessageProducer {
|
||||
|
||||
private static final String DATA_MESSAGE_1 = "Put any space separated data here for count";
|
||||
private static final String DATA_MESSAGE_2 = "Output will contain count of every word in the message";
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
KafkaProducer<String, String> producer = createKafkaProducer();
|
||||
|
||||
producer.initTransactions();
|
||||
|
||||
try{
|
||||
|
||||
producer.beginTransaction();
|
||||
|
||||
Stream.of(DATA_MESSAGE_1, DATA_MESSAGE_2).forEach(s -> producer.send(
|
||||
new ProducerRecord<String, String>("input", null, s)));
|
||||
|
||||
producer.commitTransaction();
|
||||
|
||||
}catch (KafkaException e){
|
||||
|
||||
producer.abortTransaction();
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static KafkaProducer<String, String> createKafkaProducer() {
|
||||
|
||||
Properties props = new Properties();
|
||||
props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||
props.put(ENABLE_IDEMPOTENCE_CONFIG, "true");
|
||||
props.put(TRANSACTIONAL_ID_CONFIG, "prod-0");
|
||||
props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
|
||||
props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
|
||||
|
||||
return new KafkaProducer(props);
|
||||
|
||||
}
|
||||
}
|
|
@ -14,6 +14,8 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static java.time.Duration.ofSeconds;
|
||||
import static java.util.Collections.singleton;
|
||||
|
@ -21,16 +23,16 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
|
|||
import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
|
||||
import static org.apache.kafka.clients.producer.ProducerConfig.*;
|
||||
|
||||
public class TransactionalApp {
|
||||
public class TransactionalWordCount {
|
||||
|
||||
private static final String CONSUMER_GROUP_ID = "test";
|
||||
private static final String CONSUMER_GROUP_ID = "my-group-id";
|
||||
private static final String OUTPUT_TOPIC = "output";
|
||||
private static final String INPUT_TOPIC = "input";
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
KafkaConsumer<String, String> consumer = initConsumer();
|
||||
KafkaProducer<String, String> producer = initProducer();
|
||||
KafkaConsumer<String, String> consumer = createKafkaConsumer();
|
||||
KafkaProducer<String, String> producer = createKafkaProducer();
|
||||
|
||||
producer.initTransactions();
|
||||
|
||||
|
@ -38,12 +40,17 @@ public class TransactionalApp {
|
|||
|
||||
while (true) {
|
||||
|
||||
ConsumerRecords<String, String> records = consumer.poll(ofSeconds(20));
|
||||
ConsumerRecords<String, String> records = consumer.poll(ofSeconds(60));
|
||||
|
||||
Map<String, Integer> wordCountMap = records.records(new TopicPartition(INPUT_TOPIC, 0))
|
||||
.stream()
|
||||
.flatMap(record -> Stream.of(record.value().split(" ")))
|
||||
.map(word -> Tuple.of(word, 1))
|
||||
.collect(Collectors.toMap(tuple -> tuple.getKey(), t1 -> t1.getValue(), (v1, v2) -> v1 + v2));
|
||||
|
||||
producer.beginTransaction();
|
||||
|
||||
for (ConsumerRecord record : records)
|
||||
producer.send(new ProducerRecord(OUTPUT_TOPIC, record));
|
||||
wordCountMap.forEach((key, value) -> producer.send(new ProducerRecord<String, String>(OUTPUT_TOPIC, key, value.toString())));
|
||||
|
||||
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
|
||||
|
||||
|
@ -51,7 +58,7 @@ public class TransactionalApp {
|
|||
List<ConsumerRecord<String, String>> partitionedRecords = records.records(partition);
|
||||
long offset = partitionedRecords.get(partitionedRecords.size() - 1).offset();
|
||||
|
||||
offsetsToCommit.put(partition, new OffsetAndMetadata(offset));
|
||||
offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1));
|
||||
}
|
||||
|
||||
producer.sendOffsetsToTransaction(offsetsToCommit, CONSUMER_GROUP_ID);
|
||||
|
@ -68,11 +75,12 @@ public class TransactionalApp {
|
|||
|
||||
}
|
||||
|
||||
private static KafkaConsumer<String, String> initConsumer() {
|
||||
private static KafkaConsumer<String, String> createKafkaConsumer() {
|
||||
Properties props = new Properties();
|
||||
props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||
props.put(GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
|
||||
props.put(ENABLE_AUTO_COMMIT_CONFIG, "false");
|
||||
props.put(ISOLATION_LEVEL_CONFIG, "read_committed");
|
||||
props.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
|
||||
|
@ -81,19 +89,14 @@ public class TransactionalApp {
|
|||
return consumer;
|
||||
}
|
||||
|
||||
private static KafkaProducer<String, String> initProducer() {
|
||||
private static KafkaProducer<String, String> createKafkaProducer() {
|
||||
|
||||
Properties props = new Properties();
|
||||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||
props.put(ACKS_CONFIG, "all");
|
||||
props.put(RETRIES_CONFIG, 3);
|
||||
props.put(BATCH_SIZE_CONFIG, 16384);
|
||||
props.put(LINGER_MS_CONFIG, 1);
|
||||
props.put(BUFFER_MEMORY_CONFIG, 33554432);
|
||||
props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||
props.put(ENABLE_IDEMPOTENCE_CONFIG, "true");
|
||||
props.put(TRANSACTIONAL_ID_CONFIG, "prod-1");
|
||||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
|
||||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
|
||||
props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
|
||||
props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
|
||||
|
||||
return new KafkaProducer(props);
|
||||
|
|
@ -0,0 +1,24 @@
|
|||
package com.baeldung.kafka;
|
||||
|
||||
public class Tuple {
|
||||
|
||||
private String key;
|
||||
private Integer value;
|
||||
|
||||
private Tuple(String key, Integer value) {
|
||||
this.key = key;
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public static Tuple of(String key, Integer value){
|
||||
return new Tuple(key,value);
|
||||
}
|
||||
|
||||
public String getKey() {
|
||||
return key;
|
||||
}
|
||||
|
||||
public Integer getValue() {
|
||||
return value;
|
||||
}
|
||||
}
|
|
@ -22,9 +22,9 @@ public class TodoDao implements Dao<Todo> {
|
|||
|
||||
@Override
|
||||
public Collection<Todo> getAll() {
|
||||
return Collections.unmodifiableCollection(todoList.stream()
|
||||
return todoList.stream()
|
||||
.filter(Objects::nonNull)
|
||||
.collect(Collectors.toList()));
|
||||
.collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
package com.baeldung.definition;
|
||||
|
||||
import com.baeldung.definition.domain.Address;
|
||||
import com.baeldung.definition.domain.Company;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.ComponentScan;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@Configuration
|
||||
@ComponentScan(basePackageClasses = Company.class)
|
||||
public class Config {
|
||||
@Bean
|
||||
public Address getAddress() {
|
||||
return new Address("High Street", 1000);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
package com.baeldung.definition.domain;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class Address {
|
||||
private String street;
|
||||
private int number;
|
||||
|
||||
public Address(String street, int number) {
|
||||
this.street = street;
|
||||
this.number = number;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
package com.baeldung.definition.domain;
|
||||
|
||||
import lombok.Data;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Data
|
||||
@Component
|
||||
public class Company {
|
||||
private Address address;
|
||||
|
||||
public Company(Address address) {
|
||||
this.address = address;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
package com.baeldung.definition;
|
||||
|
||||
import com.baeldung.definition.domain.Company;
|
||||
import org.junit.Test;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class SpringBeanIntegrationTest {
|
||||
@Test
|
||||
public void whenUsingIoC_thenDependenciesAreInjected() {
|
||||
ApplicationContext context = new AnnotationConfigApplicationContext(Config.class);
|
||||
Company company = context.getBean("company", Company.class);
|
||||
assertEquals("High Street", company.getAddress().getStreet());
|
||||
assertEquals(1000, company.getAddress().getNumber());
|
||||
}
|
||||
}
|
|
@ -7,4 +7,4 @@ To login, use credentials from the data.sql file in src/main/resource, eg: user/
|
|||
|
||||
### Relevant Articles:
|
||||
- [Intro to Security and WebSockets](http://www.baeldung.com/spring-security-websockets)
|
||||
- [Spring WebSockets: Specific User Chat](http://www.baeldung.com/spring-websocket-specific-user-chat)
|
||||
- [Spring WebSockets: Specific User Chat](https://www.baeldung.com/spring-websockets-send-message-to-user)
|
||||
|
|
Loading…
Reference in New Issue