diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 74daf5f7ddf..95a876583c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -200,6 +200,8 @@ Release 2.0.1-alpha - UNRELEASED HDFS-3522. If a namenode is in safemode, it should throw SafeModeException when getBlockLocations has zero locations. (Brandon Li via szetszwo) + HDFS-3408. BKJM : Namenode format fails, if there is no BK root. (Rakesh R via umamahesh) + BREAKDOWN OF HDFS-3042 SUBTASKS HDFS-2185. HDFS portion of ZK-based FailoverController (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java index 5317a0f66cc..d6f19635f5b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java @@ -28,6 +28,7 @@ import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.util.ZkUtils; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.ZooKeeper; @@ -36,6 +37,7 @@ import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.AsyncCallback.StringCallback; import java.util.Collection; import java.util.Collections; @@ -124,6 +126,12 @@ public class BookKeeperJournalManager implements JournalManager { private static final String BKJM_EDIT_INPROGRESS = "inprogress_"; + public static final String BKJM_ZK_LEDGERS_AVAILABLE_PATH + = "dfs.namenode.bookkeeperjournal.zk.availablebookies"; + + public static final String BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT + = "/ledgers/available"; + private ZooKeeper zkc; private final Configuration conf; private final BookKeeper bkc; @@ -196,7 +204,7 @@ public class BookKeeperJournalManager implements JournalManager { zkc.create(ledgerPath, new byte[] {'0'}, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } - + prepareBookKeeperEnv(); bkc = new BookKeeper(new ClientConfiguration(), zkc); } catch (KeeperException e) { @@ -210,6 +218,50 @@ public class BookKeeperJournalManager implements JournalManager { maxTxId = new MaxTxId(zkc, maxTxIdPath); } + /** + * Pre-creating bookkeeper metadata path in zookeeper. + */ + private void prepareBookKeeperEnv() throws IOException { + // create bookie available path in zookeeper if it doesn't exists + final String zkAvailablePath = conf.get(BKJM_ZK_LEDGERS_AVAILABLE_PATH, + BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT); + final CountDownLatch zkPathLatch = new CountDownLatch(1); + + StringCallback callback = new StringCallback() { + @Override + public void processResult(int rc, String path, Object ctx, String name) { + if (KeeperException.Code.OK.intValue() == rc + || KeeperException.Code.NODEEXISTS.intValue() == rc) { + LOG.info("Successfully created bookie available path : " + + zkAvailablePath); + zkPathLatch.countDown(); + } else { + KeeperException.Code code = KeeperException.Code.get(rc); + LOG + .error("Error : " + + KeeperException.create(code, path).getMessage() + + ", failed to create bookie available path : " + + zkAvailablePath); + } + } + }; + ZkUtils.createFullPathOptimistic(zkc, zkAvailablePath, new byte[0], + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, callback, null); + + try { + if (!zkPathLatch.await(zkc.getSessionTimeout(), TimeUnit.MILLISECONDS)) { + throw new IOException("Couldn't create bookie available path :" + + zkAvailablePath + ", timed out " + zkc.getSessionTimeout() + + " millis"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException( + "Interrupted when creating the bookie available path : " + + zkAvailablePath, e); + } + } + /** * Start a new log segment in a BookKeeper ledger. * First ensure that we have the write lock for this journal. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java new file mode 100644 index 00000000000..df788a27daf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.contrib.bkjournal; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.bookkeeper.util.LocalBookKeeper; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZKUtil; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestBookKeeperConfiguration { + private static final Log LOG = LogFactory + .getLog(TestBookKeeperConfiguration.class); + private static final int ZK_SESSION_TIMEOUT = 5000; + private static final String HOSTPORT = "127.0.0.1:2181"; + private static final int CONNECTION_TIMEOUT = 30000; + private static NIOServerCnxnFactory serverFactory; + private static ZooKeeperServer zks; + private static ZooKeeper zkc; + private static int ZooKeeperDefaultPort = 2181; + private static File ZkTmpDir; + private BookKeeperJournalManager bkjm; + private static final String BK_ROOT_PATH = "/ledgers"; + + private static ZooKeeper connectZooKeeper(String ensemble) + throws IOException, KeeperException, InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + + ZooKeeper zkc = new ZooKeeper(HOSTPORT, ZK_SESSION_TIMEOUT, new Watcher() { + public void process(WatchedEvent event) { + if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { + latch.countDown(); + } + } + }); + if (!latch.await(ZK_SESSION_TIMEOUT, TimeUnit.MILLISECONDS)) { + throw new IOException("Zookeeper took too long to connect"); + } + return zkc; + } + + @BeforeClass + public static void setupZooKeeper() throws Exception { + // create a ZooKeeper server(dataDir, dataLogDir, port) + LOG.info("Starting ZK server"); + ZkTmpDir = File.createTempFile("zookeeper", "test"); + ZkTmpDir.delete(); + ZkTmpDir.mkdir(); + + try { + zks = new ZooKeeperServer(ZkTmpDir, ZkTmpDir, ZooKeeperDefaultPort); + serverFactory = new NIOServerCnxnFactory(); + serverFactory.configure(new InetSocketAddress(ZooKeeperDefaultPort), 10); + serverFactory.startup(zks); + } catch (Exception e) { + LOG.error("Exception while instantiating ZooKeeper", e); + } + + boolean b = LocalBookKeeper.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT); + LOG.debug("ZooKeeper server up: " + b); + } + + @Before + public void setup() throws Exception { + zkc = connectZooKeeper(HOSTPORT); + try { + ZKUtil.deleteRecursive(zkc, BK_ROOT_PATH); + } catch (KeeperException.NoNodeException e) { + LOG.debug("Ignoring no node exception on cleanup", e); + } catch (Exception e) { + LOG.error("Exception when deleting bookie root path in zk", e); + } + } + + @After + public void teardown() throws Exception { + if (null != zkc) { + zkc.close(); + } + if (null != bkjm) { + bkjm.close(); + } + } + + @AfterClass + public static void teardownZooKeeper() throws Exception { + if (null != zkc) { + zkc.close(); + } + } + + /** + * Verify the BKJM is creating the bookie available path configured in + * 'dfs.namenode.bookkeeperjournal.zk.availablebookies' + */ + @Test + public void testWithConfiguringBKAvailablePath() throws Exception { + // set Bookie available path in the configuration + String bkAvailablePath + = BookKeeperJournalManager.BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT; + Configuration conf = new Configuration(); + conf.setStrings(BookKeeperJournalManager.BKJM_ZK_LEDGERS_AVAILABLE_PATH, + bkAvailablePath); + Assert.assertNull(bkAvailablePath + " already exists", zkc.exists( + bkAvailablePath, false)); + bkjm = new BookKeeperJournalManager(conf, URI.create("bookkeeper://" + + HOSTPORT + "/hdfsjournal-WithBKPath")); + Assert.assertNotNull("Bookie available path : " + bkAvailablePath + + " doesn't exists", zkc.exists(bkAvailablePath, false)); + } + + /** + * Verify the BKJM is creating the bookie available default path, when there + * is no 'dfs.namenode.bookkeeperjournal.zk.availablebookies' configured + */ + @Test + public void testDefaultBKAvailablePath() throws Exception { + Configuration conf = new Configuration(); + Assert.assertNull(BK_ROOT_PATH + " already exists", zkc.exists( + BK_ROOT_PATH, false)); + new BookKeeperJournalManager(conf, URI.create("bookkeeper://" + HOSTPORT + + "/hdfsjournal-DefaultBKPath")); + Assert.assertNotNull("Bookie available path : " + BK_ROOT_PATH + + " doesn't exists", zkc.exists(BK_ROOT_PATH, false)); + } +}