HDFS-3809. Make BKJM use protobufs for all serialization with ZK. Contributed by Ivan Kelly.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1403606 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Uma Maheswara Rao G 2012-10-30 02:25:31 +00:00
parent 59df5eb19f
commit 1cc0b55c68
13 changed files with 358 additions and 129 deletions

View File

@ -98,6 +98,9 @@ Release 2.0.3-alpha - Unreleased
HDFS-3789. JournalManager#format() should be able to throw IOException. HDFS-3789. JournalManager#format() should be able to throw IOException.
(Ivan Kelly via umamahesh) (Ivan Kelly via umamahesh)
HDFS-3809. Make BKJM use protobufs for all serialization with ZK.
(Ivan Kelly via umamhesh)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -0,0 +1,5 @@
<FindBugsFilter>
<Match>
<Class name="~org.apache.hadoop.contrib.bkjournal.BKJournalProtos.*" />
</Match>
</FindBugsFilter>

View File

@ -89,6 +89,90 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
</dependencies> </dependencies>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${project.build.directory}/generated-sources/java</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<configuration>
<skipTests>false</skipTests>
</configuration>
<executions>
<execution>
<id>compile-proto</id>
<phase>generate-sources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<target>
<echo file="target/compile-proto.sh">
PROTO_DIR=src/main/proto
INCLUDE_DIR=../../main/proto
JAVA_DIR=target/generated-sources/java
which cygpath 2&gt; /dev/null
if [ $? = 1 ]; then
IS_WIN=false
else
IS_WIN=true
WIN_PROTO_DIR=`cygpath --windows $PROTO_DIR`
WIN_JAVA_DIR=`cygpath --windows $JAVA_DIR`
WIN_INCLUDE_DIR=`cygpath --windows $INCLUDE_DIR`
fi
mkdir -p $JAVA_DIR 2&gt; /dev/null
for PROTO_FILE in `ls $PROTO_DIR/*.proto 2&gt; /dev/null`
do
if [ "$IS_WIN" = "true" ]; then
protoc -I$WIN_PROTO_DIR -I$WIN_INCLUDE_DIR --java_out=$WIN_JAVA_DIR $PROTO_FILE
else
protoc -I$PROTO_DIR -I$INCLUDE_DIR --java_out=$JAVA_DIR $PROTO_FILE
fi
done
</echo>
<exec executable="sh" dir="${basedir}" failonerror="true">
<arg line="target/compile-proto.sh"/>
</exec>
</target>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>findbugs-maven-plugin</artifactId>
<configuration>
<excludeFilterFile>${basedir}/dev-support/findbugsExcludeFile.xml</excludeFilterFile>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes>
<exclude>dev-support/findbugsExcludeFile.xml</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
<profiles> <profiles>
<profile> <profile>
<id>dist</id> <id>dist</id>

View File

