[BAEL-2322] Apache BookKeeper
This commit is contained in:
parent
1fe51ed7c0
commit
37a82cd1bd
|
@ -0,0 +1,2 @@
|
|||
FROM openjdk:8
|
||||
COPY add target/apache-bookkeeper-0.0.1-SNAPSHOT.jar /app.jar
|
|
@ -0,0 +1,71 @@
|
|||
version: '3.0'
|
||||
services:
|
||||
zk:
|
||||
image: zookeeper:3.6.1
|
||||
restart: always
|
||||
ports:
|
||||
- "2181:2181"
|
||||
volumes:
|
||||
- ./data/zk:/data
|
||||
|
||||
bookie_init:
|
||||
image: apache/bookkeeper:4.10.0
|
||||
environment:
|
||||
BK_zkServers: "zk:2181"
|
||||
BK_advertisedAddress: ${BK_PUBLIC_IP}
|
||||
restart: on-failure
|
||||
depends_on:
|
||||
- zk
|
||||
command: /opt/bookkeeper/bin/bookkeeper shell metaformat -nonInteractive
|
||||
|
||||
bookie:
|
||||
image: apache/bookkeeper:4.10.0
|
||||
restart: on-failure
|
||||
environment:
|
||||
BK_zkServers: "zk:2181"
|
||||
BK_advertisedAddress: ${BK_PUBLIC_IP}
|
||||
BK_httpServerPort: 3182
|
||||
ports:
|
||||
- "3181:3181"
|
||||
- "3182:3182"
|
||||
volumes:
|
||||
- ./data/bk:/data
|
||||
depends_on:
|
||||
- zk
|
||||
- bookie_init
|
||||
|
||||
bookie1:
|
||||
image: apache/bookkeeper:4.10.0
|
||||
restart: on-failure
|
||||
environment:
|
||||
BOOKIE_PORT: 4181
|
||||
BK_zkServers: "zk:2181"
|
||||
BK_advertisedAddress: ${BK_PUBLIC_IP}
|
||||
BK_httpServerPort: 3182
|
||||
ports:
|
||||
- "4181:4181"
|
||||
volumes:
|
||||
- ./data/bk1:/data
|
||||
depends_on:
|
||||
- zk
|
||||
- bookie_init
|
||||
|
||||
bookie2:
|
||||
image: apache/bookkeeper:4.10.0
|
||||
restart: on-failure
|
||||
environment:
|
||||
BOOKIE_PORT: 4182
|
||||
BK_zkServers: "zk:2181"
|
||||
BK_advertisedAddress: ${BK_PUBLIC_IP}
|
||||
BK_httpServerPort: 3182
|
||||
ports:
|
||||
- "4182:4182"
|
||||
volumes:
|
||||
- ./data/bk2:/data
|
||||
depends_on:
|
||||
- zk
|
||||
- bookie_init
|
||||
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,69 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>apache-bookkeeper</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<name>apache-bookkeeper</name>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<parent>
|
||||
<groupId>com.baeldung</groupId>
|
||||
<artifactId>parent-modules</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.bookkeeper</groupId>
|
||||
<artifactId>bookkeeper-server</artifactId>
|
||||
<version>${org.apache.bookkeeper.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.testcontainers</groupId>
|
||||
<artifactId>testcontainers</artifactId>
|
||||
<version>1.14.3</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<properties>
|
||||
<org.apache.bookkeeper.version>4.10.0</org.apache.bookkeeper.version>
|
||||
</properties>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<version>3.2.3</version>
|
||||
<configuration>
|
||||
<transformer
|
||||
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer" />
|
||||
<mainClass>com.baeldung.tutorials.bookkeeper.Main</mainClass>
|
||||
<minimizeJar>true</minimizeJar>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>shade</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
||||
|
|
@ -0,0 +1,144 @@
|
|||
package com.baeldung.tutorials.bookkeeper;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.bookkeeper.client.BKException;
|
||||
import org.apache.bookkeeper.client.BookKeeper;
|
||||
import org.apache.bookkeeper.client.BookKeeper.DigestType;
|
||||
import org.apache.bookkeeper.client.LedgerHandle;
|
||||
import org.apache.bookkeeper.client.api.LedgerMetadata;
|
||||
import org.apache.bookkeeper.conf.ClientConfiguration;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.zookeeper.AsyncCallback;
|
||||
|
||||
public class BkHelper {
|
||||
|
||||
private static final Log log = LogFactory.getLog(BkHelper.class);
|
||||
|
||||
public static BookKeeper createBkClient(String zkConnectionString) {
|
||||
try {
|
||||
ClientConfiguration cfg = new ClientConfiguration();
|
||||
cfg.setMetadataServiceUri("zk+null://zookeeper-host:2131");
|
||||
BookKeeper.forConfig(cfg).build();
|
||||
|
||||
|
||||
return new BookKeeper(zkConnectionString);
|
||||
}
|
||||
catch(Exception ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a Ledger with the given name added as custom metadata
|
||||
* @param bk
|
||||
* @param name
|
||||
* @param password
|
||||
* @return
|
||||
*/
|
||||
public static LedgerHandle createLedger(BookKeeper bk, String name, byte[] password) {
|
||||
try {
|
||||
|
||||
return bk.createLedger(3,2,2,
|
||||
DigestType.MAC,
|
||||
password,
|
||||
Collections.singletonMap("name", name.getBytes()));
|
||||
}
|
||||
catch(Exception ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Iterates over all available ledgers and returns the first one that has
|
||||
* a metadata key 'name' equals to the given name
|
||||
* @param bk
|
||||
* @param name
|
||||
* @return
|
||||
* @throws Exception
|
||||
*/
|
||||
public static Optional<Long> findLedgerByName(BookKeeper bk, String name) throws Exception {
|
||||
|
||||
Map<Long,LedgerMetadata> ledgers = new HashMap<Long, LedgerMetadata>();
|
||||
final AtomicInteger returnCode = new AtomicInteger(BKException.Code.OK);
|
||||
final CountDownLatch processDone = new CountDownLatch(1);
|
||||
|
||||
// There's no standard "list" operation. Instead, BK offers a generalized way to
|
||||
// iterate over all available ledgers using an async visitor callback.
|
||||
// The second callback will be called when there are no more ledgers do process or if an
|
||||
// error occurs.
|
||||
bk.getLedgerManager()
|
||||
.asyncProcessLedgers(
|
||||
(data,cb) -> collectLedgers(bk,data,cb,ledgers),
|
||||
(rc, s, obj) -> {
|
||||
returnCode.set(rc);
|
||||
processDone.countDown();
|
||||
},
|
||||
null,
|
||||
BKException.Code.OK,
|
||||
BKException.Code.ReadException);
|
||||
|
||||
processDone.await(5, TimeUnit.MINUTES);
|
||||
|
||||
log.info("Ledgers collected: total found=" + ledgers.size());
|
||||
|
||||
byte[] nameBytes = name.getBytes();
|
||||
|
||||
Optional<Entry<Long, LedgerMetadata>> entry = ledgers.entrySet().stream().filter((e) -> {
|
||||
Map<String,byte[]> meta = e.getValue().getCustomMetadata();
|
||||
if ( meta != null ) {
|
||||
log.info("ledger: " + e.getKey() + ", customMeta=" + meta);
|
||||
byte[] data = meta.get("name");
|
||||
if ( data != null && Arrays.equals(data, nameBytes)) {
|
||||
return true;
|
||||
}
|
||||
else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
else {
|
||||
log.info("ledger: " + e.getKey() + ", no meta");
|
||||
return false;
|
||||
}
|
||||
})
|
||||
.findFirst();
|
||||
|
||||
|
||||
if (entry.isPresent()) {
|
||||
return Optional.of(entry.get().getKey());
|
||||
}
|
||||
else {
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
public static void collectLedgers(BookKeeper bk, long ledgerId, AsyncCallback.VoidCallback cb, Map<Long,LedgerMetadata> ledgers) {
|
||||
log.info("ledgerId: " + ledgerId);
|
||||
|
||||
try {
|
||||
bk.getLedgerManager()
|
||||
.readLedgerMetadata(ledgerId)
|
||||
.thenAccept((v) -> {
|
||||
log.info("Got ledger metadata");
|
||||
ledgers.put(ledgerId,v.getValue());
|
||||
})
|
||||
.thenAccept((v) -> {
|
||||
cb.processResult(BKException.Code.OK, null, null);
|
||||
});
|
||||
}
|
||||
catch(Exception ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,5 @@
|
|||
package com.baeldung.tutorials.bookkeeper;
|
||||
|
||||
public class LedgerReader {
|
||||
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
/**
|
||||
*
|
||||
*/
|
||||
package com.baeldung.tutorials.bookkeeper;
|
||||
|
||||
/**
|
||||
* @author Philippe
|
||||
*
|
||||
*/
|
||||
public class LedgerWriter {
|
||||
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
package com.baeldung.tutorials.bookkeeper;
|
||||
|
||||
import org.apache.bookkeeper.client.BookKeeper;
|
||||
|
||||
public class Main {
|
||||
|
||||
public static void main(String args[]) {
|
||||
|
||||
BookKeeper bk = BkHelper.createBkClient(args[0]);
|
||||
System.out.println("Connect OK");
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,208 @@
|
|||
package com.baeldung.tutorials.bookkeeper;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Enumeration;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
|
||||
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
|
||||
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
|
||||
import org.apache.bookkeeper.client.BKException;
|
||||
import org.apache.bookkeeper.client.BookKeeper;
|
||||
import org.apache.bookkeeper.client.LedgerEntry;
|
||||
import org.apache.bookkeeper.client.LedgerHandle;
|
||||
import org.apache.bookkeeper.client.api.DigestType;
|
||||
import org.apache.bookkeeper.client.api.ReadHandle;
|
||||
import org.apache.bookkeeper.client.api.WriteAdvHandle;
|
||||
import org.apache.bookkeeper.client.api.WriteHandle;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.zookeeper.AsyncCallback;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
class BkHelperIntegrationTest extends BkHelper {
|
||||
|
||||
private static BookKeeper bk;
|
||||
private byte[] ledgerPassword = "SuperS3cR37".getBytes();
|
||||
|
||||
private static final Log log = LogFactory.getLog(BkHelperIntegrationTest.class);
|
||||
|
||||
@BeforeAll
|
||||
static void initBkClient() {
|
||||
bk = createBkClient("192.168.99.101:2181");
|
||||
}
|
||||
|
||||
@Test
|
||||
void whenCreateLedger_thenSuccess() throws Exception {
|
||||
|
||||
LedgerHandle lh = bk.createLedger(BookKeeper.DigestType.MAC, ledgerPassword);
|
||||
assertNotNull(lh);
|
||||
assertNotNull(lh.getId());
|
||||
|
||||
CreateCallback cb = (rc, ll, ctx) -> {
|
||||
|
||||
};
|
||||
|
||||
bk.asyncCreateLedger(3, 2, 2, BookKeeper.DigestType.MAC, "passwd".getBytes(), cb, null, Collections.emptyMap());
|
||||
//lh.get
|
||||
|
||||
// CompletableFuture<WriteAdvHandle> cf = bk.newCreateLedgerOp()
|
||||
// .withDigestType(org.apache.bookkeeper.client.api.DigestType.MAC)
|
||||
// .withPassword("password".getBytes())
|
||||
// .makeAdv()
|
||||
// .execute();
|
||||
|
||||
log.info("[I33] Ledge created: id=" + lh.getId());
|
||||
}
|
||||
|
||||
@Test
|
||||
void whenListLedgers_thenSuccess() throws Exception {
|
||||
|
||||
final AtomicInteger returnCode = new AtomicInteger(BKException.Code.OK);
|
||||
final CountDownLatch processDone = new CountDownLatch(1);
|
||||
|
||||
// There's no standard "list" operation. Instead, BK offers a generalized way to
|
||||
// iterate over all available ledgers using an async visitor callback.
|
||||
// The second callback will be called when there are no more ledgers do process or if an
|
||||
// error occurs.
|
||||
bk.getLedgerManager()
|
||||
.asyncProcessLedgers(
|
||||
(data,cb) -> processLedger(data,cb),
|
||||
(rc, s, obj) -> {
|
||||
returnCode.set(rc);
|
||||
processDone.countDown();
|
||||
},
|
||||
null,
|
||||
BKException.Code.OK,
|
||||
BKException.Code.ReadException);
|
||||
|
||||
processDone.await(5, TimeUnit.MINUTES);
|
||||
}
|
||||
|
||||
@Test
|
||||
void whenWriteEntries_thenSuccess() throws Exception {
|
||||
|
||||
LedgerHandle lh = createLedger(bk,"myledger",ledgerPassword);
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
for ( int i = 0 ; i < 1000 ; i++ ) {
|
||||
byte[] data = new String("message-" + i).getBytes();
|
||||
lh.append(data);
|
||||
}
|
||||
|
||||
byte[] data = new byte[] {};
|
||||
|
||||
CompletableFuture<Long> f = lh.appendAsync(data);
|
||||
AddCallback cbw = (rc,ll,entryId,ctx) -> {
|
||||
|
||||
};
|
||||
|
||||
lh.asyncAddEntry(data, cbw, null);
|
||||
|
||||
lh.close();
|
||||
long elapsed = System.currentTimeMillis() - start;
|
||||
log.info("Entries added to ledgerId " + lh.getId() + ". count=1000, elapsed=" + elapsed);
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
void whenWriteAndReadEntries_thenSuccess() throws Exception {
|
||||
|
||||
LedgerHandle lh = createLedger(bk,"myledger",ledgerPassword);
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
for ( int i = 0 ; i < 1000 ; i++ ) {
|
||||
byte[] data = new String("message-" + i).getBytes();
|
||||
lh.append(data);
|
||||
}
|
||||
|
||||
lh.close();
|
||||
long elapsed = System.currentTimeMillis() - start;
|
||||
log.info("Entries added to ledgerId " + lh.getId() + ", elapsed=" + elapsed);
|
||||
|
||||
Long ledgerId = findLedgerByName(bk,"myledger").orElse(null);
|
||||
assertNotNull(ledgerId);
|
||||
|
||||
lh = bk.openLedger(ledgerId, BookKeeper.DigestType.MAC, ledgerPassword);
|
||||
long lastId = lh.readLastConfirmed();
|
||||
Enumeration<LedgerEntry> entries = lh.readEntries(0, lastId);
|
||||
|
||||
ReadCallback cbr;
|
||||
lh.asyncReadEntries(0, lastId,
|
||||
(rc,ledgerHandle,ee,ctx) -> {
|
||||
while(ee.hasMoreElements()) {
|
||||
LedgerEntry e = ee.nextElement();
|
||||
}
|
||||
}, null);
|
||||
|
||||
ReadHandle rh = bk.newOpenLedgerOp()
|
||||
.withLedgerId(ledgerId)
|
||||
.withDigestType(DigestType.MAC)
|
||||
.withPassword("password".getBytes())
|
||||
.execute().get();
|
||||
|
||||
rh.read(0, lastId).forEach((entry) -> {
|
||||
|
||||
});
|
||||
|
||||
rh.readAsync(0, lastId).thenAccept((ee) -> {
|
||||
ee.forEach((entry) -> {
|
||||
// ..
|
||||
});
|
||||
});
|
||||
|
||||
while(entries.hasMoreElements()) {
|
||||
LedgerEntry entry = entries.nextElement();
|
||||
String msg = new String(entry.getEntry());
|
||||
log.info("Entry: id=" + entry.getEntryId() + ", data=" + msg);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void processLedger(long ledgerId, AsyncCallback.VoidCallback cb) {
|
||||
log.info("ledgerId: " + ledgerId);
|
||||
cb.processResult(BKException.Code.OK, null, null);
|
||||
}
|
||||
|
||||
|
||||
private CompletableFuture<List<Long>> listAllLedgers(BookKeeper bk) {
|
||||
|
||||
final List<Long> ledgers = Collections.synchronizedList(new ArrayList<>());
|
||||
final CountDownLatch processDone = new CountDownLatch(1);
|
||||
|
||||
bk.getLedgerManager()
|
||||
.asyncProcessLedgers(
|
||||
(ledgerId,cb) -> {
|
||||
ledgers.add(ledgerId);
|
||||
cb.processResult(BKException.Code.OK, null, null);
|
||||
},
|
||||
(rc, s, obj) -> {
|
||||
processDone.countDown();
|
||||
},
|
||||
null, BKException.Code.OK, BKException.Code.ReadException);
|
||||
|
||||
CompletableFuture<List<Long>> cf = CompletableFuture.supplyAsync(() -> {
|
||||
try {
|
||||
processDone.await(1,TimeUnit.MINUTES);
|
||||
return ledgers;
|
||||
}
|
||||
catch(InterruptedException ie) {
|
||||
throw new RuntimeException(ie);
|
||||
}
|
||||
});
|
||||
|
||||
return cf;
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<configuration>
|
||||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
|
||||
</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<root level="INFO">
|
||||
<appender-ref ref="STDOUT" />
|
||||
</root>
|
||||
</configuration>
|
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue