HDFS-234. Integration with BookKeeper logging system. Contributed by Ivan Kelly.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1336103 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Uma Maheswara Rao G 2012-05-09 11:32:40 +00:00
parent 16bdd99dfd
commit 1c52a01422
13 changed files with 2057 additions and 0 deletions

View File

@ -83,6 +83,9 @@ Release 2.0.0 - UNRELEASED
HDFS-3298. Add HdfsDataOutputStream as a public API. (szetszwo)
HDFS-234. Integration with BookKeeper logging system. (Ivan Kelly
via umamahesh)
IMPROVEMENTS
HDFS-2018. Move all journal stream management code into one place.

View File

@ -0,0 +1,66 @@
This module provides a BookKeeper backend for HFDS Namenode write
ahead logging.
BookKeeper is a highly available distributed write ahead logging
system. For more details, see
http://zookeeper.apache.org/bookkeeper
-------------------------------------------------------------------------------
How do I build?
To generate the distribution packages for BK journal, do the
following.
$ mvn clean package -Pdist
This will generate a jar with all the dependencies needed by the journal
manager,
target/hadoop-hdfs-bkjournal-<VERSION>.jar
Note that the -Pdist part of the build command is important, as otherwise
the dependencies would not be packaged in the jar.
-------------------------------------------------------------------------------
How do I use the BookKeeper Journal?
To run a HDFS namenode using BookKeeper as a backend, copy the bkjournal
jar, generated above, into the lib directory of hdfs. In the standard
distribution of HDFS, this is at $HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/
cp target/hadoop-hdfs-bkjournal-<VERSION>.jar \
$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/
Then, in hdfs-site.xml, set the following properties.
<property>
<name>dfs.namenode.edits.dir</name>
<value>bookkeeper://localhost:2181/bkjournal,file:///path/for/edits</value>
</property>
<property>
<name>dfs.namenode.edits.journal-plugin.bookkeeper</name>
<value>org.apache.hadoop.contrib.bkjournal.BookKeeperJournalManager</value>
</property>
In this example, the namenode is configured to use 2 write ahead
logging devices. One writes to BookKeeper and the other to a local
file system. At the moment is is not possible to only write to
BookKeeper, as the resource checker explicitly checked for local
disks currently.
The given example, configures the namenode to look for the journal
metadata at the path /bkjournal on the a standalone zookeeper ensemble
at localhost:2181. To configure a multiple host zookeeper ensemble,
separate the hosts with semicolons. For example, if you have 3
zookeeper servers, zk1, zk2 & zk3, each listening on port 2181, you
would specify this with
bookkeeper://zk1:2181;zk2:2181;zk3:2181/bkjournal
The final part /bkjournal specifies the znode in zookeeper where
ledger metadata will be store. Administrators can set this to anything
they wish.

View File

@ -0,0 +1,114 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project>
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-project</artifactId>
<version>2.0.0-SNAPSHOT</version>
<relativePath>../../../../../hadoop-project</relativePath>
</parent>
<groupId>org.apache.hadoop.contrib</groupId>
<artifactId>hadoop-hdfs-bkjournal</artifactId>
<version>2.0.0-SNAPSHOT</version>
<description>Apache Hadoop HDFS BookKeeper Journal</description>
<name>Apache Hadoop HDFS BookKeeper Journal</name>
<packaging>jar</packaging>
<properties>
<hadoop.component>hdfs</hadoop.component>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>bookkeeper-server</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
<profile>
<id>dist</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.5</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
<includes>
<include>org.apache.bookkeeper:bookkeeper-server</include>
<include>org.apache.zookeeper:zookeeper</include>
<include>org.jboss.netty:netty</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>org.apache.bookkeeper</pattern>
<shadedPattern>hidden.bkjournal.org.apache.bookkeeper</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.zookeeper</pattern>
<shadedPattern>hidden.bkjournal.org.apache.zookeeper</shadedPattern>
</relocation>
<relocation>
<pattern>org.jboss.netty</pattern>
<shadedPattern>hidden.bkjournal.org.jboss.netty</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -0,0 +1,221 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.contrib.bkjournal;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Enumeration;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Input stream which reads from a BookKeeper ledger.
*/
class BookKeeperEditLogInputStream extends EditLogInputStream {
static final Log LOG = LogFactory.getLog(BookKeeperEditLogInputStream.class);
private final long firstTxId;
private final long lastTxId;
private final int logVersion;
private final LedgerHandle lh;
private final FSEditLogOp.Reader reader;
private final FSEditLogLoader.PositionTrackingInputStream tracker;
/**
* Construct BookKeeper edit log input stream.
* Starts reading from the first entry of the ledger.
*/
BookKeeperEditLogInputStream(final LedgerHandle lh,
final EditLogLedgerMetadata metadata)
throws IOException {
this(lh, metadata, 0);
}
/**
* Construct BookKeeper edit log input stream.
* Starts reading from firstBookKeeperEntry. This allows the stream
* to take a shortcut during recovery, as it doesn't have to read
* every edit log transaction to find out what the last one is.
*/
BookKeeperEditLogInputStream(LedgerHandle lh, EditLogLedgerMetadata metadata,
long firstBookKeeperEntry)
throws IOException {
this.lh = lh;
this.firstTxId = metadata.getFirstTxId();
this.lastTxId = metadata.getLastTxId();
this.logVersion = metadata.getVersion();
BufferedInputStream bin = new BufferedInputStream(
new LedgerInputStream(lh, firstBookKeeperEntry));
tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);
DataInputStream in = new DataInputStream(tracker);
reader = new FSEditLogOp.Reader(in, logVersion);
}
@Override
public long getFirstTxId() throws IOException {
return firstTxId;
}
@Override
public long getLastTxId() throws IOException {
return lastTxId;
}
@Override
public int getVersion() throws IOException {
return logVersion;
}
@Override
protected FSEditLogOp nextOp() throws IOException {
return reader.readOp(false);
}
@Override
public void close() throws IOException {
try {
lh.close();
} catch (Exception e) {
throw new IOException("Exception closing ledger", e);
}
}
@Override
public long getPosition() {
return tracker.getPos();
}
@Override
public long length() throws IOException {
return lh.getLength();
}
@Override
public String getName() {
return String.format("BookKeeper[%s,first=%d,last=%d]",
lh.toString(), firstTxId, lastTxId);
}
// TODO(HA): Test this.
@Override
public boolean isInProgress() {
return true;
}
/**
* Input stream implementation which can be used by
* FSEditLogOp.Reader
*/
private static class LedgerInputStream extends InputStream {
private long readEntries;
private InputStream entryStream = null;
private final LedgerHandle lh;
private final long maxEntry;
/**
* Construct ledger input stream
* @param lh the ledger handle to read from
* @param firstBookKeeperEntry ledger entry to start reading from
*/
LedgerInputStream(LedgerHandle lh, long firstBookKeeperEntry)
throws IOException {
this.lh = lh;
readEntries = firstBookKeeperEntry;
try {
maxEntry = lh.getLastAddConfirmed();
} catch (Exception e) {
throw new IOException("Error reading last entry id", e);
}
}
/**
* Get input stream representing next entry in the
* ledger.
* @return input stream, or null if no more entries
*/
private InputStream nextStream() throws IOException {
try {
if (readEntries > maxEntry) {
return null;
}
Enumeration<LedgerEntry> entries
= lh.readEntries(readEntries, readEntries);
readEntries++;
if (entries.hasMoreElements()) {
LedgerEntry e = entries.nextElement();
assert !entries.hasMoreElements();
return e.getEntryInputStream();
}
} catch (Exception e) {
throw new IOException("Error reading entries from bookkeeper", e);
}
return null;
}
@Override
public int read() throws IOException {
byte[] b = new byte[1];
if (read(b, 0, 1) != 1) {
return -1;
} else {
return b[0];
}
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
try {
int read = 0;
if (entryStream == null) {
entryStream = nextStream();
if (entryStream == null) {
return read;
}
}
while (read < len) {
int thisread = entryStream.read(b, off+read, (len-read));
if (thisread == -1) {
entryStream = nextStream();
if (entryStream == null) {
return read;
}
} else {
read += thisread;
}
}
return read;
} catch (IOException e) {
throw e;
}
}
}
}

View File

@ -0,0 +1,177 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.contrib.bkjournal;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.CountDownLatch;
import java.util.Arrays;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Writer;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.io.DataOutputBuffer;
import java.io.IOException;
/**
* Output stream for BookKeeper Journal.
* Multiple complete edit log entries are packed into a single bookkeeper
* entry before sending it over the network. The fact that the edit log entries
* are complete in the bookkeeper entries means that each bookkeeper log entry
*can be read as a complete edit log. This is useful for recover, as we don't
* need to read through the entire edit log segment to get the last written
* entry.
*/
class BookKeeperEditLogOutputStream
extends EditLogOutputStream implements AddCallback {
private final DataOutputBuffer bufCurrent;
private final AtomicInteger outstandingRequests;
private final int transmissionThreshold;
private final LedgerHandle lh;
private CountDownLatch syncLatch;
private final WriteLock wl;
private final Writer writer;
/**
* Construct an edit log output stream which writes to a ledger.
*/
protected BookKeeperEditLogOutputStream(Configuration conf,
LedgerHandle lh, WriteLock wl)
throws IOException {
super();
bufCurrent = new DataOutputBuffer();
outstandingRequests = new AtomicInteger(0);
syncLatch = null;
this.lh = lh;
this.wl = wl;
this.wl.acquire();
this.writer = new Writer(bufCurrent);
this.transmissionThreshold
= conf.getInt(BookKeeperJournalManager.BKJM_OUTPUT_BUFFER_SIZE,
BookKeeperJournalManager.BKJM_OUTPUT_BUFFER_SIZE_DEFAULT);
}
@Override
public void create() throws IOException {
// noop
}
@Override
public void close() throws IOException {
setReadyToFlush();
flushAndSync();
try {
lh.close();
} catch (InterruptedException ie) {
throw new IOException("Interrupted waiting on close", ie);
} catch (BKException bke) {
throw new IOException("BookKeeper error during close", bke);
}
}
@Override
public void abort() throws IOException {
try {
lh.close();
} catch (InterruptedException ie) {
throw new IOException("Interrupted waiting on close", ie);
} catch (BKException bke) {
throw new IOException("BookKeeper error during abort", bke);
}
wl.release();
}
@Override
public void writeRaw(final byte[] data, int off, int len) throws IOException {
throw new IOException("Not supported for BK");
}
@Override
public void write(FSEditLogOp op) throws IOException {
wl.checkWriteLock();
writer.writeOp(op);
if (bufCurrent.getLength() > transmissionThreshold) {
transmit();
}
}
@Override
public void setReadyToFlush() throws IOException {
wl.checkWriteLock();
transmit();
synchronized(this) {
syncLatch = new CountDownLatch(outstandingRequests.get());
}
}
@Override
public void flushAndSync() throws IOException {
wl.checkWriteLock();
assert(syncLatch != null);
try {
syncLatch.await();
} catch (InterruptedException ie) {
throw new IOException("Interrupted waiting on latch", ie);
}
syncLatch = null;
// wait for whatever we wait on
}
/**
* Transmit the current buffer to bookkeeper.
* Synchronised at the FSEditLog level. #write() and #setReadyToFlush()
* are never called at the same time.
*/
private void transmit() throws IOException {
wl.checkWriteLock();
if (bufCurrent.getLength() > 0) {
byte[] entry = Arrays.copyOf(bufCurrent.getData(),
bufCurrent.getLength());
lh.asyncAddEntry(entry, this, null);
bufCurrent.reset();
outstandingRequests.incrementAndGet();
}
}
@Override
public void addComplete(int rc, LedgerHandle handle,
long entryId, Object ctx) {
synchronized(this) {
outstandingRequests.decrementAndGet();
CountDownLatch l = syncLatch;
if (l != null) {
l.countDown();
}
}
}
}

