diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index b06f3f2d772..aa13805c019 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -728,6 +728,8 @@ Branch-2 ( Unreleased changes ) HDFS-3828. Block Scanner rescans blocks too frequently. (Andy Isaacson via eli) + + HDFS-3809. Make BKJM use protobufs for all serialization with ZK.(Ivan Kelly via umamahesh) BREAKDOWN OF HDFS-3042 SUBTASKS diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/dev-support/findbugsExcludeFile.xml new file mode 100644 index 00000000000..45c3a75e1e0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/dev-support/findbugsExcludeFile.xml @@ -0,0 +1,5 @@ + + + + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/dev-support/findbugsExcludeFile.xml.orig b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/dev-support/findbugsExcludeFile.xml.orig new file mode 100644 index 00000000000..e69de29bb2d diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml index 33d6019c6b1..fa63e5eb373 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml @@ -89,6 +89,90 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> test + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-source + generate-sources + + add-source + + + + ${project.build.directory}/generated-sources/java + + + + + + + org.apache.maven.plugins + maven-antrun-plugin + + false + + + + compile-proto + generate-sources + + run + + + + + PROTO_DIR=src/main/proto + INCLUDE_DIR=../../main/proto + JAVA_DIR=target/generated-sources/java + which cygpath 2> /dev/null + if [ $? = 1 ]; then + IS_WIN=false + else + IS_WIN=true + WIN_PROTO_DIR=`cygpath --windows $PROTO_DIR` + WIN_JAVA_DIR=`cygpath --windows $JAVA_DIR` + WIN_INCLUDE_DIR=`cygpath --windows $INCLUDE_DIR` + fi + mkdir -p $JAVA_DIR 2> /dev/null + for PROTO_FILE in `ls $PROTO_DIR/*.proto 2> /dev/null` + do + if [ "$IS_WIN" = "true" ]; then + protoc -I$WIN_PROTO_DIR -I$WIN_INCLUDE_DIR --java_out=$WIN_JAVA_DIR $PROTO_FILE + else + protoc -I$PROTO_DIR -I$INCLUDE_DIR --java_out=$JAVA_DIR $PROTO_FILE + fi + done + + + + + + + + + + + org.codehaus.mojo + findbugs-maven-plugin + + ${basedir}/dev-support/findbugsExcludeFile.xml + + + + org.apache.rat + apache-rat-plugin + + + dev-support/findbugsExcludeFile.xml + + + + + dist diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java index 2374cd88662..15d0c02daf0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java @@ -70,7 +70,7 @@ class BookKeeperEditLogInputStream extends EditLogInputStream { this.lh = lh; this.firstTxId = metadata.getFirstTxId(); this.lastTxId = metadata.getLastTxId(); - this.logVersion = metadata.getVersion(); + this.logVersion = metadata.getDataLayoutVersion(); this.inProgress = metadata.isInProgress(); if (firstBookKeeperEntry < 0 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java index 380db257035..62dbbd031f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java @@ -50,6 +50,11 @@ import java.net.URI; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.contrib.bkjournal.BKJournalProtos.VersionProto; +import com.google.protobuf.TextFormat; +import static com.google.common.base.Charsets.UTF_8; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.google.common.annotations.VisibleForTesting; @@ -143,36 +148,17 @@ public class BookKeeperJournalManager implements JournalManager { private final int quorumSize; private final String digestpw; private final CountDownLatch zkConnectLatch; - + private final NamespaceInfo nsInfo; private LedgerHandle currentLedger = null; - private int bytesToInt(byte[] b) { - assert b.length >= 4; - return b[0] << 24 | b[1] << 16 | b[2] << 8 | b[3]; - } - - private byte[] intToBytes(int i) { - return new byte[] { - (byte)(i >> 24), - (byte)(i >> 16), - (byte)(i >> 8), - (byte)(i) }; - } - - BookKeeperJournalManager(Configuration conf, URI uri) throws IOException { - this(conf, uri, null); - // TODO(ivank): update BookKeeperJournalManager to do something - // with the NamespaceInfo. This constructor has been added - // for compatibility with the old tests, and may be removed - // when the tests are updated. - } - /** * Construct a Bookkeeper journal manager. */ public BookKeeperJournalManager(Configuration conf, URI uri, NamespaceInfo nsInfo) throws IOException { this.conf = conf; + this.nsInfo = nsInfo; + String zkConnect = uri.getAuthority().replace(";", ","); String zkPath = uri.getPath(); ensembleSize = conf.getInt(BKJM_BOOKKEEPER_ENSEMBLE_SIZE, @@ -202,10 +188,32 @@ public BookKeeperJournalManager(Configuration conf, URI uri, Stat versionStat = zkc.exists(versionPath, false); if (versionStat != null) { byte[] d = zkc.getData(versionPath, false, versionStat); + VersionProto.Builder builder = VersionProto.newBuilder(); + TextFormat.merge(new String(d, UTF_8), builder); + if (!builder.isInitialized()) { + throw new IOException("Invalid/Incomplete data in znode"); + } + VersionProto vp = builder.build(); + // There's only one version at the moment - assert bytesToInt(d) == BKJM_LAYOUT_VERSION; - } else { - zkc.create(versionPath, intToBytes(BKJM_LAYOUT_VERSION), + assert vp.getLayoutVersion() == BKJM_LAYOUT_VERSION; + + NamespaceInfo readns = PBHelper.convert(vp.getNamespaceInfo()); + + if (nsInfo.getNamespaceID() != readns.getNamespaceID() || + !nsInfo.clusterID.equals(readns.getClusterID()) || + !nsInfo.getBlockPoolID().equals(readns.getBlockPoolID())) { + String err = String.format("Environment mismatch. Running process %s" + +", stored in ZK %s", nsInfo, readns); + LOG.error(err); + throw new IOException(err); + } + } else if (nsInfo.getNamespaceID() > 0) { + VersionProto.Builder builder = VersionProto.newBuilder(); + builder.setNamespaceInfo(PBHelper.convert(nsInfo)) + .setLayoutVersion(BKJM_LAYOUT_VERSION); + byte[] data = TextFormat.printToString(builder.build()).getBytes(UTF_8); + zkc.create(versionPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } @@ -214,11 +222,11 @@ public BookKeeperJournalManager(Configuration conf, URI uri, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } prepareBookKeeperEnv(); - bkc = new BookKeeper(new ClientConfiguration(), - zkc); + bkc = new BookKeeper(new ClientConfiguration(), zkc); } catch (KeeperException e) { throw new IOException("Error initializing zk", e); } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); throw new IOException("Interrupted while initializing bk journal manager", ie); } @@ -322,13 +330,14 @@ public EditLogOutputStream startLogSegment(long txId) throws IOException { } catch (KeeperException ke) { throw new IOException("Error in zookeeper while creating ledger", ke); } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); throw new IOException("Interrupted creating ledger", ie); } try { String znodePath = inprogressZNode(txId); EditLogLedgerMetadata l = new EditLogLedgerMetadata(znodePath, - HdfsConstants.LAYOUT_VERSION, currentLedger.getId(), txId); + HdfsConstants.LAYOUT_VERSION, currentLedger.getId(), txId); /* Write the ledger metadata out to the inprogress ledger znode * This can fail if for some reason our write lock has * expired (@see WriteLock) and another process has managed to @@ -356,6 +365,7 @@ private void cleanupLedger(LedgerHandle lh) { //log & ignore, an IOException will be thrown soon LOG.error("Error closing ledger", bke); } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); LOG.warn("Interrupted while closing ledger", ie); } } @@ -425,6 +435,7 @@ public void finalizeLogSegment(long firstTxId, long lastTxId) } catch (KeeperException e) { throw new IOException("Error finalising ledger", e); } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); throw new IOException("Error finalising ledger", ie); } } @@ -454,6 +465,7 @@ EditLogInputStream getInputStream(long fromTxId, boolean inProgressOk) } catch (BKException e) { throw new IOException("Could not open ledger for " + fromTxId, e); } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); throw new IOException("Interrupted opening ledger for " + fromTxId, ie); } @@ -567,6 +579,7 @@ public void recoverUnfinalizedSegments() throws IOException { } catch (KeeperException ke) { throw new IOException("Couldn't get list of inprogress segments", ke); } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); throw new IOException("Interrupted getting list of inprogress segments", ie); } @@ -583,6 +596,7 @@ public void purgeLogsOlderThan(long minTxIdToKeep) zkc.delete(l.getZkPath(), stat.getVersion()); bkc.deleteLedger(l.getLedgerId()); } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); LOG.error("Interrupted while purging " + l, ie); } catch (BKException bke) { LOG.error("Couldn't delete ledger from bookkeeper", bke); @@ -601,6 +615,7 @@ public void close() throws IOException { } catch (BKException bke) { throw new IOException("Couldn't close bookkeeper client", bke); } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); throw new IOException("Interrupted while closing journal manager", ie); } } @@ -635,6 +650,7 @@ private long recoverLastTxId(EditLogLedgerMetadata l, boolean fence) } catch (BKException bke) { throw new IOException("Exception opening ledger for " + l, bke); } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); throw new IOException("Interrupted opening ledger for " + l, ie); } @@ -692,6 +708,7 @@ List getLedgerList(boolean inProgressOk) } catch (KeeperException e) { throw new IOException("Exception reading ledger list from zk", e); } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); throw new IOException("Interrupted getting list of ledgers from zk", ie); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java index 910d129bfe1..8477f7c4e56 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java @@ -29,6 +29,10 @@ import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; +import org.apache.hadoop.contrib.bkjournal.BKJournalProtos.CurrentInprogressProto; +import com.google.protobuf.TextFormat; +import static com.google.common.base.Charsets.UTF_8; + /** * Distributed write permission lock, using ZooKeeper. Read the version number * and return the current inprogress node path available in CurrentInprogress @@ -42,29 +46,28 @@ */ class CurrentInprogress { - private static final String CONTENT_DELIMITER = ","; - static final Log LOG = LogFactory.getLog(CurrentInprogress.class); private final ZooKeeper zkc; private final String currentInprogressNode; private volatile int versionNumberForPermission = -1; - private static final int CURRENT_INPROGRESS_LAYOUT_VERSION = -1; private final String hostName = InetAddress.getLocalHost().toString(); CurrentInprogress(ZooKeeper zkc, String lockpath) throws IOException { this.currentInprogressNode = lockpath; this.zkc = zkc; try { - Stat isCurrentInprogressNodeExists = zkc.exists(lockpath, false); + Stat isCurrentInprogressNodeExists = zkc.exists(currentInprogressNode, + false); if (isCurrentInprogressNodeExists == null) { try { - zkc.create(lockpath, null, Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); + zkc.create(currentInprogressNode, null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); } catch (NodeExistsException e) { // Node might created by other process at the same time. Ignore it. if (LOG.isDebugEnabled()) { - LOG.debug(lockpath + " already created by other process.", e); + LOG.debug(currentInprogressNode + " already created by other process.", + e); } } } @@ -83,10 +86,13 @@ class CurrentInprogress { * @throws IOException */ void update(String path) throws IOException { - String content = CURRENT_INPROGRESS_LAYOUT_VERSION - + CONTENT_DELIMITER + hostName + CONTENT_DELIMITER + path; + CurrentInprogressProto.Builder builder = CurrentInprogressProto.newBuilder(); + builder.setPath(path).setHostname(hostName); + + String content = TextFormat.printToString(builder.build()); + try { - zkc.setData(this.currentInprogressNode, content.getBytes(), + zkc.setData(this.currentInprogressNode, content.getBytes(UTF_8), this.versionNumberForPermission); } catch (KeeperException e) { throw new IOException("Exception when setting the data " @@ -123,23 +129,12 @@ String read() throws IOException { } this.versionNumberForPermission = stat.getVersion(); if (data != null) { - String stringData = new String(data); - LOG.info("Read data[layout version number,hostname,inprogressNode path]" - + "= [" + stringData + "] from CurrentInprogress"); - String[] contents = stringData.split(CONTENT_DELIMITER); - assert contents.length == 3 : "As per the current data format, " - + "CurrentInprogress node data should contain 3 fields. " - + "i.e layout version number,hostname,inprogressNode path"; - String layoutVersion = contents[0]; - if (Long.valueOf(layoutVersion) > CURRENT_INPROGRESS_LAYOUT_VERSION) { - throw new IOException( - "Supported layout version of CurrentInprogress node is : " - + CURRENT_INPROGRESS_LAYOUT_VERSION - + " . Layout version of CurrentInprogress node in ZK is : " - + layoutVersion); + CurrentInprogressProto.Builder builder = CurrentInprogressProto.newBuilder(); + TextFormat.merge(new String(data, UTF_8), builder); + if (!builder.isInitialized()) { + throw new IOException("Invalid/Incomplete data in znode"); } - String inprogressNodePath = contents[2]; - return inprogressNodePath; + return builder.build().getPath(); } else { LOG.info("No data available in CurrentInprogress"); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java index 6c75cd1369a..6aa87e7142b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java @@ -29,6 +29,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.contrib.bkjournal.BKJournalProtos.EditLogLedgerProto; +import com.google.protobuf.TextFormat; +import static com.google.common.base.Charsets.UTF_8; + /** * Utility class for storing the metadata associated * with a single edit log segment, stored in a single ledger @@ -37,8 +41,8 @@ public class EditLogLedgerMetadata { static final Log LOG = LogFactory.getLog(EditLogLedgerMetadata.class); private String zkPath; + private final int dataLayoutVersion; private final long ledgerId; - private final int version; private final long firstTxId; private long lastTxId; private boolean inprogress; @@ -57,21 +61,22 @@ public int compare(EditLogLedgerMetadata o1, } }; - EditLogLedgerMetadata(String zkPath, int version, + EditLogLedgerMetadata(String zkPath, int dataLayoutVersion, long ledgerId, long firstTxId) { this.zkPath = zkPath; + this.dataLayoutVersion = dataLayoutVersion; this.ledgerId = ledgerId; - this.version = version; this.firstTxId = firstTxId; this.lastTxId = HdfsConstants.INVALID_TXID; this.inprogress = true; } - EditLogLedgerMetadata(String zkPath, int version, long ledgerId, - long firstTxId, long lastTxId) { + EditLogLedgerMetadata(String zkPath, int dataLayoutVersion, + long ledgerId, long firstTxId, + long lastTxId) { this.zkPath = zkPath; + this.dataLayoutVersion = dataLayoutVersion; this.ledgerId = ledgerId; - this.version = version; this.firstTxId = firstTxId; this.lastTxId = lastTxId; this.inprogress = false; @@ -93,14 +98,14 @@ long getLedgerId() { return ledgerId; } - int getVersion() { - return version; - } - boolean isInProgress() { return this.inprogress; } + int getDataLayoutVersion() { + return this.dataLayoutVersion; + } + void finalizeLedger(long newLastTxId) { assert this.lastTxId == HdfsConstants.INVALID_TXID; this.lastTxId = newLastTxId; @@ -111,22 +116,27 @@ static EditLogLedgerMetadata read(ZooKeeper zkc, String path) throws IOException, KeeperException.NoNodeException { try { byte[] data = zkc.getData(path, false, null); - String[] parts = new String(data).split(";"); - if (parts.length == 3) { - int version = Integer.valueOf(parts[0]); - long ledgerId = Long.valueOf(parts[1]); - long txId = Long.valueOf(parts[2]); - return new EditLogLedgerMetadata(path, version, ledgerId, txId); - } else if (parts.length == 4) { - int version = Integer.valueOf(parts[0]); - long ledgerId = Long.valueOf(parts[1]); - long firstTxId = Long.valueOf(parts[2]); - long lastTxId = Long.valueOf(parts[3]); - return new EditLogLedgerMetadata(path, version, ledgerId, - firstTxId, lastTxId); + + EditLogLedgerProto.Builder builder = EditLogLedgerProto.newBuilder(); + if (LOG.isDebugEnabled()) { + LOG.debug("Reading " + path + " data: " + new String(data, UTF_8)); + } + TextFormat.merge(new String(data, UTF_8), builder); + if (!builder.isInitialized()) { + throw new IOException("Invalid/Incomplete data in znode"); + } + EditLogLedgerProto ledger = builder.build(); + + int dataLayoutVersion = ledger.getDataLayoutVersion(); + long ledgerId = ledger.getLedgerId(); + long firstTxId = ledger.getFirstTxId(); + if (ledger.hasLastTxId()) { + long lastTxId = ledger.getLastTxId(); + return new EditLogLedgerMetadata(path, dataLayoutVersion, + ledgerId, firstTxId, lastTxId); } else { - throw new IOException("Invalid ledger entry, " - + new String(data)); + return new EditLogLedgerMetadata(path, dataLayoutVersion, + ledgerId, firstTxId); } } catch(KeeperException.NoNodeException nne) { throw nne; @@ -140,17 +150,17 @@ static EditLogLedgerMetadata read(ZooKeeper zkc, String path) void write(ZooKeeper zkc, String path) throws IOException, KeeperException.NodeExistsException { this.zkPath = path; - String finalisedData; - if (inprogress) { - finalisedData = String.format("%d;%d;%d", - version, ledgerId, firstTxId); - } else { - finalisedData = String.format("%d;%d;%d;%d", - version, ledgerId, firstTxId, lastTxId); + + EditLogLedgerProto.Builder builder = EditLogLedgerProto.newBuilder(); + builder.setDataLayoutVersion(dataLayoutVersion) + .setLedgerId(ledgerId).setFirstTxId(firstTxId); + + if (!inprogress) { + builder.setLastTxId(lastTxId); } try { - zkc.create(path, finalisedData.getBytes(), Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); + zkc.create(path, TextFormat.printToString(builder.build()).getBytes(UTF_8), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch (KeeperException.NodeExistsException nee) { throw nee; } catch (KeeperException e) { @@ -183,9 +193,9 @@ public boolean equals(Object o) { } EditLogLedgerMetadata ol = (EditLogLedgerMetadata)o; return ledgerId == ol.ledgerId + && dataLayoutVersion == ol.dataLayoutVersion && firstTxId == ol.firstTxId - && lastTxId == ol.lastTxId - && version == ol.version; + && lastTxId == ol.lastTxId; } public int hashCode() { @@ -193,15 +203,15 @@ public int hashCode() { hash = hash * 31 + (int) ledgerId; hash = hash * 31 + (int) firstTxId; hash = hash * 31 + (int) lastTxId; - hash = hash * 31 + (int) version; + hash = hash * 31 + (int) dataLayoutVersion; return hash; } public String toString() { return "[LedgerId:"+ledgerId + ", firstTxId:" + firstTxId + - ", lastTxId:" + lastTxId + - ", version:" + version + "]"; + ", lastTxId:" + lastTxId + + ", dataLayoutVersion:" + dataLayoutVersion + "]"; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java index 0109c33386d..5a2eefa0a6f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java @@ -27,6 +27,10 @@ import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; +import org.apache.hadoop.contrib.bkjournal.BKJournalProtos.MaxTxIdProto; +import com.google.protobuf.TextFormat; +import static com.google.common.base.Charsets.UTF_8; + /** * Utility class for storing and reading * the max seen txid in zookeeper @@ -55,14 +59,16 @@ synchronized void store(long maxTxId) throws IOException { } synchronized void reset(long maxTxId) throws IOException { - String txidStr = Long.toString(maxTxId); try { + MaxTxIdProto.Builder builder = MaxTxIdProto.newBuilder().setTxId(maxTxId); + + byte[] data = TextFormat.printToString(builder.build()).getBytes(UTF_8); if (currentStat != null) { - currentStat = zkc.setData(path, txidStr.getBytes("UTF-8"), currentStat + currentStat = zkc.setData(path, data, currentStat .getVersion()); } else { - zkc.create(path, txidStr.getBytes("UTF-8"), Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); + zkc.create(path, data, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); } } catch (KeeperException e) { throw new IOException("Error writing max tx id", e); @@ -77,9 +83,16 @@ synchronized long get() throws IOException { if (currentStat == null) { return 0; } else { + byte[] bytes = zkc.getData(path, false, currentStat); - String txidString = new String(bytes, "UTF-8"); - return Long.valueOf(txidString); + + MaxTxIdProto.Builder builder = MaxTxIdProto.newBuilder(); + TextFormat.merge(new String(bytes, UTF_8), builder); + if (!builder.isInitialized()) { + throw new IOException("Invalid/Incomplete data in znode"); + } + + return builder.build().getTxId(); } } catch (KeeperException e) { throw new IOException("Error reading the max tx id from zk", e); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/proto/bkjournal.proto b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/proto/bkjournal.proto new file mode 100644 index 00000000000..b8df35b7add --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/proto/bkjournal.proto @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// This file contains protocol buffers that are used by bkjournal +// mostly for storing data in zookeeper + +option java_package = "org.apache.hadoop.contrib.bkjournal"; +option java_outer_classname = "BKJournalProtos"; +option java_generate_equals_and_hash = true; + +import "hdfs.proto"; + +message VersionProto { + required int32 layoutVersion = 1; + optional NamespaceInfoProto namespaceInfo = 2; +} + +message EditLogLedgerProto { + required int32 dataLayoutVersion = 1; + required int64 ledgerId = 2; + required int64 firstTxId = 3; + optional int64 lastTxId = 4; +} + +message MaxTxIdProto { + required int64 txId = 1; +} + +message CurrentInprogressProto { + required string path = 1; + optional string hostname = 2; +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/proto/bkjournal.proto.orig b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/proto/bkjournal.proto.orig new file mode 100644 index 00000000000..e69de29bb2d diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java index df788a27daf..4ea5074092c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java @@ -23,6 +23,7 @@ import java.net.URI; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.Random; import org.apache.bookkeeper.util.LocalBookKeeper; import org.apache.commons.logging.Log; @@ -42,6 +43,8 @@ import org.junit.BeforeClass; import org.junit.Test; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; + public class TestBookKeeperConfiguration { private static final Log LOG = LogFactory .getLog(TestBookKeeperConfiguration.class); @@ -73,6 +76,11 @@ public void process(WatchedEvent event) { return zkc; } + private NamespaceInfo newNSInfo() { + Random r = new Random(); + return new NamespaceInfo(r.nextInt(), "testCluster", "TestBPID", -1); + } + @BeforeClass public static void setupZooKeeper() throws Exception { // create a ZooKeeper server(dataDir, dataLogDir, port) @@ -137,8 +145,10 @@ public void testWithConfiguringBKAvailablePath() throws Exception { bkAvailablePath); Assert.assertNull(bkAvailablePath + " already exists", zkc.exists( bkAvailablePath, false)); - bkjm = new BookKeeperJournalManager(conf, URI.create("bookkeeper://" - + HOSTPORT + "/hdfsjournal-WithBKPath")); + NamespaceInfo nsi = newNSInfo(); + bkjm = new BookKeeperJournalManager(conf, + URI.create("bookkeeper://" + HOSTPORT + "/hdfsjournal-WithBKPath"), + nsi); Assert.assertNotNull("Bookie available path : " + bkAvailablePath + " doesn't exists", zkc.exists(bkAvailablePath, false)); } @@ -152,8 +162,10 @@ public void testDefaultBKAvailablePath() throws Exception { Configuration conf = new Configuration(); Assert.assertNull(BK_ROOT_PATH + " already exists", zkc.exists( BK_ROOT_PATH, false)); - new BookKeeperJournalManager(conf, URI.create("bookkeeper://" + HOSTPORT - + "/hdfsjournal-DefaultBKPath")); + NamespaceInfo nsi = newNSInfo(); + bkjm = new BookKeeperJournalManager(conf, + URI.create("bookkeeper://" + HOSTPORT + "/hdfsjournal-DefaultBKPath"), + nsi); Assert.assertNotNull("Bookie available path : " + BK_ROOT_PATH + " doesn't exists", zkc.exists(BK_ROOT_PATH, false)); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java index 9476dea81c8..954f2a54098 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.net.URI; import java.util.List; +import java.util.Random; import org.apache.hadoop.conf.Configuration; @@ -37,6 +38,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil; import org.apache.hadoop.hdfs.server.namenode.JournalManager; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.bookkeeper.proto.BookieServer; import org.apache.zookeeper.CreateMode; @@ -78,10 +80,17 @@ public void teardown() throws Exception { zkc.close(); } + private NamespaceInfo newNSInfo() { + Random r = new Random(); + return new NamespaceInfo(r.nextInt(), "testCluster", "TestBPID", -1); + } + @Test public void testSimpleWrite() throws Exception { + NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - BKJMUtil.createJournalURI("/hdfsjournal-simplewrite")); + BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"), nsi); + EditLogOutputStream out = bkjm.startLogSegment(1); for (long i = 1 ; i <= 100; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); @@ -99,8 +108,10 @@ public void testSimpleWrite() throws Exception { @Test public void testNumberOfTransactions() throws Exception { + NamespaceInfo nsi = newNSInfo(); + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - BKJMUtil.createJournalURI("/hdfsjournal-txncount")); + BKJMUtil.createJournalURI("/hdfsjournal-txncount"), nsi); EditLogOutputStream out = bkjm.startLogSegment(1); for (long i = 1 ; i <= 100; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); @@ -116,8 +127,10 @@ public void testNumberOfTransactions() throws Exception { @Test public void testNumberOfTransactionsWithGaps() throws Exception { + NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - BKJMUtil.createJournalURI("/hdfsjournal-gaps")); + BKJMUtil.createJournalURI("/hdfsjournal-gaps"), nsi); + long txid = 1; for (long i = 0; i < 3; i++) { long start = txid; @@ -151,8 +164,10 @@ public void testNumberOfTransactionsWithGaps() throws Exception { @Test public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception { + NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - BKJMUtil.createJournalURI("/hdfsjournal-inprogressAtEnd")); + BKJMUtil.createJournalURI("/hdfsjournal-inprogressAtEnd"), nsi); + long txid = 1; for (long i = 0; i < 3; i++) { long start = txid; @@ -190,8 +205,10 @@ public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception { */ @Test public void testWriteRestartFrom1() throws Exception { + NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - BKJMUtil.createJournalURI("/hdfsjournal-restartFrom1")); + BKJMUtil.createJournalURI("/hdfsjournal-restartFrom1"), nsi); + long txid = 1; long start = txid; EditLogOutputStream out = bkjm.startLogSegment(txid); @@ -245,11 +262,15 @@ public void testWriteRestartFrom1() throws Exception { @Test public void testTwoWriters() throws Exception { long start = 1; + NamespaceInfo nsi = newNSInfo(); + BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf, - BKJMUtil.createJournalURI("/hdfsjournal-dualWriter")); + BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi); + BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf, - BKJMUtil.createJournalURI("/hdfsjournal-dualWriter")); - + BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi); + + EditLogOutputStream out1 = bkjm1.startLogSegment(start); try { bkjm2.startLogSegment(start); @@ -263,8 +284,11 @@ public void testTwoWriters() throws Exception { @Test public void testSimpleRead() throws Exception { + NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - BKJMUtil.createJournalURI("/hdfsjournal-simpleread")); + BKJMUtil.createJournalURI("/hdfsjournal-simpleread"), + nsi); + final long numTransactions = 10000; EditLogOutputStream out = bkjm.startLogSegment(1); for (long i = 1 ; i <= numTransactions; i++) { @@ -287,8 +311,11 @@ public void testSimpleRead() throws Exception { @Test public void testSimpleRecovery() throws Exception { + NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - BKJMUtil.createJournalURI("/hdfsjournal-simplerecovery")); + BKJMUtil.createJournalURI("/hdfsjournal-simplerecovery"), + nsi); + EditLogOutputStream out = bkjm.startLogSegment(1); for (long i = 1 ; i <= 100; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); @@ -334,8 +361,10 @@ public void testAllBookieFailure() throws Exception { conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE, ensembleSize); long txid = 1; + NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - BKJMUtil.createJournalURI("/hdfsjournal-allbookiefailure")); + BKJMUtil.createJournalURI("/hdfsjournal-allbookiefailure"), + nsi); EditLogOutputStream out = bkjm.startLogSegment(txid); for (long i = 1 ; i <= 3; i++) { @@ -416,8 +445,12 @@ public void testOneBookieFailure() throws Exception { conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE, ensembleSize); long txid = 1; + + NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, - BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure")); + BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"), + nsi); + EditLogOutputStream out = bkjm.startLogSegment(txid); for (long i = 1 ; i <= 3; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); @@ -464,7 +497,9 @@ public void testOneBookieFailure() throws Exception { @Test public void testEmptyInprogressNode() throws Exception { URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogress"); - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri); + NamespaceInfo nsi = newNSInfo(); + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri, + nsi); EditLogOutputStream out = bkjm.startLogSegment(1); for (long i = 1; i <= 100; i++) { @@ -481,7 +516,7 @@ public void testEmptyInprogressNode() throws Exception { String inprogressZNode = bkjm.inprogressZNode(101); zkc.setData(inprogressZNode, new byte[0], -1); - bkjm = new BookKeeperJournalManager(conf, uri); + bkjm = new BookKeeperJournalManager(conf, uri, nsi); try { bkjm.recoverUnfinalizedSegments(); fail("Should have failed. There should be no way of creating" @@ -489,7 +524,7 @@ public void testEmptyInprogressNode() throws Exception { } catch (IOException e) { // correct behaviour assertTrue("Exception different than expected", e.getMessage().contains( - "Invalid ledger entry,")); + "Invalid/Incomplete data in znode")); } finally { bkjm.close(); } @@ -503,7 +538,9 @@ public void testEmptyInprogressNode() throws Exception { @Test public void testCorruptInprogressNode() throws Exception { URI uri = BKJMUtil.createJournalURI("/hdfsjournal-corruptInprogress"); - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri); + NamespaceInfo nsi = newNSInfo(); + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri, + nsi); EditLogOutputStream out = bkjm.startLogSegment(1); for (long i = 1; i <= 100; i++) { @@ -521,7 +558,7 @@ public void testCorruptInprogressNode() throws Exception { String inprogressZNode = bkjm.inprogressZNode(101); zkc.setData(inprogressZNode, "WholeLottaJunk".getBytes(), -1); - bkjm = new BookKeeperJournalManager(conf, uri); + bkjm = new BookKeeperJournalManager(conf, uri, nsi); try { bkjm.recoverUnfinalizedSegments(); fail("Should have failed. There should be no way of creating" @@ -529,8 +566,7 @@ public void testCorruptInprogressNode() throws Exception { } catch (IOException e) { // correct behaviour assertTrue("Exception different than expected", e.getMessage().contains( - "Invalid ledger entry,")); - + "has no field named")); } finally { bkjm.close(); } @@ -544,7 +580,9 @@ public void testCorruptInprogressNode() throws Exception { @Test public void testEmptyInprogressLedger() throws Exception { URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogressLedger"); - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri); + NamespaceInfo nsi = newNSInfo(); + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri, + nsi); EditLogOutputStream out = bkjm.startLogSegment(1); for (long i = 1; i <= 100; i++) { @@ -559,7 +597,7 @@ public void testEmptyInprogressLedger() throws Exception { out.close(); bkjm.close(); - bkjm = new BookKeeperJournalManager(conf, uri); + bkjm = new BookKeeperJournalManager(conf, uri, nsi); bkjm.recoverUnfinalizedSegments(); out = bkjm.startLogSegment(101); for (long i = 1; i <= 100; i++) { @@ -581,7 +619,9 @@ public void testEmptyInprogressLedger() throws Exception { public void testRefinalizeAlreadyFinalizedInprogress() throws Exception { URI uri = BKJMUtil .createJournalURI("/hdfsjournal-refinalizeInprogressLedger"); - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri); + NamespaceInfo nsi = newNSInfo(); + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri, + nsi); EditLogOutputStream out = bkjm.startLogSegment(1); for (long i = 1; i <= 100; i++) { @@ -601,7 +641,7 @@ public void testRefinalizeAlreadyFinalizedInprogress() throws Exception { byte[] inprogressData = zkc.getData(inprogressZNode, false, null); // finalize - bkjm = new BookKeeperJournalManager(conf, uri); + bkjm = new BookKeeperJournalManager(conf, uri, nsi); bkjm.recoverUnfinalizedSegments(); bkjm.close(); @@ -613,7 +653,7 @@ public void testRefinalizeAlreadyFinalizedInprogress() throws Exception { CreateMode.PERSISTENT); // should work fine - bkjm = new BookKeeperJournalManager(conf, uri); + bkjm = new BookKeeperJournalManager(conf, uri, nsi); bkjm.recoverUnfinalizedSegments(); bkjm.close(); } @@ -626,7 +666,10 @@ public void testRefinalizeAlreadyFinalizedInprogress() throws Exception { @Test public void testEditLogFileNotExistsWhenReadingMetadata() throws Exception { URI uri = BKJMUtil.createJournalURI("/hdfsjournal-editlogfile"); - BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri); + NamespaceInfo nsi = newNSInfo(); + BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri, + nsi); + try { // start new inprogress log segment with txid=1 // and write transactions till txid=50