diff --git a/persistence-modules/apache-bookkeeper/data/.gitignore b/persistence-modules/apache-bookkeeper/data/.gitignore new file mode 100644 index 0000000000..43a3e42263 --- /dev/null +++ b/persistence-modules/apache-bookkeeper/data/.gitignore @@ -0,0 +1,5 @@ +bk/bookkeeper/* +bk1/bookkeeper/* +bk2/bookkeeper/* +zk/* + diff --git a/persistence-modules/apache-bookkeeper/data/bk1/.gitignore b/persistence-modules/apache-bookkeeper/data/bk1/.gitignore new file mode 100644 index 0000000000..32c9297ccd --- /dev/null +++ b/persistence-modules/apache-bookkeeper/data/bk1/.gitignore @@ -0,0 +1 @@ +/bookkeeper/ diff --git a/persistence-modules/apache-bookkeeper/data/bk2/.gitignore b/persistence-modules/apache-bookkeeper/data/bk2/.gitignore new file mode 100644 index 0000000000..32c9297ccd --- /dev/null +++ b/persistence-modules/apache-bookkeeper/data/bk2/.gitignore @@ -0,0 +1 @@ +/bookkeeper/ diff --git a/persistence-modules/apache-bookkeeper/docker-compose.yml b/persistence-modules/apache-bookkeeper/docker-compose.yml new file mode 100644 index 0000000000..0ef4c41a4a --- /dev/null +++ b/persistence-modules/apache-bookkeeper/docker-compose.yml @@ -0,0 +1,71 @@ +version: '3.0' +services: + zk: + image: zookeeper:3.6.1 + restart: always + ports: + - "2181:2181" + volumes: + - ./data/zk:/data + + bookie_init: + image: apache/bookkeeper:4.10.0 + environment: + BK_zkServers: "zk:2181" + BK_advertisedAddress: ${BK_PUBLIC_IP} + restart: on-failure + depends_on: + - zk + command: /opt/bookkeeper/bin/bookkeeper shell metaformat -nonInteractive + + bookie: + image: apache/bookkeeper:4.10.0 + restart: on-failure + environment: + BK_zkServers: "zk:2181" + BK_advertisedAddress: ${BK_PUBLIC_IP} + BK_httpServerPort: 3182 + ports: + - "3181:3181" + - "3182:3182" + volumes: + - ./data/bk:/data + depends_on: + - zk + - bookie_init + + bookie1: + image: apache/bookkeeper:4.10.0 + restart: on-failure + environment: + BOOKIE_PORT: 4181 + BK_zkServers: "zk:2181" + BK_advertisedAddress: ${BK_PUBLIC_IP} + BK_httpServerPort: 3182 + ports: + - "4181:4181" + volumes: + - ./data/bk1:/data + depends_on: + - zk + - bookie_init + + bookie2: + image: apache/bookkeeper:4.10.0 + restart: on-failure + environment: + BOOKIE_PORT: 4182 + BK_zkServers: "zk:2181" + BK_advertisedAddress: ${BK_PUBLIC_IP} + BK_httpServerPort: 3182 + ports: + - "4182:4182" + volumes: + - ./data/bk2:/data + depends_on: + - zk + - bookie_init + + + + diff --git a/persistence-modules/apache-bookkeeper/pom.xml b/persistence-modules/apache-bookkeeper/pom.xml new file mode 100644 index 0000000000..46a7982b12 --- /dev/null +++ b/persistence-modules/apache-bookkeeper/pom.xml @@ -0,0 +1,47 @@ + + + + 4.0.0 + apache-bookkeeper + 0.0.1-SNAPSHOT + apache-bookkeeper + jar + + + com.baeldung + parent-modules + 1.0.0-SNAPSHOT + ../../ + + + + + org.apache.bookkeeper + bookkeeper-server + ${org.apache.bookkeeper.version} + + + org.slf4j + slf4j-log4j12 + + + + + + org.testcontainers + testcontainers + 1.14.3 + test + + + + + + 4.10.0 + + + + + diff --git a/persistence-modules/apache-bookkeeper/src/main/java/com/baeldung/tutorials/bookkeeper/BkHelper.java b/persistence-modules/apache-bookkeeper/src/main/java/com/baeldung/tutorials/bookkeeper/BkHelper.java new file mode 100644 index 0000000000..7cba88af19 --- /dev/null +++ b/persistence-modules/apache-bookkeeper/src/main/java/com/baeldung/tutorials/bookkeeper/BkHelper.java @@ -0,0 +1,157 @@ +package com.baeldung.tutorials.bookkeeper; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Map.Entry; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.zookeeper.AsyncCallback; + +public class BkHelper { + + private static final Log log = LogFactory.getLog(BkHelper.class); + + public static BookKeeper createBkClient(String zkConnectionString) { + try { + return new BookKeeper(zkConnectionString); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + /** + * Creates a Ledger with the given name added as custom metadata + * @param bk + * @param name + * @param password + * @return + */ + public static LedgerHandle createLedger(BookKeeper bk, String name, byte[] password) { + try { + return bk.createLedger(3, 2, 2, DigestType.MAC, password, Collections.singletonMap("name", name.getBytes())); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + /** + * Iterates over all available ledgers and returns the first one that has + * a metadata key 'name' equals to the given name + * @param bk + * @param name + * @return + * @throws Exception + */ + public static Optional findLedgerByName(BookKeeper bk, String name) throws Exception { + + Map ledgers = new HashMap(); + final AtomicInteger returnCode = new AtomicInteger(BKException.Code.OK); + final CountDownLatch processDone = new CountDownLatch(1); + + // There's no standard "list" operation. Instead, BK offers a generalized way to + // iterate over all available ledgers using an async visitor callback. + // The second callback will be called when there are no more ledgers do process or if an + // error occurs. + bk.getLedgerManager() + .asyncProcessLedgers((ledgerId, cb) -> collectLedgers(bk, ledgerId, cb, ledgers), + (rc, s, obj) -> { + returnCode.set(rc); + processDone.countDown(); + }, null, BKException.Code.OK, BKException.Code.ReadException); + + processDone.await(5, TimeUnit.MINUTES); + + log.info("Ledgers collected: total found=" + ledgers.size()); + + byte[] nameBytes = name.getBytes(); + + Optional> entry = ledgers.entrySet() + .stream() + .filter((e) -> { + Map meta = e.getValue().getCustomMetadata(); + if (meta != null) { + log.info("ledger: " + e.getKey() + ", customMeta=" + meta); + byte[] data = meta.get("name"); + if (data != null && Arrays.equals(data, nameBytes)) { + return true; + } else { + return false; + } + } else { + log.info("ledger: " + e.getKey() + ", no meta"); + return false; + } + }) + .findFirst(); + + if (entry.isPresent()) { + return Optional.of(entry.get().getKey()); + } else { + return Optional.empty(); + } + } + + public static void collectLedgers(BookKeeper bk, long ledgerId, AsyncCallback.VoidCallback cb, Map ledgers) { + log.debug("ledgerId: " + ledgerId); + + try { + bk.getLedgerManager() + .readLedgerMetadata(ledgerId) + .thenAccept((v) -> { + log.debug("Got ledger metadata"); + ledgers.put(ledgerId, v.getValue()); + }) + .thenAccept((v) -> { + cb.processResult(BKException.Code.OK, null, null); + }); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + /** + * Return a list with all available Ledgers + * @param bk + * @return + */ + public static List listAllLedgers(BookKeeper bk) { + + final List ledgers = Collections.synchronizedList(new ArrayList<>()); + final CountDownLatch processDone = new CountDownLatch(1); + + bk.getLedgerManager() + .asyncProcessLedgers( + (ledgerId,cb) -> { + ledgers.add(ledgerId); + cb.processResult(BKException.Code.OK, null, null); + }, + (rc, s, obj) -> { + processDone.countDown(); + }, + null, BKException.Code.OK, BKException.Code.ReadException); + + try { + processDone.await(1,TimeUnit.MINUTES); + return ledgers; + } + catch(InterruptedException ie) { + throw new RuntimeException(ie); + } + } + + +} diff --git a/persistence-modules/apache-bookkeeper/src/test/java/com/baeldung/tutorials/bookkeeper/BkHelperLiveTest.java b/persistence-modules/apache-bookkeeper/src/test/java/com/baeldung/tutorials/bookkeeper/BkHelperLiveTest.java new file mode 100644 index 0000000000..2bbf54e2b7 --- /dev/null +++ b/persistence-modules/apache-bookkeeper/src/test/java/com/baeldung/tutorials/bookkeeper/BkHelperLiveTest.java @@ -0,0 +1,229 @@ +package com.baeldung.tutorials.bookkeeper; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Enumeration; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import org.apache.bookkeeper.client.AsyncCallback.AddCallback; +import org.apache.bookkeeper.client.AsyncCallback.CreateCallback; +import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.LedgerEntry; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.api.DigestType; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.client.api.WriteAdvHandle; +import org.apache.bookkeeper.client.api.WriteHandle; +import org.apache.bookkeeper.tools.cli.commands.bookie.ListLedgersCommand; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.zookeeper.AsyncCallback; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import com.google.common.collect.Iterables; + +class BkHelperLiveTest extends BkHelper { + + private static BookKeeper bk; + private byte[] ledgerPassword = "SuperS3cR37".getBytes(); + + private static final Log log = LogFactory.getLog(BkHelperLiveTest.class); + + @BeforeAll + static void initBkClient() { + bk = createBkClient("192.168.99.101:2181"); + } + + @Test + void whenCreateLedger_thenSuccess() throws Exception { + + LedgerHandle lh = bk.createLedger(BookKeeper.DigestType.MAC, ledgerPassword); + assertNotNull(lh); + assertNotNull(lh.getId()); + + log.info("[I33] Ledge created: id=" + lh.getId()); + } + + + @Test + void whenCreateLedgerAsync_thenSuccess() throws Exception { + + CompletableFuture cf = bk.newCreateLedgerOp() + .withDigestType(org.apache.bookkeeper.client.api.DigestType.MAC) + .withPassword("password".getBytes()) + .execute(); + + WriteHandle handle = cf.get(1, TimeUnit.MINUTES); + assertNotNull(handle); + handle.close(); + + } + + + @Test + void whenAsyncCreateLedger_thenSuccess() throws Exception { + + CountDownLatch latch = new CountDownLatch(1); + AtomicReference handleRef =new AtomicReference<>(); + + bk.asyncCreateLedger(3, 2, 2, + BookKeeper.DigestType.MAC, + ledgerPassword, + (rc, lh, ctx) -> { + handleRef.set(lh); + latch.countDown(); + }, + null, + Collections.emptyMap()); + + latch.await(1, TimeUnit.MINUTES); + LedgerHandle lh = handleRef.get(); + assertNotNull(lh); + assertFalse(lh.isClosed(),"Ledger should be writeable"); + } + + + @Test + void whenListLedgers_thenSuccess() throws Exception { + + List ledgers = listAllLedgers(bk); + assertNotNull(ledgers); + } + + @Test + void whenWriteEntries_thenSuccess() throws Exception { + + LedgerHandle lh = createLedger(bk,"myledger",ledgerPassword); + + long start = System.currentTimeMillis(); + for ( int i = 0 ; i < 1000 ; i++ ) { + byte[] data = new String("message-" + i).getBytes(); + lh.append(data); + } + + lh.close(); + long elapsed = System.currentTimeMillis() - start; + log.info("Entries added to ledgerId " + lh.getId() + ". count=1000, elapsed=" + elapsed); + } + + @Test + void whenWriteEntriesAsync_thenSuccess() throws Exception { + + CompletableFuture f = bk.newCreateLedgerOp() + .withDigestType(DigestType.MAC) + .withPassword(ledgerPassword) + .execute() + .thenApply((wh) -> { + List> ops = new ArrayList<>(); + for( int i = 0; i < 1000 ; i++ ) { + byte[] data = String.format("message-%04d", i).getBytes(); + ops.add(wh.appendAsync(data)); + } + + return CompletableFuture + .allOf(ops.stream().toArray(CompletableFuture[]::new)) + .thenCompose((v) -> wh.closeAsync()); + }); + + f.get(5, TimeUnit.MINUTES); + } + + @Test + void whenWriteAndReadEntriesAsync_thenSuccess() throws Exception { + + CompletableFuture f = bk.newCreateLedgerOp() + .withDigestType(DigestType.MAC) + .withPassword(ledgerPassword) + .execute() + .thenApply((wh) -> { + List> ops = new ArrayList<>(); + for( int i = 0; i < 1000 ; i++ ) { + byte[] data = String.format("message-%04d", i).getBytes(); + ops.add(wh.appendAsync(data)); + } + + + return CompletableFuture + .allOf(ops.stream().toArray(CompletableFuture[]::new)) + .thenCompose((v) -> wh.closeAsync()) + .thenApply((v) -> wh.getId()); + }) + .thenCompose((lf) -> lf); // flatten the + + Long ledgerId = f.get(5, TimeUnit.MINUTES); + log.info("Ledger created with 1000 entries: ledgerId=" + ledgerId); + + // Now let's read data back... + CompletableFuture ef = bk.newOpenLedgerOp() + .withLedgerId(ledgerId) + .withPassword(ledgerPassword) + .withDigestType(DigestType.MAC) + .execute() + .thenCompose((rh) -> { + return rh.readLastAddConfirmedAsync() + .thenCompose((lastId) -> rh.readAsync(0, lastId)); + }); + + LedgerEntries entries = ef.get(5,TimeUnit.MINUTES); + + + long count = 0; + Iterator it = entries.iterator(); + while ( it.hasNext()) { + org.apache.bookkeeper.client.api.LedgerEntry e = it.next(); + String msg = new String(e.getEntryBytes()); + assertEquals(String.format("message-%04d", count),msg); + count++; + } + + assertEquals(1000,count); + + log.info("Got entries: count=" + count); + + } + + + @Test + void whenWriteAndReadEntries_thenSuccess() throws Exception { + + LedgerHandle lh = createLedger(bk,"myledger",ledgerPassword); + + long start = System.currentTimeMillis(); + for ( int i = 0 ; i < 1000 ; i++ ) { + byte[] data = new String("message-" + i).getBytes(); + lh.append(data); + } + + lh.close(); + long elapsed = System.currentTimeMillis() - start; + log.info("Entries added to ledgerId " + lh.getId() + ", elapsed=" + elapsed); + + Long ledgerId = findLedgerByName(bk,"myledger").orElse(null); + assertNotNull(ledgerId); + + lh = bk.openLedger(ledgerId, BookKeeper.DigestType.MAC, ledgerPassword); + long lastId = lh.readLastConfirmed(); + Enumeration entries = lh.readEntries(0, lastId); + + while(entries.hasMoreElements()) { + LedgerEntry entry = entries.nextElement(); + String msg = new String(entry.getEntry()); + log.info("Entry: id=" + entry.getEntryId() + ", data=" + msg); + } + } +} diff --git a/persistence-modules/apache-bookkeeper/src/test/resources/logback-test.xml b/persistence-modules/apache-bookkeeper/src/test/resources/logback-test.xml new file mode 100644 index 0000000000..cea0f38eb8 --- /dev/null +++ b/persistence-modules/apache-bookkeeper/src/test/resources/logback-test.xml @@ -0,0 +1,13 @@ + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + +