From 75041b99500ad1f8e8d51436c9ddd0d527277a4a Mon Sep 17 00:00:00 2001 From: Uma Maheswara Rao G Date: Sun, 20 May 2012 15:14:53 +0000 Subject: [PATCH] HDFS-2717. BookKeeper Journal output stream doesn't check addComplete rc. Contributed by Ivan Kelly. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1340750 13f79535-47bb-0310-9956-ffa450edef68 --- .../BookKeeperEditLogOutputStream.java | 24 ++ .../bkjournal/BookKeeperJournalManager.java | 50 ++-- .../TestBookKeeperJournalManager.java | 251 ++++++++++++++++-- 3 files changed, 278 insertions(+), 47 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java index ddbe0b62e0a..6432c57c041 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java @@ -33,6 +33,9 @@ import org.apache.hadoop.io.DataOutputBuffer; import java.io.IOException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + /** * Output stream for BookKeeper Journal. * Multiple complete edit log entries are packed into a single bookkeeper @@ -44,11 +47,15 @@ */ class BookKeeperEditLogOutputStream extends EditLogOutputStream implements AddCallback { + static final Log LOG = LogFactory.getLog(BookKeeperEditLogOutputStream.class); + private final DataOutputBuffer bufCurrent; private final AtomicInteger outstandingRequests; private final int transmissionThreshold; private final LedgerHandle lh; private CountDownLatch syncLatch; + private final AtomicInteger transmitResult + = new AtomicInteger(BKException.Code.OK); private final WriteLock wl; private final Writer writer; @@ -141,6 +148,11 @@ public void flushAndSync() throws IOException { } catch (InterruptedException ie) { throw new IOException("Interrupted waiting on latch", ie); } + if (transmitResult.get() != BKException.Code.OK) { + throw new IOException("Failed to write to bookkeeper; Error is (" + + transmitResult.get() + ") " + + BKException.getMessage(transmitResult.get())); + } syncLatch = null; // wait for whatever we wait on @@ -154,6 +166,12 @@ public void flushAndSync() throws IOException { private void transmit() throws IOException { wl.checkWriteLock(); + if (!transmitResult.compareAndSet(BKException.Code.OK, + BKException.Code.OK)) { + throw new IOException("Trying to write to an errored stream;" + + " Error code : (" + transmitResult.get() + + ") " + BKException.getMessage(transmitResult.get())); + } if (bufCurrent.getLength() > 0) { byte[] entry = Arrays.copyOf(bufCurrent.getData(), bufCurrent.getLength()); @@ -168,6 +186,12 @@ public void addComplete(int rc, LedgerHandle handle, long entryId, Object ctx) { synchronized(this) { outstandingRequests.decrementAndGet(); + if (!transmitResult.compareAndSet(BKException.Code.OK, rc)) { + LOG.warn("Tried to set transmit result to (" + rc + ") \"" + + BKException.getMessage(rc) + "\"" + + " but is already (" + transmitResult.get() + ") \"" + + BKException.getMessage(transmitResult.get()) + "\""); + } CountDownLatch l = syncLatch; if (l != null) { l.countDown(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java index 047efd51f4b..238f090694a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java @@ -61,7 +61,7 @@ * * * - * dfs.namenode.edits.journalPlugin.bookkeeper + * dfs.namenode.edits.journal-plugin.bookkeeper * org.apache.hadoop.contrib.bkjournal.BookKeeperJournalManager * * } @@ -212,15 +212,15 @@ public EditLogOutputStream startLogSegment(long txId) throws IOException { throw new IOException("We've already seen " + txId + ". A new stream cannot be created with it"); } - if (currentLedger != null) { - throw new IOException("Already writing to a ledger, id=" - + currentLedger.getId()); - } try { + if (currentLedger != null) { + // bookkeeper errored on last stream, clean up ledger + currentLedger.close(); + } currentLedger = bkc.createLedger(ensembleSize, quorumSize, BookKeeper.DigestType.MAC, digestpw.getBytes()); - String znodePath = inprogressZNode(); + String znodePath = inprogressZNode(txId); EditLogLedgerMetadata l = new EditLogLedgerMetadata(znodePath, HdfsConstants.LAYOUT_VERSION, currentLedger.getId(), txId); /* Write the ledger metadata out to the inprogress ledger znode @@ -258,7 +258,7 @@ public EditLogOutputStream startLogSegment(long txId) throws IOException { @Override public void finalizeLogSegment(long firstTxId, long lastTxId) throws IOException { - String inprogressPath = inprogressZNode(); + String inprogressPath = inprogressZNode(firstTxId); try { Stat inprogressStat = zkc.exists(inprogressPath, false); if (inprogressStat == null) { @@ -372,21 +372,33 @@ public long getNumberOfTransactions(long fromTxnId, boolean inProgressOk) @Override public void recoverUnfinalizedSegments() throws IOException { wl.acquire(); - synchronized (this) { try { - EditLogLedgerMetadata l - = EditLogLedgerMetadata.read(zkc, inprogressZNode()); - long endTxId = recoverLastTxId(l); - if (endTxId == HdfsConstants.INVALID_TXID) { - LOG.error("Unrecoverable corruption has occurred in segment " - + l.toString() + " at path " + inprogressZNode() - + ". Unable to continue recovery."); - throw new IOException("Unrecoverable corruption, please check logs."); + List children = zkc.getChildren(ledgerPath, false); + for (String child : children) { + if (!child.startsWith("inprogress_")) { + continue; + } + String znode = ledgerPath + "/" + child; + EditLogLedgerMetadata l + = EditLogLedgerMetadata.read(zkc, znode); + long endTxId = recoverLastTxId(l); + if (endTxId == HdfsConstants.INVALID_TXID) { + LOG.error("Unrecoverable corruption has occurred in segment " + + l.toString() + " at path " + znode + + ". Unable to continue recovery."); + throw new IOException("Unrecoverable corruption," + + " please check logs."); + } + finalizeLogSegment(l.getFirstTxId(), endTxId); } - finalizeLogSegment(l.getFirstTxId(), endTxId); } catch (KeeperException.NoNodeException nne) { // nothing to recover, ignore + } catch (KeeperException ke) { + throw new IOException("Couldn't get list of inprogress segments", ke); + } catch (InterruptedException ie) { + throw new IOException("Interrupted getting list of inprogress segments", + ie); } finally { if (wl.haveLock()) { wl.release(); @@ -495,8 +507,8 @@ String finalizedLedgerZNode(long startTxId, long endTxId) { /** * Get the znode path for the inprogressZNode */ - String inprogressZNode() { - return ledgerPath + "/inprogress"; + String inprogressZNode(long startTxid) { + return ledgerPath + "/inprogress_" + Long.toString(startTxid, 16); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java index 5937fa82958..7dbc95f25d7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java @@ -28,6 +28,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.proto.BookieServer; +import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.util.LocalBookKeeper; import java.io.RandomAccessFile; @@ -74,11 +76,15 @@ public class TestBookKeeperJournalManager { private static final long DEFAULT_SEGMENT_SIZE = 1000; private static final String zkEnsemble = "localhost:2181"; + final static private int numBookies = 5; private static Thread bkthread; protected static Configuration conf = new Configuration(); private ZooKeeper zkc; + + static int nextPort = 6000; // next port for additionally created bookies + private static ZooKeeper connectZooKeeper(String ensemble) throws IOException, KeeperException, InterruptedException { final CountDownLatch latch = new CountDownLatch(1); @@ -96,9 +102,72 @@ public void process(WatchedEvent event) { return zkc; } + private static BookieServer newBookie() throws Exception { + int port = nextPort++; + ServerConfiguration bookieConf = new ServerConfiguration(); + bookieConf.setBookiePort(port); + File tmpdir = File.createTempFile("bookie" + Integer.toString(port) + "_", + "test"); + tmpdir.delete(); + tmpdir.mkdir(); + + bookieConf.setZkServers(zkEnsemble); + bookieConf.setJournalDirName(tmpdir.getPath()); + bookieConf.setLedgerDirNames(new String[] { tmpdir.getPath() }); + + BookieServer b = new BookieServer(bookieConf); + b.start(); + for (int i = 0; i < 10 && !b.isRunning(); i++) { + Thread.sleep(10000); + } + if (!b.isRunning()) { + throw new IOException("Bookie would not start"); + } + return b; + } + + /** + * Check that a number of bookies are available + * @param count number of bookies required + * @param timeout number of seconds to wait for bookies to start + * @throws IOException if bookies are not started by the time the timeout hits + */ + private static int checkBookiesUp(int count, int timeout) throws Exception { + ZooKeeper zkc = connectZooKeeper(zkEnsemble); + try { + boolean up = false; + int mostRecentSize = 0; + for (int i = 0; i < timeout; i++) { + try { + List children = zkc.getChildren("/ledgers/available", + false); + mostRecentSize = children.size(); + if (LOG.isDebugEnabled()) { + LOG.debug("Found " + mostRecentSize + " bookies up, " + + "waiting for " + count); + if (LOG.isTraceEnabled()) { + for (String child : children) { + LOG.trace(" server: " + child); + } + } + } + if (mostRecentSize == count) { + up = true; + break; + } + } catch (KeeperException e) { + // ignore + } + Thread.sleep(1000); + } + return mostRecentSize; + } finally { + zkc.close(); + } + } + @BeforeClass public static void setupBookkeeper() throws Exception { - final int numBookies = 5; bkthread = new Thread() { public void run() { try { @@ -118,29 +187,8 @@ public void run() { if (!LocalBookKeeper.waitForServerUp(zkEnsemble, 10000)) { throw new Exception("Error starting zookeeper/bookkeeper"); } - - ZooKeeper zkc = connectZooKeeper(zkEnsemble); - try { - boolean up = false; - for (int i = 0; i < 10; i++) { - try { - List children = zkc.getChildren("/ledgers/available", - false); - if (children.size() == numBookies) { - up = true; - break; - } - } catch (KeeperException e) { - // ignore - } - Thread.sleep(1000); - } - if (!up) { - throw new IOException("Not enough bookies started"); - } - } finally { - zkc.close(); - } + assertEquals("Not all bookies started", + numBookies, checkBookiesUp(numBookies, 10)); } @Before @@ -178,7 +226,7 @@ public void testSimpleWrite() throws Exception { String zkpath = bkjm.finalizedLedgerZNode(1, 100); assertNotNull(zkc.exists(zkpath, false)); - assertNull(zkc.exists(bkjm.inprogressZNode(), false)); + assertNull(zkc.exists(bkjm.inprogressZNode(1), false)); } @Test @@ -385,11 +433,158 @@ public void testSimpleRecovery() throws Exception { assertNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false)); - assertNotNull(zkc.exists(bkjm.inprogressZNode(), false)); + assertNotNull(zkc.exists(bkjm.inprogressZNode(1), false)); bkjm.recoverUnfinalizedSegments(); assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false)); - assertNull(zkc.exists(bkjm.inprogressZNode(), false)); + assertNull(zkc.exists(bkjm.inprogressZNode(1), false)); } -} + + /** + * Test that if enough bookies fail to prevent an ensemble, + * writes the bookkeeper will fail. Test that when once again + * an ensemble is available, it can continue to write. + */ + @Test + public void testAllBookieFailure() throws Exception { + BookieServer bookieToFail = newBookie(); + BookieServer replacementBookie = null; + + try { + int ensembleSize = numBookies + 1; + assertEquals("New bookie didn't start", + ensembleSize, checkBookiesUp(ensembleSize, 10)); + + // ensure that the journal manager has to use all bookies, + // so that a failure will fail the journal manager + Configuration conf = new Configuration(); + conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE, + ensembleSize); + conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE, + ensembleSize); + long txid = 1; + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, + URI.create("bookkeeper://" + zkEnsemble + + "/hdfsjournal-allbookiefailure")); + EditLogOutputStream out = bkjm.startLogSegment(txid); + + for (long i = 1 ; i <= 3; i++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(txid++); + out.write(op); + } + out.setReadyToFlush(); + out.flush(); + bookieToFail.shutdown(); + assertEquals("New bookie didn't die", + numBookies, checkBookiesUp(numBookies, 10)); + + try { + for (long i = 1 ; i <= 3; i++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(txid++); + out.write(op); + } + out.setReadyToFlush(); + out.flush(); + fail("should not get to this stage"); + } catch (IOException ioe) { + LOG.debug("Error writing to bookkeeper", ioe); + assertTrue("Invalid exception message", + ioe.getMessage().contains("Failed to write to bookkeeper")); + } + replacementBookie = newBookie(); + + assertEquals("New bookie didn't start", + numBookies+1, checkBookiesUp(numBookies+1, 10)); + out = bkjm.startLogSegment(txid); + for (long i = 1 ; i <= 3; i++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(txid++); + out.write(op); + } + + out.setReadyToFlush(); + out.flush(); + + } catch (Exception e) { + LOG.error("Exception in test", e); + throw e; + } finally { + if (replacementBookie != null) { + replacementBookie.shutdown(); + } + bookieToFail.shutdown(); + + if (checkBookiesUp(numBookies, 30) != numBookies) { + LOG.warn("Not all bookies from this test shut down, expect errors"); + } + } + } + + /** + * Test that a BookKeeper JM can continue to work across the + * failure of a bookie. This should be handled transparently + * by bookkeeper. + */ + @Test + public void testOneBookieFailure() throws Exception { + BookieServer bookieToFail = newBookie(); + BookieServer replacementBookie = null; + + try { + int ensembleSize = numBookies + 1; + assertEquals("New bookie didn't start", + ensembleSize, checkBookiesUp(ensembleSize, 10)); + + // ensure that the journal manager has to use all bookies, + // so that a failure will fail the journal manager + Configuration conf = new Configuration(); + conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE, + ensembleSize); + conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE, + ensembleSize); + long txid = 1; + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, + URI.create("bookkeeper://" + zkEnsemble + + "/hdfsjournal-onebookiefailure")); + EditLogOutputStream out = bkjm.startLogSegment(txid); + for (long i = 1 ; i <= 3; i++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(txid++); + out.write(op); + } + out.setReadyToFlush(); + out.flush(); + + replacementBookie = newBookie(); + assertEquals("replacement bookie didn't start", + ensembleSize+1, checkBookiesUp(ensembleSize+1, 10)); + bookieToFail.shutdown(); + assertEquals("New bookie didn't die", + ensembleSize, checkBookiesUp(ensembleSize, 10)); + + for (long i = 1 ; i <= 3; i++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(txid++); + out.write(op); + } + out.setReadyToFlush(); + out.flush(); + } catch (Exception e) { + LOG.error("Exception in test", e); + throw e; + } finally { + if (replacementBookie != null) { + replacementBookie.shutdown(); + } + bookieToFail.shutdown(); + + if (checkBookiesUp(numBookies, 30) != numBookies) { + LOG.warn("Not all bookies from this test shut down, expect errors"); + } + } + } + +} \ No newline at end of file