Merge r:1344840 HDFS-3423. BKJM: NN startup is failing, when tries to recoverUnfinalizedSegments() a bad inProgress_ ZNodes. Contributed by Ivan and Uma.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1344852 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Uma Maheswara Rao G 2012-05-31 18:26:46 +00:00
parent f376251475
commit 136e586272
7 changed files with 349 additions and 51 deletions

View File

@ -129,6 +129,9 @@ Release 2.0.1-alpha - UNRELEASED
HDFS-3468. Make BKJM-ZK session timeout configurable. (umamahesh)
HDFS-3423. BKJM: NN startup is failing, when tries to recoverUnfinalizedSegments()
a bad inProgress_ ZNodes. (Ivan Kelly and Uma via umamahesh)
Release 2.0.0-alpha - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -73,6 +73,11 @@ class BookKeeperEditLogInputStream extends EditLogInputStream {
this.logVersion = metadata.getVersion();
this.inProgress = metadata.isInProgress();
if (firstBookKeeperEntry < 0
|| firstBookKeeperEntry > lh.getLastAddConfirmed()) {
throw new IOException("Invalid first bk entry to read: "
+ firstBookKeeperEntry + ", LAC: " + lh.getLastAddConfirmed());
}
BufferedInputStream bin = new BufferedInputStream(
new LedgerInputStream(lh, firstBookKeeperEntry));
tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);

View File

