Merge pull request #9472 from psevestre/master

[BAEL-2322] Test Code
This commit is contained in:
Jonathan Cook 2020-06-26 13:28:16 +02:00 committed by GitHub
commit df6c2ff1b8
10 changed files with 475 additions and 1 deletions

View File

@ -222,5 +222,4 @@
</plugin> </plugin>
</plugins> </plugins>
</build> </build>
</project> </project>

View File

@ -0,0 +1,5 @@
bk/bookkeeper/*
bk1/bookkeeper/*
bk2/bookkeeper/*
zk/*

View File

@ -0,0 +1 @@
/bookkeeper/

View File

@ -0,0 +1 @@
/bookkeeper/

View File

@ -0,0 +1,71 @@
version: '3.0'
services:
zk:
image: zookeeper:3.6.1
restart: always
ports:
- "2181:2181"
volumes:
- ./data/zk:/data
bookie_init:
image: apache/bookkeeper:4.10.0
environment:
BK_zkServers: "zk:2181"
BK_advertisedAddress: ${BK_PUBLIC_IP}
restart: on-failure
depends_on:
- zk
command: /opt/bookkeeper/bin/bookkeeper shell metaformat -nonInteractive
bookie:
image: apache/bookkeeper:4.10.0
restart: on-failure
environment:
BK_zkServers: "zk:2181"
BK_advertisedAddress: ${BK_PUBLIC_IP}
BK_httpServerPort: 3182
ports:
- "3181:3181"
- "3182:3182"
volumes:
- ./data/bk:/data
depends_on:
- zk
- bookie_init
bookie1:
image: apache/bookkeeper:4.10.0
restart: on-failure
environment:
BOOKIE_PORT: 4181
BK_zkServers: "zk:2181"
BK_advertisedAddress: ${BK_PUBLIC_IP}
BK_httpServerPort: 3182
ports:
- "4181:4181"
volumes:
- ./data/bk1:/data
depends_on:
- zk
- bookie_init
bookie2:
image: apache/bookkeeper:4.10.0
restart: on-failure
environment:
BOOKIE_PORT: 4182
BK_zkServers: "zk:2181"
BK_advertisedAddress: ${BK_PUBLIC_IP}
BK_httpServerPort: 3182
ports:
- "4182:4182"
volumes:
- ./data/bk2:/data
depends_on:
- zk
- bookie_init

View File

@ -0,0 +1,47 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>apache-bookkeeper</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>apache-bookkeeper</name>
<packaging>jar</packaging>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-modules</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../../</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>bookkeeper-server</artifactId>
<version>${org.apache.bookkeeper.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.14.3</version>
<scope>test</scope>
</dependency>
</dependencies>
<properties>
<org.apache.bookkeeper.version>4.10.0</org.apache.bookkeeper.version>
</properties>
</project>

View File

@ -0,0 +1,149 @@
package com.baeldung.tutorials.bookkeeper;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.AsyncCallback;
public class BkHelper {
private static final Log LOG = LogFactory.getLog(BkHelper.class);
public static BookKeeper createBkClient(String zkConnectionString) {
try {
return new BookKeeper(zkConnectionString);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
/**
* Creates a Ledger with the given name added as custom metadata
* @param bk
* @param name
* @param password
* @return
*/
public static LedgerHandle createLedger(BookKeeper bk, String name, byte[] password) {
try {
return bk.createLedger(3, 2, 2, DigestType.MAC, password, Collections.singletonMap("name", name.getBytes()));
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
/**
* Iterates over all available ledgers and returns the first one that has
* a metadata key 'name' equals to the given name
* @param bk
* @param name
* @return
* @throws Exception
*/
public static Optional<Long> findLedgerByName(BookKeeper bk, String name) throws Exception {
Map<Long, LedgerMetadata> ledgers = new HashMap<Long, LedgerMetadata>();
final AtomicInteger returnCode = new AtomicInteger(BKException.Code.OK);
final CountDownLatch processDone = new CountDownLatch(1);
// There's no standard "list" operation. Instead, BK offers a generalized way to
// iterate over all available ledgers using an async visitor callback.
// The second callback will be called when there are no more ledgers do process or if an
// error occurs.
bk.getLedgerManager()
.asyncProcessLedgers(
(ledgerId, cb) -> collectLedgers(bk, ledgerId, cb, ledgers),
(rc, s, obj) -> {
returnCode.set(rc);
processDone.countDown();
},
null,
BKException.Code.OK, BKException.Code.ReadException);
processDone.await(5, TimeUnit.MINUTES);
LOG.info("Ledgers collected: total found=" + ledgers.size());
byte[] nameBytes = name.getBytes();
Optional<Entry<Long, LedgerMetadata>> entry = ledgers.entrySet()
.stream()
.filter((e) -> {
Map<String, byte[]> meta = e.getValue()
.getCustomMetadata();
if (meta != null) {
LOG.info("ledger: " + e.getKey() + ", customMeta=" + meta);
byte[] data = meta.get("name");
if (data != null && Arrays.equals(data, nameBytes)) {
return true;
} else {
return false;
}
} else {
LOG.info("ledger: " + e.getKey() + ", no meta");
return false;
}
})
.findFirst();
if (entry.isPresent()) {
return Optional.of(entry.get()
.getKey());
} else {
return Optional.empty();
}
}
public static void collectLedgers(BookKeeper bk, long ledgerId, AsyncCallback.VoidCallback cb, Map<Long, LedgerMetadata> ledgers) {
try {
bk.getLedgerManager()
.readLedgerMetadata(ledgerId)
.thenAccept((v) -> {
LOG.debug("Got ledger metadata");
ledgers.put(ledgerId, v.getValue());
})
.thenAccept((v) -> {
cb.processResult(BKException.Code.OK, null, null);
});
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
/**
* Return a list with all available Ledgers
* @param bk
* @return
*/
public static List<Long> listAllLedgers(BookKeeper bk) {
final List<Long> ledgers = Collections.synchronizedList(new ArrayList<>());
final CountDownLatch processDone = new CountDownLatch(1);
bk.getLedgerManager()
.asyncProcessLedgers((ledgerId, cb) -> {
ledgers.add(ledgerId);
cb.processResult(BKException.Code.OK, null, null);
},
(rc, s, obj) -> {
processDone.countDown();
}, null, BKException.Code.OK, BKException.Code.ReadException);
try {
processDone.await(1, TimeUnit.MINUTES);
return ledgers;
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
}
}

View File

@ -0,0 +1,185 @@
package com.baeldung.tutorials.bookkeeper;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.WriteHandle;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
class BkHelperLiveTest extends BkHelper {
private static BookKeeper bk;
private byte[] ledgerPassword = "SuperS3cR37".getBytes();
private static final Log LOG = LogFactory.getLog(BkHelperLiveTest.class);
@BeforeAll
static void initBkClient() {
bk = createBkClient("192.168.99.101:2181");
}
@Test
void whenCreateLedger_thenSuccess() throws Exception {
LedgerHandle lh = bk.createLedger(BookKeeper.DigestType.MAC, ledgerPassword);
assertNotNull(lh);
assertNotNull(lh.getId());
LOG.info("[I33] Ledge created: id=" + lh.getId());
}
@Test
void whenCreateLedgerAsync_thenSuccess() throws Exception {
CompletableFuture<WriteHandle> cf = bk.newCreateLedgerOp()
.withDigestType(org.apache.bookkeeper.client.api.DigestType.MAC)
.withPassword("password".getBytes())
.execute();
WriteHandle handle = cf.get(1, TimeUnit.MINUTES);
assertNotNull(handle);
handle.close();
}
@Test
void whenAsyncCreateLedger_thenSuccess() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<LedgerHandle> handleRef = new AtomicReference<>();
bk.asyncCreateLedger(3, 2, 2, BookKeeper.DigestType.MAC, ledgerPassword,
(rc, lh, ctx) -> {
handleRef.set(lh);
latch.countDown();
}, null, Collections.emptyMap());
latch.await(1, TimeUnit.MINUTES);
LedgerHandle lh = handleRef.get();
assertNotNull(lh);
assertFalse(lh.isClosed(), "Ledger should be writeable");
}
@Test
void whenListLedgers_thenSuccess() throws Exception {
List<Long> ledgers = listAllLedgers(bk);
assertNotNull(ledgers);
}
@Test
void whenWriteEntries_thenSuccess() throws Exception {
LedgerHandle lh = createLedger(bk, "myledger", ledgerPassword);
long start = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
byte[] data = new String("message-" + i).getBytes();
lh.append(data);
}
lh.close();
long elapsed = System.currentTimeMillis() - start;
LOG.info("Entries added to ledgerId " + lh.getId() + ". count=1000, elapsed=" + elapsed);
}
@Test
void whenWriteEntriesAsync_thenSuccess() throws Exception {
CompletableFuture<Object> f = bk.newCreateLedgerOp()
.withDigestType(DigestType.MAC)
.withPassword(ledgerPassword)
.execute()
.thenApply((wh) -> {
List<CompletableFuture<Long>> ops = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
byte[] data = String.format("message-%04d", i)
.getBytes();
ops.add(wh.appendAsync(data));
}
return CompletableFuture.allOf(ops.stream()
.toArray(CompletableFuture[]::new))
.thenCompose((v) -> wh.closeAsync());
});
f.get(5, TimeUnit.MINUTES);
}
@Test
void whenWriteAndReadEntriesAsync_thenSuccess() throws Exception {
CompletableFuture<Long> f = bk.newCreateLedgerOp()
.withDigestType(DigestType.MAC)
.withPassword(ledgerPassword)
.execute()
.thenApply((wh) -> {
List<CompletableFuture<Long>> ops = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
byte[] data = String.format("message-%04d", i)
.getBytes();
ops.add(wh.appendAsync(data));
}
return CompletableFuture.allOf(ops.stream()
.toArray(CompletableFuture[]::new))
.thenCompose((v) -> wh.closeAsync())
.thenApply((v) -> wh.getId());
})
.thenCompose((lf) -> lf); // flatten the futures
Long ledgerId = f.get(5, TimeUnit.MINUTES);
LOG.info("Ledger created with 1000 entries: ledgerId=" + ledgerId);
// Now let's read data back...
CompletableFuture<LedgerEntries> ef = bk.newOpenLedgerOp()
.withLedgerId(ledgerId)
.withPassword(ledgerPassword)
.withDigestType(DigestType.MAC)
.execute()
.thenCompose((rh) -> {
return rh.readLastAddConfirmedAsync()
.thenCompose((lastId) -> rh.readAsync(0, lastId));
});
LedgerEntries entries = ef.get(5, TimeUnit.MINUTES);
// Check all writes where OK
long count = 0;
Iterator<org.apache.bookkeeper.client.api.LedgerEntry> it = entries.iterator();
while (it.hasNext()) {
org.apache.bookkeeper.client.api.LedgerEntry e = it.next();
String msg = new String(e.getEntryBytes());
assertEquals(String.format("message-%04d", count), msg);
count++;
}
assertEquals(1000, count);
LOG.info("Got entries: count=" + count);
}
@Test
void whenWriteAndReadEntries_thenSuccess() throws Exception {
LedgerHandle lh = createLedger(bk, "myledger", ledgerPassword);
long start = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
byte[] data = new String("message-" + i).getBytes();
lh.append(data);
}
lh.close();
long elapsed = System.currentTimeMillis() - start;
LOG.info("Entries added to ledgerId " + lh.getId() + ", elapsed=" + elapsed);
Long ledgerId = findLedgerByName(bk, "myledger").orElse(null);
assertNotNull(ledgerId);
lh = bk.openLedger(ledgerId, BookKeeper.DigestType.MAC, ledgerPassword);
long lastId = lh.readLastConfirmed();
Enumeration<LedgerEntry> entries = lh.readEntries(0, lastId);
while (entries.hasMoreElements()) {
LedgerEntry entry = entries.nextElement();
String msg = new String(entry.getEntry());
LOG.info("Entry: id=" + entry.getEntryId() + ", data=" + msg);
}
}
}

View File

@ -0,0 +1,15 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT"
class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} -
%msg%n
</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>

View File

@ -14,6 +14,7 @@
<modules> <modules>
<module>activejdbc</module> <module>activejdbc</module>
<module>apache-bookkeeper</module><!-- BAEL-2322 -->
<module>apache-cayenne</module> <module>apache-cayenne</module>
<module>core-java-persistence</module> <module>core-java-persistence</module>
<module>deltaspike</module> <module>deltaspike</module>