[BAEL-2322] Code formatting

This commit is contained in:
Philippe 2020-06-14 16:54:22 -03:00
parent 0c2aedbe8f
commit 00a5903c46
4 changed files with 151 additions and 201 deletions

View File

@ -1,46 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" <project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>apache-bookkeeper</artifactId> <artifactId>apache-bookkeeper</artifactId>
<version>0.0.1-SNAPSHOT</version> <version>0.0.1-SNAPSHOT</version>
<name>apache-bookkeeper</name> <name>apache-bookkeeper</name>
<packaging>jar</packaging> <packaging>jar</packaging>
<parent> <parent>
<groupId>com.baeldung</groupId> <groupId>com.baeldung</groupId>
<artifactId>parent-modules</artifactId> <artifactId>parent-modules</artifactId>
<version>1.0.0-SNAPSHOT</version> <version>1.0.0-SNAPSHOT</version>
<relativePath>../../</relativePath> <relativePath>../../</relativePath>
</parent> </parent>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.apache.bookkeeper</groupId> <groupId>org.apache.bookkeeper</groupId>
<artifactId>bookkeeper-server</artifactId> <artifactId>bookkeeper-server</artifactId>
<version>${org.apache.bookkeeper.version}</version> <version>${org.apache.bookkeeper.version}</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId> <artifactId>slf4j-log4j12</artifactId>
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.testcontainers</groupId> <groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId> <artifactId>testcontainers</artifactId>
<version>1.14.3</version> <version>1.14.3</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
</dependencies>
<properties> </dependencies>
<org.apache.bookkeeper.version>4.10.0</org.apache.bookkeeper.version>
</properties> <properties>
<org.apache.bookkeeper.version>4.10.0</org.apache.bookkeeper.version>
</properties>
</project> </project>

View File