View File

@ -0,0 +1,512 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.contrib.bkjournal;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.JournalManager;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.conf.Configuration;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import java.util.Collections;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.io.IOException;
import java.net.URI;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* BookKeeper Journal Manager
*
* To use, add the following to hdfs-site.xml.
* <pre>
* {@code
* <property>
* <name>dfs.namenode.edits.dir</name>
* <value>bookkeeper://zk1:2181;zk2:2181;zk3:2181/hdfsjournal</value>
* </property>
*
* <property>
* <name>dfs.namenode.edits.journalPlugin.bookkeeper</name>
* <value>org.apache.hadoop.contrib.bkjournal.BookKeeperJournalManager</value>
* </property>
* }
* </pre>
* The URI format for bookkeeper is bookkeeper://[zkEnsemble]/[rootZnode]
* [zookkeeper ensemble] is a list of semi-colon separated, zookeeper host:port
* pairs. In the example above there are 3 servers, in the ensemble,
* zk1, zk2 &amp; zk3, each one listening on port 2181.
*
* [root znode] is the path of the zookeeper znode, under which the editlog
* information will be stored.
*
* Other configuration options are:
* <ul>
* <li><b>dfs.namenode.bookkeeperjournal.output-buffer-size</b>
* Number of bytes a bookkeeper journal stream will buffer before
* forcing a flush. Default is 1024.</li>
* <li><b>dfs.namenode.bookkeeperjournal.ensemble-size</b>
* Number of bookkeeper servers in edit log ledger ensembles. This
* is the number of bookkeeper servers which need to be available
* for the ledger to be writable. Default is 3.</li>
* <li><b>dfs.namenode.bookkeeperjournal.quorum-size</b>
* Number of bookkeeper servers in the write quorum. This is the
* number of bookkeeper servers which must have acknowledged the
* write of an entry before it is considered written.
* Default is 2.</li>
* <li><b>dfs.namenode.bookkeeperjournal.digestPw</b>
* Password to use when creating ledgers. </li>
* </ul>
*/
public class BookKeeperJournalManager implements JournalManager {
static final Log LOG = LogFactory.getLog(BookKeeperJournalManager.class);
public static final String BKJM_OUTPUT_BUFFER_SIZE
= "dfs.namenode.bookkeeperjournal.output-buffer-size";
public static final int BKJM_OUTPUT_BUFFER_SIZE_DEFAULT = 1024;
public static final String BKJM_BOOKKEEPER_ENSEMBLE_SIZE
= "dfs.namenode.bookkeeperjournal.ensemble-size";
public static final int BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT = 3;
public static final String BKJM_BOOKKEEPER_QUORUM_SIZE
= "dfs.namenode.bookkeeperjournal.quorum-size";
public static final int BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT = 2;
public static final String BKJM_BOOKKEEPER_DIGEST_PW
= "dfs.namenode.bookkeeperjournal.digestPw";
public static final String BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT = "";
private static final int BKJM_LAYOUT_VERSION = -1;
private final ZooKeeper zkc;
private final Configuration conf;
private final BookKeeper bkc;
private final WriteLock wl;
private final String ledgerPath;
private final MaxTxId maxTxId;
private final int ensembleSize;
private final int quorumSize;
private final String digestpw;
private final CountDownLatch zkConnectLatch;
private LedgerHandle currentLedger = null;
private int bytesToInt(byte[] b) {
assert b.length >= 4;
return b[0] << 24 | b[1] << 16 | b[2] << 8 | b[3];
}
private byte[] intToBytes(int i) {
return new byte[] {
(byte)(i >> 24),
(byte)(i >> 16),
(byte)(i >> 8),
(byte)(i) };
}
/**
* Construct a Bookkeeper journal manager.
*/
public BookKeeperJournalManager(Configuration conf, URI uri)
throws IOException {
this.conf = conf;
String zkConnect = uri.getAuthority().replace(";", ",");
String zkPath = uri.getPath();
ensembleSize = conf.getInt(BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT);
quorumSize = conf.getInt(BKJM_BOOKKEEPER_QUORUM_SIZE,
BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT);
ledgerPath = zkPath + "/ledgers";
String maxTxIdPath = zkPath + "/maxtxid";
String lockPath = zkPath + "/lock";
String versionPath = zkPath + "/version";
digestpw = conf.get(BKJM_BOOKKEEPER_DIGEST_PW,
BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT);
try {
zkConnectLatch = new CountDownLatch(1);
zkc = new ZooKeeper(zkConnect, 3000, new ZkConnectionWatcher());
if (!zkConnectLatch.await(6000, TimeUnit.MILLISECONDS)) {
throw new IOException("Error connecting to zookeeper");
}
if (zkc.exists(zkPath, false) == null) {
zkc.create(zkPath, new byte[] {'0'},
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
Stat versionStat = zkc.exists(versionPath, false);
if (versionStat != null) {
byte[] d = zkc.getData(versionPath, false, versionStat);
// There's only one version at the moment
assert bytesToInt(d) == BKJM_LAYOUT_VERSION;
} else {
zkc.create(versionPath, intToBytes(BKJM_LAYOUT_VERSION),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
if (zkc.exists(ledgerPath, false) == null) {
zkc.create(ledgerPath, new byte[] {'0'},
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
bkc = new BookKeeper(new ClientConfiguration(),
zkc);
} catch (Exception e) {
throw new IOException("Error initializing zk", e);
}
wl = new WriteLock(zkc, lockPath);
maxTxId = new MaxTxId(zkc, maxTxIdPath);
}
/**
* Start a new log segment in a BookKeeper ledger.
* First ensure that we have the write lock for this journal.
* Then create a ledger and stream based on that ledger.
* The ledger id is written to the inprogress znode, so that in the
* case of a crash, a recovery process can find the ledger we were writing
* to when we crashed.
* @param txId First transaction id to be written to the stream
*/
@Override
public EditLogOutputStream startLogSegment(long txId) throws IOException {
wl.acquire();
if (txId <= maxTxId.get()) {
throw new IOException("We've already seen " + txId
+ ". A new stream cannot be created with it");
}
if (currentLedger != null) {
throw new IOException("Already writing to a ledger, id="
+ currentLedger.getId());
}
try {
currentLedger = bkc.createLedger(ensembleSize, quorumSize,
BookKeeper.DigestType.MAC,
digestpw.getBytes());
String znodePath = inprogressZNode();
EditLogLedgerMetadata l = new EditLogLedgerMetadata(znodePath,
HdfsConstants.LAYOUT_VERSION, currentLedger.getId(), txId);
/* Write the ledger metadata out to the inprogress ledger znode
* This can fail if for some reason our write lock has
* expired (@see WriteLock) and another process has managed to
* create the inprogress znode.
* In this case, throw an exception. We don't want to continue
* as this would lead to a split brain situation.
*/
l.write(zkc, znodePath);
return new BookKeeperEditLogOutputStream(conf, currentLedger, wl);
} catch (Exception e) {
if (currentLedger != null) {
try {
currentLedger.close();
} catch (Exception e2) {
//log & ignore, an IOException will be thrown soon
LOG.error("Error closing ledger", e2);
}
}
throw new IOException("Error creating ledger", e);
}
}
/**
* Finalize a log segment. If the journal manager is currently
* writing to a ledger, ensure that this is the ledger of the log segment
* being finalized.
*
* Otherwise this is the recovery case. In the recovery case, ensure that
* the firstTxId of the ledger matches firstTxId for the segment we are
* trying to finalize.
*/
@Override
public void finalizeLogSegment(long firstTxId, long lastTxId)
throws IOException {
String inprogressPath = inprogressZNode();
try {
Stat inprogressStat = zkc.exists(inprogressPath, false);
if (inprogressStat == null) {
throw new IOException("Inprogress znode " + inprogressPath
+ " doesn't exist");
}
wl.checkWriteLock();
EditLogLedgerMetadata l
= EditLogLedgerMetadata.read(zkc, inprogressPath);
if (currentLedger != null) { // normal, non-recovery case
if (l.getLedgerId() == currentLedger.getId()) {
try {
currentLedger.close();
} catch (BKException bke) {
LOG.error("Error closing current ledger", bke);
}
currentLedger = null;
} else {
throw new IOException(
"Active ledger has different ID to inprogress. "
+ l.getLedgerId() + " found, "
+ currentLedger.getId() + " expected");
}
}
if (l.getFirstTxId() != firstTxId) {
throw new IOException("Transaction id not as expected, "
+ l.getFirstTxId() + " found, " + firstTxId + " expected");
}
l.finalizeLedger(lastTxId);
String finalisedPath = finalizedLedgerZNode(firstTxId, lastTxId);
try {
l.write(zkc, finalisedPath);
} catch (KeeperException.NodeExistsException nee) {
if (!l.verify(zkc, finalisedPath)) {
throw new IOException("Node " + finalisedPath + " already exists"
+ " but data doesn't match");
}
}
maxTxId.store(lastTxId);
zkc.delete(inprogressPath, inprogressStat.getVersion());
} catch (KeeperException e) {
throw new IOException("Error finalising ledger", e);
} catch (InterruptedException ie) {
throw new IOException("Error finalising ledger", ie);
} finally {
wl.release();
}
}
// TODO(HA): Handle inProgressOk
@Override
public EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk)
throws IOException {
for (EditLogLedgerMetadata l : getLedgerList()) {
if (l.getFirstTxId() == fromTxnId) {
try {
LedgerHandle h = bkc.openLedger(l.getLedgerId(),
BookKeeper.DigestType.MAC,
digestpw.getBytes());
return new BookKeeperEditLogInputStream(h, l);
} catch (Exception e) {
throw new IOException("Could not open ledger for " + fromTxnId, e);
}
}
}
throw new IOException("No ledger for fromTxnId " + fromTxnId + " found.");
}
// TODO(HA): Handle inProgressOk
@Override
public long getNumberOfTransactions(long fromTxnId, boolean inProgressOk)
throws IOException {
long count = 0;
long expectedStart = 0;
for (EditLogLedgerMetadata l : getLedgerList()) {
if (l.isInProgress()) {
long endTxId = recoverLastTxId(l);
if (endTxId == HdfsConstants.INVALID_TXID) {
break;
}
count += (endTxId - l.getFirstTxId()) + 1;
break;
}
if (l.getFirstTxId() < fromTxnId) {
continue;
} else if (l.getFirstTxId() == fromTxnId) {
count = (l.getLastTxId() - l.getFirstTxId()) + 1;
expectedStart = l.getLastTxId() + 1;
} else {
if (expectedStart != l.getFirstTxId()) {
if (count == 0) {
throw new CorruptionException("StartTxId " + l.getFirstTxId()
+ " is not as expected " + expectedStart
+ ". Gap in transaction log?");
} else {
break;
}
}
count += (l.getLastTxId() - l.getFirstTxId()) + 1;
expectedStart = l.getLastTxId() + 1;
}
}
return count;
}
@Override
public void recoverUnfinalizedSegments() throws IOException {
wl.acquire();
synchronized (this) {
try {
EditLogLedgerMetadata l
= EditLogLedgerMetadata.read(zkc, inprogressZNode());
long endTxId = recoverLastTxId(l);
if (endTxId == HdfsConstants.INVALID_TXID) {
LOG.error("Unrecoverable corruption has occurred in segment "
+ l.toString() + " at path " + inprogressZNode()
+ ". Unable to continue recovery.");
throw new IOException("Unrecoverable corruption, please check logs.");
}
finalizeLogSegment(l.getFirstTxId(), endTxId);
} catch (KeeperException.NoNodeException nne) {
// nothing to recover, ignore
} finally {
if (wl.haveLock()) {
wl.release();
}
}
}
}
@Override
public void purgeLogsOlderThan(long minTxIdToKeep)
throws IOException {
for (EditLogLedgerMetadata l : getLedgerList()) {
if (!l.isInProgress()
&& l.getLastTxId() < minTxIdToKeep) {
try {
Stat stat = zkc.exists(l.getZkPath(), false);
zkc.delete(l.getZkPath(), stat.getVersion());
bkc.deleteLedger(l.getLedgerId());
} catch (InterruptedException ie) {
LOG.error("Interrupted while purging " + l, ie);
} catch (BKException bke) {
LOG.error("Couldn't delete ledger from bookkeeper", bke);
} catch (KeeperException ke) {
LOG.error("Error deleting ledger entry in zookeeper", ke);
}
}
}
}
@Override
public void close() throws IOException {
try {
bkc.close();
zkc.close();
} catch (Exception e) {
throw new IOException("Couldn't close zookeeper client", e);
}
}
/**
* Set the amount of memory that this stream should use to buffer edits.
* Setting this will only affect future output stream. Streams
* which have currently be created won't be affected.
*/
@Override
public void setOutputBufferCapacity(int size) {
conf.getInt(BKJM_OUTPUT_BUFFER_SIZE, size);
}
/**
* Find the id of the last edit log transaction writen to a edit log
* ledger.
*/
private long recoverLastTxId(EditLogLedgerMetadata l) throws IOException {
try {
LedgerHandle lh = bkc.openLedger(l.getLedgerId(),
BookKeeper.DigestType.MAC,
digestpw.getBytes());
long lastAddConfirmed = lh.getLastAddConfirmed();
BookKeeperEditLogInputStream in
= new BookKeeperEditLogInputStream(lh, l, lastAddConfirmed);
long endTxId = HdfsConstants.INVALID_TXID;
FSEditLogOp op = in.readOp();
while (op != null) {
if (endTxId == HdfsConstants.INVALID_TXID
|| op.getTransactionId() == endTxId+1) {
endTxId = op.getTransactionId();
}
op = in.readOp();
}
return endTxId;
} catch (Exception e) {
throw new IOException("Exception retreiving last tx id for ledger " + l,
e);
}
}
/**
* Get a list of all segments in the journal.
*/
private List<EditLogLedgerMetadata> getLedgerList() throws IOException {
List<EditLogLedgerMetadata> ledgers
= new ArrayList<EditLogLedgerMetadata>();
try {
List<String> ledgerNames = zkc.getChildren(ledgerPath, false);
for (String n : ledgerNames) {
ledgers.add(EditLogLedgerMetadata.read(zkc, ledgerPath + "/" + n));
}
} catch (Exception e) {
throw new IOException("Exception reading ledger list from zk", e);
}
Collections.sort(ledgers, EditLogLedgerMetadata.COMPARATOR);
return ledgers;
}
/**
* Get the znode path for a finalize ledger
*/
String finalizedLedgerZNode(long startTxId, long endTxId) {
return String.format("%s/edits_%018d_%018d",
ledgerPath, startTxId, endTxId);
}
/**
* Get the znode path for the inprogressZNode
*/
String inprogressZNode() {
return ledgerPath + "/inprogress";
}
/**
* Simple watcher to notify when zookeeper has connected
*/
private class ZkConnectionWatcher implements Watcher {
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected.equals(event.getState())) {
zkConnectLatch.countDown();
}
}
}
}

View File

@ -0,0 +1,200 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.contrib.bkjournal;
import java.io.IOException;
import java.util.Comparator;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.KeeperException;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Utility class for storing the metadata associated
* with a single edit log segment, stored in a single ledger
*/
public class EditLogLedgerMetadata {
static final Log LOG = LogFactory.getLog(EditLogLedgerMetadata.class);
private String zkPath;
private final long ledgerId;
private final int version;
private final long firstTxId;
private long lastTxId;
private boolean inprogress;
public static final Comparator COMPARATOR
= new Comparator<EditLogLedgerMetadata>() {
public int compare(EditLogLedgerMetadata o1,
EditLogLedgerMetadata o2) {
if (o1.firstTxId < o2.firstTxId) {
return -1;
} else if (o1.firstTxId == o2.firstTxId) {
return 0;
} else {
return 1;
}
}
};
EditLogLedgerMetadata(String zkPath, int version,
long ledgerId, long firstTxId) {
this.zkPath = zkPath;
this.ledgerId = ledgerId;
this.version = version;
this.firstTxId = firstTxId;
this.lastTxId = HdfsConstants.INVALID_TXID;
this.inprogress = true;
}
EditLogLedgerMetadata(String zkPath, int version, long ledgerId,
long firstTxId, long lastTxId) {
this.zkPath = zkPath;
this.ledgerId = ledgerId;
this.version = version;
this.firstTxId = firstTxId;
this.lastTxId = lastTxId;
this.inprogress = false;
}
String getZkPath() {
return zkPath;
}
long getFirstTxId() {
return firstTxId;
}
long getLastTxId() {
return lastTxId;
}
long getLedgerId() {
return ledgerId;
}
int getVersion() {
return version;
}
boolean isInProgress() {
return this.inprogress;
}
void finalizeLedger(long newLastTxId) {
assert this.lastTxId == HdfsConstants.INVALID_TXID;
this.lastTxId = newLastTxId;
this.inprogress = false;
}
static EditLogLedgerMetadata read(ZooKeeper zkc, String path)
throws IOException, KeeperException.NoNodeException {
try {
byte[] data = zkc.getData(path, false, null);
String[] parts = new String(data).split(";");
if (parts.length == 3) {
int version = Integer.valueOf(parts[0]);
long ledgerId = Long.valueOf(parts[1]);
long txId = Long.valueOf(parts[2]);
return new EditLogLedgerMetadata(path, version, ledgerId, txId);
} else if (parts.length == 4) {
int version = Integer.valueOf(parts[0]);
long ledgerId = Long.valueOf(parts[1]);
long firstTxId = Long.valueOf(parts[2]);
long lastTxId = Long.valueOf(parts[3]);
return new EditLogLedgerMetadata(path, version, ledgerId,
firstTxId, lastTxId);
} else {
throw new IOException("Invalid ledger entry, "
+ new String(data));
}
} catch(KeeperException.NoNodeException nne) {
throw nne;
} catch(Exception e) {
throw new IOException("Error reading from zookeeper", e);
}
}
void write(ZooKeeper zkc, String path)
throws IOException, KeeperException.NodeExistsException {
this.zkPath = path;
String finalisedData;
if (inprogress) {
finalisedData = String.format("%d;%d;%d",
version, ledgerId, firstTxId);
} else {
finalisedData = String.format("%d;%d;%d;%d",
version, ledgerId, firstTxId, lastTxId);
}
try {
zkc.create(path, finalisedData.getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException nee) {
throw nee;
} catch (Exception e) {
throw new IOException("Error creating ledger znode");
}
}
boolean verify(ZooKeeper zkc, String path) {
try {
EditLogLedgerMetadata other = read(zkc, path);
if (LOG.isTraceEnabled()) {
LOG.trace("Verifying " + this.toString()
+ " against " + other);
}
return other == this;
} catch (Exception e) {
LOG.error("Couldn't verify data in " + path, e);
return false;
}
}
public boolean equals(Object o) {
if (!(o instanceof EditLogLedgerMetadata)) {
return false;
}
EditLogLedgerMetadata ol = (EditLogLedgerMetadata)o;
return ledgerId == ol.ledgerId
&& firstTxId == ol.firstTxId
&& lastTxId == ol.lastTxId
&& version == ol.version;
}
public int hashCode() {
int hash = 1;
hash = hash * 31 + (int)ledgerId;
hash = hash * 31 + (int)firstTxId;
hash = hash * 31 + (int)lastTxId;
hash = hash * 31 + (int)version;
return hash;
}
public String toString() {
return "[LedgerId:"+ledgerId +
", firstTxId:" + firstTxId +
", lastTxId:" + lastTxId +
", version:" + version + "]";
}
}

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.contrib.bkjournal;
import java.io.IOException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Utility class for storing and reading
* the max seen txid in zookeeper
*/
class MaxTxId {
static final Log LOG = LogFactory.getLog(MaxTxId.class);
private final ZooKeeper zkc;
private final String path;
private Stat currentStat;
MaxTxId(ZooKeeper zkc, String path) {
this.zkc = zkc;
this.path = path;
}
synchronized void store(long maxTxId) throws IOException {
long currentMax = get();
if (currentMax < maxTxId) {
if (LOG.isTraceEnabled()) {
LOG.trace("Setting maxTxId to " + maxTxId);
}
String txidStr = Long.toString(maxTxId);
try {
if (currentStat != null) {
currentStat = zkc.setData(path, txidStr.getBytes("UTF-8"),
currentStat.getVersion());
} else {
zkc.create(path, txidStr.getBytes("UTF-8"),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
throw new IOException("Error writing max tx id", e);
}
}
}
synchronized long get() throws IOException {
try {
currentStat = zkc.exists(path, false);
if (currentStat == null) {
return 0;
} else {
byte[] bytes = zkc.getData(path, false, currentStat);
String txidString = new String(bytes, "UTF-8");
return Long.valueOf(txidString);
}
} catch (Exception e) {
throw new IOException("Error reading the max tx id from zk", e);
}
}
}

View File

@ -0,0 +1,186 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.contrib.bkjournal;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.List;
import java.util.Collections;
import java.util.Comparator;
import java.net.InetAddress;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Distributed lock, using ZooKeeper.
*
* The lock is vulnerable to timing issues. For example, the process could
* encounter a really long GC cycle between acquiring the lock, and writing to
* a ledger. This could have timed out the lock, and another process could have
* acquired the lock and started writing to bookkeeper. Therefore other
* mechanisms are required to ensure correctness (i.e. Fencing).
*/
class WriteLock implements Watcher {
static final Log LOG = LogFactory.getLog(WriteLock.class);
private final ZooKeeper zkc;
private final String lockpath;
private AtomicInteger lockCount = new AtomicInteger(0);
private String myznode = null;
WriteLock(ZooKeeper zkc, String lockpath) throws IOException {
this.lockpath = lockpath;
this.zkc = zkc;
try {
if (zkc.exists(lockpath, false) == null) {
String localString = InetAddress.getLocalHost().toString();
zkc.create(lockpath, localString.getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
throw new IOException("Exception accessing Zookeeper", e);
}
}
void acquire() throws IOException {
while (true) {
if (lockCount.get() == 0) {
try {
synchronized(this) {
if (lockCount.get() > 0) {
lockCount.incrementAndGet();
return;
}
myznode = zkc.create(lockpath + "/lock-", new byte[] {'0'},
Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
if (LOG.isTraceEnabled()) {
LOG.trace("Acquiring lock, trying " + myznode);
}
List<String> nodes = zkc.getChildren(lockpath, false);
Collections.sort(nodes, new Comparator<String>() {
public int compare(String o1,
String o2) {
Integer l1 = Integer.valueOf(o1.replace("lock-", ""));
Integer l2 = Integer.valueOf(o2.replace("lock-", ""));
return l1 - l2;
}
});
if ((lockpath + "/" + nodes.get(0)).equals(myznode)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Lock acquired - " + myznode);
}
lockCount.set(1);
zkc.exists(myznode, this);
return;
} else {
LOG.error("Failed to acquire lock with " + myznode
+ ", " + nodes.get(0) + " already has it");
throw new IOException("Could not acquire lock");
}
}
} catch (KeeperException e) {
throw new IOException("Exception accessing Zookeeper", e);
} catch (InterruptedException ie) {
throw new IOException("Exception accessing Zookeeper", ie);
}
} else {
int ret = lockCount.getAndIncrement();
if (ret == 0) {
lockCount.decrementAndGet();
continue; // try again;
} else {
return;
}
}
}
}
void release() throws IOException {
try {
if (lockCount.decrementAndGet() <= 0) {
if (lockCount.get() < 0) {
LOG.warn("Unbalanced lock handling somewhere, lockCount down to "
+ lockCount.get());
}
synchronized(this) {
if (lockCount.get() <= 0) {
if (LOG.isTraceEnabled()) {
LOG.trace("releasing lock " + myznode);
}
if (myznode != null) {
zkc.delete(myznode, -1);
myznode = null;
}
}
}
}
} catch (Exception e) {
throw new IOException("Exception accessing Zookeeper", e);
}
}
public void checkWriteLock() throws IOException {
if (!haveLock()) {
throw new IOException("Lost writer lock");
}
}
boolean haveLock() throws IOException {
return lockCount.get() > 0;
}
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.Disconnected
|| event.getState() == KeeperState.Expired) {
LOG.warn("Lost zookeeper session, lost lock ");
lockCount.set(0);
} else {
// reapply the watch
synchronized (this) {
LOG.info("Zookeeper event " + event
+ " received, reapplying watch to " + myznode);
if (myznode != null) {
try {
zkc.exists(myznode, this);
} catch (Exception e) {
LOG.warn("Could not set watch on lock, releasing", e);
try {
release();
} catch (IOException ioe) {
LOG.error("Could not release Zk lock", ioe);
}
}
}
}
}
}
}

View File

@ -0,0 +1,395 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.contrib.bkjournal;
import static org.junit.Assert.*;
import java.net.URI;
import java.util.Collections;
import java.util.Arrays;
import java.util.List;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.util.LocalBookKeeper;
import java.io.RandomAccessFile;
import java.io.File;
import java.io.FilenameFilter;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.security.SecurityUtil;
import org.junit.Test;
import org.junit.Before;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.AfterClass;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.setupEdits;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
import org.apache.hadoop.hdfs.server.namenode.JournalManager;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.KeeperException;
import com.google.common.collect.ImmutableList;
import java.util.zip.CheckedInputStream;
import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class TestBookKeeperJournalManager {
static final Log LOG = LogFactory.getLog(TestBookKeeperJournalManager.class);
private static final long DEFAULT_SEGMENT_SIZE = 1000;
private static final String zkEnsemble = "localhost:2181";
private static Thread bkthread;
protected static Configuration conf = new Configuration();
private ZooKeeper zkc;
private static ZooKeeper connectZooKeeper(String ensemble)
throws IOException, KeeperException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
ZooKeeper zkc = new ZooKeeper(zkEnsemble, 3600, new Watcher() {
public void process(WatchedEvent event) {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
latch.countDown();
}
}
});
if (!latch.await(3, TimeUnit.SECONDS)) {
throw new IOException("Zookeeper took too long to connect");
}
return zkc;
}
@BeforeClass
public static void setupBookkeeper() throws Exception {
final int numBookies = 5;
bkthread = new Thread() {
public void run() {
try {
String[] args = new String[1];
args[0] = String.valueOf(numBookies);
LOG.info("Starting bk");
LocalBookKeeper.main(args);
} catch (InterruptedException e) {
// go away quietly
} catch (Exception e) {
LOG.error("Error starting local bk", e);
}
}
};
bkthread.start();
if (!LocalBookKeeper.waitForServerUp(zkEnsemble, 10000)) {
throw new Exception("Error starting zookeeper/bookkeeper");
}
ZooKeeper zkc = connectZooKeeper(zkEnsemble);
try {
boolean up = false;
for (int i = 0; i < 10; i++) {
try {
List<String> children = zkc.getChildren("/ledgers/available",
false);
if (children.size() == numBookies) {
up = true;
break;
}
} catch (KeeperException e) {
// ignore
}
Thread.sleep(1000);
}
if (!up) {
throw new IOException("Not enough bookies started");
}
} finally {
zkc.close();
}
}
@Before
public void setup() throws Exception {
zkc = connectZooKeeper(zkEnsemble);
}
@After
public void teardown() throws Exception {
zkc.close();
}
@AfterClass
public static void teardownBookkeeper() throws Exception {
if (bkthread != null) {
bkthread.interrupt();
bkthread.join();
}
}
@Test
public void testSimpleWrite() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simplewrite"));
long txid = 1;
EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1 ; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(1, 100);
String zkpath = bkjm.finalizedLedgerZNode(1, 100);
assertNotNull(zkc.exists(zkpath, false));
assertNull(zkc.exists(bkjm.inprogressZNode(), false));
}
@Test
public void testNumberOfTransactions() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-txncount"));
long txid = 1;
EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1 ; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(1, 100);
long numTrans = bkjm.getNumberOfTransactions(1, true);
assertEquals(100, numTrans);
}
@Test
public void testNumberOfTransactionsWithGaps() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-gaps"));
long txid = 1;
for (long i = 0; i < 3; i++) {
long start = txid;
EditLogOutputStream out = bkjm.startLogSegment(start);
for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(txid++);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(start, txid-1);
assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(start, txid-1), false));
}
zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1, DEFAULT_SEGMENT_SIZE*2), -1);
long numTrans = bkjm.getNumberOfTransactions(1, true);
assertEquals(DEFAULT_SEGMENT_SIZE, numTrans);
try {
numTrans = bkjm.getNumberOfTransactions(DEFAULT_SEGMENT_SIZE+1, true);
fail("Should have thrown corruption exception by this point");
} catch (JournalManager.CorruptionException ce) {
// if we get here, everything is going good
}
numTrans = bkjm.getNumberOfTransactions((DEFAULT_SEGMENT_SIZE*2)+1, true);
assertEquals(DEFAULT_SEGMENT_SIZE, numTrans);
}
@Test
public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-inprogressAtEnd"));
long txid = 1;
for (long i = 0; i < 3; i++) {
long start = txid;
EditLogOutputStream out = bkjm.startLogSegment(start);
for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(txid++);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(start, (txid-1));
assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(start, (txid-1)), false));
}
long start = txid;
EditLogOutputStream out = bkjm.startLogSegment(start);
for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE/2; j++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(txid++);
out.write(op);
}
out.setReadyToFlush();
out.flush();
out.abort();
out.close();
long numTrans = bkjm.getNumberOfTransactions(1, true);
assertEquals((txid-1), numTrans);
}
/**
* Create a bkjm namespace, write a journal from txid 1, close stream.
* Try to create a new journal from txid 1. Should throw an exception.
*/
@Test
public void testWriteRestartFrom1() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-restartFrom1"));
long txid = 1;
long start = txid;
EditLogOutputStream out = bkjm.startLogSegment(txid);
for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(txid++);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(start, (txid-1));
txid = 1;
try {
out = bkjm.startLogSegment(txid);
fail("Shouldn't be able to start another journal from " + txid
+ " when one already exists");
} catch (Exception ioe) {
LOG.info("Caught exception as expected", ioe);
}
// test border case
txid = DEFAULT_SEGMENT_SIZE;
try {
out = bkjm.startLogSegment(txid);
fail("Shouldn't be able to start another journal from " + txid
+ " when one already exists");
} catch (IOException ioe) {
LOG.info("Caught exception as expected", ioe);
}
// open journal continuing from before
txid = DEFAULT_SEGMENT_SIZE + 1;
start = txid;
out = bkjm.startLogSegment(start);
assertNotNull(out);
for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(txid++);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(start, (txid-1));
// open journal arbitarily far in the future
txid = DEFAULT_SEGMENT_SIZE * 4;
out = bkjm.startLogSegment(txid);
assertNotNull(out);
}
@Test
public void testTwoWriters() throws Exception {
long start = 1;
BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-dualWriter"));
BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-dualWriter"));
EditLogOutputStream out1 = bkjm1.startLogSegment(start);
try {
EditLogOutputStream out2 = bkjm2.startLogSegment(start);
fail("Shouldn't have been able to open the second writer");
} catch (IOException ioe) {
LOG.info("Caught exception as expected", ioe);
}
}
@Test
public void testSimpleRead() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simpleread"));
long txid = 1;
final long numTransactions = 10000;
EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1 ; i <= numTransactions; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(1, numTransactions);
EditLogInputStream in = bkjm.getInputStream(1, true);
try {
assertEquals(numTransactions,
FSEditLogTestUtil.countTransactionsInStream(in));
} finally {
in.close();
}
}
@Test
public void testSimpleRecovery() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simplerecovery"));
EditLogOutputStream out = bkjm.startLogSegment(1);
long txid = 1;
for (long i = 1 ; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
out.write(op);
}
out.setReadyToFlush();
out.flush();
out.abort();
out.close();
assertNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false));
assertNotNull(zkc.exists(bkjm.inprogressZNode(), false));
bkjm.recoverUnfinalizedSegments();
assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false));
assertNull(zkc.exists(bkjm.inprogressZNode(), false));
}
}

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache;
/**
* Utilities for testing edit logs
*/
public class FSEditLogTestUtil {
private static OpInstanceCache cache = new OpInstanceCache();
public static FSEditLogOp getNoOpInstance() {
return FSEditLogOp.LogSegmentOp.getInstance(cache,
FSEditLogOpCodes.OP_END_LOG_SEGMENT);
}
public static long countTransactionsInStream(EditLogInputStream in)
throws IOException {
FSEditLogLoader.EditLogValidation validation = FSEditLogLoader.validateEditLog(in);
return validation.getNumTransactions();
}
}

