From 42e793aa7fa7bed6f3b603fc4fe14e8901a164be Mon Sep 17 00:00:00 2001 From: psevestre Date: Wed, 14 Apr 2021 18:35:32 -0300 Subject: [PATCH] [BAEL-4863] Article Code (#10656) * [BAEL-4863] Inicial code * [BAEL-4863] Article code --- .../baeldung/kubernetes/intro/WatchPods.java | 72 ++++++++++++ .../intro/WatchPodsUsingBookmarks.java | 87 ++++++++++++++ .../intro/WatchPodsUsingResourceVersions.java | 111 ++++++++++++++++++ .../kubernetes/intro/WatchPodsLiveTest.java | 10 ++ .../WatchPodsUsingBookmarksLiveTest.java | 10 ++ ...atchPodsUsingResourceVersionsLiveTest.java | 10 ++ 6 files changed, 300 insertions(+) create mode 100644 kubernetes/k8s-intro/src/main/java/com/baeldung/kubernetes/intro/WatchPods.java create mode 100644 kubernetes/k8s-intro/src/main/java/com/baeldung/kubernetes/intro/WatchPodsUsingBookmarks.java create mode 100644 kubernetes/k8s-intro/src/main/java/com/baeldung/kubernetes/intro/WatchPodsUsingResourceVersions.java create mode 100644 kubernetes/k8s-intro/src/test/java/com/baeldung/kubernetes/intro/WatchPodsLiveTest.java create mode 100644 kubernetes/k8s-intro/src/test/java/com/baeldung/kubernetes/intro/WatchPodsUsingBookmarksLiveTest.java create mode 100644 kubernetes/k8s-intro/src/test/java/com/baeldung/kubernetes/intro/WatchPodsUsingResourceVersionsLiveTest.java diff --git a/kubernetes/k8s-intro/src/main/java/com/baeldung/kubernetes/intro/WatchPods.java b/kubernetes/k8s-intro/src/main/java/com/baeldung/kubernetes/intro/WatchPods.java new file mode 100644 index 0000000000..7df972b738 --- /dev/null +++ b/kubernetes/k8s-intro/src/main/java/com/baeldung/kubernetes/intro/WatchPods.java @@ -0,0 +1,72 @@ +package com.baeldung.kubernetes.intro; + +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.reflect.TypeToken; + +import io.kubernetes.client.openapi.ApiClient; +import io.kubernetes.client.openapi.ApiException; +import io.kubernetes.client.openapi.apis.CoreV1Api; +import io.kubernetes.client.openapi.models.V1ObjectMeta; +import io.kubernetes.client.openapi.models.V1Pod; +import io.kubernetes.client.util.Config; +import io.kubernetes.client.util.Watch; +import io.kubernetes.client.util.Watch.Response; +import okhttp3.OkHttpClient; +import okhttp3.logging.HttpLoggingInterceptor; + +public class WatchPods { + + private static Logger log = LoggerFactory.getLogger(WatchPods.class); + + public static void main(String[] args) throws Exception { + + ApiClient client = Config.defaultClient(); + + // Optional, put helpful during tests: disable client timeout and enable + // HTTP wire-level logs + HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor(message -> log.info(message)); + interceptor.setLevel(HttpLoggingInterceptor.Level.BODY); + OkHttpClient newClient = client.getHttpClient() + .newBuilder() + .addInterceptor(interceptor) + .readTimeout(0, TimeUnit.SECONDS) + .build(); + + client.setHttpClient(newClient); + CoreV1Api api = new CoreV1Api(client); + + // Create the watch object that monitors pod creation/deletion/update events + while (true) { + log.info("[I46] Creating watch..."); + try (Watch watch = Watch.createWatch( + client, + api.listPodForAllNamespacesCall(false, null, null, null, null, "false", null, null, 10, true, null), + new TypeToken>(){}.getType())) { + + log.info("[I52] Receiving events:"); + for (Response event : watch) { + V1Pod pod = event.object; + V1ObjectMeta meta = pod.getMetadata(); + switch (event.type) { + case "ADDED": + case "MODIFIED": + case "DELETED": + log.info("event.type: {}, namespace={}, name={}", + event.type, + meta.getNamespace(), + meta.getName()); + break; + default: + log.warn("[W66] Unknown event type: {}", event.type); + } + } + } catch (ApiException ex) { + log.error("[E70] ApiError", ex); + } + } + } +} diff --git a/kubernetes/k8s-intro/src/main/java/com/baeldung/kubernetes/intro/WatchPodsUsingBookmarks.java b/kubernetes/k8s-intro/src/main/java/com/baeldung/kubernetes/intro/WatchPodsUsingBookmarks.java new file mode 100644 index 0000000000..9dfccfec08 --- /dev/null +++ b/kubernetes/k8s-intro/src/main/java/com/baeldung/kubernetes/intro/WatchPodsUsingBookmarks.java @@ -0,0 +1,87 @@ +package com.baeldung.kubernetes.intro; + +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.reflect.TypeToken; + +import io.kubernetes.client.openapi.ApiClient; +import io.kubernetes.client.openapi.ApiException; +import io.kubernetes.client.openapi.apis.CoreV1Api; +import io.kubernetes.client.openapi.models.V1ObjectMeta; +import io.kubernetes.client.openapi.models.V1Pod; +import io.kubernetes.client.openapi.models.V1PodList; +import io.kubernetes.client.util.Config; +import io.kubernetes.client.util.Watch; +import io.kubernetes.client.util.Watch.Response; +import okhttp3.OkHttpClient; +import okhttp3.logging.HttpLoggingInterceptor; + +public class WatchPodsUsingBookmarks { + + private static Logger log = LoggerFactory.getLogger(WatchPodsUsingBookmarks.class); + + public static void main(String[] args) throws Exception { + + ApiClient client = Config.defaultClient(); + + // Optional, put helpful during tests: disable client timeout and enable + // HTTP wire-level logs + HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor(message -> log.info(message)); + interceptor.setLevel(HttpLoggingInterceptor.Level.BODY); + OkHttpClient newClient = client.getHttpClient() + .newBuilder() + .addInterceptor(interceptor) + .readTimeout(0, TimeUnit.SECONDS) + .build(); + + client.setHttpClient(newClient); + CoreV1Api api = new CoreV1Api(client); + + String resourceVersion = null; + while (true) { + // Get a fresh list only we need to resync + if ( resourceVersion == null ) { + log.info("[I48] Creating initial POD list..."); + V1PodList podList = api.listPodForAllNamespaces(true, null, null, null, null, "false", resourceVersion, null, null, null); + resourceVersion = podList.getMetadata().getResourceVersion(); + } + + while (true) { + log.info("[I54] Creating watch: resourceVersion={}", resourceVersion); + try (Watch watch = Watch.createWatch( + client, + api.listPodForAllNamespacesCall(true, null, null, null, null, "false", resourceVersion, null, 10, true, null), + new TypeToken>(){}.getType())) { + + log.info("[I60] Receiving events:"); + for (Response event : watch) { + V1Pod pod = event.object; + V1ObjectMeta meta = pod.getMetadata(); + switch (event.type) { + case "BOOKMARK": + resourceVersion = meta.getResourceVersion(); + log.info("[I67] event.type: {}, resourceVersion={}", event.type,resourceVersion); + break; + case "ADDED": + case "MODIFIED": + case "DELETED": + log.info("event.type: {}, namespace={}, name={}", + event.type, + meta.getNamespace(), + meta.getName()); + break; + default: + log.warn("[W76] Unknown event type: {}", event.type); + } + } + } catch (ApiException ex) { + log.error("[E80] ApiError", ex); + resourceVersion = null; + } + } + } + } +} diff --git a/kubernetes/k8s-intro/src/main/java/com/baeldung/kubernetes/intro/WatchPodsUsingResourceVersions.java b/kubernetes/k8s-intro/src/main/java/com/baeldung/kubernetes/intro/WatchPodsUsingResourceVersions.java new file mode 100644 index 0000000000..2165d7fc0b --- /dev/null +++ b/kubernetes/k8s-intro/src/main/java/com/baeldung/kubernetes/intro/WatchPodsUsingResourceVersions.java @@ -0,0 +1,111 @@ +package com.baeldung.kubernetes.intro; + +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; + +import io.kubernetes.client.openapi.ApiClient; +import io.kubernetes.client.openapi.ApiException; +import io.kubernetes.client.openapi.apis.CoreV1Api; +import io.kubernetes.client.openapi.models.V1ObjectMeta; +import io.kubernetes.client.openapi.models.V1Pod; +import io.kubernetes.client.openapi.models.V1PodList; +import io.kubernetes.client.util.Config; +import io.kubernetes.client.util.Watch; +import io.kubernetes.client.util.Watch.Response; +import okhttp3.OkHttpClient; +import okhttp3.logging.HttpLoggingInterceptor; + +public class WatchPodsUsingResourceVersions { + + private static Logger log = LoggerFactory.getLogger(WatchPodsUsingResourceVersions.class); + + public static void main(String[] args) throws Exception { + + ApiClient client = Config.defaultClient(); + + // Optional, put helpful during tests: disable client timeout and enable + // HTTP wire-level logs + HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor(message -> log.info(message)); + interceptor.setLevel(HttpLoggingInterceptor.Level.BODY); + OkHttpClient newClient = client.getHttpClient() + .newBuilder() + .addInterceptor(interceptor) + .readTimeout(0, TimeUnit.SECONDS) + .build(); + + client.setHttpClient(newClient); + CoreV1Api api = new CoreV1Api(client); + + String resourceVersion = null; + while (true) { + try { + if ( resourceVersion == null ) { + V1PodList podList = api.listPodForAllNamespaces(null, null, null, null, null, null, resourceVersion, null, null, null); + resourceVersion = podList.getMetadata().getResourceVersion(); + } + + log.info("[I59] Creating watch: resourceVersion={}", resourceVersion); + try (Watch watch = Watch.createWatch( + client, + api.listPodForAllNamespacesCall(null, null, null, null, null, null, resourceVersion, null, 60, true, null), + new TypeToken>(){}.getType())) { + + log.info("[I65] Receiving events:"); + for (Response event : watch) { + V1Pod pod = event.object; + V1ObjectMeta meta = pod.getMetadata(); + switch (event.type) { + case "ADDED": + case "MODIFIED": + case "DELETED": + log.info("event: type={}, namespace={}, name={}", + event.type, + meta.getNamespace(), + meta.getName()); + break; + default: + log.warn("[W76] Unknown event type: {}", event.type); + } + } + } + } + catch (ApiException ex) { + if ( ex.getCode() == 504 || ex.getCode() == 410 ) { + resourceVersion = extractResourceVersionFromException(ex); + } + else { + // Reset resource version + resourceVersion = null; + } + } + } + } + + private static String extractResourceVersionFromException(ApiException ex) { + + String body = ex.getResponseBody(); + if (body == null) { + return null; + } + + Gson gson = new Gson(); + Map st = gson.fromJson(body, Map.class); + Pattern p = Pattern.compile("Timeout: Too large resource version: (\\d+), current: (\\d+)"); + String msg = (String)st.get("message"); + Matcher m = p.matcher(msg); + if (!m.matches()) { + return null; + } + + return m.group(2); + } + +} diff --git a/kubernetes/k8s-intro/src/test/java/com/baeldung/kubernetes/intro/WatchPodsLiveTest.java b/kubernetes/k8s-intro/src/test/java/com/baeldung/kubernetes/intro/WatchPodsLiveTest.java new file mode 100644 index 0000000000..37828d7a2d --- /dev/null +++ b/kubernetes/k8s-intro/src/test/java/com/baeldung/kubernetes/intro/WatchPodsLiveTest.java @@ -0,0 +1,10 @@ +package com.baeldung.kubernetes.intro; + +import org.junit.jupiter.api.Test; + +class WatchPodsLiveTest { + @Test + void whenWatchPods_thenSuccess() throws Exception { + WatchPods.main(new String[] {}); + } +} diff --git a/kubernetes/k8s-intro/src/test/java/com/baeldung/kubernetes/intro/WatchPodsUsingBookmarksLiveTest.java b/kubernetes/k8s-intro/src/test/java/com/baeldung/kubernetes/intro/WatchPodsUsingBookmarksLiveTest.java new file mode 100644 index 0000000000..ea111f22a2 --- /dev/null +++ b/kubernetes/k8s-intro/src/test/java/com/baeldung/kubernetes/intro/WatchPodsUsingBookmarksLiveTest.java @@ -0,0 +1,10 @@ +package com.baeldung.kubernetes.intro; + +import org.junit.jupiter.api.Test; + +class WatchPodsUsingBookmarksLiveTest { + @Test + void whenWatchPods_thenSuccess() throws Exception { + WatchPodsUsingBookmarks.main(new String[] {}); + } +} diff --git a/kubernetes/k8s-intro/src/test/java/com/baeldung/kubernetes/intro/WatchPodsUsingResourceVersionsLiveTest.java b/kubernetes/k8s-intro/src/test/java/com/baeldung/kubernetes/intro/WatchPodsUsingResourceVersionsLiveTest.java new file mode 100644 index 0000000000..79c3a13eb2 --- /dev/null +++ b/kubernetes/k8s-intro/src/test/java/com/baeldung/kubernetes/intro/WatchPodsUsingResourceVersionsLiveTest.java @@ -0,0 +1,10 @@ +package com.baeldung.kubernetes.intro; + +import org.junit.jupiter.api.Test; + +class WatchPodsUsingResourceVersionsLiveTest { + @Test + void whenWatchPods_thenSuccess() throws Exception { + WatchPodsUsingResourceVersions.main(new String[] {}); + } +}