@ -6,8 +6,8 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -23,7 +23,7 @@ import org.apache.zookeeper.AsyncCallback;
public class BkHelper { public class BkHelper {
private static final Log log = LogFactory.getLog(BkHelper.class); private static final Log LOG = LogFactory.getLog(BkHelper.class);
public static BookKeeper createBkClient(String zkConnectionString) { public static BookKeeper createBkClient(String zkConnectionString) {
try { try {
@ -57,7 +57,6 @@ public class BkHelper {
* @throws Exception * @throws Exception
*/ */
public static Optional<Long> findLedgerByName(BookKeeper bk, String name) throws Exception { public static Optional<Long> findLedgerByName(BookKeeper bk, String name) throws Exception {
Map<Long, LedgerMetadata> ledgers = new HashMap<Long, LedgerMetadata>(); Map<Long, LedgerMetadata> ledgers = new HashMap<Long, LedgerMetadata>();
final AtomicInteger returnCode = new AtomicInteger(BKException.Code.OK); final AtomicInteger returnCode = new AtomicInteger(BKException.Code.OK);
final CountDownLatch processDone = new CountDownLatch(1); final CountDownLatch processDone = new CountDownLatch(1);
@ -67,52 +66,51 @@ public class BkHelper {
// The second callback will be called when there are no more ledgers do process or if an // The second callback will be called when there are no more ledgers do process or if an
// error occurs. // error occurs.
bk.getLedgerManager() bk.getLedgerManager()
.asyncProcessLedgers((ledgerId, cb) -> collectLedgers(bk, ledgerId, cb, ledgers), .asyncProcessLedgers(
(rc, s, obj) -> { (ledgerId, cb) -> collectLedgers(bk, ledgerId, cb, ledgers),
returnCode.set(rc); (rc, s, obj) -> {
processDone.countDown(); returnCode.set(rc);
}, null, BKException.Code.OK, BKException.Code.ReadException); processDone.countDown();
},
null,
BKException.Code.OK, BKException.Code.ReadException);
processDone.await(5, TimeUnit.MINUTES); processDone.await(5, TimeUnit.MINUTES);
LOG.info("Ledgers collected: total found=" + ledgers.size());
log.info("Ledgers collected: total found=" + ledgers.size());
byte[] nameBytes = name.getBytes(); byte[] nameBytes = name.getBytes();
Optional<Entry<Long, LedgerMetadata>> entry = ledgers.entrySet() Optional<Entry<Long, LedgerMetadata>> entry = ledgers.entrySet()
.stream() .stream()
.filter((e) -> { .filter((e) -> {
Map<String, byte[]> meta = e.getValue().getCustomMetadata(); Map<String, byte[]> meta = e.getValue()
if (meta != null) { .getCustomMetadata();
log.info("ledger: " + e.getKey() + ", customMeta=" + meta); if (meta != null) {
byte[] data = meta.get("name"); LOG.info("ledger: " + e.getKey() + ", customMeta=" + meta);
if (data != null && Arrays.equals(data, nameBytes)) { byte[] data = meta.get("name");
return true; if (data != null && Arrays.equals(data, nameBytes)) {
} else { return true;
return false; } else {
} return false;
} else { }
log.info("ledger: " + e.getKey() + ", no meta"); } else {
return false; LOG.info("ledger: " + e.getKey() + ", no meta");
} return false;
}) }
.findFirst(); })
.findFirst();
if (entry.isPresent()) { if (entry.isPresent()) {
return Optional.of(entry.get().getKey()); return Optional.of(entry.get()
.getKey());
} else { } else {
return Optional.empty(); return Optional.empty();
} }
} }
public static void collectLedgers(BookKeeper bk, long ledgerId, AsyncCallback.VoidCallback cb, Map<Long, LedgerMetadata> ledgers) { public static void collectLedgers(BookKeeper bk, long ledgerId, AsyncCallback.VoidCallback cb, Map<Long, LedgerMetadata> ledgers) {
log.debug("ledgerId: " + ledgerId);
try { try {
bk.getLedgerManager() bk.getLedgerManager()
.readLedgerMetadata(ledgerId) .readLedgerMetadata(ledgerId)
.thenAccept((v) -> { .thenAccept((v) -> {
log.debug("Got ledger metadata"); LOG.debug("Got ledger metadata");
ledgers.put(ledgerId, v.getValue()); ledgers.put(ledgerId, v.getValue());
}) })
.thenAccept((v) -> { .thenAccept((v) -> {
@ -122,36 +120,30 @@ public class BkHelper {
throw new RuntimeException(ex); throw new RuntimeException(ex);
} }
} }
/** /**
* Return a list with all available Ledgers * Return a list with all available Ledgers
* @param bk * @param bk
* @return * @return
*/ */
public static List<Long> listAllLedgers(BookKeeper bk) { public static List<Long> listAllLedgers(BookKeeper bk) {
final List<Long> ledgers = Collections.synchronizedList(new ArrayList<>()); final List<Long> ledgers = Collections.synchronizedList(new ArrayList<>());
final CountDownLatch processDone = new CountDownLatch(1); final CountDownLatch processDone = new CountDownLatch(1);
bk.getLedgerManager() bk.getLedgerManager()
.asyncProcessLedgers( .asyncProcessLedgers((ledgerId, cb) -> {
(ledgerId,cb) -> { ledgers.add(ledgerId);
ledgers.add(ledgerId); cb.processResult(BKException.Code.OK, null, null);
cb.processResult(BKException.Code.OK, null, null); },
}, (rc, s, obj) -> {
(rc, s, obj) -> { processDone.countDown();
processDone.countDown(); }, null, BKException.Code.OK, BKException.Code.ReadException);
},
null, BKException.Code.OK, BKException.Code.ReadException); try {
processDone.await(1, TimeUnit.MINUTES);
try { return ledgers;
processDone.await(1,TimeUnit.MINUTES); } catch (InterruptedException ie) {
return ledgers; throw new RuntimeException(ie);
} }
catch(InterruptedException ie) {
throw new RuntimeException(ie);
}
} }
} }

View File

@ -1,9 +1,10 @@
package com.baeldung.tutorials.bookkeeper; package com.baeldung.tutorials.bookkeeper;
import static org.junit.jupiter.api.Assertions.*; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.Iterator; import java.util.Iterator;
@ -11,163 +12,128 @@ import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.DigestType; import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.api.WriteAdvHandle;
import org.apache.bookkeeper.client.api.WriteHandle; import org.apache.bookkeeper.client.api.WriteHandle;
import org.apache.bookkeeper.tools.cli.commands.bookie.ListLedgersCommand;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.AsyncCallback;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import com.google.common.collect.Iterables;
class BkHelperLiveTest extends BkHelper { class BkHelperLiveTest extends BkHelper {
private static BookKeeper bk; private static BookKeeper bk;
private byte[] ledgerPassword = "SuperS3cR37".getBytes(); private byte[] ledgerPassword = "SuperS3cR37".getBytes();
private static final Log LOG = LogFactory.getLog(BkHelperLiveTest.class);
private static final Log log = LogFactory.getLog(BkHelperLiveTest.class);
@BeforeAll @BeforeAll
static void initBkClient() { static void initBkClient() {
bk = createBkClient("192.168.99.101:2181"); bk = createBkClient("192.168.99.101:2181");
} }
@Test @Test
void whenCreateLedger_thenSuccess() throws Exception { void whenCreateLedger_thenSuccess() throws Exception {
LedgerHandle lh = bk.createLedger(BookKeeper.DigestType.MAC, ledgerPassword); LedgerHandle lh = bk.createLedger(BookKeeper.DigestType.MAC, ledgerPassword);
assertNotNull(lh); assertNotNull(lh);
assertNotNull(lh.getId()); assertNotNull(lh.getId());
LOG.info("[I33] Ledge created: id=" + lh.getId());
log.info("[I33] Ledge created: id=" + lh.getId());
} }
@Test @Test
void whenCreateLedgerAsync_thenSuccess() throws Exception { void whenCreateLedgerAsync_thenSuccess() throws Exception {
CompletableFuture<WriteHandle> cf = bk.newCreateLedgerOp() CompletableFuture<WriteHandle> cf = bk.newCreateLedgerOp()
.withDigestType(org.apache.bookkeeper.client.api.DigestType.MAC) .withDigestType(org.apache.bookkeeper.client.api.DigestType.MAC)
.withPassword("password".getBytes()) .withPassword("password".getBytes())
.execute(); .execute();
WriteHandle handle = cf.get(1, TimeUnit.MINUTES); WriteHandle handle = cf.get(1, TimeUnit.MINUTES);
assertNotNull(handle); assertNotNull(handle);
handle.close(); handle.close();
} }
@Test @Test
void whenAsyncCreateLedger_thenSuccess() throws Exception { void whenAsyncCreateLedger_thenSuccess() throws Exception {
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
AtomicReference<LedgerHandle> handleRef =new AtomicReference<>(); AtomicReference<LedgerHandle> handleRef = new AtomicReference<>();
bk.asyncCreateLedger(3, 2, 2, bk.asyncCreateLedger(3, 2, 2, BookKeeper.DigestType.MAC, ledgerPassword,
BookKeeper.DigestType.MAC,
ledgerPassword,
(rc, lh, ctx) -> { (rc, lh, ctx) -> {
handleRef.set(lh); handleRef.set(lh);
latch.countDown(); latch.countDown();
}, }, null, Collections.emptyMap());
null,
Collections.emptyMap());
latch.await(1, TimeUnit.MINUTES); latch.await(1, TimeUnit.MINUTES);
LedgerHandle lh = handleRef.get(); LedgerHandle lh = handleRef.get();
assertNotNull(lh); assertNotNull(lh);
assertFalse(lh.isClosed(),"Ledger should be writeable"); assertFalse(lh.isClosed(), "Ledger should be writeable");
} }
@Test @Test
void whenListLedgers_thenSuccess() throws Exception { void whenListLedgers_thenSuccess() throws Exception {
List<Long> ledgers = listAllLedgers(bk); List<Long> ledgers = listAllLedgers(bk);
assertNotNull(ledgers); assertNotNull(ledgers);
} }
@Test @Test
void whenWriteEntries_thenSuccess() throws Exception { void whenWriteEntries_thenSuccess() throws Exception {
LedgerHandle lh = createLedger(bk, "myledger", ledgerPassword);
LedgerHandle lh = createLedger(bk,"myledger",ledgerPassword);
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
for ( int i = 0 ; i < 1000 ; i++ ) { for (int i = 0; i < 1000; i++) {
byte[] data = new String("message-" + i).getBytes(); byte[] data = new String("message-" + i).getBytes();
lh.append(data); lh.append(data);
} }
lh.close(); lh.close();
long elapsed = System.currentTimeMillis() - start; long elapsed = System.currentTimeMillis() - start;
log.info("Entries added to ledgerId " + lh.getId() + ". count=1000, elapsed=" + elapsed); LOG.info("Entries added to ledgerId " + lh.getId() + ". count=1000, elapsed=" + elapsed);
} }
@Test @Test
void whenWriteEntriesAsync_thenSuccess() throws Exception { void whenWriteEntriesAsync_thenSuccess() throws Exception {
CompletableFuture<Object> f = bk.newCreateLedgerOp() CompletableFuture<Object> f = bk.newCreateLedgerOp()
.withDigestType(DigestType.MAC) .withDigestType(DigestType.MAC)
.withPassword(ledgerPassword) .withPassword(ledgerPassword)
.execute() .execute()
.thenApply((wh) -> { .thenApply((wh) -> {
List<CompletableFuture<Long>> ops = new ArrayList<>(); List<CompletableFuture<Long>> ops = new ArrayList<>();
for( int i = 0; i < 1000 ; i++ ) { for (int i = 0; i < 1000; i++) {
byte[] data = String.format("message-%04d", i).getBytes(); byte[] data = String.format("message-%04d", i)
.getBytes();
ops.add(wh.appendAsync(data)); ops.add(wh.appendAsync(data));
} }
return CompletableFuture.allOf(ops.stream()
return CompletableFuture .toArray(CompletableFuture[]::new))
.allOf(ops.stream().toArray(CompletableFuture[]::new)) .thenCompose((v) -> wh.closeAsync());
.thenCompose((v) -> wh.closeAsync()); });
});
f.get(5, TimeUnit.MINUTES); f.get(5, TimeUnit.MINUTES);
} }
@Test @Test
void whenWriteAndReadEntriesAsync_thenSuccess() throws Exception { void whenWriteAndReadEntriesAsync_thenSuccess() throws Exception {
CompletableFuture<Long> f = bk.newCreateLedgerOp() CompletableFuture<Long> f = bk.newCreateLedgerOp()
.withDigestType(DigestType.MAC) .withDigestType(DigestType.MAC)
.withPassword(ledgerPassword) .withPassword(ledgerPassword)
.execute() .execute()
.thenApply((wh) -> { .thenApply((wh) -> {
List<CompletableFuture<Long>> ops = new ArrayList<>(); List<CompletableFuture<Long>> ops = new ArrayList<>();
for( int i = 0; i < 1000 ; i++ ) { for (int i = 0; i < 1000; i++) {
byte[] data = String.format("message-%04d", i).getBytes(); byte[] data = String.format("message-%04d", i)
.getBytes();
ops.add(wh.appendAsync(data)); ops.add(wh.appendAsync(data));
} }
return CompletableFuture.allOf(ops.stream()
.toArray(CompletableFuture[]::new))
return CompletableFuture
.allOf(ops.stream().toArray(CompletableFuture[]::new))
.thenCompose((v) -> wh.closeAsync()) .thenCompose((v) -> wh.closeAsync())
.thenApply((v) -> wh.getId()); .thenApply((v) -> wh.getId());
}) })
.thenCompose((lf) -> lf); // flatten the .thenCompose((lf) -> lf); // flatten the futures
Long ledgerId = f.get(5, TimeUnit.MINUTES); Long ledgerId = f.get(5, TimeUnit.MINUTES);
log.info("Ledger created with 1000 entries: ledgerId=" + ledgerId); LOG.info("Ledger created with 1000 entries: ledgerId=" + ledgerId);
// Now let's read data back... // Now let's read data back...
CompletableFuture<LedgerEntries> ef = bk.newOpenLedgerOp() CompletableFuture<LedgerEntries> ef = bk.newOpenLedgerOp()
.withLedgerId(ledgerId) .withLedgerId(ledgerId)
@ -175,55 +141,45 @@ class BkHelperLiveTest extends BkHelper {
.withDigestType(DigestType.MAC) .withDigestType(DigestType.MAC)
.execute() .execute()
.thenCompose((rh) -> { .thenCompose((rh) -> {
return rh.readLastAddConfirmedAsync() return rh.readLastAddConfirmedAsync()
.thenCompose((lastId) -> rh.readAsync(0, lastId)); .thenCompose((lastId) -> rh.readAsync(0, lastId));
}); });
LedgerEntries entries = ef.get(5, TimeUnit.MINUTES);
LedgerEntries entries = ef.get(5,TimeUnit.MINUTES); // Check all writes where OK
long count = 0; long count = 0;
Iterator<org.apache.bookkeeper.client.api.LedgerEntry> it = entries.iterator(); Iterator<org.apache.bookkeeper.client.api.LedgerEntry> it = entries.iterator();
while ( it.hasNext()) { while (it.hasNext()) {
org.apache.bookkeeper.client.api.LedgerEntry e = it.next(); org.apache.bookkeeper.client.api.LedgerEntry e = it.next();
String msg = new String(e.getEntryBytes()); String msg = new String(e.getEntryBytes());
assertEquals(String.format("message-%04d", count),msg); assertEquals(String.format("message-%04d", count), msg);
count++; count++;
} }
assertEquals(1000, count);
assertEquals(1000,count); LOG.info("Got entries: count=" + count);
log.info("Got entries: count=" + count);
} }
@Test @Test
void whenWriteAndReadEntries_thenSuccess() throws Exception { void whenWriteAndReadEntries_thenSuccess() throws Exception {
LedgerHandle lh = createLedger(bk, "myledger", ledgerPassword);
LedgerHandle lh = createLedger(bk,"myledger",ledgerPassword);
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
for ( int i = 0 ; i < 1000 ; i++ ) { for (int i = 0; i < 1000; i++) {
byte[] data = new String("message-" + i).getBytes(); byte[] data = new String("message-" + i).getBytes();
lh.append(data); lh.append(data);
} }
lh.close(); lh.close();
long elapsed = System.currentTimeMillis() - start; long elapsed = System.currentTimeMillis() - start;
log.info("Entries added to ledgerId " + lh.getId() + ", elapsed=" + elapsed); LOG.info("Entries added to ledgerId " + lh.getId() + ", elapsed=" + elapsed);
Long ledgerId = findLedgerByName(bk,"myledger").orElse(null); Long ledgerId = findLedgerByName(bk, "myledger").orElse(null);
assertNotNull(ledgerId); assertNotNull(ledgerId);
lh = bk.openLedger(ledgerId, BookKeeper.DigestType.MAC, ledgerPassword); lh = bk.openLedger(ledgerId, BookKeeper.DigestType.MAC, ledgerPassword);
long lastId = lh.readLastConfirmed(); long lastId = lh.readLastConfirmed();
Enumeration<LedgerEntry> entries = lh.readEntries(0, lastId); Enumeration<LedgerEntry> entries = lh.readEntries(0, lastId);
while (entries.hasMoreElements()) {
while(entries.hasMoreElements()) {
LedgerEntry entry = entries.nextElement(); LedgerEntry entry = entries.nextElement();
String msg = new String(entry.getEntry()); String msg = new String(entry.getEntry());
log.info("Entry: id=" + entry.getEntryId() + ", data=" + msg); LOG.info("Entry: id=" + entry.getEntryId() + ", data=" + msg);
} }
} }
} }

View File

@ -1,8 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<configuration> <configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <appender name="STDOUT"
class="ch.qos.logback.core.ConsoleAppender">
<encoder> <encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} -
%msg%n
</pattern> </pattern>
</encoder> </encoder>
</appender> </appender>