@ -461,17 +461,34 @@ public class BookKeeperJournalManager implements JournalManager {
continue;
}
String znode = ledgerPath + "/" + child;
EditLogLedgerMetadata l
= EditLogLedgerMetadata.read(zkc, znode);
long endTxId = recoverLastTxId(l, true);
if (endTxId == HdfsConstants.INVALID_TXID) {
LOG.error("Unrecoverable corruption has occurred in segment "
+ l.toString() + " at path " + znode
+ ". Unable to continue recovery.");
throw new IOException("Unrecoverable corruption,"
+ " please check logs.");
EditLogLedgerMetadata l = EditLogLedgerMetadata.read(zkc, znode);
try {
long endTxId = recoverLastTxId(l, true);
if (endTxId == HdfsConstants.INVALID_TXID) {
LOG.error("Unrecoverable corruption has occurred in segment "
+ l.toString() + " at path " + znode
+ ". Unable to continue recovery.");
throw new IOException("Unrecoverable corruption,"
+ " please check logs.");
}
finalizeLogSegment(l.getFirstTxId(), endTxId);
} catch (SegmentEmptyException see) {
LOG.warn("Inprogress znode " + child
+ " refers to a ledger which is empty. This occurs when the NN"
+ " crashes after opening a segment, but before writing the"
+ " OP_START_LOG_SEGMENT op. It is safe to delete."
+ " MetaData [" + l.toString() + "]");
// If the max seen transaction is the same as what would
// have been the first transaction of the failed ledger,
// decrement it, as that transaction never happened and as
// such, is _not_ the last seen
if (maxTxId.get() == l.getFirstTxId()) {
maxTxId.reset(maxTxId.get() - 1);
}
zkc.delete(znode, -1);
}
finalizeLogSegment(l.getFirstTxId(), endTxId);
}
} catch (KeeperException.NoNodeException nne) {
// nothing to recover, ignore
@ -532,9 +549,9 @@ public class BookKeeperJournalManager implements JournalManager {
* ledger.
*/
private long recoverLastTxId(EditLogLedgerMetadata l, boolean fence)
throws IOException {
throws IOException, SegmentEmptyException {
LedgerHandle lh = null;
try {
LedgerHandle lh = null;
if (fence) {
lh = bkc.openLedger(l.getLedgerId(),
BookKeeper.DigestType.MAC,
@ -544,9 +561,21 @@ public class BookKeeperJournalManager implements JournalManager {
BookKeeper.DigestType.MAC,
digestpw.getBytes());
}
} catch (BKException bke) {
throw new IOException("Exception opening ledger for " + l, bke);
} catch (InterruptedException ie) {
throw new IOException("Interrupted opening ledger for " + l, ie);
}
BookKeeperEditLogInputStream in = null;
try {
long lastAddConfirmed = lh.getLastAddConfirmed();
BookKeeperEditLogInputStream in
= new BookKeeperEditLogInputStream(lh, l, lastAddConfirmed);
if (lastAddConfirmed == -1) {
throw new SegmentEmptyException();
}
in = new BookKeeperEditLogInputStream(lh, l, lastAddConfirmed);
long endTxId = HdfsConstants.INVALID_TXID;
FSEditLogOp op = in.readOp();
@ -558,12 +587,10 @@ public class BookKeeperJournalManager implements JournalManager {
op = in.readOp();
}
return endTxId;
} catch (BKException e) {
throw new IOException("Exception retreiving last tx id for ledger " + l,
e);
} catch (InterruptedException ie) {
throw new IOException("Interrupted while retreiving last tx id "
+ "for ledger " + l, ie);
} finally {
if (in != null) {
in.close();
}
}
}
@ -613,4 +640,7 @@ public class BookKeeperJournalManager implements JournalManager {
}
}
}
private static class SegmentEmptyException extends IOException {
}
}

View File

@ -149,8 +149,8 @@ public class EditLogLedgerMetadata {
version, ledgerId, firstTxId, lastTxId);
}
try {
zkc.create(path, finalisedData.getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zkc.create(path, finalisedData.getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException nee) {
throw nee;
} catch (KeeperException e) {
@ -167,7 +167,7 @@ public class EditLogLedgerMetadata {
LOG.trace("Verifying " + this.toString()
+ " against " + other);
}
return other == this;
return other.equals(this);
} catch (KeeperException e) {
LOG.error("Couldn't verify data in " + path, e);
return false;
@ -188,12 +188,12 @@ public class EditLogLedgerMetadata {
&& version == ol.version;
}
public int hashCode() {
public int hashCode() {
int hash = 1;
hash = hash * 31 + (int)ledgerId;
hash = hash * 31 + (int)firstTxId;
hash = hash * 31 + (int)lastTxId;
hash = hash * 31 + (int)version;
hash = hash * 31 + (int) ledgerId;
hash = hash * 31 + (int) firstTxId;
hash = hash * 31 + (int) lastTxId;
hash = hash * 31 + (int) version;
return hash;
}

View File

@ -18,14 +18,14 @@
package org.apache.hadoop.contrib.bkjournal;
import java.io.IOException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
/**
* Utility class for storing and reading
@ -50,20 +50,24 @@ class MaxTxId {
if (LOG.isTraceEnabled()) {
LOG.trace("Setting maxTxId to " + maxTxId);
}
String txidStr = Long.toString(maxTxId);
try {
if (currentStat != null) {
currentStat = zkc.setData(path, txidStr.getBytes("UTF-8"),
currentStat.getVersion());
} else {
zkc.create(path, txidStr.getBytes("UTF-8"),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
throw new IOException("Error writing max tx id", e);
} catch (InterruptedException e) {
throw new IOException("Interrupted while writing max tx id", e);
reset(maxTxId);
}
}
synchronized void reset(long maxTxId) throws IOException {
String txidStr = Long.toString(maxTxId);
try {
if (currentStat != null) {
currentStat = zkc.setData(path, txidStr.getBytes("UTF-8"), currentStat
.getVersion());
} else {
zkc.create(path, txidStr.getBytes("UTF-8"), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
throw new IOException("Error writing max tx id", e);
} catch (InterruptedException e) {
throw new IOException("Interrupted while writing max tx id", e);
}
}

View File

@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.contrib.bkjournal;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.zookeeper.ZooKeeper;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Unit test for the bkjm's streams
*/
public class TestBookKeeperEditLogStreams {
static final Log LOG = LogFactory.getLog(TestBookKeeperEditLogStreams.class);
private static BKJMUtil bkutil;
private final static int numBookies = 3;
@BeforeClass
public static void setupBookkeeper() throws Exception {
bkutil = new BKJMUtil(numBookies);
bkutil.start();
}
@AfterClass
public static void teardownBookkeeper() throws Exception {
bkutil.teardown();
}
/**
* Test that bkjm will refuse open a stream on an empty
* ledger.
*/
@Test
public void testEmptyInputStream() throws Exception {
ZooKeeper zk = BKJMUtil.connectZooKeeper();
BookKeeper bkc = new BookKeeper(new ClientConfiguration(), zk);
try {
LedgerHandle lh = bkc.createLedger(BookKeeper.DigestType.CRC32, "foobar"
.getBytes());
lh.close();
EditLogLedgerMetadata metadata = new EditLogLedgerMetadata("/foobar",
HdfsConstants.LAYOUT_VERSION, lh.getId(), 0x1234);
try {
new BookKeeperEditLogInputStream(lh, metadata, -1);
fail("Shouldn't get this far, should have thrown");
} catch (IOException ioe) {
assertTrue(ioe.getMessage().contains("Invalid first bk entry to read"));
}
metadata = new EditLogLedgerMetadata("/foobar",
HdfsConstants.LAYOUT_VERSION, lh.getId(), 0x1234);
try {
new BookKeeperEditLogInputStream(lh, metadata, 0);
fail("Shouldn't get this far, should have thrown");
} catch (IOException ioe) {
assertTrue(ioe.getMessage().contains("Invalid first bk entry to read"));
}
} finally {
bkc.close();
zk.close();
}
}
}

View File

@ -25,6 +25,8 @@ import org.junit.BeforeClass;
import org.junit.AfterClass;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
@ -34,7 +36,9 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
import org.apache.hadoop.hdfs.server.namenode.JournalManager;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -74,7 +78,6 @@ public class TestBookKeeperJournalManager {
public void testSimpleWrite() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"));
long txid = 1;
EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1 ; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
@ -94,7 +97,6 @@ public class TestBookKeeperJournalManager {
public void testNumberOfTransactions() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-txncount"));
long txid = 1;
EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1 ; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
@ -246,10 +248,12 @@ public class TestBookKeeperJournalManager {
EditLogOutputStream out1 = bkjm1.startLogSegment(start);
try {
EditLogOutputStream out2 = bkjm2.startLogSegment(start);
bkjm2.startLogSegment(start);
fail("Shouldn't have been able to open the second writer");
} catch (IOException ioe) {
LOG.info("Caught exception as expected", ioe);
}finally{
out1.close();
}
}
@ -257,7 +261,6 @@ public class TestBookKeeperJournalManager {
public void testSimpleRead() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-simpleread"));
long txid = 1;
final long numTransactions = 10000;
EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1 ; i <= numTransactions; i++) {
@ -283,7 +286,6 @@ public class TestBookKeeperJournalManager {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
BKJMUtil.createJournalURI("/hdfsjournal-simplerecovery"));
EditLogOutputStream out = bkjm.startLogSegment(1);
long txid = 1;
for (long i = 1 ; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
@ -449,5 +451,167 @@ public class TestBookKeeperJournalManager {
}
}
}
/**
* If a journal manager has an empty inprogress node, ensure that we throw an
* error, as this should not be possible, and some third party has corrupted
* the zookeeper state
*/
@Test
public void testEmptyInprogressNode() throws Exception {
URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogress");
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(1, 100);
out = bkjm.startLogSegment(101);
out.close();
bkjm.close();
String inprogressZNode = bkjm.inprogressZNode(101);
zkc.setData(inprogressZNode, new byte[0], -1);
bkjm = new BookKeeperJournalManager(conf, uri);
try {
bkjm.recoverUnfinalizedSegments();
fail("Should have failed. There should be no way of creating"
+ " an empty inprogess znode");
} catch (IOException e) {
// correct behaviour
assertTrue("Exception different than expected", e.getMessage().contains(
"Invalid ledger entry,"));
} finally {
bkjm.close();
}
}
/**
* If a journal manager has an corrupt inprogress node, ensure that we throw
* an error, as this should not be possible, and some third party has
* corrupted the zookeeper state
*/
@Test
public void testCorruptInprogressNode() throws Exception {
URI uri = BKJMUtil.createJournalURI("/hdfsjournal-corruptInprogress");
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(1, 100);
out = bkjm.startLogSegment(101);
out.close();
bkjm.close();
String inprogressZNode = bkjm.inprogressZNode(101);
zkc.setData(inprogressZNode, "WholeLottaJunk".getBytes(), -1);
bkjm = new BookKeeperJournalManager(conf, uri);
try {
bkjm.recoverUnfinalizedSegments();
fail("Should have failed. There should be no way of creating"
+ " an empty inprogess znode");
} catch (IOException e) {
// correct behaviour
assertTrue("Exception different than expected", e.getMessage().contains(
"Invalid ledger entry,"));
} finally {
bkjm.close();
}
}
/**
* Cases can occur where we create a segment but crash before we even have the
* chance to write the START_SEGMENT op. If this occurs we should warn, but
* load as normal
*/
@Test
public void testEmptyInprogressLedger() throws Exception {
URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogressLedger");
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(1, 100);
out = bkjm.startLogSegment(101);
out.close();
bkjm.close();
bkjm = new BookKeeperJournalManager(conf, uri);
bkjm.recoverUnfinalizedSegments();
out = bkjm.startLogSegment(101);
for (long i = 1; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
out.write(op);
}
out.close();
bkjm.finalizeLogSegment(101, 200);
bkjm.close();
}
/**
* Test that if we fail between finalizing an inprogress and deleting the
* corresponding inprogress znode.
*/
@Test
public void testRefinalizeAlreadyFinalizedInprogress() throws Exception {
URI uri = BKJMUtil
.createJournalURI("/hdfsjournal-refinalizeInprogressLedger");
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
out.write(op);
}
out.close();
bkjm.close();
String inprogressZNode = bkjm.inprogressZNode(1);
String finalizedZNode = bkjm.finalizedLedgerZNode(1, 100);
assertNotNull("inprogress znode doesn't exist", zkc.exists(inprogressZNode,
null));
assertNull("finalized znode exists", zkc.exists(finalizedZNode, null));
byte[] inprogressData = zkc.getData(inprogressZNode, false, null);
// finalize
bkjm = new BookKeeperJournalManager(conf, uri);
bkjm.recoverUnfinalizedSegments();
bkjm.close();
assertNull("inprogress znode exists", zkc.exists(inprogressZNode, null));
assertNotNull("finalized znode doesn't exist", zkc.exists(finalizedZNode,
null));
zkc.create(inprogressZNode, inprogressData, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
// should work fine
bkjm = new BookKeeperJournalManager(conf, uri);
bkjm.recoverUnfinalizedSegments();
bkjm.close();
}
}