From 1f2f4e105b90e88a5c3cb101810293f4fe228362 Mon Sep 17 00:00:00 2001 From: Jitendra Nath Pandey Date: Tue, 13 Dec 2011 23:27:13 +0000 Subject: [PATCH] HDFS-234. Integration with BookKeeper logging system. Contributed by Ivan Kelly. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1213983 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../src/contrib/bkjournal/README.txt | 60 +++ .../hadoop-hdfs/src/contrib/bkjournal/pom.xml | 67 +++ .../BookKeeperEditLogInputStream.java | 221 ++++++++ .../BookKeeperEditLogOutputStream.java | 177 ++++++ .../bkjournal/BookKeeperJournalManager.java | 508 ++++++++++++++++++ .../bkjournal/EditLogLedgerMetadata.java | 200 +++++++ .../hadoop/contrib/bkjournal/MaxTxId.java | 81 +++ .../hadoop/contrib/bkjournal/WriteLock.java | 186 +++++++ .../TestBookKeeperJournalManager.java | 395 ++++++++++++++ .../server/namenode/FSEditLogTestUtil.java | 35 ++ .../src/test/resources/log4j.properties | 62 +++ hadoop-hdfs-project/pom.xml | 1 + 13 files changed, 1996 insertions(+) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/README.txt create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/WriteLock.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/resources/log4j.properties diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 583ba1f8dd9..989c962367d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -34,6 +34,9 @@ Trunk (unreleased changes) HDFS-2666. Fix TestBackupNode failure. (suresh) + HDFS-234. Integration with BookKeeper logging system. (Ivan Kelly + via jitendra) + HDFS-2663. Optional protobuf parameters are not handled correctly. (suresh) IMPROVEMENTS diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/README.txt b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/README.txt new file mode 100644 index 00000000000..0474c3f6e38 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/README.txt @@ -0,0 +1,60 @@ +This module provides a BookKeeper backend for HFDS Namenode write +ahead logging. + +BookKeeper is a highly available distributed write ahead logging +system. For more details, see + + http://zookeeper.apache.org/bookkeeper + +------------------------------------------------------------------------------- +How do I build? + + To generate the distribution packages for BK journal, do the + following. + + $ mvn clean install -Pdist -Dtar + + This will generate a tarball, + target/hadoop-hdfs-bkjournal-.tar.gz + +------------------------------------------------------------------------------- +How do I use the BookKeeper Journal? + + To run a HDFS namenode using BookKeeper as a backend, extract the + distribution package on top of hdfs + + cd hadoop-hdfs-/ + tar --strip-components 1 -zxvf path/to/hadoop-hdfs-bkjournal-.tar.gz + + Then, in hdfs-site.xml, set the following properties. + + + dfs.namenode.edits.dir + bookkeeper://localhost:2181/bkjournal,file:///path/for/edits + + + + dfs.namenode.edits.journal-plugin.bookkeeper + org.apache.hadoop.contrib.bkjournal.BookKeeperJournalManager + + + In this example, the namenode is configured to use 2 write ahead + logging devices. One writes to BookKeeper and the other to a local + file system. At the moment is is not possible to only write to + BookKeeper, as the resource checker explicitly checked for local + disks currently. + + The given example, configures the namenode to look for the journal + metadata at the path /bkjournal on the a standalone zookeeper ensemble + at localhost:2181. To configure a multiple host zookeeper ensemble, + separate the hosts with semicolons. For example, if you have 3 + zookeeper servers, zk1, zk2 & zk3, each listening on port 2181, you + would specify this with + + bookkeeper://zk1:2181;zk2:2181;zk3:2181/bkjournal + + The final part /bkjournal specifies the znode in zookeeper where + ledger metadata will be store. Administrators can set this to anything + they wish. + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml new file mode 100644 index 00000000000..a0bafcffbb0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml @@ -0,0 +1,67 @@ + + + + 4.0.0 + + org.apache.hadoop + hadoop-project-dist + 0.24.0-SNAPSHOT + ../../../../hadoop-project-dist + + + org.apache.hadoop.contrib + hadoop-hdfs-bkjournal + 0.24.0-SNAPSHOT + Apache Hadoop HDFS BookKeeper Journal + Apache Hadoop HDFS BookKeeper Journal + jar + + + hdfs + + + + + org.apache.hadoop + hadoop-annotations + provided + + + org.apache.hadoop + hadoop-common + 0.24.0-SNAPSHOT + provided + + + org.apache.hadoop + hadoop-hdfs + 0.24.0-SNAPSHOT + provided + + + org.apache.hadoop + hadoop-hdfs + 0.24.0-SNAPSHOT + test-jar + test + + + org.apache.bookkeeper + bookkeeper-server + 4.0.0 + compile + + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java new file mode 100644 index 00000000000..707182ec5cc --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.contrib.bkjournal; + +import java.io.BufferedInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Enumeration; + +import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.LedgerEntry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Input stream which reads from a BookKeeper ledger. + */ +class BookKeeperEditLogInputStream extends EditLogInputStream { + static final Log LOG = LogFactory.getLog(BookKeeperEditLogInputStream.class); + + private final long firstTxId; + private final long lastTxId; + private final int logVersion; + private final LedgerHandle lh; + + private final FSEditLogOp.Reader reader; + private final FSEditLogLoader.PositionTrackingInputStream tracker; + + /** + * Construct BookKeeper edit log input stream. + * Starts reading from the first entry of the ledger. + */ + BookKeeperEditLogInputStream(final LedgerHandle lh, + final EditLogLedgerMetadata metadata) + throws IOException { + this(lh, metadata, 0); + } + + /** + * Construct BookKeeper edit log input stream. + * Starts reading from firstBookKeeperEntry. This allows the stream + * to take a shortcut during recovery, as it doesn't have to read + * every edit log transaction to find out what the last one is. + */ + BookKeeperEditLogInputStream(LedgerHandle lh, EditLogLedgerMetadata metadata, + long firstBookKeeperEntry) + throws IOException { + this.lh = lh; + this.firstTxId = metadata.getFirstTxId(); + this.lastTxId = metadata.getLastTxId(); + this.logVersion = metadata.getVersion(); + + BufferedInputStream bin = new BufferedInputStream( + new LedgerInputStream(lh, firstBookKeeperEntry)); + tracker = new FSEditLogLoader.PositionTrackingInputStream(bin); + DataInputStream in = new DataInputStream(tracker); + + reader = new FSEditLogOp.Reader(in, logVersion); + } + + @Override + public long getFirstTxId() throws IOException { + return firstTxId; + } + + @Override + public long getLastTxId() throws IOException { + return lastTxId; + } + + @Override + public int getVersion() throws IOException { + return logVersion; + } + + @Override + public FSEditLogOp readOp() throws IOException { + return reader.readOp(); + } + + @Override + public void close() throws IOException { + try { + lh.close(); + } catch (Exception e) { + throw new IOException("Exception closing ledger", e); + } + } + + @Override + public long getPosition() { + return tracker.getPos(); + } + + @Override + public long length() throws IOException { + return lh.getLength(); + } + + @Override + public String getName() { + return String.format("BookKeeper[%s,first=%d,last=%d]", + lh.toString(), firstTxId, lastTxId); + } + + @Override + public JournalType getType() { + assert (false); + return null; + } + + /** + * Input stream implementation which can be used by + * FSEditLogOp.Reader + */ + private static class LedgerInputStream extends InputStream { + private long readEntries; + private InputStream entryStream = null; + private final LedgerHandle lh; + private final long maxEntry; + + /** + * Construct ledger input stream + * @param lh the ledger handle to read from + * @param firstBookKeeperEntry ledger entry to start reading from + */ + LedgerInputStream(LedgerHandle lh, long firstBookKeeperEntry) + throws IOException { + this.lh = lh; + readEntries = firstBookKeeperEntry; + try { + maxEntry = lh.getLastAddConfirmed(); + } catch (Exception e) { + throw new IOException("Error reading last entry id", e); + } + } + + /** + * Get input stream representing next entry in the + * ledger. + * @return input stream, or null if no more entries + */ + private InputStream nextStream() throws IOException { + try { + if (readEntries > maxEntry) { + return null; + } + Enumeration entries + = lh.readEntries(readEntries, readEntries); + readEntries++; + if (entries.hasMoreElements()) { + LedgerEntry e = entries.nextElement(); + assert !entries.hasMoreElements(); + return e.getEntryInputStream(); + } + } catch (Exception e) { + throw new IOException("Error reading entries from bookkeeper", e); + } + return null; + } + + @Override + public int read() throws IOException { + byte[] b = new byte[1]; + if (read(b, 0, 1) != 1) { + return -1; + } else { + return b[0]; + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + try { + int read = 0; + if (entryStream == null) { + entryStream = nextStream(); + if (entryStream == null) { + return read; + } + } + + while (read < len) { + int thisread = entryStream.read(b, off+read, (len-read)); + if (thisread == -1) { + entryStream = nextStream(); + if (entryStream == null) { + return read; + } + } else { + read += thisread; + } + } + return read; + } catch (IOException e) { + throw e; + } + + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java new file mode 100644 index 00000000000..ddbe0b62e0a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.contrib.bkjournal; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.CountDownLatch; + +import java.util.Arrays; + +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.AsyncCallback.AddCallback; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Writer; + +import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; +import org.apache.hadoop.io.DataOutputBuffer; +import java.io.IOException; + +/** + * Output stream for BookKeeper Journal. + * Multiple complete edit log entries are packed into a single bookkeeper + * entry before sending it over the network. The fact that the edit log entries + * are complete in the bookkeeper entries means that each bookkeeper log entry + *can be read as a complete edit log. This is useful for recover, as we don't + * need to read through the entire edit log segment to get the last written + * entry. + */ +class BookKeeperEditLogOutputStream + extends EditLogOutputStream implements AddCallback { + private final DataOutputBuffer bufCurrent; + private final AtomicInteger outstandingRequests; + private final int transmissionThreshold; + private final LedgerHandle lh; + private CountDownLatch syncLatch; + private final WriteLock wl; + private final Writer writer; + + /** + * Construct an edit log output stream which writes to a ledger. + + */ + protected BookKeeperEditLogOutputStream(Configuration conf, + LedgerHandle lh, WriteLock wl) + throws IOException { + super(); + + bufCurrent = new DataOutputBuffer(); + outstandingRequests = new AtomicInteger(0); + syncLatch = null; + this.lh = lh; + this.wl = wl; + this.wl.acquire(); + this.writer = new Writer(bufCurrent); + this.transmissionThreshold + = conf.getInt(BookKeeperJournalManager.BKJM_OUTPUT_BUFFER_SIZE, + BookKeeperJournalManager.BKJM_OUTPUT_BUFFER_SIZE_DEFAULT); + } + + @Override + public void create() throws IOException { + // noop + } + + @Override + public void close() throws IOException { + setReadyToFlush(); + flushAndSync(); + try { + lh.close(); + } catch (InterruptedException ie) { + throw new IOException("Interrupted waiting on close", ie); + } catch (BKException bke) { + throw new IOException("BookKeeper error during close", bke); + } + } + + @Override + public void abort() throws IOException { + try { + lh.close(); + } catch (InterruptedException ie) { + throw new IOException("Interrupted waiting on close", ie); + } catch (BKException bke) { + throw new IOException("BookKeeper error during abort", bke); + } + + wl.release(); + } + + @Override + public void writeRaw(final byte[] data, int off, int len) throws IOException { + throw new IOException("Not supported for BK"); + } + + @Override + public void write(FSEditLogOp op) throws IOException { + wl.checkWriteLock(); + + writer.writeOp(op); + + if (bufCurrent.getLength() > transmissionThreshold) { + transmit(); + } + } + + @Override + public void setReadyToFlush() throws IOException { + wl.checkWriteLock(); + + transmit(); + + synchronized(this) { + syncLatch = new CountDownLatch(outstandingRequests.get()); + } + } + + @Override + public void flushAndSync() throws IOException { + wl.checkWriteLock(); + + assert(syncLatch != null); + try { + syncLatch.await(); + } catch (InterruptedException ie) { + throw new IOException("Interrupted waiting on latch", ie); + } + + syncLatch = null; + // wait for whatever we wait on + } + + /** + * Transmit the current buffer to bookkeeper. + * Synchronised at the FSEditLog level. #write() and #setReadyToFlush() + * are never called at the same time. + */ + private void transmit() throws IOException { + wl.checkWriteLock(); + + if (bufCurrent.getLength() > 0) { + byte[] entry = Arrays.copyOf(bufCurrent.getData(), + bufCurrent.getLength()); + lh.asyncAddEntry(entry, this, null); + bufCurrent.reset(); + outstandingRequests.incrementAndGet(); + } + } + + @Override + public void addComplete(int rc, LedgerHandle handle, + long entryId, Object ctx) { + synchronized(this) { + outstandingRequests.decrementAndGet(); + CountDownLatch l = syncLatch; + if (l != null) { + l.countDown(); + } + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java new file mode 100644 index 00000000000..7fa90269ecd --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java @@ -0,0 +1,508 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.contrib.bkjournal; + +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.namenode.JournalManager; +import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream; +import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; +import org.apache.hadoop.conf.Configuration; + +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.LedgerHandle; + +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs.Ids; + +import java.util.Collections; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.io.IOException; + +import java.net.URI; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * BookKeeper Journal Manager + * + * To use, add the following to hdfs-site.xml. + *
+ * {@code
+ * 
+ *   dfs.namenode.edits.dir
+ *   bookkeeper://zk1:2181;zk2:2181;zk3:2181/hdfsjournal
+ * 
+ *
+ * 
+ *   dfs.namenode.edits.journalPlugin.bookkeeper
+ *   org.apache.hadoop.contrib.bkjournal.BookKeeperJournalManager
+ * 
+ * }
+ * 
+ * The URI format for bookkeeper is bookkeeper://[zkEnsemble]/[rootZnode] + * [zookkeeper ensemble] is a list of semi-colon separated, zookeeper host:port + * pairs. In the example above there are 3 servers, in the ensemble, + * zk1, zk2 & zk3, each one listening on port 2181. + * + * [root znode] is the path of the zookeeper znode, under which the editlog + * information will be stored. + * + * Other configuration options are: + *
    + *
  • dfs.namenode.bookkeeperjournal.output-buffer-size + * Number of bytes a bookkeeper journal stream will buffer before + * forcing a flush. Default is 1024.
  • + *
  • dfs.namenode.bookkeeperjournal.ensemble-size + * Number of bookkeeper servers in edit log ledger ensembles. This + * is the number of bookkeeper servers which need to be available + * for the ledger to be writable. Default is 3.
  • + *
  • dfs.namenode.bookkeeperjournal.quorum-size + * Number of bookkeeper servers in the write quorum. This is the + * number of bookkeeper servers which must have acknowledged the + * write of an entry before it is considered written. + * Default is 2.
  • + *
  • dfs.namenode.bookkeeperjournal.digestPw + * Password to use when creating ledgers.
  • + *
