[BAEL-2322] Relocate module

This commit is contained in:
Philippe 2020-06-10 00:32:08 -03:00
parent 21a471e283
commit c438f0c842
15 changed files with 312 additions and 326 deletions

View File

@ -1,5 +0,0 @@
package com.baeldung.tutorials.bookkeeper;
public class LedgerReader {
}

View File

@ -1,12 +0,0 @@
/**
*
*/
package com.baeldung.tutorials.bookkeeper;
/**
* @author Philippe
*
*/
public class LedgerWriter {
}

View File

@ -1,13 +0,0 @@
package com.baeldung.tutorials.bookkeeper;
import org.apache.bookkeeper.client.BookKeeper;
public class Main {
public static void main(String args[]) {
BookKeeper bk = BkHelper.createBkClient(args[0]);
System.out.println("Connect OK");
}
}

View File

@ -1,208 +0,0 @@
package com.baeldung.tutorials.bookkeeper;
import static org.junit.jupiter.api.Assertions.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.api.WriteAdvHandle;
import org.apache.bookkeeper.client.api.WriteHandle;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.AsyncCallback;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
class BkHelperIntegrationTest extends BkHelper {
private static BookKeeper bk;
private byte[] ledgerPassword = "SuperS3cR37".getBytes();
private static final Log log = LogFactory.getLog(BkHelperIntegrationTest.class);
@BeforeAll
static void initBkClient() {
bk = createBkClient("192.168.99.101:2181");
}
@Test
void whenCreateLedger_thenSuccess() throws Exception {
LedgerHandle lh = bk.createLedger(BookKeeper.DigestType.MAC, ledgerPassword);
assertNotNull(lh);
assertNotNull(lh.getId());
CreateCallback cb = (rc, ll, ctx) -> {
};
bk.asyncCreateLedger(3, 2, 2, BookKeeper.DigestType.MAC, "passwd".getBytes(), cb, null, Collections.emptyMap());
//lh.get
// CompletableFuture<WriteAdvHandle> cf = bk.newCreateLedgerOp()
// .withDigestType(org.apache.bookkeeper.client.api.DigestType.MAC)
// .withPassword("password".getBytes())
// .makeAdv()
// .execute();
log.info("[I33] Ledge created: id=" + lh.getId());
}
@Test
void whenListLedgers_thenSuccess() throws Exception {
final AtomicInteger returnCode = new AtomicInteger(BKException.Code.OK);
final CountDownLatch processDone = new CountDownLatch(1);
// There's no standard "list" operation. Instead, BK offers a generalized way to
// iterate over all available ledgers using an async visitor callback.
// The second callback will be called when there are no more ledgers do process or if an
// error occurs.
bk.getLedgerManager()
.asyncProcessLedgers(
(data,cb) -> processLedger(data,cb),
(rc, s, obj) -> {
returnCode.set(rc);
processDone.countDown();
},
null,
BKException.Code.OK,
BKException.Code.ReadException);
processDone.await(5, TimeUnit.MINUTES);
}
@Test
void whenWriteEntries_thenSuccess() throws Exception {
LedgerHandle lh = createLedger(bk,"myledger",ledgerPassword);
long start = System.currentTimeMillis();
for ( int i = 0 ; i < 1000 ; i++ ) {
byte[] data = new String("message-" + i).getBytes();
lh.append(data);
}
byte[] data = new byte[] {};
CompletableFuture<Long> f = lh.appendAsync(data);
AddCallback cbw = (rc,ll,entryId,ctx) -> {
};
lh.asyncAddEntry(data, cbw, null);
lh.close();
long elapsed = System.currentTimeMillis() - start;
log.info("Entries added to ledgerId " + lh.getId() + ". count=1000, elapsed=" + elapsed);
}
@Test
void whenWriteAndReadEntries_thenSuccess() throws Exception {
LedgerHandle lh = createLedger(bk,"myledger",ledgerPassword);
long start = System.currentTimeMillis();
for ( int i = 0 ; i < 1000 ; i++ ) {
byte[] data = new String("message-" + i).getBytes();
lh.append(data);
}
lh.close();
long elapsed = System.currentTimeMillis() - start;
log.info("Entries added to ledgerId " + lh.getId() + ", elapsed=" + elapsed);
Long ledgerId = findLedgerByName(bk,"myledger").orElse(null);
assertNotNull(ledgerId);
lh = bk.openLedger(ledgerId, BookKeeper.DigestType.MAC, ledgerPassword);
long lastId = lh.readLastConfirmed();
Enumeration<LedgerEntry> entries = lh.readEntries(0, lastId);
ReadCallback cbr;
lh.asyncReadEntries(0, lastId,
(rc,ledgerHandle,ee,ctx) -> {
while(ee.hasMoreElements()) {
LedgerEntry e = ee.nextElement();
}
}, null);
ReadHandle rh = bk.newOpenLedgerOp()
.withLedgerId(ledgerId)
.withDigestType(DigestType.MAC)
.withPassword("password".getBytes())
.execute().get();
rh.read(0, lastId).forEach((entry) -> {
});
rh.readAsync(0, lastId).thenAccept((ee) -> {
ee.forEach((entry) -> {
// ..
});
});
while(entries.hasMoreElements()) {
LedgerEntry entry = entries.nextElement();
String msg = new String(entry.getEntry());
log.info("Entry: id=" + entry.getEntryId() + ", data=" + msg);
}
}
private void processLedger(long ledgerId, AsyncCallback.VoidCallback cb) {
log.info("ledgerId: " + ledgerId);
cb.processResult(BKException.Code.OK, null, null);
}
private CompletableFuture<List<Long>> listAllLedgers(BookKeeper bk) {
final List<Long> ledgers = Collections.synchronizedList(new ArrayList<>());
final CountDownLatch processDone = new CountDownLatch(1);
bk.getLedgerManager()
.asyncProcessLedgers(
(ledgerId,cb) -> {
ledgers.add(ledgerId);
cb.processResult(BKException.Code.OK, null, null);
},
(rc, s, obj) -> {
processDone.countDown();
},
null, BKException.Code.OK, BKException.Code.ReadException);
CompletableFuture<List<Long>> cf = CompletableFuture.supplyAsync(() -> {
try {
processDone.await(1,TimeUnit.MINUTES);
return ledgers;
}
catch(InterruptedException ie) {
throw new RuntimeException(ie);
}
});
return cf;
}
}

