diff --git a/libraries-data-2/pom.xml b/libraries-data-2/pom.xml
index ac23747caa..be776282e9 100644
--- a/libraries-data-2/pom.xml
+++ b/libraries-data-2/pom.xml
@@ -222,5 +222,4 @@
-
\ No newline at end of file
diff --git a/persistence-modules/apache-bookkeeper/data/.gitignore b/persistence-modules/apache-bookkeeper/data/.gitignore
new file mode 100644
index 0000000000..43a3e42263
--- /dev/null
+++ b/persistence-modules/apache-bookkeeper/data/.gitignore
@@ -0,0 +1,5 @@
+bk/bookkeeper/*
+bk1/bookkeeper/*
+bk2/bookkeeper/*
+zk/*
+
diff --git a/persistence-modules/apache-bookkeeper/data/bk1/.gitignore b/persistence-modules/apache-bookkeeper/data/bk1/.gitignore
new file mode 100644
index 0000000000..32c9297ccd
--- /dev/null
+++ b/persistence-modules/apache-bookkeeper/data/bk1/.gitignore
@@ -0,0 +1 @@
+/bookkeeper/
diff --git a/persistence-modules/apache-bookkeeper/data/bk2/.gitignore b/persistence-modules/apache-bookkeeper/data/bk2/.gitignore
new file mode 100644
index 0000000000..32c9297ccd
--- /dev/null
+++ b/persistence-modules/apache-bookkeeper/data/bk2/.gitignore
@@ -0,0 +1 @@
+/bookkeeper/
diff --git a/persistence-modules/apache-bookkeeper/docker-compose.yml b/persistence-modules/apache-bookkeeper/docker-compose.yml
new file mode 100644
index 0000000000..0ef4c41a4a
--- /dev/null
+++ b/persistence-modules/apache-bookkeeper/docker-compose.yml
@@ -0,0 +1,71 @@
+version: '3.0'
+services:
+ zk:
+ image: zookeeper:3.6.1
+ restart: always
+ ports:
+ - "2181:2181"
+ volumes:
+ - ./data/zk:/data
+
+ bookie_init:
+ image: apache/bookkeeper:4.10.0
+ environment:
+ BK_zkServers: "zk:2181"
+ BK_advertisedAddress: ${BK_PUBLIC_IP}
+ restart: on-failure
+ depends_on:
+ - zk
+ command: /opt/bookkeeper/bin/bookkeeper shell metaformat -nonInteractive
+
+ bookie:
+ image: apache/bookkeeper:4.10.0
+ restart: on-failure
+ environment:
+ BK_zkServers: "zk:2181"
+ BK_advertisedAddress: ${BK_PUBLIC_IP}
+ BK_httpServerPort: 3182
+ ports:
+ - "3181:3181"
+ - "3182:3182"
+ volumes:
+ - ./data/bk:/data
+ depends_on:
+ - zk
+ - bookie_init
+
+ bookie1:
+ image: apache/bookkeeper:4.10.0
+ restart: on-failure
+ environment:
+ BOOKIE_PORT: 4181
+ BK_zkServers: "zk:2181"
+ BK_advertisedAddress: ${BK_PUBLIC_IP}
+ BK_httpServerPort: 3182
+ ports:
+ - "4181:4181"
+ volumes:
+ - ./data/bk1:/data
+ depends_on:
+ - zk
+ - bookie_init
+
+ bookie2:
+ image: apache/bookkeeper:4.10.0
+ restart: on-failure
+ environment:
+ BOOKIE_PORT: 4182
+ BK_zkServers: "zk:2181"
+ BK_advertisedAddress: ${BK_PUBLIC_IP}
+ BK_httpServerPort: 3182
+ ports:
+ - "4182:4182"
+ volumes:
+ - ./data/bk2:/data
+ depends_on:
+ - zk
+ - bookie_init
+
+
+
+
diff --git a/persistence-modules/apache-bookkeeper/pom.xml b/persistence-modules/apache-bookkeeper/pom.xml
new file mode 100644
index 0000000000..0beea7f1fc
--- /dev/null
+++ b/persistence-modules/apache-bookkeeper/pom.xml
@@ -0,0 +1,47 @@
+
+
+
+ 4.0.0
+ apache-bookkeeper
+ 0.0.1-SNAPSHOT
+ apache-bookkeeper
+ jar
+
+
+ com.baeldung
+ parent-modules
+ 1.0.0-SNAPSHOT
+ ../../
+
+
+
+
+ org.apache.bookkeeper
+ bookkeeper-server
+ ${org.apache.bookkeeper.version}
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
+
+
+ org.testcontainers
+ testcontainers
+ 1.14.3
+ test
+
+
+
+
+
+ 4.10.0
+
+
+
+
+
diff --git a/persistence-modules/apache-bookkeeper/src/main/java/com/baeldung/tutorials/bookkeeper/BkHelper.java b/persistence-modules/apache-bookkeeper/src/main/java/com/baeldung/tutorials/bookkeeper/BkHelper.java
new file mode 100644
index 0000000000..55f5d7b09f
--- /dev/null
+++ b/persistence-modules/apache-bookkeeper/src/main/java/com/baeldung/tutorials/bookkeeper/BkHelper.java
@@ -0,0 +1,149 @@
+package com.baeldung.tutorials.bookkeeper;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.zookeeper.AsyncCallback;
+
+public class BkHelper {
+
+ private static final Log LOG = LogFactory.getLog(BkHelper.class);
+
+ public static BookKeeper createBkClient(String zkConnectionString) {
+ try {
+ return new BookKeeper(zkConnectionString);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /**
+ * Creates a Ledger with the given name added as custom metadata
+ * @param bk
+ * @param name
+ * @param password
+ * @return
+ */
+ public static LedgerHandle createLedger(BookKeeper bk, String name, byte[] password) {
+ try {
+ return bk.createLedger(3, 2, 2, DigestType.MAC, password, Collections.singletonMap("name", name.getBytes()));
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /**
+ * Iterates over all available ledgers and returns the first one that has
+ * a metadata key 'name' equals to the given name
+ * @param bk
+ * @param name
+ * @return
+ * @throws Exception
+ */
+ public static Optional findLedgerByName(BookKeeper bk, String name) throws Exception {
+ Map ledgers = new HashMap();
+ final AtomicInteger returnCode = new AtomicInteger(BKException.Code.OK);
+ final CountDownLatch processDone = new CountDownLatch(1);
+
+ // There's no standard "list" operation. Instead, BK offers a generalized way to
+ // iterate over all available ledgers using an async visitor callback.
+ // The second callback will be called when there are no more ledgers do process or if an
+ // error occurs.
+ bk.getLedgerManager()
+ .asyncProcessLedgers(
+ (ledgerId, cb) -> collectLedgers(bk, ledgerId, cb, ledgers),
+ (rc, s, obj) -> {
+ returnCode.set(rc);
+ processDone.countDown();
+ },
+ null,
+ BKException.Code.OK, BKException.Code.ReadException);
+ processDone.await(5, TimeUnit.MINUTES);
+ LOG.info("Ledgers collected: total found=" + ledgers.size());
+
+ byte[] nameBytes = name.getBytes();
+ Optional> entry = ledgers.entrySet()
+ .stream()
+ .filter((e) -> {
+ Map meta = e.getValue()
+ .getCustomMetadata();
+ if (meta != null) {
+ LOG.info("ledger: " + e.getKey() + ", customMeta=" + meta);
+ byte[] data = meta.get("name");
+ if (data != null && Arrays.equals(data, nameBytes)) {
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ LOG.info("ledger: " + e.getKey() + ", no meta");
+ return false;
+ }
+ })
+ .findFirst();
+ if (entry.isPresent()) {
+ return Optional.of(entry.get()
+ .getKey());
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ public static void collectLedgers(BookKeeper bk, long ledgerId, AsyncCallback.VoidCallback cb, Map ledgers) {
+ try {
+ bk.getLedgerManager()
+ .readLedgerMetadata(ledgerId)
+ .thenAccept((v) -> {
+ LOG.debug("Got ledger metadata");
+ ledgers.put(ledgerId, v.getValue());
+ })
+ .thenAccept((v) -> {
+ cb.processResult(BKException.Code.OK, null, null);
+ });
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /**
+ * Return a list with all available Ledgers
+ * @param bk
+ * @return
+ */
+ public static List listAllLedgers(BookKeeper bk) {
+ final List ledgers = Collections.synchronizedList(new ArrayList<>());
+ final CountDownLatch processDone = new CountDownLatch(1);
+
+ bk.getLedgerManager()
+ .asyncProcessLedgers((ledgerId, cb) -> {
+ ledgers.add(ledgerId);
+ cb.processResult(BKException.Code.OK, null, null);
+ },
+ (rc, s, obj) -> {
+ processDone.countDown();
+ }, null, BKException.Code.OK, BKException.Code.ReadException);
+
+ try {
+ processDone.await(1, TimeUnit.MINUTES);
+ return ledgers;
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie);
+ }
+ }
+}
diff --git a/persistence-modules/apache-bookkeeper/src/test/java/com/baeldung/tutorials/bookkeeper/BkHelperLiveTest.java b/persistence-modules/apache-bookkeeper/src/test/java/com/baeldung/tutorials/bookkeeper/BkHelperLiveTest.java
new file mode 100644
index 0000000000..84a8ce3db8
--- /dev/null
+++ b/persistence-modules/apache-bookkeeper/src/test/java/com/baeldung/tutorials/bookkeeper/BkHelperLiveTest.java
@@ -0,0 +1,185 @@
+package com.baeldung.tutorials.bookkeeper;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.WriteHandle;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+class BkHelperLiveTest extends BkHelper {
+ private static BookKeeper bk;
+ private byte[] ledgerPassword = "SuperS3cR37".getBytes();
+ private static final Log LOG = LogFactory.getLog(BkHelperLiveTest.class);
+
+ @BeforeAll
+ static void initBkClient() {
+ bk = createBkClient("192.168.99.101:2181");
+ }
+
+ @Test
+ void whenCreateLedger_thenSuccess() throws Exception {
+ LedgerHandle lh = bk.createLedger(BookKeeper.DigestType.MAC, ledgerPassword);
+ assertNotNull(lh);
+ assertNotNull(lh.getId());
+ LOG.info("[I33] Ledge created: id=" + lh.getId());
+ }
+
+ @Test
+ void whenCreateLedgerAsync_thenSuccess() throws Exception {
+
+ CompletableFuture cf = bk.newCreateLedgerOp()
+ .withDigestType(org.apache.bookkeeper.client.api.DigestType.MAC)
+ .withPassword("password".getBytes())
+ .execute();
+
+ WriteHandle handle = cf.get(1, TimeUnit.MINUTES);
+ assertNotNull(handle);
+ handle.close();
+
+ }
+
+ @Test
+ void whenAsyncCreateLedger_thenSuccess() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference handleRef = new AtomicReference<>();
+
+ bk.asyncCreateLedger(3, 2, 2, BookKeeper.DigestType.MAC, ledgerPassword,
+ (rc, lh, ctx) -> {
+ handleRef.set(lh);
+ latch.countDown();
+ }, null, Collections.emptyMap());
+ latch.await(1, TimeUnit.MINUTES);
+ LedgerHandle lh = handleRef.get();
+ assertNotNull(lh);
+ assertFalse(lh.isClosed(), "Ledger should be writeable");
+ }
+
+ @Test
+ void whenListLedgers_thenSuccess() throws Exception {
+ List ledgers = listAllLedgers(bk);
+ assertNotNull(ledgers);
+ }
+
+ @Test
+ void whenWriteEntries_thenSuccess() throws Exception {
+ LedgerHandle lh = createLedger(bk, "myledger", ledgerPassword);
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < 1000; i++) {
+ byte[] data = new String("message-" + i).getBytes();
+ lh.append(data);
+ }
+ lh.close();
+ long elapsed = System.currentTimeMillis() - start;
+ LOG.info("Entries added to ledgerId " + lh.getId() + ". count=1000, elapsed=" + elapsed);
+ }
+
+ @Test
+ void whenWriteEntriesAsync_thenSuccess() throws Exception {
+ CompletableFuture