+ */ +public class BookKeeperJournalManager implements JournalManager { + static final Log LOG = LogFactory.getLog(BookKeeperJournalManager.class); + + public static final String BKJM_OUTPUT_BUFFER_SIZE + = "dfs.namenode.bookkeeperjournal.output-buffer-size"; + public static final int BKJM_OUTPUT_BUFFER_SIZE_DEFAULT = 1024; + + public static final String BKJM_BOOKKEEPER_ENSEMBLE_SIZE + = "dfs.namenode.bookkeeperjournal.ensemble-size"; + public static final int BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT = 3; + + public static final String BKJM_BOOKKEEPER_QUORUM_SIZE + = "dfs.namenode.bookkeeperjournal.quorum-size"; + public static final int BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT = 2; + + public static final String BKJM_BOOKKEEPER_DIGEST_PW + = "dfs.namenode.bookkeeperjournal.digestPw"; + public static final String BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT = ""; + + private static final int BKJM_LAYOUT_VERSION = -1; + + private final ZooKeeper zkc; + private final Configuration conf; + private final BookKeeper bkc; + private final WriteLock wl; + private final String ledgerPath; + private final MaxTxId maxTxId; + private final int ensembleSize; + private final int quorumSize; + private final String digestpw; + private final CountDownLatch zkConnectLatch; + + private LedgerHandle currentLedger = null; + + private int bytesToInt(byte[] b) { + assert b.length >= 4; + return b[0] << 24 | b[1] << 16 | b[2] << 8 | b[3]; + } + + private byte[] intToBytes(int i) { + return new byte[] { + (byte)(i >> 24), + (byte)(i >> 16), + (byte)(i >> 8), + (byte)(i) }; + } + + /** + * Construct a Bookkeeper journal manager. + */ + public BookKeeperJournalManager(Configuration conf, URI uri) + throws IOException { + this.conf = conf; + String zkConnect = uri.getAuthority().replace(";", ","); + String zkPath = uri.getPath(); + ensembleSize = conf.getInt(BKJM_BOOKKEEPER_ENSEMBLE_SIZE, + BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT); + quorumSize = conf.getInt(BKJM_BOOKKEEPER_QUORUM_SIZE, + BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT); + + ledgerPath = zkPath + "/ledgers"; + String maxTxIdPath = zkPath + "/maxtxid"; + String lockPath = zkPath + "/lock"; + String versionPath = zkPath + "/version"; + digestpw = conf.get(BKJM_BOOKKEEPER_DIGEST_PW, + BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT); + + try { + zkConnectLatch = new CountDownLatch(1); + zkc = new ZooKeeper(zkConnect, 3000, new ZkConnectionWatcher()); + if (!zkConnectLatch.await(6000, TimeUnit.MILLISECONDS)) { + throw new IOException("Error connecting to zookeeper"); + } + if (zkc.exists(zkPath, false) == null) { + zkc.create(zkPath, new byte[] {'0'}, + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + + Stat versionStat = zkc.exists(versionPath, false); + if (versionStat != null) { + byte[] d = zkc.getData(versionPath, false, versionStat); + // There's only one version at the moment + assert bytesToInt(d) == BKJM_LAYOUT_VERSION; + } else { + zkc.create(versionPath, intToBytes(BKJM_LAYOUT_VERSION), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + + if (zkc.exists(ledgerPath, false) == null) { + zkc.create(ledgerPath, new byte[] {'0'}, + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + + bkc = new BookKeeper(new ClientConfiguration(), + zkc); + } catch (Exception e) { + throw new IOException("Error initializing zk", e); + } + + wl = new WriteLock(zkc, lockPath); + maxTxId = new MaxTxId(zkc, maxTxIdPath); + } + + /** + * Start a new log segment in a BookKeeper ledger. + * First ensure that we have the write lock for this journal. + * Then create a ledger and stream based on that ledger. + * The ledger id is written to the inprogress znode, so that in the + * case of a crash, a recovery process can find the ledger we were writing + * to when we crashed. + * @param txId First transaction id to be written to the stream + */ + @Override + public EditLogOutputStream startLogSegment(long txId) throws IOException { + wl.acquire(); + + if (txId <= maxTxId.get()) { + throw new IOException("We've already seen " + txId + + ". A new stream cannot be created with it"); + } + if (currentLedger != null) { + throw new IOException("Already writing to a ledger, id=" + + currentLedger.getId()); + } + try { + currentLedger = bkc.createLedger(ensembleSize, quorumSize, + BookKeeper.DigestType.MAC, + digestpw.getBytes()); + String znodePath = inprogressZNode(); + EditLogLedgerMetadata l = new EditLogLedgerMetadata(znodePath, + HdfsConstants.LAYOUT_VERSION, currentLedger.getId(), txId); + /* Write the ledger metadata out to the inprogress ledger znode + * This can fail if for some reason our write lock has + * expired (@see WriteLock) and another process has managed to + * create the inprogress znode. + * In this case, throw an exception. We don't want to continue + * as this would lead to a split brain situation. + */ + l.write(zkc, znodePath); + + return new BookKeeperEditLogOutputStream(conf, currentLedger, wl); + } catch (Exception e) { + if (currentLedger != null) { + try { + currentLedger.close(); + } catch (Exception e2) { + //log & ignore, an IOException will be thrown soon + LOG.error("Error closing ledger", e2); + } + } + throw new IOException("Error creating ledger", e); + } + } + + /** + * Finalize a log segment. If the journal manager is currently + * writing to a ledger, ensure that this is the ledger of the log segment + * being finalized. + * + * Otherwise this is the recovery case. In the recovery case, ensure that + * the firstTxId of the ledger matches firstTxId for the segment we are + * trying to finalize. + */ + @Override + public void finalizeLogSegment(long firstTxId, long lastTxId) + throws IOException { + String inprogressPath = inprogressZNode(); + try { + Stat inprogressStat = zkc.exists(inprogressPath, false); + if (inprogressStat == null) { + throw new IOException("Inprogress znode " + inprogressPath + + " doesn't exist"); + } + + wl.checkWriteLock(); + EditLogLedgerMetadata l + = EditLogLedgerMetadata.read(zkc, inprogressPath); + + if (currentLedger != null) { // normal, non-recovery case + if (l.getLedgerId() == currentLedger.getId()) { + try { + currentLedger.close(); + } catch (BKException bke) { + LOG.error("Error closing current ledger", bke); + } + currentLedger = null; + } else { + throw new IOException( + "Active ledger has different ID to inprogress. " + + l.getLedgerId() + " found, " + + currentLedger.getId() + " expected"); + } + } + + if (l.getFirstTxId() != firstTxId) { + throw new IOException("Transaction id not as expected, " + + l.getFirstTxId() + " found, " + firstTxId + " expected"); + } + + l.finalizeLedger(lastTxId); + String finalisedPath = finalizedLedgerZNode(firstTxId, lastTxId); + try { + l.write(zkc, finalisedPath); + } catch (KeeperException.NodeExistsException nee) { + if (!l.verify(zkc, finalisedPath)) { + throw new IOException("Node " + finalisedPath + " already exists" + + " but data doesn't match"); + } + } + maxTxId.store(lastTxId); + zkc.delete(inprogressPath, inprogressStat.getVersion()); + } catch (KeeperException e) { + throw new IOException("Error finalising ledger", e); + } catch (InterruptedException ie) { + throw new IOException("Error finalising ledger", ie); + } finally { + wl.release(); + } + } + + @Override + public EditLogInputStream getInputStream(long fromTxnId) throws IOException { + for (EditLogLedgerMetadata l : getLedgerList()) { + if (l.getFirstTxId() == fromTxnId) { + try { + LedgerHandle h = bkc.openLedger(l.getLedgerId(), + BookKeeper.DigestType.MAC, + digestpw.getBytes()); + return new BookKeeperEditLogInputStream(h, l); + } catch (Exception e) { + throw new IOException("Could not open ledger for " + fromTxnId, e); + } + } + } + throw new IOException("No ledger for fromTxnId " + fromTxnId + " found."); + } + + @Override + public long getNumberOfTransactions(long fromTxnId) throws IOException { + long count = 0; + long expectedStart = 0; + for (EditLogLedgerMetadata l : getLedgerList()) { + if (l.isInProgress()) { + long endTxId = recoverLastTxId(l); + if (endTxId == HdfsConstants.INVALID_TXID) { + break; + } + count += (endTxId - l.getFirstTxId()) + 1; + break; + } + + if (l.getFirstTxId() < fromTxnId) { + continue; + } else if (l.getFirstTxId() == fromTxnId) { + count = (l.getLastTxId() - l.getFirstTxId()) + 1; + expectedStart = l.getLastTxId() + 1; + } else { + if (expectedStart != l.getFirstTxId()) { + if (count == 0) { + throw new CorruptionException("StartTxId " + l.getFirstTxId() + + " is not as expected " + expectedStart + + ". Gap in transaction log?"); + } else { + break; + } + } + count += (l.getLastTxId() - l.getFirstTxId()) + 1; + expectedStart = l.getLastTxId() + 1; + } + } + return count; + } + + @Override + public void recoverUnfinalizedSegments() throws IOException { + wl.acquire(); + + synchronized (this) { + try { + EditLogLedgerMetadata l + = EditLogLedgerMetadata.read(zkc, inprogressZNode()); + long endTxId = recoverLastTxId(l); + if (endTxId == HdfsConstants.INVALID_TXID) { + LOG.error("Unrecoverable corruption has occurred in segment " + + l.toString() + " at path " + inprogressZNode() + + ". Unable to continue recovery."); + throw new IOException("Unrecoverable corruption, please check logs."); + } + finalizeLogSegment(l.getFirstTxId(), endTxId); + } catch (KeeperException.NoNodeException nne) { + // nothing to recover, ignore + } finally { + if (wl.haveLock()) { + wl.release(); + } + } + } + } + + @Override + public void purgeLogsOlderThan(long minTxIdToKeep) + throws IOException { + for (EditLogLedgerMetadata l : getLedgerList()) { + if (!l.isInProgress() + && l.getLastTxId() < minTxIdToKeep) { + try { + Stat stat = zkc.exists(l.getZkPath(), false); + zkc.delete(l.getZkPath(), stat.getVersion()); + bkc.deleteLedger(l.getLedgerId()); + } catch (InterruptedException ie) { + LOG.error("Interrupted while purging " + l, ie); + } catch (BKException bke) { + LOG.error("Couldn't delete ledger from bookkeeper", bke); + } catch (KeeperException ke) { + LOG.error("Error deleting ledger entry in zookeeper", ke); + } + } + } + } + + @Override + public void close() throws IOException { + try { + bkc.close(); + zkc.close(); + } catch (Exception e) { + throw new IOException("Couldn't close zookeeper client", e); + } + } + + /** + * Set the amount of memory that this stream should use to buffer edits. + * Setting this will only affect future output stream. Streams + * which have currently be created won't be affected. + */ + @Override + public void setOutputBufferCapacity(int size) { + conf.getInt(BKJM_OUTPUT_BUFFER_SIZE, size); + } + + /** + * Find the id of the last edit log transaction writen to a edit log + * ledger. + */ + private long recoverLastTxId(EditLogLedgerMetadata l) throws IOException { + try { + LedgerHandle lh = bkc.openLedger(l.getLedgerId(), + BookKeeper.DigestType.MAC, + digestpw.getBytes()); + long lastAddConfirmed = lh.getLastAddConfirmed(); + BookKeeperEditLogInputStream in + = new BookKeeperEditLogInputStream(lh, l, lastAddConfirmed); + + long endTxId = HdfsConstants.INVALID_TXID; + FSEditLogOp op = in.readOp(); + while (op != null) { + if (endTxId == HdfsConstants.INVALID_TXID + || op.getTransactionId() == endTxId+1) { + endTxId = op.getTransactionId(); + } + op = in.readOp(); + } + return endTxId; + } catch (Exception e) { + throw new IOException("Exception retreiving last tx id for ledger " + l, + e); + } + } + + /** + * Get a list of all segments in the journal. + */ + private List getLedgerList() throws IOException { + List ledgers + = new ArrayList(); + try { + List ledgerNames = zkc.getChildren(ledgerPath, false); + for (String n : ledgerNames) { + ledgers.add(EditLogLedgerMetadata.read(zkc, ledgerPath + "/" + n)); + } + } catch (Exception e) { + throw new IOException("Exception reading ledger list from zk", e); + } + + Collections.sort(ledgers, EditLogLedgerMetadata.COMPARATOR); + return ledgers; + } + + /** + * Get the znode path for a finalize ledger + */ + String finalizedLedgerZNode(long startTxId, long endTxId) { + return String.format("%s/edits_%018d_%018d", + ledgerPath, startTxId, endTxId); + } + + /** + * Get the znode path for the inprogressZNode + */ + String inprogressZNode() { + return ledgerPath + "/inprogress"; + } + + /** + * Simple watcher to notify when zookeeper has connected + */ + private class ZkConnectionWatcher implements Watcher { + public void process(WatchedEvent event) { + if (Event.KeeperState.SyncConnected.equals(event.getState())) { + zkConnectLatch.countDown(); + } + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java new file mode 100644 index 00000000000..9ae5cdd93f7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.contrib.bkjournal; + +import java.io.IOException; +import java.util.Comparator; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.KeeperException; + +import org.apache.hadoop.hdfs.protocol.HdfsConstants; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Utility class for storing the metadata associated + * with a single edit log segment, stored in a single ledger + */ +public class EditLogLedgerMetadata { + static final Log LOG = LogFactory.getLog(EditLogLedgerMetadata.class); + + private String zkPath; + private final long ledgerId; + private final int version; + private final long firstTxId; + private long lastTxId; + private boolean inprogress; + + public static final Comparator COMPARATOR + = new Comparator() { + public int compare(EditLogLedgerMetadata o1, + EditLogLedgerMetadata o2) { + if (o1.firstTxId < o2.firstTxId) { + return -1; + } else if (o1.firstTxId == o2.firstTxId) { + return 0; + } else { + return 1; + } + } + }; + + EditLogLedgerMetadata(String zkPath, int version, + long ledgerId, long firstTxId) { + this.zkPath = zkPath; + this.ledgerId = ledgerId; + this.version = version; + this.firstTxId = firstTxId; + this.lastTxId = HdfsConstants.INVALID_TXID; + this.inprogress = true; + } + + EditLogLedgerMetadata(String zkPath, int version, long ledgerId, + long firstTxId, long lastTxId) { + this.zkPath = zkPath; + this.ledgerId = ledgerId; + this.version = version; + this.firstTxId = firstTxId; + this.lastTxId = lastTxId; + this.inprogress = false; + } + + String getZkPath() { + return zkPath; + } + + long getFirstTxId() { + return firstTxId; + } + + long getLastTxId() { + return lastTxId; + } + + long getLedgerId() { + return ledgerId; + } + + int getVersion() { + return version; + } + + boolean isInProgress() { + return this.inprogress; + } + + void finalizeLedger(long newLastTxId) { + assert this.lastTxId == HdfsConstants.INVALID_TXID; + this.lastTxId = newLastTxId; + this.inprogress = false; + } + + static EditLogLedgerMetadata read(ZooKeeper zkc, String path) + throws IOException, KeeperException.NoNodeException { + try { + byte[] data = zkc.getData(path, false, null); + String[] parts = new String(data).split(";"); + if (parts.length == 3) { + int version = Integer.valueOf(parts[0]); + long ledgerId = Long.valueOf(parts[1]); + long txId = Long.valueOf(parts[2]); + return new EditLogLedgerMetadata(path, version, ledgerId, txId); + } else if (parts.length == 4) { + int version = Integer.valueOf(parts[0]); + long ledgerId = Long.valueOf(parts[1]); + long firstTxId = Long.valueOf(parts[2]); + long lastTxId = Long.valueOf(parts[3]); + return new EditLogLedgerMetadata(path, version, ledgerId, + firstTxId, lastTxId); + } else { + throw new IOException("Invalid ledger entry, " + + new String(data)); + } + } catch(KeeperException.NoNodeException nne) { + throw nne; + } catch(Exception e) { + throw new IOException("Error reading from zookeeper", e); + } + } + + void write(ZooKeeper zkc, String path) + throws IOException, KeeperException.NodeExistsException { + this.zkPath = path; + String finalisedData; + if (inprogress) { + finalisedData = String.format("%d;%d;%d", + version, ledgerId, firstTxId); + } else { + finalisedData = String.format("%d;%d;%d;%d", + version, ledgerId, firstTxId, lastTxId); + } + try { + zkc.create(path, finalisedData.getBytes(), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } catch (KeeperException.NodeExistsException nee) { + throw nee; + } catch (Exception e) { + throw new IOException("Error creating ledger znode"); + } + } + + boolean verify(ZooKeeper zkc, String path) { + try { + EditLogLedgerMetadata other = read(zkc, path); + if (LOG.isTraceEnabled()) { + LOG.trace("Verifying " + this.toString() + + " against " + other); + } + return other == this; + } catch (Exception e) { + LOG.error("Couldn't verify data in " + path, e); + return false; + } + } + + public boolean equals(Object o) { + if (!(o instanceof EditLogLedgerMetadata)) { + return false; + } + EditLogLedgerMetadata ol = (EditLogLedgerMetadata)o; + return ledgerId == ol.ledgerId + && firstTxId == ol.firstTxId + && lastTxId == ol.lastTxId + && version == ol.version; + } + + public int hashCode() { + int hash = 1; + hash = hash * 31 + (int)ledgerId; + hash = hash * 31 + (int)firstTxId; + hash = hash * 31 + (int)lastTxId; + hash = hash * 31 + (int)version; + return hash; + } + + public String toString() { + return "[LedgerId:"+ledgerId + + ", firstTxId:" + firstTxId + + ", lastTxId:" + lastTxId + + ", version:" + version + "]"; + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java new file mode 100644 index 00000000000..f2724096832 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.contrib.bkjournal; + +import java.io.IOException; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.data.Stat; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Utility class for storing and reading + * the max seen txid in zookeeper + */ +class MaxTxId { + static final Log LOG = LogFactory.getLog(MaxTxId.class); + + private final ZooKeeper zkc; + private final String path; + + private Stat currentStat; + + MaxTxId(ZooKeeper zkc, String path) { + this.zkc = zkc; + this.path = path; + } + + synchronized void store(long maxTxId) throws IOException { + long currentMax = get(); + if (currentMax < maxTxId) { + if (LOG.isTraceEnabled()) { + LOG.trace("Setting maxTxId to " + maxTxId); + } + String txidStr = Long.toString(maxTxId); + try { + if (currentStat != null) { + currentStat = zkc.setData(path, txidStr.getBytes("UTF-8"), + currentStat.getVersion()); + } else { + zkc.create(path, txidStr.getBytes("UTF-8"), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + } catch (Exception e) { + throw new IOException("Error writing max tx id", e); + } + } + } + + synchronized long get() throws IOException { + try { + currentStat = zkc.exists(path, false); + if (currentStat == null) { + return 0; + } else { + byte[] bytes = zkc.getData(path, false, currentStat); + String txidString = new String(bytes, "UTF-8"); + return Long.valueOf(txidString); + } + } catch (Exception e) { + throw new IOException("Error reading the max tx id from zk", e); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/WriteLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/WriteLock.java new file mode 100644 index 00000000000..67743b2228c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/WriteLock.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.contrib.bkjournal; + +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs.Ids; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.List; +import java.util.Collections; +import java.util.Comparator; + +import java.net.InetAddress; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Distributed lock, using ZooKeeper. + * + * The lock is vulnerable to timing issues. For example, the process could + * encounter a really long GC cycle between acquiring the lock, and writing to + * a ledger. This could have timed out the lock, and another process could have + * acquired the lock and started writing to bookkeeper. Therefore other + * mechanisms are required to ensure correctness (i.e. Fencing). + */ +class WriteLock implements Watcher { + static final Log LOG = LogFactory.getLog(WriteLock.class); + + private final ZooKeeper zkc; + private final String lockpath; + + private AtomicInteger lockCount = new AtomicInteger(0); + private String myznode = null; + + WriteLock(ZooKeeper zkc, String lockpath) throws IOException { + this.lockpath = lockpath; + + this.zkc = zkc; + try { + if (zkc.exists(lockpath, false) == null) { + String localString = InetAddress.getLocalHost().toString(); + zkc.create(lockpath, localString.getBytes(), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + } catch (Exception e) { + throw new IOException("Exception accessing Zookeeper", e); + } + } + + void acquire() throws IOException { + while (true) { + if (lockCount.get() == 0) { + try { + synchronized(this) { + if (lockCount.get() > 0) { + lockCount.incrementAndGet(); + return; + } + myznode = zkc.create(lockpath + "/lock-", new byte[] {'0'}, + Ids.OPEN_ACL_UNSAFE, + CreateMode.EPHEMERAL_SEQUENTIAL); + if (LOG.isTraceEnabled()) { + LOG.trace("Acquiring lock, trying " + myznode); + } + + List nodes = zkc.getChildren(lockpath, false); + Collections.sort(nodes, new Comparator() { + public int compare(String o1, + String o2) { + Integer l1 = Integer.valueOf(o1.replace("lock-", "")); + Integer l2 = Integer.valueOf(o2.replace("lock-", "")); + return l1 - l2; + } + }); + if ((lockpath + "/" + nodes.get(0)).equals(myznode)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Lock acquired - " + myznode); + } + lockCount.set(1); + zkc.exists(myznode, this); + return; + } else { + LOG.error("Failed to acquire lock with " + myznode + + ", " + nodes.get(0) + " already has it"); + throw new IOException("Could not acquire lock"); + } + } + } catch (KeeperException e) { + throw new IOException("Exception accessing Zookeeper", e); + } catch (InterruptedException ie) { + throw new IOException("Exception accessing Zookeeper", ie); + } + } else { + int ret = lockCount.getAndIncrement(); + if (ret == 0) { + lockCount.decrementAndGet(); + continue; // try again; + } else { + return; + } + } + } + } + + void release() throws IOException { + try { + if (lockCount.decrementAndGet() <= 0) { + if (lockCount.get() < 0) { + LOG.warn("Unbalanced lock handling somewhere, lockCount down to " + + lockCount.get()); + } + synchronized(this) { + if (lockCount.get() <= 0) { + if (LOG.isTraceEnabled()) { + LOG.trace("releasing lock " + myznode); + } + if (myznode != null) { + zkc.delete(myznode, -1); + myznode = null; + } + } + } + } + } catch (Exception e) { + throw new IOException("Exception accessing Zookeeper", e); + } + } + + public void checkWriteLock() throws IOException { + if (!haveLock()) { + throw new IOException("Lost writer lock"); + } + } + + boolean haveLock() throws IOException { + return lockCount.get() > 0; + } + + public void process(WatchedEvent event) { + if (event.getState() == KeeperState.Disconnected + || event.getState() == KeeperState.Expired) { + LOG.warn("Lost zookeeper session, lost lock "); + lockCount.set(0); + } else { + // reapply the watch + synchronized (this) { + LOG.info("Zookeeper event " + event + + " received, reapplying watch to " + myznode); + if (myznode != null) { + try { + zkc.exists(myznode, this); + } catch (Exception e) { + LOG.warn("Could not set watch on lock, releasing", e); + try { + release(); + } catch (IOException ioe) { + LOG.error("Could not release Zk lock", ioe); + } + } + } + } + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java new file mode 100644 index 00000000000..b949bc200ea --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java @@ -0,0 +1,395 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.contrib.bkjournal; + +import static org.junit.Assert.*; + +import java.net.URI; +import java.util.Collections; +import java.util.Arrays; +import java.util.List; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.bookkeeper.util.LocalBookKeeper; + +import java.io.RandomAccessFile; +import java.io.File; +import java.io.FilenameFilter; +import java.io.BufferedInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.security.SecurityUtil; +import org.junit.Test; +import org.junit.Before; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.AfterClass; + +import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; +import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.setupEdits; +import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; +import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil; +import org.apache.hadoop.hdfs.server.namenode.JournalManager; + +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.KeeperException; + +import com.google.common.collect.ImmutableList; + +import java.util.zip.CheckedInputStream; +import java.util.zip.Checksum; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class TestBookKeeperJournalManager { + static final Log LOG = LogFactory.getLog(TestBookKeeperJournalManager.class); + + private static final long DEFAULT_SEGMENT_SIZE = 1000; + private static final String zkEnsemble = "localhost:2181"; + + private static Thread bkthread; + protected static Configuration conf = new Configuration(); + private ZooKeeper zkc; + + private static ZooKeeper connectZooKeeper(String ensemble) + throws IOException, KeeperException, InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + + ZooKeeper zkc = new ZooKeeper(zkEnsemble, 3600, new Watcher() { + public void process(WatchedEvent event) { + if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { + latch.countDown(); + } + } + }); + if (!latch.await(3, TimeUnit.SECONDS)) { + throw new IOException("Zookeeper took too long to connect"); + } + return zkc; + } + + @BeforeClass + public static void setupBookkeeper() throws Exception { + final int numBookies = 5; + bkthread = new Thread() { + public void run() { + try { + String[] args = new String[1]; + args[0] = String.valueOf(numBookies); + LOG.info("Starting bk"); + LocalBookKeeper.main(args); + } catch (InterruptedException e) { + // go away quietly + } catch (Exception e) { + LOG.error("Error starting local bk", e); + } + } + }; + bkthread.start(); + + if (!LocalBookKeeper.waitForServerUp(zkEnsemble, 10000)) { + throw new Exception("Error starting zookeeper/bookkeeper"); + } + + ZooKeeper zkc = connectZooKeeper(zkEnsemble); + try { + boolean up = false; + for (int i = 0; i < 10; i++) { + try { + List children = zkc.getChildren("/ledgers/available", + false); + if (children.size() == numBookies) { + up = true; + break; + } + } catch (KeeperException e) { + // ignore + } + Thread.sleep(1000); + } + if (!up) { + throw new IOException("Not enough bookies started"); + } + } finally { + zkc.close(); + } + } + + @Before + public void setup() throws Exception { + zkc = connectZooKeeper(zkEnsemble); + } + + @After + public void teardown() throws Exception { + zkc.close(); + } + + @AfterClass + public static void teardownBookkeeper() throws Exception { + if (bkthread != null) { + bkthread.interrupt(); + bkthread.join(); + } + } + + @Test + public void testSimpleWrite() throws Exception { + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, + URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simplewrite")); + long txid = 1; + EditLogOutputStream out = bkjm.startLogSegment(1); + for (long i = 1 ; i <= 100; i++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(i); + out.write(op); + } + out.close(); + bkjm.finalizeLogSegment(1, 100); + + String zkpath = bkjm.finalizedLedgerZNode(1, 100); + + assertNotNull(zkc.exists(zkpath, false)); + assertNull(zkc.exists(bkjm.inprogressZNode(), false)); + } + + @Test + public void testNumberOfTransactions() throws Exception { + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, + URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-txncount")); + long txid = 1; + EditLogOutputStream out = bkjm.startLogSegment(1); + for (long i = 1 ; i <= 100; i++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(i); + out.write(op); + } + out.close(); + bkjm.finalizeLogSegment(1, 100); + + long numTrans = bkjm.getNumberOfTransactions(1); + assertEquals(100, numTrans); + } + + @Test + public void testNumberOfTransactionsWithGaps() throws Exception { + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, + URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-gaps")); + long txid = 1; + for (long i = 0; i < 3; i++) { + long start = txid; + EditLogOutputStream out = bkjm.startLogSegment(start); + for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(txid++); + out.write(op); + } + out.close(); + bkjm.finalizeLogSegment(start, txid-1); + assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(start, txid-1), false)); + } + zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1, DEFAULT_SEGMENT_SIZE*2), -1); + + long numTrans = bkjm.getNumberOfTransactions(1); + assertEquals(DEFAULT_SEGMENT_SIZE, numTrans); + + try { + numTrans = bkjm.getNumberOfTransactions(DEFAULT_SEGMENT_SIZE+1); + fail("Should have thrown corruption exception by this point"); + } catch (JournalManager.CorruptionException ce) { + // if we get here, everything is going good + } + + numTrans = bkjm.getNumberOfTransactions((DEFAULT_SEGMENT_SIZE*2)+1); + assertEquals(DEFAULT_SEGMENT_SIZE, numTrans); + } + + @Test + public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception { + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, + URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-inprogressAtEnd")); + long txid = 1; + for (long i = 0; i < 3; i++) { + long start = txid; + EditLogOutputStream out = bkjm.startLogSegment(start); + for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(txid++); + out.write(op); + } + + out.close(); + bkjm.finalizeLogSegment(start, (txid-1)); + assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(start, (txid-1)), false)); + } + long start = txid; + EditLogOutputStream out = bkjm.startLogSegment(start); + for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE/2; j++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(txid++); + out.write(op); + } + out.setReadyToFlush(); + out.flush(); + out.abort(); + out.close(); + + long numTrans = bkjm.getNumberOfTransactions(1); + assertEquals((txid-1), numTrans); + } + + /** + * Create a bkjm namespace, write a journal from txid 1, close stream. + * Try to create a new journal from txid 1. Should throw an exception. + */ + @Test + public void testWriteRestartFrom1() throws Exception { + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, + URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-restartFrom1")); + long txid = 1; + long start = txid; + EditLogOutputStream out = bkjm.startLogSegment(txid); + for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(txid++); + out.write(op); + } + out.close(); + bkjm.finalizeLogSegment(start, (txid-1)); + + txid = 1; + try { + out = bkjm.startLogSegment(txid); + fail("Shouldn't be able to start another journal from " + txid + + " when one already exists"); + } catch (Exception ioe) { + LOG.info("Caught exception as expected", ioe); + } + + // test border case + txid = DEFAULT_SEGMENT_SIZE; + try { + out = bkjm.startLogSegment(txid); + fail("Shouldn't be able to start another journal from " + txid + + " when one already exists"); + } catch (IOException ioe) { + LOG.info("Caught exception as expected", ioe); + } + + // open journal continuing from before + txid = DEFAULT_SEGMENT_SIZE + 1; + start = txid; + out = bkjm.startLogSegment(start); + assertNotNull(out); + + for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(txid++); + out.write(op); + } + out.close(); + bkjm.finalizeLogSegment(start, (txid-1)); + + // open journal arbitarily far in the future + txid = DEFAULT_SEGMENT_SIZE * 4; + out = bkjm.startLogSegment(txid); + assertNotNull(out); + } + + @Test + public void testTwoWriters() throws Exception { + long start = 1; + BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf, + URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-dualWriter")); + BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf, + URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-dualWriter")); + + EditLogOutputStream out1 = bkjm1.startLogSegment(start); + try { + EditLogOutputStream out2 = bkjm2.startLogSegment(start); + fail("Shouldn't have been able to open the second writer"); + } catch (IOException ioe) { + LOG.info("Caught exception as expected", ioe); + } + } + + @Test + public void testSimpleRead() throws Exception { + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, + URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simpleread")); + long txid = 1; + final long numTransactions = 10000; + EditLogOutputStream out = bkjm.startLogSegment(1); + for (long i = 1 ; i <= numTransactions; i++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(i); + out.write(op); + } + out.close(); + bkjm.finalizeLogSegment(1, numTransactions); + + + EditLogInputStream in = bkjm.getInputStream(1); + try { + assertEquals(numTransactions, + FSEditLogTestUtil.countTransactionsInStream(in)); + } finally { + in.close(); + } + } + + @Test + public void testSimpleRecovery() throws Exception { + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, + URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simplerecovery")); + EditLogOutputStream out = bkjm.startLogSegment(1); + long txid = 1; + for (long i = 1 ; i <= 100; i++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(i); + out.write(op); + } + out.setReadyToFlush(); + out.flush(); + + out.abort(); + out.close(); + + + assertNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false)); + assertNotNull(zkc.exists(bkjm.inprogressZNode(), false)); + + bkjm.recoverUnfinalizedSegments(); + + assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false)); + assertNull(zkc.exists(bkjm.inprogressZNode(), false)); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java new file mode 100644 index 00000000000..6557b96e18a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import java.io.IOException; + +/** + * Utilities for testing edit logs + */ +public class FSEditLogTestUtil { + public static FSEditLogOp getNoOpInstance() { + return FSEditLogOp.LogSegmentOp.getInstance(FSEditLogOpCodes.OP_END_LOG_SEGMENT); + } + + public static long countTransactionsInStream(EditLogInputStream in) + throws IOException { + FSEditLogLoader.EditLogValidation validation = FSEditLogLoader.validateEditLog(in); + return validation.getNumTransactions(); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/resources/log4j.properties b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/resources/log4j.properties new file mode 100644 index 00000000000..8a6b2174144 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/resources/log4j.properties @@ -0,0 +1,62 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# + +# +# Bookkeeper Journal Logging Configuration +# + +# Format is " (, )+ + +# DEFAULT: console appender only +log4j.rootLogger=OFF, CONSOLE + +# Example with rolling log file +#log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE + +# Example with rolling log file and tracing +#log4j.rootLogger=TRACE, CONSOLE, ROLLINGFILE, TRACEFILE + +# +# Log INFO level and above messages to the console +# +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.Threshold=INFO +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n + +# +# Add ROLLINGFILE to rootLogger to get log file output +# Log DEBUG level and above messages to a log file +log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender +log4j.appender.ROLLINGFILE.Threshold=DEBUG +log4j.appender.ROLLINGFILE.File=hdfs-namenode.log +log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout +log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n + +# Max log file size of 10MB +log4j.appender.ROLLINGFILE.MaxFileSize=10MB +# uncomment the next line to limit number of backup files +#log4j.appender.ROLLINGFILE.MaxBackupIndex=10 + +log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout +log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n + + diff --git a/hadoop-hdfs-project/pom.xml b/hadoop-hdfs-project/pom.xml index 824edc3210c..299d6f86348 100644 --- a/hadoop-hdfs-project/pom.xml +++ b/hadoop-hdfs-project/pom.xml @@ -30,6 +30,7 @@ hadoop-hdfs hadoop-hdfs-httpfs + hadoop-hdfs/src/contrib/bkjournal