View File

@ -13,6 +13,7 @@
<groupId>com.baeldung</groupId> <groupId>com.baeldung</groupId>
<artifactId>parent-modules</artifactId> <artifactId>parent-modules</artifactId>
<version>1.0.0-SNAPSHOT</version> <version>1.0.0-SNAPSHOT</version>
<relativePath>../../</relativePath>
</parent> </parent>
<dependencies> <dependencies>
@ -41,29 +42,6 @@
<org.apache.bookkeeper.version>4.10.0</org.apache.bookkeeper.version> <org.apache.bookkeeper.version>4.10.0</org.apache.bookkeeper.version>
</properties> </properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.3</version>
<configuration>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer" />
<mainClass>com.baeldung.tutorials.bookkeeper.Main</mainClass>
<minimizeJar>true</minimizeJar>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project> </project>

View File

@ -1,8 +1,10 @@
package com.baeldung.tutorials.bookkeeper; package com.baeldung.tutorials.bookkeeper;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Map.Entry; import java.util.Map.Entry;
@ -15,7 +17,6 @@ import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.AsyncCallback;
@ -26,14 +27,8 @@ public class BkHelper {
public static BookKeeper createBkClient(String zkConnectionString) { public static BookKeeper createBkClient(String zkConnectionString) {
try { try {
ClientConfiguration cfg = new ClientConfiguration();
cfg.setMetadataServiceUri("zk+null://zookeeper-host:2131");
BookKeeper.forConfig(cfg).build();
return new BookKeeper(zkConnectionString); return new BookKeeper(zkConnectionString);
} } catch (Exception ex) {
catch(Exception ex) {
throw new RuntimeException(ex); throw new RuntimeException(ex);
} }
} }
@ -47,13 +42,8 @@ public class BkHelper {
*/ */
public static LedgerHandle createLedger(BookKeeper bk, String name, byte[] password) { public static LedgerHandle createLedger(BookKeeper bk, String name, byte[] password) {
try { try {
return bk.createLedger(3, 2, 2, DigestType.MAC, password, Collections.singletonMap("name", name.getBytes()));
return bk.createLedger(3,2,2, } catch (Exception ex) {
DigestType.MAC,
password,
Collections.singletonMap("name", name.getBytes()));
}
catch(Exception ex) {
throw new RuntimeException(ex); throw new RuntimeException(ex);
} }
} }
@ -68,7 +58,7 @@ public class BkHelper {
*/ */
public static Optional<Long> findLedgerByName(BookKeeper bk, String name) throws Exception { public static Optional<Long> findLedgerByName(BookKeeper bk, String name) throws Exception {
Map<Long,LedgerMetadata> ledgers = new HashMap<Long, LedgerMetadata>(); Map<Long, LedgerMetadata> ledgers = new HashMap<Long, LedgerMetadata>();
final AtomicInteger returnCode = new AtomicInteger(BKException.Code.OK); final AtomicInteger returnCode = new AtomicInteger(BKException.Code.OK);
final CountDownLatch processDone = new CountDownLatch(1); final CountDownLatch processDone = new CountDownLatch(1);
@ -77,15 +67,11 @@ public class BkHelper {
// The second callback will be called when there are no more ledgers do process or if an // The second callback will be called when there are no more ledgers do process or if an
// error occurs. // error occurs.
bk.getLedgerManager() bk.getLedgerManager()
.asyncProcessLedgers( .asyncProcessLedgers((ledgerId, cb) -> collectLedgers(bk, ledgerId, cb, ledgers),
(data,cb) -> collectLedgers(bk,data,cb,ledgers), (rc, s, obj) -> {
(rc, s, obj) -> { returnCode.set(rc);
returnCode.set(rc); processDone.countDown();
processDone.countDown(); }, null, BKException.Code.OK, BKException.Code.ReadException);
},
null,
BKException.Code.OK,
BKException.Code.ReadException);
processDone.await(5, TimeUnit.MINUTES); processDone.await(5, TimeUnit.MINUTES);
@ -93,52 +79,79 @@ public class BkHelper {
byte[] nameBytes = name.getBytes(); byte[] nameBytes = name.getBytes();
Optional<Entry<Long, LedgerMetadata>> entry = ledgers.entrySet().stream().filter((e) -> { Optional<Entry<Long, LedgerMetadata>> entry = ledgers.entrySet()
Map<String,byte[]> meta = e.getValue().getCustomMetadata(); .stream()
if ( meta != null ) { .filter((e) -> {
log.info("ledger: " + e.getKey() + ", customMeta=" + meta); Map<String, byte[]> meta = e.getValue().getCustomMetadata();
byte[] data = meta.get("name"); if (meta != null) {
if ( data != null && Arrays.equals(data, nameBytes)) { log.info("ledger: " + e.getKey() + ", customMeta=" + meta);
return true; byte[] data = meta.get("name");
} if (data != null && Arrays.equals(data, nameBytes)) {
else { return true;
} else {
return false;
}
} else {
log.info("ledger: " + e.getKey() + ", no meta");
return false; return false;
} }
} })
else { .findFirst();
log.info("ledger: " + e.getKey() + ", no meta");
return false;
}
})
.findFirst();
if (entry.isPresent()) { if (entry.isPresent()) {
return Optional.of(entry.get().getKey()); return Optional.of(entry.get().getKey());
} } else {
else {
return Optional.empty(); return Optional.empty();
} }
} }
public static void collectLedgers(BookKeeper bk, long ledgerId, AsyncCallback.VoidCallback cb, Map<Long,LedgerMetadata> ledgers) { public static void collectLedgers(BookKeeper bk, long ledgerId, AsyncCallback.VoidCallback cb, Map<Long, LedgerMetadata> ledgers) {
log.info("ledgerId: " + ledgerId); log.debug("ledgerId: " + ledgerId);
try { try {
bk.getLedgerManager() bk.getLedgerManager()
.readLedgerMetadata(ledgerId) .readLedgerMetadata(ledgerId)
.thenAccept((v) -> { .thenAccept((v) -> {
log.info("Got ledger metadata"); log.debug("Got ledger metadata");
ledgers.put(ledgerId,v.getValue()); ledgers.put(ledgerId, v.getValue());
}) })
.thenAccept((v) -> { .thenAccept((v) -> {
cb.processResult(BKException.Code.OK, null, null); cb.processResult(BKException.Code.OK, null, null);
}); });
} } catch (Exception ex) {
catch(Exception ex) {
throw new RuntimeException(ex); throw new RuntimeException(ex);
} }
} }
/**
* Return a list with all available Ledgers
* @param bk
* @return
*/
public static List<Long> listAllLedgers(BookKeeper bk) {
final List<Long> ledgers = Collections.synchronizedList(new ArrayList<>());
final CountDownLatch processDone = new CountDownLatch(1);
bk.getLedgerManager()
.asyncProcessLedgers(
(ledgerId,cb) -> {
ledgers.add(ledgerId);
cb.processResult(BKException.Code.OK, null, null);
},
(rc, s, obj) -> {
processDone.countDown();
},
null, BKException.Code.OK, BKException.Code.ReadException);
try {
processDone.await(1,TimeUnit.MINUTES);
return ledgers;
}
catch(InterruptedException ie) {
throw new RuntimeException(ie);
}
}
} }

View File

@ -0,0 +1,229 @@
package com.baeldung.tutorials.bookkeeper;
import static org.junit.jupiter.api.Assertions.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.api.WriteAdvHandle;
import org.apache.bookkeeper.client.api.WriteHandle;
import org.apache.bookkeeper.tools.cli.commands.bookie.ListLedgersCommand;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.AsyncCallback;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import com.google.common.collect.Iterables;
class BkHelperLiveTest extends BkHelper {
private static BookKeeper bk;
private byte[] ledgerPassword = "SuperS3cR37".getBytes();
private static final Log log = LogFactory.getLog(BkHelperLiveTest.class);
@BeforeAll
static void initBkClient() {
bk = createBkClient("192.168.99.101:2181");
}
@Test
void whenCreateLedger_thenSuccess() throws Exception {
LedgerHandle lh = bk.createLedger(BookKeeper.DigestType.MAC, ledgerPassword);
assertNotNull(lh);
assertNotNull(lh.getId());
log.info("[I33] Ledge created: id=" + lh.getId());
}
@Test
void whenCreateLedgerAsync_thenSuccess() throws Exception {
CompletableFuture<WriteHandle> cf = bk.newCreateLedgerOp()
.withDigestType(org.apache.bookkeeper.client.api.DigestType.MAC)
.withPassword("password".getBytes())
.execute();
WriteHandle handle = cf.get(1, TimeUnit.MINUTES);
assertNotNull(handle);
handle.close();
}
@Test
void whenAsyncCreateLedger_thenSuccess() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<LedgerHandle> handleRef =new AtomicReference<>();
bk.asyncCreateLedger(3, 2, 2,
BookKeeper.DigestType.MAC,
ledgerPassword,
(rc, lh, ctx) -> {
handleRef.set(lh);
latch.countDown();
},
null,
Collections.emptyMap());
latch.await(1, TimeUnit.MINUTES);
LedgerHandle lh = handleRef.get();
assertNotNull(lh);
assertFalse(lh.isClosed(),"Ledger should be writeable");
}
@Test
void whenListLedgers_thenSuccess() throws Exception {
List<Long> ledgers = listAllLedgers(bk);
assertNotNull(ledgers);
}
@Test
void whenWriteEntries_thenSuccess() throws Exception {
LedgerHandle lh = createLedger(bk,"myledger",ledgerPassword);
long start = System.currentTimeMillis();
for ( int i = 0 ; i < 1000 ; i++ ) {
byte[] data = new String("message-" + i).getBytes();
lh.append(data);
}
lh.close();
long elapsed = System.currentTimeMillis() - start;
log.info("Entries added to ledgerId " + lh.getId() + ". count=1000, elapsed=" + elapsed);
}
@Test
void whenWriteEntriesAsync_thenSuccess() throws Exception {
CompletableFuture<Object> f = bk.newCreateLedgerOp()
.withDigestType(DigestType.MAC)
.withPassword(ledgerPassword)
.execute()
.thenApply((wh) -> {
List<CompletableFuture<Long>> ops = new ArrayList<>();
for( int i = 0; i < 1000 ; i++ ) {
byte[] data = String.format("message-%04d", i).getBytes();
ops.add(wh.appendAsync(data));
}
return CompletableFuture
.allOf(ops.stream().toArray(CompletableFuture[]::new))
.thenCompose((v) -> wh.closeAsync());
});
f.get(5, TimeUnit.MINUTES);
}
@Test
void whenWriteAndReadEntriesAsync_thenSuccess() throws Exception {
CompletableFuture<Long> f = bk.newCreateLedgerOp()
.withDigestType(DigestType.MAC)
.withPassword(ledgerPassword)
.execute()
.thenApply((wh) -> {
List<CompletableFuture<Long>> ops = new ArrayList<>();
for( int i = 0; i < 1000 ; i++ ) {
byte[] data = String.format("message-%04d", i).getBytes();
ops.add(wh.appendAsync(data));
}
return CompletableFuture
.allOf(ops.stream().toArray(CompletableFuture[]::new))
.thenCompose((v) -> wh.closeAsync())
.thenApply((v) -> wh.getId());
})
.thenCompose((lf) -> lf); // flatten the
Long ledgerId = f.get(5, TimeUnit.MINUTES);
log.info("Ledger created with 1000 entries: ledgerId=" + ledgerId);
// Now let's read data back...
CompletableFuture<LedgerEntries> ef = bk.newOpenLedgerOp()
.withLedgerId(ledgerId)
.withPassword(ledgerPassword)
.withDigestType(DigestType.MAC)
.execute()
.thenCompose((rh) -> {
return rh.readLastAddConfirmedAsync()
.thenCompose((lastId) -> rh.readAsync(0, lastId));
});
LedgerEntries entries = ef.get(5,TimeUnit.MINUTES);
long count = 0;
Iterator<org.apache.bookkeeper.client.api.LedgerEntry> it = entries.iterator();
while ( it.hasNext()) {
org.apache.bookkeeper.client.api.LedgerEntry e = it.next();
String msg = new String(e.getEntryBytes());
assertEquals(String.format("message-%04d", count),msg);
count++;
}
assertEquals(1000,count);
log.info("Got entries: count=" + count);
}
@Test
void whenWriteAndReadEntries_thenSuccess() throws Exception {
LedgerHandle lh = createLedger(bk,"myledger",ledgerPassword);
long start = System.currentTimeMillis();
for ( int i = 0 ; i < 1000 ; i++ ) {
byte[] data = new String("message-" + i).getBytes();
lh.append(data);
}
lh.close();
long elapsed = System.currentTimeMillis() - start;
log.info("Entries added to ledgerId " + lh.getId() + ", elapsed=" + elapsed);
Long ledgerId = findLedgerByName(bk,"myledger").orElse(null);
assertNotNull(ledgerId);
lh = bk.openLedger(ledgerId, BookKeeper.DigestType.MAC, ledgerPassword);
long lastId = lh.readLastConfirmed();
Enumeration<LedgerEntry> entries = lh.readEntries(0, lastId);
while(entries.hasMoreElements()) {
LedgerEntry entry = entries.nextElement();
String msg = new String(entry.getEntry());
log.info("Entry: id=" + entry.getEntryId() + ", data=" + msg);
}
}
}

View File

@ -217,4 +217,8 @@
</plugins> </plugins>
</build> </build>
<modules>
<module>apache-bookkeeper</module>
</modules>
</project> </project>