@ -70,7 +70,7 @@ class BookKeeperEditLogInputStream extends EditLogInputStream {
this.lh = lh; this.lh = lh;
this.firstTxId = metadata.getFirstTxId(); this.firstTxId = metadata.getFirstTxId();
this.lastTxId = metadata.getLastTxId(); this.lastTxId = metadata.getLastTxId();
this.logVersion = metadata.getVersion(); this.logVersion = metadata.getDataLayoutVersion();
this.inProgress = metadata.isInProgress(); this.inProgress = metadata.isInProgress();
if (firstBookKeeperEntry < 0 if (firstBookKeeperEntry < 0

View File

@ -50,6 +50,11 @@ import java.io.IOException;
import java.net.URI; import java.net.URI;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.contrib.bkjournal.BKJournalProtos.VersionProto;
import com.google.protobuf.TextFormat;
import static com.google.common.base.Charsets.UTF_8;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -143,36 +148,17 @@ public class BookKeeperJournalManager implements JournalManager {
private final int quorumSize; private final int quorumSize;
private final String digestpw; private final String digestpw;
private final CountDownLatch zkConnectLatch; private final CountDownLatch zkConnectLatch;
private final NamespaceInfo nsInfo;
private LedgerHandle currentLedger = null; private LedgerHandle currentLedger = null;
private int bytesToInt(byte[] b) {
assert b.length >= 4;
return b[0] << 24 | b[1] << 16 | b[2] << 8 | b[3];
}
private byte[] intToBytes(int i) {
return new byte[] {
(byte)(i >> 24),
(byte)(i >> 16),
(byte)(i >> 8),
(byte)(i) };
}
BookKeeperJournalManager(Configuration conf, URI uri) throws IOException {
this(conf, uri, null);
// TODO(ivank): update BookKeeperJournalManager to do something
// with the NamespaceInfo. This constructor has been added
// for compatibility with the old tests, and may be removed
// when the tests are updated.
}
/** /**
* Construct a Bookkeeper journal manager. * Construct a Bookkeeper journal manager.
*/ */
public BookKeeperJournalManager(Configuration conf, URI uri, public BookKeeperJournalManager(Configuration conf, URI uri,
NamespaceInfo nsInfo) throws IOException { NamespaceInfo nsInfo) throws IOException {
this.conf = conf; this.conf = conf;
this.nsInfo = nsInfo;
String zkConnect = uri.getAuthority().replace(";", ","); String zkConnect = uri.getAuthority().replace(";", ",");
String zkPath = uri.getPath(); String zkPath = uri.getPath();
ensembleSize = conf.getInt(BKJM_BOOKKEEPER_ENSEMBLE_SIZE, ensembleSize = conf.getInt(BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
@ -202,10 +188,32 @@ public class BookKeeperJournalManager implements JournalManager {
Stat versionStat = zkc.exists(versionPath, false); Stat versionStat = zkc.exists(versionPath, false);
if (versionStat != null) { if (versionStat != null) {
byte[] d = zkc.getData(versionPath, false, versionStat); byte[] d = zkc.getData(versionPath, false, versionStat);
VersionProto.Builder builder = VersionProto.newBuilder();
TextFormat.merge(new String(d, UTF_8), builder);
if (!builder.isInitialized()) {
throw new IOException("Invalid/Incomplete data in znode");
}
VersionProto vp = builder.build();
// There's only one version at the moment // There's only one version at the moment
assert bytesToInt(d) == BKJM_LAYOUT_VERSION; assert vp.getLayoutVersion() == BKJM_LAYOUT_VERSION;
} else {
zkc.create(versionPath, intToBytes(BKJM_LAYOUT_VERSION), NamespaceInfo readns = PBHelper.convert(vp.getNamespaceInfo());
if (nsInfo.getNamespaceID() != readns.getNamespaceID() ||
!nsInfo.clusterID.equals(readns.getClusterID()) ||
!nsInfo.getBlockPoolID().equals(readns.getBlockPoolID())) {
String err = String.format("Environment mismatch. Running process %s"
+", stored in ZK %s", nsInfo, readns);
LOG.error(err);
throw new IOException(err);
}
} else if (nsInfo.getNamespaceID() > 0) {
VersionProto.Builder builder = VersionProto.newBuilder();
builder.setNamespaceInfo(PBHelper.convert(nsInfo))
.setLayoutVersion(BKJM_LAYOUT_VERSION);
byte[] data = TextFormat.printToString(builder.build()).getBytes(UTF_8);
zkc.create(versionPath, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} }
@ -214,11 +222,11 @@ public class BookKeeperJournalManager implements JournalManager {
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} }
prepareBookKeeperEnv(); prepareBookKeeperEnv();
bkc = new BookKeeper(new ClientConfiguration(), bkc = new BookKeeper(new ClientConfiguration(), zkc);
zkc);
} catch (KeeperException e) { } catch (KeeperException e) {
throw new IOException("Error initializing zk", e); throw new IOException("Error initializing zk", e);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while initializing bk journal manager", throw new IOException("Interrupted while initializing bk journal manager",
ie); ie);
} }
@ -322,13 +330,14 @@ public class BookKeeperJournalManager implements JournalManager {
} catch (KeeperException ke) { } catch (KeeperException ke) {
throw new IOException("Error in zookeeper while creating ledger", ke); throw new IOException("Error in zookeeper while creating ledger", ke);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted creating ledger", ie); throw new IOException("Interrupted creating ledger", ie);
} }
try { try {
String znodePath = inprogressZNode(txId); String znodePath = inprogressZNode(txId);
EditLogLedgerMetadata l = new EditLogLedgerMetadata(znodePath, EditLogLedgerMetadata l = new EditLogLedgerMetadata(znodePath,
HdfsConstants.LAYOUT_VERSION, currentLedger.getId(), txId); HdfsConstants.LAYOUT_VERSION, currentLedger.getId(), txId);
/* Write the ledger metadata out to the inprogress ledger znode /* Write the ledger metadata out to the inprogress ledger znode
* This can fail if for some reason our write lock has * This can fail if for some reason our write lock has
* expired (@see WriteLock) and another process has managed to * expired (@see WriteLock) and another process has managed to
@ -356,6 +365,7 @@ public class BookKeeperJournalManager implements JournalManager {
//log & ignore, an IOException will be thrown soon //log & ignore, an IOException will be thrown soon
LOG.error("Error closing ledger", bke); LOG.error("Error closing ledger", bke);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.warn("Interrupted while closing ledger", ie); LOG.warn("Interrupted while closing ledger", ie);
} }
} }
@ -425,6 +435,7 @@ public class BookKeeperJournalManager implements JournalManager {
} catch (KeeperException e) { } catch (KeeperException e) {
throw new IOException("Error finalising ledger", e); throw new IOException("Error finalising ledger", e);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IOException("Error finalising ledger", ie); throw new IOException("Error finalising ledger", ie);
} }
} }
@ -454,6 +465,7 @@ public class BookKeeperJournalManager implements JournalManager {
} catch (BKException e) { } catch (BKException e) {
throw new IOException("Could not open ledger for " + fromTxId, e); throw new IOException("Could not open ledger for " + fromTxId, e);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted opening ledger for " throw new IOException("Interrupted opening ledger for "
+ fromTxId, ie); + fromTxId, ie);
} }
@ -567,6 +579,7 @@ public class BookKeeperJournalManager implements JournalManager {
} catch (KeeperException ke) { } catch (KeeperException ke) {
throw new IOException("Couldn't get list of inprogress segments", ke); throw new IOException("Couldn't get list of inprogress segments", ke);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted getting list of inprogress segments", throw new IOException("Interrupted getting list of inprogress segments",
ie); ie);
} }
@ -583,6 +596,7 @@ public class BookKeeperJournalManager implements JournalManager {
zkc.delete(l.getZkPath(), stat.getVersion()); zkc.delete(l.getZkPath(), stat.getVersion());
bkc.deleteLedger(l.getLedgerId()); bkc.deleteLedger(l.getLedgerId());
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while purging " + l, ie); LOG.error("Interrupted while purging " + l, ie);
} catch (BKException bke) { } catch (BKException bke) {
LOG.error("Couldn't delete ledger from bookkeeper", bke); LOG.error("Couldn't delete ledger from bookkeeper", bke);
@ -601,6 +615,7 @@ public class BookKeeperJournalManager implements JournalManager {
} catch (BKException bke) { } catch (BKException bke) {
throw new IOException("Couldn't close bookkeeper client", bke); throw new IOException("Couldn't close bookkeeper client", bke);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while closing journal manager", ie); throw new IOException("Interrupted while closing journal manager", ie);
} }
} }
@ -635,6 +650,7 @@ public class BookKeeperJournalManager implements JournalManager {
} catch (BKException bke) { } catch (BKException bke) {
throw new IOException("Exception opening ledger for " + l, bke); throw new IOException("Exception opening ledger for " + l, bke);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted opening ledger for " + l, ie); throw new IOException("Interrupted opening ledger for " + l, ie);
} }
@ -692,6 +708,7 @@ public class BookKeeperJournalManager implements JournalManager {
} catch (KeeperException e) { } catch (KeeperException e) {
throw new IOException("Exception reading ledger list from zk", e); throw new IOException("Exception reading ledger list from zk", e);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted getting list of ledgers from zk", ie); throw new IOException("Interrupted getting list of ledgers from zk", ie);
} }

View File

@ -29,6 +29,10 @@ import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.data.Stat;
import org.apache.hadoop.contrib.bkjournal.BKJournalProtos.CurrentInprogressProto;
import com.google.protobuf.TextFormat;
import static com.google.common.base.Charsets.UTF_8;
/** /**
* Distributed write permission lock, using ZooKeeper. Read the version number * Distributed write permission lock, using ZooKeeper. Read the version number
* and return the current inprogress node path available in CurrentInprogress * and return the current inprogress node path available in CurrentInprogress
@ -42,29 +46,28 @@ import org.apache.zookeeper.data.Stat;
*/ */
class CurrentInprogress { class CurrentInprogress {
private static final String CONTENT_DELIMITER = ",";
static final Log LOG = LogFactory.getLog(CurrentInprogress.class); static final Log LOG = LogFactory.getLog(CurrentInprogress.class);
private final ZooKeeper zkc; private final ZooKeeper zkc;
private final String currentInprogressNode; private final String currentInprogressNode;
private volatile int versionNumberForPermission = -1; private volatile int versionNumberForPermission = -1;
private static final int CURRENT_INPROGRESS_LAYOUT_VERSION = -1;
private final String hostName = InetAddress.getLocalHost().toString(); private final String hostName = InetAddress.getLocalHost().toString();
CurrentInprogress(ZooKeeper zkc, String lockpath) throws IOException { CurrentInprogress(ZooKeeper zkc, String lockpath) throws IOException {
this.currentInprogressNode = lockpath; this.currentInprogressNode = lockpath;
this.zkc = zkc; this.zkc = zkc;
try { try {
Stat isCurrentInprogressNodeExists = zkc.exists(lockpath, false); Stat isCurrentInprogressNodeExists = zkc.exists(currentInprogressNode,
false);
if (isCurrentInprogressNodeExists == null) { if (isCurrentInprogressNodeExists == null) {
try { try {
zkc.create(lockpath, null, Ids.OPEN_ACL_UNSAFE, zkc.create(currentInprogressNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT); CreateMode.PERSISTENT);
} catch (NodeExistsException e) { } catch (NodeExistsException e) {
// Node might created by other process at the same time. Ignore it. // Node might created by other process at the same time. Ignore it.
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(lockpath + " already created by other process.", e); LOG.debug(currentInprogressNode + " already created by other process.",
e);
} }
} }
} }
@ -83,10 +86,13 @@ class CurrentInprogress {
* @throws IOException * @throws IOException
*/ */
void update(String path) throws IOException { void update(String path) throws IOException {
String content = CURRENT_INPROGRESS_LAYOUT_VERSION CurrentInprogressProto.Builder builder = CurrentInprogressProto.newBuilder();
+ CONTENT_DELIMITER + hostName + CONTENT_DELIMITER + path; builder.setPath(path).setHostname(hostName);
String content = TextFormat.printToString(builder.build());
try { try {
zkc.setData(this.currentInprogressNode, content.getBytes(), zkc.setData(this.currentInprogressNode, content.getBytes(UTF_8),
this.versionNumberForPermission); this.versionNumberForPermission);
} catch (KeeperException e) { } catch (KeeperException e) {
throw new IOException("Exception when setting the data " throw new IOException("Exception when setting the data "
@ -123,23 +129,12 @@ class CurrentInprogress {
} }
this.versionNumberForPermission = stat.getVersion(); this.versionNumberForPermission = stat.getVersion();
if (data != null) { if (data != null) {
String stringData = new String(data); CurrentInprogressProto.Builder builder = CurrentInprogressProto.newBuilder();
LOG.info("Read data[layout version number,hostname,inprogressNode path]" TextFormat.merge(new String(data, UTF_8), builder);
+ "= [" + stringData + "] from CurrentInprogress"); if (!builder.isInitialized()) {
String[] contents = stringData.split(CONTENT_DELIMITER); throw new IOException("Invalid/Incomplete data in znode");
assert contents.length == 3 : "As per the current data format, "
+ "CurrentInprogress node data should contain 3 fields. "
+ "i.e layout version number,hostname,inprogressNode path";
String layoutVersion = contents[0];
if (Long.valueOf(layoutVersion) > CURRENT_INPROGRESS_LAYOUT_VERSION) {
throw new IOException(
"Supported layout version of CurrentInprogress node is : "
+ CURRENT_INPROGRESS_LAYOUT_VERSION
+ " . Layout version of CurrentInprogress node in ZK is : "
+ layoutVersion);
} }
String inprogressNodePath = contents[2]; return builder.build().getPath();
return inprogressNodePath;
} else { } else {
LOG.info("No data available in CurrentInprogress"); LOG.info("No data available in CurrentInprogress");
} }

View File

@ -29,6 +29,10 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.contrib.bkjournal.BKJournalProtos.EditLogLedgerProto;
import com.google.protobuf.TextFormat;
import static com.google.common.base.Charsets.UTF_8;
/** /**
* Utility class for storing the metadata associated * Utility class for storing the metadata associated
* with a single edit log segment, stored in a single ledger * with a single edit log segment, stored in a single ledger
@ -37,8 +41,8 @@ public class EditLogLedgerMetadata {
static final Log LOG = LogFactory.getLog(EditLogLedgerMetadata.class); static final Log LOG = LogFactory.getLog(EditLogLedgerMetadata.class);
private String zkPath; private String zkPath;
private final int dataLayoutVersion;
private final long ledgerId; private final long ledgerId;
private final int version;
private final long firstTxId; private final long firstTxId;
private long lastTxId; private long lastTxId;
private boolean inprogress; private boolean inprogress;
@ -57,21 +61,22 @@ public class EditLogLedgerMetadata {
} }
}; };
EditLogLedgerMetadata(String zkPath, int version, EditLogLedgerMetadata(String zkPath, int dataLayoutVersion,
long ledgerId, long firstTxId) { long ledgerId, long firstTxId) {
this.zkPath = zkPath; this.zkPath = zkPath;
this.dataLayoutVersion = dataLayoutVersion;
this.ledgerId = ledgerId; this.ledgerId = ledgerId;
this.version = version;
this.firstTxId = firstTxId; this.firstTxId = firstTxId;
this.lastTxId = HdfsConstants.INVALID_TXID; this.lastTxId = HdfsConstants.INVALID_TXID;
this.inprogress = true; this.inprogress = true;
} }
EditLogLedgerMetadata(String zkPath, int version, long ledgerId, EditLogLedgerMetadata(String zkPath, int dataLayoutVersion,
long firstTxId, long lastTxId) { long ledgerId, long firstTxId,
long lastTxId) {
this.zkPath = zkPath; this.zkPath = zkPath;
this.dataLayoutVersion = dataLayoutVersion;
this.ledgerId = ledgerId; this.ledgerId = ledgerId;
this.version = version;
this.firstTxId = firstTxId; this.firstTxId = firstTxId;
this.lastTxId = lastTxId; this.lastTxId = lastTxId;
this.inprogress = false; this.inprogress = false;
@ -93,14 +98,14 @@ public class EditLogLedgerMetadata {
return ledgerId; return ledgerId;
} }
int getVersion() {
return version;
}
boolean isInProgress() { boolean isInProgress() {
return this.inprogress; return this.inprogress;
} }
int getDataLayoutVersion() {
return this.dataLayoutVersion;
}
void finalizeLedger(long newLastTxId) { void finalizeLedger(long newLastTxId) {
assert this.lastTxId == HdfsConstants.INVALID_TXID; assert this.lastTxId == HdfsConstants.INVALID_TXID;
this.lastTxId = newLastTxId; this.lastTxId = newLastTxId;
@ -111,22 +116,27 @@ public class EditLogLedgerMetadata {
throws IOException, KeeperException.NoNodeException { throws IOException, KeeperException.NoNodeException {
try { try {
byte[] data = zkc.getData(path, false, null); byte[] data = zkc.getData(path, false, null);
String[] parts = new String(data).split(";");
if (parts.length == 3) { EditLogLedgerProto.Builder builder = EditLogLedgerProto.newBuilder();
int version = Integer.valueOf(parts[0]); if (LOG.isDebugEnabled()) {
long ledgerId = Long.valueOf(parts[1]); LOG.debug("Reading " + path + " data: " + new String(data, UTF_8));
long txId = Long.valueOf(parts[2]); }
return new EditLogLedgerMetadata(path, version, ledgerId, txId); TextFormat.merge(new String(data, UTF_8), builder);
} else if (parts.length == 4) { if (!builder.isInitialized()) {
int version = Integer.valueOf(parts[0]); throw new IOException("Invalid/Incomplete data in znode");
long ledgerId = Long.valueOf(parts[1]); }
long firstTxId = Long.valueOf(parts[2]); EditLogLedgerProto ledger = builder.build();
long lastTxId = Long.valueOf(parts[3]);
return new EditLogLedgerMetadata(path, version, ledgerId, int dataLayoutVersion = ledger.getDataLayoutVersion();
firstTxId, lastTxId); long ledgerId = ledger.getLedgerId();
long firstTxId = ledger.getFirstTxId();
if (ledger.hasLastTxId()) {
long lastTxId = ledger.getLastTxId();
return new EditLogLedgerMetadata(path, dataLayoutVersion,
ledgerId, firstTxId, lastTxId);
} else { } else {
throw new IOException("Invalid ledger entry, " return new EditLogLedgerMetadata(path, dataLayoutVersion,
+ new String(data)); ledgerId, firstTxId);
} }
} catch(KeeperException.NoNodeException nne) { } catch(KeeperException.NoNodeException nne) {
throw nne; throw nne;
@ -140,17 +150,17 @@ public class EditLogLedgerMetadata {
void write(ZooKeeper zkc, String path) void write(ZooKeeper zkc, String path)
throws IOException, KeeperException.NodeExistsException { throws IOException, KeeperException.NodeExistsException {
this.zkPath = path; this.zkPath = path;
String finalisedData;
if (inprogress) { EditLogLedgerProto.Builder builder = EditLogLedgerProto.newBuilder();
finalisedData = String.format("%d;%d;%d", builder.setDataLayoutVersion(dataLayoutVersion)
version, ledgerId, firstTxId); .setLedgerId(ledgerId).setFirstTxId(firstTxId);
} else {
finalisedData = String.format("%d;%d;%d;%d", if (!inprogress) {
version, ledgerId, firstTxId, lastTxId); builder.setLastTxId(lastTxId);
} }
try { try {
zkc.create(path, finalisedData.getBytes(), Ids.OPEN_ACL_UNSAFE, zkc.create(path, TextFormat.printToString(builder.build()).getBytes(UTF_8),
CreateMode.PERSISTENT); Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException nee) { } catch (KeeperException.NodeExistsException nee) {
throw nee; throw nee;
} catch (KeeperException e) { } catch (KeeperException e) {
@ -183,9 +193,9 @@ public class EditLogLedgerMetadata {
} }
EditLogLedgerMetadata ol = (EditLogLedgerMetadata)o; EditLogLedgerMetadata ol = (EditLogLedgerMetadata)o;
return ledgerId == ol.ledgerId return ledgerId == ol.ledgerId
&& dataLayoutVersion == ol.dataLayoutVersion
&& firstTxId == ol.firstTxId && firstTxId == ol.firstTxId
&& lastTxId == ol.lastTxId && lastTxId == ol.lastTxId;
&& version == ol.version;
} }
public int hashCode() { public int hashCode() {
@ -193,15 +203,15 @@ public class EditLogLedgerMetadata {
hash = hash * 31 + (int) ledgerId; hash = hash * 31 + (int) ledgerId;
hash = hash * 31 + (int) firstTxId; hash = hash * 31 + (int) firstTxId;
hash = hash * 31 + (int) lastTxId; hash = hash * 31 + (int) lastTxId;
hash = hash * 31 + (int) version; hash = hash * 31 + (int) dataLayoutVersion;
return hash; return hash;
} }
public String toString() { public String toString() {
return "[LedgerId:"+ledgerId + return "[LedgerId:"+ledgerId +
", firstTxId:" + firstTxId + ", firstTxId:" + firstTxId +
", lastTxId:" + lastTxId + ", lastTxId:" + lastTxId +
", version:" + version + "]"; ", dataLayoutVersion:" + dataLayoutVersion + "]";
} }
} }

View File

@ -27,6 +27,10 @@ import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.data.Stat;
import org.apache.hadoop.contrib.bkjournal.BKJournalProtos.MaxTxIdProto;
import com.google.protobuf.TextFormat;
import static com.google.common.base.Charsets.UTF_8;
/** /**
* Utility class for storing and reading * Utility class for storing and reading
* the max seen txid in zookeeper * the max seen txid in zookeeper
@ -55,14 +59,16 @@ class MaxTxId {
} }
synchronized void reset(long maxTxId) throws IOException { synchronized void reset(long maxTxId) throws IOException {
String txidStr = Long.toString(maxTxId);
try { try {
MaxTxIdProto.Builder builder = MaxTxIdProto.newBuilder().setTxId(maxTxId);
byte[] data = TextFormat.printToString(builder.build()).getBytes(UTF_8);
if (currentStat != null) { if (currentStat != null) {
currentStat = zkc.setData(path, txidStr.getBytes("UTF-8"), currentStat currentStat = zkc.setData(path, data, currentStat
.getVersion()); .getVersion());
} else { } else {
zkc.create(path, txidStr.getBytes("UTF-8"), Ids.OPEN_ACL_UNSAFE, zkc.create(path, data, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT); CreateMode.PERSISTENT);
} }
} catch (KeeperException e) { } catch (KeeperException e) {
throw new IOException("Error writing max tx id", e); throw new IOException("Error writing max tx id", e);
@ -77,9 +83,16 @@ class MaxTxId {
if (currentStat == null) { if (currentStat == null) {
return 0; return 0;
} else { } else {
byte[] bytes = zkc.getData(path, false, currentStat); byte[] bytes = zkc.getData(path, false, currentStat);
String txidString = new String(bytes, "UTF-8");
return Long.valueOf(txidString); MaxTxIdProto.Builder builder = MaxTxIdProto.newBuilder();
TextFormat.merge(new String(bytes, UTF_8), builder);
if (!builder.isInitialized()) {
throw new IOException("Invalid/Incomplete data in znode");
}
return builder.build().getTxId();
} }
} catch (KeeperException e) { } catch (KeeperException e) {
throw new IOException("Error reading the max tx id from zk", e); throw new IOException("Error reading the max tx id from zk", e);

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// This file contains protocol buffers that are used by bkjournal
// mostly for storing data in zookeeper
option java_package = "org.apache.hadoop.contrib.bkjournal";
option java_outer_classname = "BKJournalProtos";
option java_generate_equals_and_hash = true;
import "hdfs.proto";
message VersionProto {
required int32 layoutVersion = 1;
optional NamespaceInfoProto namespaceInfo = 2;
}
message EditLogLedgerProto {
required int32 dataLayoutVersion = 1;
required int64 ledgerId = 2;
required int64 firstTxId = 3;
optional int64 lastTxId = 4;
}
message MaxTxIdProto {
required int64 txId = 1;
}
message CurrentInprogressProto {
required string path = 1;
optional string hostname = 2;
}

View File

@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.Random;
import org.apache.bookkeeper.util.LocalBookKeeper; import org.apache.bookkeeper.util.LocalBookKeeper;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -42,6 +43,8 @@ import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
public class TestBookKeeperConfiguration { public class TestBookKeeperConfiguration {
private static final Log LOG = LogFactory private static final Log LOG = LogFactory
.getLog(TestBookKeeperConfiguration.class); .getLog(TestBookKeeperConfiguration.class);
@ -73,6 +76,11 @@ public class TestBookKeeperConfiguration {
return zkc; return zkc;
} }
private NamespaceInfo newNSInfo() {
Random r = new Random();
return new NamespaceInfo(r.nextInt(), "testCluster", "TestBPID", -1);
}
@BeforeClass @BeforeClass
public static void setupZooKeeper() throws Exception { public static void setupZooKeeper() throws Exception {
// create a ZooKeeper server(dataDir, dataLogDir, port) // create a ZooKeeper server(dataDir, dataLogDir, port)
@ -137,8 +145,10 @@ public class TestBookKeeperConfiguration {
bkAvailablePath); bkAvailablePath);
Assert.assertNull(bkAvailablePath + " already exists", zkc.exists( Assert.assertNull(bkAvailablePath + " already exists", zkc.exists(
bkAvailablePath, false)); bkAvailablePath, false));
bkjm = new BookKeeperJournalManager(conf, URI.create("bookkeeper://" NamespaceInfo nsi = newNSInfo();
+ HOSTPORT + "/hdfsjournal-WithBKPath")); bkjm = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + HOSTPORT + "/hdfsjournal-WithBKPath"),
nsi);
Assert.assertNotNull("Bookie available path : " + bkAvailablePath Assert.assertNotNull("Bookie available path : " + bkAvailablePath
+ " doesn't exists", zkc.exists(bkAvailablePath, false)); + " doesn't exists", zkc.exists(bkAvailablePath, false));
} }
@ -152,8 +162,10 @@ public class TestBookKeeperConfiguration {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
Assert.assertNull(BK_ROOT_PATH + " already exists", zkc.exists( Assert.assertNull(BK_ROOT_PATH + " already exists", zkc.exists(
BK_ROOT_PATH, false)); BK_ROOT_PATH, false));
new BookKeeperJournalManager(conf, URI.create("bookkeeper://" + HOSTPORT NamespaceInfo nsi = newNSInfo();
+ "/hdfsjournal-DefaultBKPath")); bkjm = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + HOSTPORT + "/hdfsjournal-DefaultBKPath"),
nsi);
Assert.assertNotNull("Bookie available path : " + BK_ROOT_PATH Assert.assertNotNull("Bookie available path : " + BK_ROOT_PATH
+ " doesn't exists", zkc.exists(BK_ROOT_PATH, false)); + " doesn't exists", zkc.exists(BK_ROOT_PATH, false));
} }

View File

@ -29,6 +29,7 @@ import org.mockito.Mockito;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.List; import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -37,6 +38,7 @@ import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil; import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
import org.apache.hadoop.hdfs.server.namenode.JournalManager; import org.apache.hadoop.hdfs.server.namenode.JournalManager;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.bookkeeper.proto.BookieServer; import org.apache.bookkeeper.proto.BookieServer;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
@ -78,10 +80,17 @@ public class TestBookKeeperJournalManager {
zkc.close(); zkc.close();
} }
private NamespaceInfo newNSInfo() {
Random r = new Random();
return new NamespaceInfo(r.nextInt(), "testCluster", "TestBPID", -1);
}
@Test @Test
public void testSimpleWrite() throws Exception { public void testSimpleWrite() throws Exception {
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-simplewrite")); BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"), nsi);
EditLogOutputStream out = bkjm.startLogSegment(1); EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1 ; i <= 100; i++) { for (long i = 1 ; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
@ -99,8 +108,10 @@ public class TestBookKeeperJournalManager {
@Test @Test
public void testNumberOfTransactions() throws Exception { public void testNumberOfTransactions() throws Exception {
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-txncount")); BKJMUtil.createJournalURI("/hdfsjournal-txncount"), nsi);
EditLogOutputStream out = bkjm.startLogSegment(1); EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1 ; i <= 100; i++) { for (long i = 1 ; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
@ -116,8 +127,10 @@ public class TestBookKeeperJournalManager {
@Test @Test
public void testNumberOfTransactionsWithGaps() throws Exception { public void testNumberOfTransactionsWithGaps() throws Exception {
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-gaps")); BKJMUtil.createJournalURI("/hdfsjournal-gaps"), nsi);
long txid = 1; long txid = 1;
for (long i = 0; i < 3; i++) { for (long i = 0; i < 3; i++) {
long start = txid; long start = txid;
@ -151,8 +164,10 @@ public class TestBookKeeperJournalManager {
@Test @Test
public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception { public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception {
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-inprogressAtEnd")); BKJMUtil.createJournalURI("/hdfsjournal-inprogressAtEnd"), nsi);
long txid = 1; long txid = 1;
for (long i = 0; i < 3; i++) { for (long i = 0; i < 3; i++) {
long start = txid; long start = txid;
@ -190,8 +205,10 @@ public class TestBookKeeperJournalManager {
*/ */
@Test @Test
public void testWriteRestartFrom1() throws Exception { public void testWriteRestartFrom1() throws Exception {
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-restartFrom1")); BKJMUtil.createJournalURI("/hdfsjournal-restartFrom1"), nsi);
long txid = 1; long txid = 1;
long start = txid; long start = txid;
EditLogOutputStream out = bkjm.startLogSegment(txid); EditLogOutputStream out = bkjm.startLogSegment(txid);
@ -245,11 +262,15 @@ public class TestBookKeeperJournalManager {
@Test @Test
public void testTwoWriters() throws Exception { public void testTwoWriters() throws Exception {
long start = 1; long start = 1;
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-dualWriter")); BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi);
BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-dualWriter")); BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi);
EditLogOutputStream out1 = bkjm1.startLogSegment(start); EditLogOutputStream out1 = bkjm1.startLogSegment(start);
try { try {
bkjm2.startLogSegment(start); bkjm2.startLogSegment(start);
@ -263,8 +284,11 @@ public class TestBookKeeperJournalManager {
@Test @Test
public void testSimpleRead() throws Exception { public void testSimpleRead() throws Exception {
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-simpleread")); BKJMUtil.createJournalURI("/hdfsjournal-simpleread"),
nsi);
final long numTransactions = 10000; final long numTransactions = 10000;
EditLogOutputStream out = bkjm.startLogSegment(1); EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1 ; i <= numTransactions; i++) { for (long i = 1 ; i <= numTransactions; i++) {
@ -287,8 +311,11 @@ public class TestBookKeeperJournalManager {
@Test @Test
public void testSimpleRecovery() throws Exception { public void testSimpleRecovery() throws Exception {
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-simplerecovery")); BKJMUtil.createJournalURI("/hdfsjournal-simplerecovery"),
nsi);
EditLogOutputStream out = bkjm.startLogSegment(1); EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1 ; i <= 100; i++) { for (long i = 1 ; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
@ -334,8 +361,10 @@ public class TestBookKeeperJournalManager {
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE, conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
ensembleSize); ensembleSize);
long txid = 1; long txid = 1;
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-allbookiefailure")); BKJMUtil.createJournalURI("/hdfsjournal-allbookiefailure"),
nsi);
EditLogOutputStream out = bkjm.startLogSegment(txid); EditLogOutputStream out = bkjm.startLogSegment(txid);
for (long i = 1 ; i <= 3; i++) { for (long i = 1 ; i <= 3; i++) {
@ -416,8 +445,12 @@ public class TestBookKeeperJournalManager {
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE, conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
ensembleSize); ensembleSize);
long txid = 1; long txid = 1;
NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure")); BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"),
nsi);
EditLogOutputStream out = bkjm.startLogSegment(txid); EditLogOutputStream out = bkjm.startLogSegment(txid);
for (long i = 1 ; i <= 3; i++) { for (long i = 1 ; i <= 3; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
@ -464,7 +497,9 @@ public class TestBookKeeperJournalManager {
@Test @Test
public void testEmptyInprogressNode() throws Exception { public void testEmptyInprogressNode() throws Exception {
URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogress"); URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogress");
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri); NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
nsi);
EditLogOutputStream out = bkjm.startLogSegment(1); EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1; i <= 100; i++) { for (long i = 1; i <= 100; i++) {
@ -481,7 +516,7 @@ public class TestBookKeeperJournalManager {
String inprogressZNode = bkjm.inprogressZNode(101); String inprogressZNode = bkjm.inprogressZNode(101);
zkc.setData(inprogressZNode, new byte[0], -1); zkc.setData(inprogressZNode, new byte[0], -1);
bkjm = new BookKeeperJournalManager(conf, uri); bkjm = new BookKeeperJournalManager(conf, uri, nsi);
try { try {
bkjm.recoverUnfinalizedSegments(); bkjm.recoverUnfinalizedSegments();
fail("Should have failed. There should be no way of creating" fail("Should have failed. There should be no way of creating"
@ -489,7 +524,7 @@ public class TestBookKeeperJournalManager {
} catch (IOException e) { } catch (IOException e) {
// correct behaviour // correct behaviour
assertTrue("Exception different than expected", e.getMessage().contains( assertTrue("Exception different than expected", e.getMessage().contains(
"Invalid ledger entry,")); "Invalid/Incomplete data in znode"));
} finally { } finally {
bkjm.close(); bkjm.close();
} }
@ -503,7 +538,9 @@ public class TestBookKeeperJournalManager {
@Test @Test
public void testCorruptInprogressNode() throws Exception { public void testCorruptInprogressNode() throws Exception {
URI uri = BKJMUtil.createJournalURI("/hdfsjournal-corruptInprogress"); URI uri = BKJMUtil.createJournalURI("/hdfsjournal-corruptInprogress");
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri); NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
nsi);
EditLogOutputStream out = bkjm.startLogSegment(1); EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1; i <= 100; i++) { for (long i = 1; i <= 100; i++) {
@ -521,7 +558,7 @@ public class TestBookKeeperJournalManager {
String inprogressZNode = bkjm.inprogressZNode(101); String inprogressZNode = bkjm.inprogressZNode(101);
zkc.setData(inprogressZNode, "WholeLottaJunk".getBytes(), -1); zkc.setData(inprogressZNode, "WholeLottaJunk".getBytes(), -1);
bkjm = new BookKeeperJournalManager(conf, uri); bkjm = new BookKeeperJournalManager(conf, uri, nsi);
try { try {
bkjm.recoverUnfinalizedSegments(); bkjm.recoverUnfinalizedSegments();
fail("Should have failed. There should be no way of creating" fail("Should have failed. There should be no way of creating"
@ -529,8 +566,7 @@ public class TestBookKeeperJournalManager {
} catch (IOException e) { } catch (IOException e) {
// correct behaviour // correct behaviour
assertTrue("Exception different than expected", e.getMessage().contains( assertTrue("Exception different than expected", e.getMessage().contains(
"Invalid ledger entry,")); "has no field named"));
} finally { } finally {
bkjm.close(); bkjm.close();
} }
@ -544,7 +580,9 @@ public class TestBookKeeperJournalManager {
@Test @Test
public void testEmptyInprogressLedger() throws Exception { public void testEmptyInprogressLedger() throws Exception {
URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogressLedger"); URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogressLedger");
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri); NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
nsi);
EditLogOutputStream out = bkjm.startLogSegment(1); EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1; i <= 100; i++) { for (long i = 1; i <= 100; i++) {
@ -559,7 +597,7 @@ public class TestBookKeeperJournalManager {
out.close(); out.close();
bkjm.close(); bkjm.close();
bkjm = new BookKeeperJournalManager(conf, uri); bkjm = new BookKeeperJournalManager(conf, uri, nsi);
bkjm.recoverUnfinalizedSegments(); bkjm.recoverUnfinalizedSegments();
out = bkjm.startLogSegment(101); out = bkjm.startLogSegment(101);
for (long i = 1; i <= 100; i++) { for (long i = 1; i <= 100; i++) {
@ -581,7 +619,9 @@ public class TestBookKeeperJournalManager {
public void testRefinalizeAlreadyFinalizedInprogress() throws Exception { public void testRefinalizeAlreadyFinalizedInprogress() throws Exception {
URI uri = BKJMUtil URI uri = BKJMUtil
.createJournalURI("/hdfsjournal-refinalizeInprogressLedger"); .createJournalURI("/hdfsjournal-refinalizeInprogressLedger");
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri); NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
nsi);
EditLogOutputStream out = bkjm.startLogSegment(1); EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1; i <= 100; i++) { for (long i = 1; i <= 100; i++) {
@ -601,7 +641,7 @@ public class TestBookKeeperJournalManager {
byte[] inprogressData = zkc.getData(inprogressZNode, false, null); byte[] inprogressData = zkc.getData(inprogressZNode, false, null);
// finalize // finalize
bkjm = new BookKeeperJournalManager(conf, uri); bkjm = new BookKeeperJournalManager(conf, uri, nsi);
bkjm.recoverUnfinalizedSegments(); bkjm.recoverUnfinalizedSegments();
bkjm.close(); bkjm.close();
@ -613,7 +653,7 @@ public class TestBookKeeperJournalManager {
CreateMode.PERSISTENT); CreateMode.PERSISTENT);
// should work fine // should work fine
bkjm = new BookKeeperJournalManager(conf, uri); bkjm = new BookKeeperJournalManager(conf, uri, nsi);
bkjm.recoverUnfinalizedSegments(); bkjm.recoverUnfinalizedSegments();
bkjm.close(); bkjm.close();
} }
@ -626,7 +666,10 @@ public class TestBookKeeperJournalManager {
@Test @Test
public void testEditLogFileNotExistsWhenReadingMetadata() throws Exception { public void testEditLogFileNotExistsWhenReadingMetadata() throws Exception {
URI uri = BKJMUtil.createJournalURI("/hdfsjournal-editlogfile"); URI uri = BKJMUtil.createJournalURI("/hdfsjournal-editlogfile");
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri); NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
nsi);
try { try {
// start new inprogress log segment with txid=1 // start new inprogress log segment with txid=1
// and write transactions till txid=50 // and write transactions till txid=50