From fe6dfad79dd53d0082405906b50204680e3957a8 Mon Sep 17 00:00:00 2001 From: Uma Maheswara Rao G Date: Thu, 31 May 2012 18:09:04 +0000 Subject: [PATCH] HDFS-3423. BKJM: NN startup is failing, when tries to recoverUnfinalizedSegments() a bad inProgress_ ZNodes. Contributed by Ivan and Uma. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1344840 13f79535-47bb-0310-9956-ffa450edef68 --- .../BookKeeperEditLogInputStream.java | 5 + .../bkjournal/BookKeeperJournalManager.java | 70 +++++-- .../bkjournal/EditLogLedgerMetadata.java | 16 +- .../hadoop/contrib/bkjournal/MaxTxId.java | 40 ++-- .../TestBookKeeperEditLogStreams.java | 92 +++++++++ .../TestBookKeeperJournalManager.java | 174 +++++++++++++++++- 6 files changed, 346 insertions(+), 51 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperEditLogStreams.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java index 54e45059bb4..2374cd88662 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java @@ -73,6 +73,11 @@ class BookKeeperEditLogInputStream extends EditLogInputStream { this.logVersion = metadata.getVersion(); this.inProgress = metadata.isInProgress(); + if (firstBookKeeperEntry < 0 + || firstBookKeeperEntry > lh.getLastAddConfirmed()) { + throw new IOException("Invalid first bk entry to read: " + + firstBookKeeperEntry + ", LAC: " + lh.getLastAddConfirmed()); + } BufferedInputStream bin = new BufferedInputStream( new LedgerInputStream(lh, firstBookKeeperEntry)); tracker = new FSEditLogLoader.PositionTrackingInputStream(bin); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java index 97553172fea..31bdc514d97 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java @@ -461,17 +461,34 @@ public class BookKeeperJournalManager implements JournalManager { continue; } String znode = ledgerPath + "/" + child; - EditLogLedgerMetadata l - = EditLogLedgerMetadata.read(zkc, znode); - long endTxId = recoverLastTxId(l, true); - if (endTxId == HdfsConstants.INVALID_TXID) { - LOG.error("Unrecoverable corruption has occurred in segment " - + l.toString() + " at path " + znode - + ". Unable to continue recovery."); - throw new IOException("Unrecoverable corruption," - + " please check logs."); + EditLogLedgerMetadata l = EditLogLedgerMetadata.read(zkc, znode); + try { + long endTxId = recoverLastTxId(l, true); + if (endTxId == HdfsConstants.INVALID_TXID) { + LOG.error("Unrecoverable corruption has occurred in segment " + + l.toString() + " at path " + znode + + ". Unable to continue recovery."); + throw new IOException("Unrecoverable corruption," + + " please check logs."); + } + finalizeLogSegment(l.getFirstTxId(), endTxId); + } catch (SegmentEmptyException see) { + LOG.warn("Inprogress znode " + child + + " refers to a ledger which is empty. This occurs when the NN" + + " crashes after opening a segment, but before writing the" + + " OP_START_LOG_SEGMENT op. It is safe to delete." + + " MetaData [" + l.toString() + "]"); + + // If the max seen transaction is the same as what would + // have been the first transaction of the failed ledger, + // decrement it, as that transaction never happened and as + // such, is _not_ the last seen + if (maxTxId.get() == l.getFirstTxId()) { + maxTxId.reset(maxTxId.get() - 1); + } + + zkc.delete(znode, -1); } - finalizeLogSegment(l.getFirstTxId(), endTxId); } } catch (KeeperException.NoNodeException nne) { // nothing to recover, ignore @@ -532,9 +549,9 @@ public class BookKeeperJournalManager implements JournalManager { * ledger. */ private long recoverLastTxId(EditLogLedgerMetadata l, boolean fence) - throws IOException { + throws IOException, SegmentEmptyException { + LedgerHandle lh = null; try { - LedgerHandle lh = null; if (fence) { lh = bkc.openLedger(l.getLedgerId(), BookKeeper.DigestType.MAC, @@ -544,9 +561,21 @@ public class BookKeeperJournalManager implements JournalManager { BookKeeper.DigestType.MAC, digestpw.getBytes()); } + } catch (BKException bke) { + throw new IOException("Exception opening ledger for " + l, bke); + } catch (InterruptedException ie) { + throw new IOException("Interrupted opening ledger for " + l, ie); + } + + BookKeeperEditLogInputStream in = null; + + try { long lastAddConfirmed = lh.getLastAddConfirmed(); - BookKeeperEditLogInputStream in - = new BookKeeperEditLogInputStream(lh, l, lastAddConfirmed); + if (lastAddConfirmed == -1) { + throw new SegmentEmptyException(); + } + + in = new BookKeeperEditLogInputStream(lh, l, lastAddConfirmed); long endTxId = HdfsConstants.INVALID_TXID; FSEditLogOp op = in.readOp(); @@ -558,12 +587,10 @@ public class BookKeeperJournalManager implements JournalManager { op = in.readOp(); } return endTxId; - } catch (BKException e) { - throw new IOException("Exception retreiving last tx id for ledger " + l, - e); - } catch (InterruptedException ie) { - throw new IOException("Interrupted while retreiving last tx id " - + "for ledger " + l, ie); + } finally { + if (in != null) { + in.close(); + } } } @@ -613,4 +640,7 @@ public class BookKeeperJournalManager implements JournalManager { } } } + + private static class SegmentEmptyException extends IOException { + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java index 5654a11f1af..6c75cd1369a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java @@ -149,8 +149,8 @@ public class EditLogLedgerMetadata { version, ledgerId, firstTxId, lastTxId); } try { - zkc.create(path, finalisedData.getBytes(), - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zkc.create(path, finalisedData.getBytes(), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); } catch (KeeperException.NodeExistsException nee) { throw nee; } catch (KeeperException e) { @@ -167,7 +167,7 @@ public class EditLogLedgerMetadata { LOG.trace("Verifying " + this.toString() + " against " + other); } - return other == this; + return other.equals(this); } catch (KeeperException e) { LOG.error("Couldn't verify data in " + path, e); return false; @@ -188,12 +188,12 @@ public class EditLogLedgerMetadata { && version == ol.version; } - public int hashCode() { + public int hashCode() { int hash = 1; - hash = hash * 31 + (int)ledgerId; - hash = hash * 31 + (int)firstTxId; - hash = hash * 31 + (int)lastTxId; - hash = hash * 31 + (int)version; + hash = hash * 31 + (int) ledgerId; + hash = hash * 31 + (int) firstTxId; + hash = hash * 31 + (int) lastTxId; + hash = hash * 31 + (int) version; return hash; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java index 46ef3eb4ac8..0109c33386d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java @@ -18,14 +18,14 @@ package org.apache.hadoop.contrib.bkjournal; import java.io.IOException; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.data.Stat; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.data.Stat; /** * Utility class for storing and reading @@ -50,20 +50,24 @@ class MaxTxId { if (LOG.isTraceEnabled()) { LOG.trace("Setting maxTxId to " + maxTxId); } - String txidStr = Long.toString(maxTxId); - try { - if (currentStat != null) { - currentStat = zkc.setData(path, txidStr.getBytes("UTF-8"), - currentStat.getVersion()); - } else { - zkc.create(path, txidStr.getBytes("UTF-8"), - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } - } catch (KeeperException e) { - throw new IOException("Error writing max tx id", e); - } catch (InterruptedException e) { - throw new IOException("Interrupted while writing max tx id", e); + reset(maxTxId); + } + } + + synchronized void reset(long maxTxId) throws IOException { + String txidStr = Long.toString(maxTxId); + try { + if (currentStat != null) { + currentStat = zkc.setData(path, txidStr.getBytes("UTF-8"), currentStat + .getVersion()); + } else { + zkc.create(path, txidStr.getBytes("UTF-8"), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); } + } catch (KeeperException e) { + throw new IOException("Error writing max tx id", e); + } catch (InterruptedException e) { + throw new IOException("Interrupted while writing max tx id", e); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperEditLogStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperEditLogStreams.java new file mode 100644 index 00000000000..3710676050d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperEditLogStreams.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.contrib.bkjournal; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; + +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.zookeeper.ZooKeeper; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Unit test for the bkjm's streams + */ +public class TestBookKeeperEditLogStreams { + static final Log LOG = LogFactory.getLog(TestBookKeeperEditLogStreams.class); + + private static BKJMUtil bkutil; + private final static int numBookies = 3; + + @BeforeClass + public static void setupBookkeeper() throws Exception { + bkutil = new BKJMUtil(numBookies); + bkutil.start(); + } + + @AfterClass + public static void teardownBookkeeper() throws Exception { + bkutil.teardown(); + } + + /** + * Test that bkjm will refuse open a stream on an empty + * ledger. + */ + @Test + public void testEmptyInputStream() throws Exception { + ZooKeeper zk = BKJMUtil.connectZooKeeper(); + + BookKeeper bkc = new BookKeeper(new ClientConfiguration(), zk); + try { + LedgerHandle lh = bkc.createLedger(BookKeeper.DigestType.CRC32, "foobar" + .getBytes()); + lh.close(); + + EditLogLedgerMetadata metadata = new EditLogLedgerMetadata("/foobar", + HdfsConstants.LAYOUT_VERSION, lh.getId(), 0x1234); + try { + new BookKeeperEditLogInputStream(lh, metadata, -1); + fail("Shouldn't get this far, should have thrown"); + } catch (IOException ioe) { + assertTrue(ioe.getMessage().contains("Invalid first bk entry to read")); + } + + metadata = new EditLogLedgerMetadata("/foobar", + HdfsConstants.LAYOUT_VERSION, lh.getId(), 0x1234); + try { + new BookKeeperEditLogInputStream(lh, metadata, 0); + fail("Shouldn't get this far, should have thrown"); + } catch (IOException ioe) { + assertTrue(ioe.getMessage().contains("Invalid first bk entry to read")); + } + } finally { + bkc.close(); + zk.close(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java index ab96275414b..c05a3bdfb1e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java @@ -25,6 +25,8 @@ import org.junit.BeforeClass; import org.junit.AfterClass; import java.io.IOException; +import java.net.URI; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; @@ -34,7 +36,9 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil; import org.apache.hadoop.hdfs.server.namenode.JournalManager; import org.apache.bookkeeper.proto.BookieServer; +import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooDefs.Ids; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -74,7 +78,6 @@ public class TestBookKeeperJournalManager { public void testSimpleWrite() throws Exception { BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-simplewrite")); - long txid = 1; EditLogOutputStream out = bkjm.startLogSegment(1); for (long i = 1 ; i <= 100; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); @@ -94,7 +97,6 @@ public class TestBookKeeperJournalManager { public void testNumberOfTransactions() throws Exception { BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-txncount")); - long txid = 1; EditLogOutputStream out = bkjm.startLogSegment(1); for (long i = 1 ; i <= 100; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); @@ -246,10 +248,12 @@ public class TestBookKeeperJournalManager { EditLogOutputStream out1 = bkjm1.startLogSegment(start); try { - EditLogOutputStream out2 = bkjm2.startLogSegment(start); + bkjm2.startLogSegment(start); fail("Shouldn't have been able to open the second writer"); } catch (IOException ioe) { LOG.info("Caught exception as expected", ioe); + }finally{ + out1.close(); } } @@ -257,7 +261,6 @@ public class TestBookKeeperJournalManager { public void testSimpleRead() throws Exception { BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-simpleread")); - long txid = 1; final long numTransactions = 10000; EditLogOutputStream out = bkjm.startLogSegment(1); for (long i = 1 ; i <= numTransactions; i++) { @@ -283,7 +286,6 @@ public class TestBookKeeperJournalManager { BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-simplerecovery")); EditLogOutputStream out = bkjm.startLogSegment(1); - long txid = 1; for (long i = 1 ; i <= 100; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); op.setTransactionId(i); @@ -449,5 +451,167 @@ public class TestBookKeeperJournalManager { } } } + + /** + * If a journal manager has an empty inprogress node, ensure that we throw an + * error, as this should not be possible, and some third party has corrupted + * the zookeeper state + */ + @Test + public void testEmptyInprogressNode() throws Exception { + URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogress"); + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri); + + EditLogOutputStream out = bkjm.startLogSegment(1); + for (long i = 1; i <= 100; i++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(i); + out.write(op); + } + out.close(); + bkjm.finalizeLogSegment(1, 100); + + out = bkjm.startLogSegment(101); + out.close(); + bkjm.close(); + String inprogressZNode = bkjm.inprogressZNode(101); + zkc.setData(inprogressZNode, new byte[0], -1); + + bkjm = new BookKeeperJournalManager(conf, uri); + try { + bkjm.recoverUnfinalizedSegments(); + fail("Should have failed. There should be no way of creating" + + " an empty inprogess znode"); + } catch (IOException e) { + // correct behaviour + assertTrue("Exception different than expected", e.getMessage().contains( + "Invalid ledger entry,")); + } finally { + bkjm.close(); + } + } + + /** + * If a journal manager has an corrupt inprogress node, ensure that we throw + * an error, as this should not be possible, and some third party has + * corrupted the zookeeper state + */ + @Test + public void testCorruptInprogressNode() throws Exception { + URI uri = BKJMUtil.createJournalURI("/hdfsjournal-corruptInprogress"); + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri); + + EditLogOutputStream out = bkjm.startLogSegment(1); + for (long i = 1; i <= 100; i++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(i); + out.write(op); + } + out.close(); + bkjm.finalizeLogSegment(1, 100); + + out = bkjm.startLogSegment(101); + out.close(); + bkjm.close(); + + String inprogressZNode = bkjm.inprogressZNode(101); + zkc.setData(inprogressZNode, "WholeLottaJunk".getBytes(), -1); + + bkjm = new BookKeeperJournalManager(conf, uri); + try { + bkjm.recoverUnfinalizedSegments(); + fail("Should have failed. There should be no way of creating" + + " an empty inprogess znode"); + } catch (IOException e) { + // correct behaviour + assertTrue("Exception different than expected", e.getMessage().contains( + "Invalid ledger entry,")); + + } finally { + bkjm.close(); + } + } + + /** + * Cases can occur where we create a segment but crash before we even have the + * chance to write the START_SEGMENT op. If this occurs we should warn, but + * load as normal + */ + @Test + public void testEmptyInprogressLedger() throws Exception { + URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogressLedger"); + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri); + + EditLogOutputStream out = bkjm.startLogSegment(1); + for (long i = 1; i <= 100; i++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(i); + out.write(op); + } + out.close(); + bkjm.finalizeLogSegment(1, 100); + + out = bkjm.startLogSegment(101); + out.close(); + bkjm.close(); + + bkjm = new BookKeeperJournalManager(conf, uri); + bkjm.recoverUnfinalizedSegments(); + out = bkjm.startLogSegment(101); + for (long i = 1; i <= 100; i++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(i); + out.write(op); + } + out.close(); + bkjm.finalizeLogSegment(101, 200); + + bkjm.close(); + } + + /** + * Test that if we fail between finalizing an inprogress and deleting the + * corresponding inprogress znode. + */ + @Test + public void testRefinalizeAlreadyFinalizedInprogress() throws Exception { + URI uri = BKJMUtil + .createJournalURI("/hdfsjournal-refinalizeInprogressLedger"); + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri); + + EditLogOutputStream out = bkjm.startLogSegment(1); + for (long i = 1; i <= 100; i++) { + FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); + op.setTransactionId(i); + out.write(op); + } + out.close(); + bkjm.close(); + + String inprogressZNode = bkjm.inprogressZNode(1); + String finalizedZNode = bkjm.finalizedLedgerZNode(1, 100); + assertNotNull("inprogress znode doesn't exist", zkc.exists(inprogressZNode, + null)); + assertNull("finalized znode exists", zkc.exists(finalizedZNode, null)); + + byte[] inprogressData = zkc.getData(inprogressZNode, false, null); + + // finalize + bkjm = new BookKeeperJournalManager(conf, uri); + bkjm.recoverUnfinalizedSegments(); + bkjm.close(); + + assertNull("inprogress znode exists", zkc.exists(inprogressZNode, null)); + assertNotNull("finalized znode doesn't exist", zkc.exists(finalizedZNode, + null)); + + zkc.create(inprogressZNode, inprogressData, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + // should work fine + bkjm = new BookKeeperJournalManager(conf, uri); + bkjm.recoverUnfinalizedSegments(); + bkjm.close(); + } }