View File

@ -0,0 +1,62 @@
#
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#
#
# Bookkeeper Journal Logging Configuration
#
# Format is "<default threshold> (, <appender>)+
# DEFAULT: console appender only
log4j.rootLogger=OFF, CONSOLE
# Example with rolling log file
#log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE
# Example with rolling log file and tracing
#log4j.rootLogger=TRACE, CONSOLE, ROLLINGFILE, TRACEFILE
#
# Log INFO level and above messages to the console
#
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.Threshold=INFO
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
#
# Add ROLLINGFILE to rootLogger to get log file output
# Log DEBUG level and above messages to a log file
log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender
log4j.appender.ROLLINGFILE.Threshold=DEBUG
log4j.appender.ROLLINGFILE.File=hdfs-namenode.log
log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
# Max log file size of 10MB
log4j.appender.ROLLINGFILE.MaxFileSize=10MB
# uncomment the next line to limit number of backup files
#log4j.appender.ROLLINGFILE.MaxBackupIndex=10
log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n

View File

@ -31,6 +31,7 @@
<module>hadoop-hdfs</module>
<module>hadoop-hdfs-httpfs</module>
<module>hadoop-hdfs/src/contrib/fuse-dfs</module>
<module>hadoop-hdfs/src/contrib/bkjournal</module>
</modules>
<build>