HDFS-3423. BKJM: NN startup is failing, when tries to recoverUnfinalizedSegments() a bad inProgress_ ZNodes. Contributed by Ivan and Uma.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1344840 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8bbacfaa31
commit
fe6dfad79d
|
@ -73,6 +73,11 @@ class BookKeeperEditLogInputStream extends EditLogInputStream {
|
|||
this.logVersion = metadata.getVersion();
|
||||
this.inProgress = metadata.isInProgress();
|
||||
|
||||
if (firstBookKeeperEntry < 0
|
||||
|| firstBookKeeperEntry > lh.getLastAddConfirmed()) {
|
||||
throw new IOException("Invalid first bk entry to read: "
|
||||
+ firstBookKeeperEntry + ", LAC: " + lh.getLastAddConfirmed());
|
||||
}
|
||||
BufferedInputStream bin = new BufferedInputStream(
|
||||
new LedgerInputStream(lh, firstBookKeeperEntry));
|
||||
tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);
|
||||
|
|
|
@ -461,17 +461,34 @@ public class BookKeeperJournalManager implements JournalManager {
|
|||
continue;
|
||||
}
|
||||
String znode = ledgerPath + "/" + child;
|
||||
EditLogLedgerMetadata l
|
||||
= EditLogLedgerMetadata.read(zkc, znode);
|
||||
long endTxId = recoverLastTxId(l, true);
|
||||
if (endTxId == HdfsConstants.INVALID_TXID) {
|
||||
LOG.error("Unrecoverable corruption has occurred in segment "
|
||||
+ l.toString() + " at path " + znode
|
||||
+ ". Unable to continue recovery.");
|
||||
throw new IOException("Unrecoverable corruption,"
|
||||
+ " please check logs.");
|
||||
EditLogLedgerMetadata l = EditLogLedgerMetadata.read(zkc, znode);
|
||||
try {
|
||||
long endTxId = recoverLastTxId(l, true);
|
||||
if (endTxId == HdfsConstants.INVALID_TXID) {
|
||||
LOG.error("Unrecoverable corruption has occurred in segment "
|
||||
+ l.toString() + " at path " + znode
|
||||
+ ". Unable to continue recovery.");
|
||||
throw new IOException("Unrecoverable corruption,"
|
||||
+ " please check logs.");
|
||||
}
|
||||
finalizeLogSegment(l.getFirstTxId(), endTxId);
|
||||
} catch (SegmentEmptyException see) {
|
||||
LOG.warn("Inprogress znode " + child
|
||||
+ " refers to a ledger which is empty. This occurs when the NN"
|
||||
+ " crashes after opening a segment, but before writing the"
|
||||
+ " OP_START_LOG_SEGMENT op. It is safe to delete."
|
||||
+ " MetaData [" + l.toString() + "]");
|
||||
|
||||
// If the max seen transaction is the same as what would
|
||||
// have been the first transaction of the failed ledger,
|
||||
// decrement it, as that transaction never happened and as
|
||||
// such, is _not_ the last seen
|
||||
if (maxTxId.get() == l.getFirstTxId()) {
|
||||
maxTxId.reset(maxTxId.get() - 1);
|
||||
}
|
||||
|
||||
zkc.delete(znode, -1);
|
||||
}
|
||||
finalizeLogSegment(l.getFirstTxId(), endTxId);
|
||||
}
|
||||
} catch (KeeperException.NoNodeException nne) {
|
||||
// nothing to recover, ignore
|
||||
|
@ -532,9 +549,9 @@ public class BookKeeperJournalManager implements JournalManager {
|
|||
* ledger.
|
||||
*/
|
||||
private long recoverLastTxId(EditLogLedgerMetadata l, boolean fence)
|
||||
throws IOException {
|
||||
throws IOException, SegmentEmptyException {
|
||||
LedgerHandle lh = null;
|
||||
try {
|
||||
LedgerHandle lh = null;
|
||||
if (fence) {
|
||||
lh = bkc.openLedger(l.getLedgerId(),
|
||||
BookKeeper.DigestType.MAC,
|
||||
|
@ -544,9 +561,21 @@ public class BookKeeperJournalManager implements JournalManager {
|
|||
BookKeeper.DigestType.MAC,
|
||||
digestpw.getBytes());
|
||||
}
|
||||
} catch (BKException bke) {
|
||||
throw new IOException("Exception opening ledger for " + l, bke);
|
||||
} catch (InterruptedException ie) {
|
||||
throw new IOException("Interrupted opening ledger for " + l, ie);
|
||||
}
|
||||
|
||||
BookKeeperEditLogInputStream in = null;
|
||||
|
||||
try {
|
||||
long lastAddConfirmed = lh.getLastAddConfirmed();
|
||||
BookKeeperEditLogInputStream in
|
||||
= new BookKeeperEditLogInputStream(lh, l, lastAddConfirmed);
|
||||
if (lastAddConfirmed == -1) {
|
||||
throw new SegmentEmptyException();
|
||||
}
|
||||
|
||||
in = new BookKeeperEditLogInputStream(lh, l, lastAddConfirmed);
|
||||
|
||||
long endTxId = HdfsConstants.INVALID_TXID;
|
||||
FSEditLogOp op = in.readOp();
|
||||
|
@ -558,12 +587,10 @@ public class BookKeeperJournalManager implements JournalManager {
|
|||
op = in.readOp();
|
||||
}
|
||||
return endTxId;
|
||||
} catch (BKException e) {
|
||||
throw new IOException("Exception retreiving last tx id for ledger " + l,
|
||||
e);
|
||||
} catch (InterruptedException ie) {
|
||||
throw new IOException("Interrupted while retreiving last tx id "
|
||||
+ "for ledger " + l, ie);
|
||||
} finally {
|
||||
if (in != null) {
|
||||
in.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -613,4 +640,7 @@ public class BookKeeperJournalManager implements JournalManager {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class SegmentEmptyException extends IOException {
|
||||
}
|
||||
}
|
||||
|
|
|
@ -149,8 +149,8 @@ public class EditLogLedgerMetadata {
|
|||
version, ledgerId, firstTxId, lastTxId);
|
||||
}
|
||||
try {
|
||||
zkc.create(path, finalisedData.getBytes(),
|
||||
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
||||
zkc.create(path, finalisedData.getBytes(), Ids.OPEN_ACL_UNSAFE,
|
||||
CreateMode.PERSISTENT);
|
||||
} catch (KeeperException.NodeExistsException nee) {
|
||||
throw nee;
|
||||
} catch (KeeperException e) {
|
||||
|
@ -167,7 +167,7 @@ public class EditLogLedgerMetadata {
|
|||
LOG.trace("Verifying " + this.toString()
|
||||
+ " against " + other);
|
||||
}
|
||||
return other == this;
|
||||
return other.equals(this);
|
||||
} catch (KeeperException e) {
|
||||
LOG.error("Couldn't verify data in " + path, e);
|
||||
return false;
|
||||
|
@ -188,12 +188,12 @@ public class EditLogLedgerMetadata {
|
|||
&& version == ol.version;
|
||||
}
|
||||
|
||||
public int hashCode() {
|
||||
public int hashCode() {
|
||||
int hash = 1;
|
||||
hash = hash * 31 + (int)ledgerId;
|
||||
hash = hash * 31 + (int)firstTxId;
|
||||
hash = hash * 31 + (int)lastTxId;
|
||||
hash = hash * 31 + (int)version;
|
||||
hash = hash * 31 + (int) ledgerId;
|
||||
hash = hash * 31 + (int) firstTxId;
|
||||
hash = hash * 31 + (int) lastTxId;
|
||||
hash = hash * 31 + (int) version;
|
||||
return hash;
|
||||
}
|
||||
|
||||
|
|
|
@ -18,14 +18,14 @@
|
|||
package org.apache.hadoop.contrib.bkjournal;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.ZooDefs.Ids;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.apache.zookeeper.ZooDefs.Ids;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
|
||||
/**
|
||||
* Utility class for storing and reading
|
||||
|
@ -50,20 +50,24 @@ class MaxTxId {
|
|||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Setting maxTxId to " + maxTxId);
|
||||
}
|
||||
String txidStr = Long.toString(maxTxId);
|
||||
try {
|
||||
if (currentStat != null) {
|
||||
currentStat = zkc.setData(path, txidStr.getBytes("UTF-8"),
|
||||
currentStat.getVersion());
|
||||
} else {
|
||||
zkc.create(path, txidStr.getBytes("UTF-8"),
|
||||
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException("Error writing max tx id", e);
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException("Interrupted while writing max tx id", e);
|
||||
reset(maxTxId);
|
||||
}
|
||||
}
|
||||
|
||||
synchronized void reset(long maxTxId) throws IOException {
|
||||
String txidStr = Long.toString(maxTxId);
|
||||
try {
|
||||
if (currentStat != null) {
|
||||
currentStat = zkc.setData(path, txidStr.getBytes("UTF-8"), currentStat
|
||||
.getVersion());
|
||||
} else {
|
||||
zkc.create(path, txidStr.getBytes("UTF-8"), Ids.OPEN_ACL_UNSAFE,
|
||||
CreateMode.PERSISTENT);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException("Error writing max tx id", e);
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException("Interrupted while writing max tx id", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,92 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.contrib.bkjournal;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.bookkeeper.client.BookKeeper;
|
||||
import org.apache.bookkeeper.client.LedgerHandle;
|
||||
import org.apache.bookkeeper.conf.ClientConfiguration;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Unit test for the bkjm's streams
|
||||
*/
|
||||
public class TestBookKeeperEditLogStreams {
|
||||
static final Log LOG = LogFactory.getLog(TestBookKeeperEditLogStreams.class);
|
||||
|
||||
private static BKJMUtil bkutil;
|
||||
private final static int numBookies = 3;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupBookkeeper() throws Exception {
|
||||
bkutil = new BKJMUtil(numBookies);
|
||||
bkutil.start();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardownBookkeeper() throws Exception {
|
||||
bkutil.teardown();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that bkjm will refuse open a stream on an empty
|
||||
* ledger.
|
||||
*/
|
||||
@Test
|
||||
public void testEmptyInputStream() throws Exception {
|
||||
ZooKeeper zk = BKJMUtil.connectZooKeeper();
|
||||
|
||||
BookKeeper bkc = new BookKeeper(new ClientConfiguration(), zk);
|
||||
try {
|
||||
LedgerHandle lh = bkc.createLedger(BookKeeper.DigestType.CRC32, "foobar"
|
||||
.getBytes());
|
||||
lh.close();
|
||||
|
||||
EditLogLedgerMetadata metadata = new EditLogLedgerMetadata("/foobar",
|
||||
HdfsConstants.LAYOUT_VERSION, lh.getId(), 0x1234);
|
||||
try {
|
||||
new BookKeeperEditLogInputStream(lh, metadata, -1);
|
||||
fail("Shouldn't get this far, should have thrown");
|
||||
} catch (IOException ioe) {
|
||||
assertTrue(ioe.getMessage().contains("Invalid first bk entry to read"));
|
||||
}
|
||||
|
||||
metadata = new EditLogLedgerMetadata("/foobar",
|
||||
HdfsConstants.LAYOUT_VERSION, lh.getId(), 0x1234);
|
||||
try {
|
||||
new BookKeeperEditLogInputStream(lh, metadata, 0);
|
||||
fail("Shouldn't get this far, should have thrown");
|
||||
} catch (IOException ioe) {
|
||||
assertTrue(ioe.getMessage().contains("Invalid first bk entry to read"));
|
||||
}
|
||||
} finally {
|
||||
bkc.close();
|
||||
zk.close();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -25,6 +25,8 @@ import org.junit.BeforeClass;
|
|||
import org.junit.AfterClass;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
|
||||
|
@ -34,7 +36,9 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
|
|||
import org.apache.hadoop.hdfs.server.namenode.JournalManager;
|
||||
|
||||
import org.apache.bookkeeper.proto.BookieServer;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.apache.zookeeper.ZooDefs.Ids;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -74,7 +78,6 @@ public class TestBookKeeperJournalManager {
|
|||
public void testSimpleWrite() throws Exception {
|
||||
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
||||
BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"));
|
||||
long txid = 1;
|
||||
EditLogOutputStream out = bkjm.startLogSegment(1);
|
||||
for (long i = 1 ; i <= 100; i++) {
|
||||
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
|
||||
|
@ -94,7 +97,6 @@ public class TestBookKeeperJournalManager {
|
|||
public void testNumberOfTransactions() throws Exception {
|
||||
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
||||
BKJMUtil.createJournalURI("/hdfsjournal-txncount"));
|
||||
long txid = 1;
|
||||
EditLogOutputStream out = bkjm.startLogSegment(1);
|
||||
for (long i = 1 ; i <= 100; i++) {
|
||||
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
|
||||
|
@ -246,10 +248,12 @@ public class TestBookKeeperJournalManager {
|
|||
|
||||
EditLogOutputStream out1 = bkjm1.startLogSegment(start);
|
||||
try {
|
||||
EditLogOutputStream out2 = bkjm2.startLogSegment(start);
|
||||
bkjm2.startLogSegment(start);
|
||||
fail("Shouldn't have been able to open the second writer");
|
||||
} catch (IOException ioe) {
|
||||
LOG.info("Caught exception as expected", ioe);
|
||||
}finally{
|
||||
out1.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -257,7 +261,6 @@ public class TestBookKeeperJournalManager {
|
|||
public void testSimpleRead() throws Exception {
|
||||
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
||||
BKJMUtil.createJournalURI("/hdfsjournal-simpleread"));
|
||||
long txid = 1;
|
||||
final long numTransactions = 10000;
|
||||
EditLogOutputStream out = bkjm.startLogSegment(1);
|
||||
for (long i = 1 ; i <= numTransactions; i++) {
|
||||
|
@ -283,7 +286,6 @@ public class TestBookKeeperJournalManager {
|
|||
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
|
||||
BKJMUtil.createJournalURI("/hdfsjournal-simplerecovery"));
|
||||
EditLogOutputStream out = bkjm.startLogSegment(1);
|
||||
long txid = 1;
|
||||
for (long i = 1 ; i <= 100; i++) {
|
||||
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
|
||||
op.setTransactionId(i);
|
||||
|
@ -449,5 +451,167 @@ public class TestBookKeeperJournalManager {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If a journal manager has an empty inprogress node, ensure that we throw an
|
||||
* error, as this should not be possible, and some third party has corrupted
|
||||
* the zookeeper state
|
||||
*/
|
||||
@Test
|
||||
public void testEmptyInprogressNode() throws Exception {
|
||||
URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogress");
|
||||
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
|
||||
|
||||
EditLogOutputStream out = bkjm.startLogSegment(1);
|
||||
for (long i = 1; i <= 100; i++) {
|
||||
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
|
||||
op.setTransactionId(i);
|
||||
out.write(op);
|
||||
}
|
||||
out.close();
|
||||
bkjm.finalizeLogSegment(1, 100);
|
||||
|
||||
out = bkjm.startLogSegment(101);
|
||||
out.close();
|
||||
bkjm.close();
|
||||
String inprogressZNode = bkjm.inprogressZNode(101);
|
||||
zkc.setData(inprogressZNode, new byte[0], -1);
|
||||
|
||||
bkjm = new BookKeeperJournalManager(conf, uri);
|
||||
try {
|
||||
bkjm.recoverUnfinalizedSegments();
|
||||
fail("Should have failed. There should be no way of creating"
|
||||
+ " an empty inprogess znode");
|
||||
} catch (IOException e) {
|
||||
// correct behaviour
|
||||
assertTrue("Exception different than expected", e.getMessage().contains(
|
||||
"Invalid ledger entry,"));
|
||||
} finally {
|
||||
bkjm.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If a journal manager has an corrupt inprogress node, ensure that we throw
|
||||
* an error, as this should not be possible, and some third party has
|
||||
* corrupted the zookeeper state
|
||||
*/
|
||||
@Test
|
||||
public void testCorruptInprogressNode() throws Exception {
|
||||
URI uri = BKJMUtil.createJournalURI("/hdfsjournal-corruptInprogress");
|
||||
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
|
||||
|
||||
EditLogOutputStream out = bkjm.startLogSegment(1);
|
||||
for (long i = 1; i <= 100; i++) {
|
||||
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
|
||||
op.setTransactionId(i);
|
||||
out.write(op);
|
||||
}
|
||||
out.close();
|
||||
bkjm.finalizeLogSegment(1, 100);
|
||||
|
||||
out = bkjm.startLogSegment(101);
|
||||
out.close();
|
||||
bkjm.close();
|
||||
|
||||
String inprogressZNode = bkjm.inprogressZNode(101);
|
||||
zkc.setData(inprogressZNode, "WholeLottaJunk".getBytes(), -1);
|
||||
|
||||
bkjm = new BookKeeperJournalManager(conf, uri);
|
||||
try {
|
||||
bkjm.recoverUnfinalizedSegments();
|
||||
fail("Should have failed. There should be no way of creating"
|
||||
+ " an empty inprogess znode");
|
||||
} catch (IOException e) {
|
||||
// correct behaviour
|
||||
assertTrue("Exception different than expected", e.getMessage().contains(
|
||||
"Invalid ledger entry,"));
|
||||
|
||||
} finally {
|
||||
bkjm.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Cases can occur where we create a segment but crash before we even have the
|
||||
* chance to write the START_SEGMENT op. If this occurs we should warn, but
|
||||
* load as normal
|
||||
*/
|
||||
@Test
|
||||
public void testEmptyInprogressLedger() throws Exception {
|
||||
URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogressLedger");
|
||||
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
|
||||
|
||||
EditLogOutputStream out = bkjm.startLogSegment(1);
|
||||
for (long i = 1; i <= 100; i++) {
|
||||
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
|
||||
op.setTransactionId(i);
|
||||
out.write(op);
|
||||
}
|
||||
out.close();
|
||||
bkjm.finalizeLogSegment(1, 100);
|
||||
|
||||
out = bkjm.startLogSegment(101);
|
||||
out.close();
|
||||
bkjm.close();
|
||||
|
||||
bkjm = new BookKeeperJournalManager(conf, uri);
|
||||
bkjm.recoverUnfinalizedSegments();
|
||||
out = bkjm.startLogSegment(101);
|
||||
for (long i = 1; i <= 100; i++) {
|
||||
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
|
||||
op.setTransactionId(i);
|
||||
out.write(op);
|
||||
}
|
||||
out.close();
|
||||
bkjm.finalizeLogSegment(101, 200);
|
||||
|
||||
bkjm.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that if we fail between finalizing an inprogress and deleting the
|
||||
* corresponding inprogress znode.
|
||||
*/
|
||||
@Test
|
||||
public void testRefinalizeAlreadyFinalizedInprogress() throws Exception {
|
||||
URI uri = BKJMUtil
|
||||
.createJournalURI("/hdfsjournal-refinalizeInprogressLedger");
|
||||
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
|
||||
|
||||
EditLogOutputStream out = bkjm.startLogSegment(1);
|
||||
for (long i = 1; i <= 100; i++) {
|
||||
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
|
||||
op.setTransactionId(i);
|
||||
out.write(op);
|
||||
}
|
||||
out.close();
|
||||
bkjm.close();
|
||||
|
||||
String inprogressZNode = bkjm.inprogressZNode(1);
|
||||
String finalizedZNode = bkjm.finalizedLedgerZNode(1, 100);
|
||||
assertNotNull("inprogress znode doesn't exist", zkc.exists(inprogressZNode,
|
||||
null));
|
||||
assertNull("finalized znode exists", zkc.exists(finalizedZNode, null));
|
||||
|
||||
byte[] inprogressData = zkc.getData(inprogressZNode, false, null);
|
||||
|
||||
// finalize
|
||||
bkjm = new BookKeeperJournalManager(conf, uri);
|
||||
bkjm.recoverUnfinalizedSegments();
|
||||
bkjm.close();
|
||||
|
||||
assertNull("inprogress znode exists", zkc.exists(inprogressZNode, null));
|
||||
assertNotNull("finalized znode doesn't exist", zkc.exists(finalizedZNode,
|
||||
null));
|
||||
|
||||
zkc.create(inprogressZNode, inprogressData, Ids.OPEN_ACL_UNSAFE,
|
||||
CreateMode.PERSISTENT);
|
||||
|
||||
// should work fine
|
||||
bkjm = new BookKeeperJournalManager(conf, uri);
|
||||
bkjm.recoverUnfinalizedSegments();
|
||||
bkjm.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue