From 4688044c404a40f3a30da5c61c02b6e1ee94aee5 Mon Sep 17 00:00:00 2001 From: Uma Maheswara Rao G Date: Tue, 29 May 2012 19:05:40 +0000 Subject: [PATCH] Merge r:1343913 HDFS-3452. BKJM:Switch from standby to active fails and NN gets shut down due to delay in clearing of lock. Contributed by Uma Maheswara Rao G. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1343929 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../BookKeeperEditLogOutputStream.java | 17 +- .../bkjournal/BookKeeperJournalManager.java | 30 +-- .../contrib/bkjournal/CurrentInprogress.java | 161 +++++++++++++++ .../hadoop/contrib/bkjournal/WriteLock.java | 186 ------------------ .../TestBookKeeperAsHASharedDir.java | 23 +-- .../TestBookKeeperJournalManager.java | 1 + .../bkjournal/TestCurrentInprogress.java | 157 +++++++++++++++ 8 files changed, 348 insertions(+), 230 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java delete mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/WriteLock.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestCurrentInprogress.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 7f54b11017d..b1740177a61 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -118,6 +118,9 @@ Release 2.0.1-alpha - UNRELEASED HDFS-3368. Missing blocks due to bad DataNodes coming up and down. (shv) + HDFS-3452. BKJM:Switch from standby to active fails and NN gets shut down + due to delay in clearing of lock. (umamahesh) + Release 2.0.0-alpha - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java index 6432c57c041..6267871aae0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java @@ -56,15 +56,13 @@ class BookKeeperEditLogOutputStream private CountDownLatch syncLatch; private final AtomicInteger transmitResult = new AtomicInteger(BKException.Code.OK); - private final WriteLock wl; private final Writer writer; /** * Construct an edit log output stream which writes to a ledger. */ - protected BookKeeperEditLogOutputStream(Configuration conf, - LedgerHandle lh, WriteLock wl) + protected BookKeeperEditLogOutputStream(Configuration conf, LedgerHandle lh) throws IOException { super(); @@ -72,8 +70,6 @@ class BookKeeperEditLogOutputStream outstandingRequests = new AtomicInteger(0); syncLatch = null; this.lh = lh; - this.wl = wl; - this.wl.acquire(); this.writer = new Writer(bufCurrent); this.transmissionThreshold = conf.getInt(BookKeeperJournalManager.BKJM_OUTPUT_BUFFER_SIZE, @@ -108,7 +104,6 @@ class BookKeeperEditLogOutputStream throw new IOException("BookKeeper error during abort", bke); } - wl.release(); } @Override @@ -118,8 +113,6 @@ class BookKeeperEditLogOutputStream @Override public void write(FSEditLogOp op) throws IOException { - wl.checkWriteLock(); - writer.writeOp(op); if (bufCurrent.getLength() > transmissionThreshold) { @@ -129,19 +122,15 @@ class BookKeeperEditLogOutputStream @Override public void setReadyToFlush() throws IOException { - wl.checkWriteLock(); - transmit(); - synchronized(this) { + synchronized (this) { syncLatch = new CountDownLatch(outstandingRequests.get()); } } @Override public void flushAndSync() throws IOException { - wl.checkWriteLock(); - assert(syncLatch != null); try { syncLatch.await(); @@ -164,8 +153,6 @@ class BookKeeperEditLogOutputStream * are never called at the same time. */ private void transmit() throws IOException { - wl.checkWriteLock(); - if (!transmitResult.compareAndSet(BKException.Code.OK, BKException.Code.OK)) { throw new IOException("Trying to write to an errored stream;" diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java index ce090799b93..f47e9f3681e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java @@ -117,7 +117,7 @@ public class BookKeeperJournalManager implements JournalManager { private final ZooKeeper zkc; private final Configuration conf; private final BookKeeper bkc; - private final WriteLock wl; + private final CurrentInprogress ci; private final String ledgerPath; private final MaxTxId maxTxId; private final int ensembleSize; @@ -155,7 +155,7 @@ public class BookKeeperJournalManager implements JournalManager { ledgerPath = zkPath + "/ledgers"; String maxTxIdPath = zkPath + "/maxtxid"; - String lockPath = zkPath + "/lock"; + String currentInprogressNodePath = zkPath + "/CurrentInprogress"; String versionPath = zkPath + "/version"; digestpw = conf.get(BKJM_BOOKKEEPER_DIGEST_PW, BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT); @@ -192,7 +192,7 @@ public class BookKeeperJournalManager implements JournalManager { throw new IOException("Error initializing zk", e); } - wl = new WriteLock(zkc, lockPath); + ci = new CurrentInprogress(zkc, currentInprogressNodePath); maxTxId = new MaxTxId(zkc, maxTxIdPath); } @@ -207,13 +207,16 @@ public class BookKeeperJournalManager implements JournalManager { */ @Override public EditLogOutputStream startLogSegment(long txId) throws IOException { - wl.acquire(); - if (txId <= maxTxId.get()) { throw new IOException("We've already seen " + txId + ". A new stream cannot be created with it"); } try { + String existingInprogressNode = ci.read(); + if (null != existingInprogressNode + && zkc.exists(existingInprogressNode, false) != null) { + throw new IOException("Inprogress node already exists"); + } if (currentLedger != null) { // bookkeeper errored on last stream, clean up ledger currentLedger.close(); @@ -234,7 +237,8 @@ public class BookKeeperJournalManager implements JournalManager { l.write(zkc, znodePath); maxTxId.store(txId); - return new BookKeeperEditLogOutputStream(conf, currentLedger, wl); + ci.update(znodePath); + return new BookKeeperEditLogOutputStream(conf, currentLedger); } catch (Exception e) { if (currentLedger != null) { try { @@ -270,7 +274,6 @@ public class BookKeeperJournalManager implements JournalManager { + " doesn't exist"); } - wl.checkWriteLock(); EditLogLedgerMetadata l = EditLogLedgerMetadata.read(zkc, inprogressPath); @@ -307,13 +310,15 @@ public class BookKeeperJournalManager implements JournalManager { } maxTxId.store(lastTxId); zkc.delete(inprogressPath, inprogressStat.getVersion()); + String inprogressPathFromCI = ci.read(); + if (inprogressPath.equals(inprogressPathFromCI)) { + ci.clear(); + } } catch (KeeperException e) { throw new IOException("Error finalising ledger", e); } catch (InterruptedException ie) { throw new IOException("Error finalising ledger", ie); - } finally { - wl.release(); - } + } } EditLogInputStream getInputStream(long fromTxId, boolean inProgressOk) @@ -417,7 +422,6 @@ public class BookKeeperJournalManager implements JournalManager { @Override public void recoverUnfinalizedSegments() throws IOException { - wl.acquire(); synchronized (this) { try { List children = zkc.getChildren(ledgerPath, false); @@ -445,10 +449,6 @@ public class BookKeeperJournalManager implements JournalManager { } catch (InterruptedException ie) { throw new IOException("Interrupted getting list of inprogress segments", ie); - } finally { - if (wl.haveLock()) { - wl.release(); - } } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java new file mode 100644 index 00000000000..995f32b34fe --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.contrib.bkjournal; + +import java.io.IOException; +import java.net.InetAddress; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.KeeperException.NodeExistsException; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.data.Stat; + +/** + * Distributed write permission lock, using ZooKeeper. Read the version number + * and return the current inprogress node path available in CurrentInprogress + * path. If it exist, caller can treat that some other client already operating + * on it. Then caller can take action. If there is no inprogress node exist, + * then caller can treat that there is no client operating on it. Later same + * caller should update the his newly created inprogress node path. At this + * point, if some other activities done on this node, version number might + * change, so update will fail. So, this read, update api will ensure that there + * is only node can continue further after checking with CurrentInprogress. + */ + +class CurrentInprogress { + private static final String CONTENT_DELIMITER = ","; + + static final Log LOG = LogFactory.getLog(CurrentInprogress.class); + + private final ZooKeeper zkc; + private final String currentInprogressNode; + private volatile int versionNumberForPermission = -1; + private static final int CURRENT_INPROGRESS_LAYOUT_VERSION = -1; + private final String hostName = InetAddress.getLocalHost().toString(); + + CurrentInprogress(ZooKeeper zkc, String lockpath) throws IOException { + this.currentInprogressNode = lockpath; + this.zkc = zkc; + try { + Stat isCurrentInprogressNodeExists = zkc.exists(lockpath, false); + if (isCurrentInprogressNodeExists == null) { + try { + zkc.create(lockpath, null, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } catch (NodeExistsException e) { + // Node might created by other process at the same time. Ignore it. + if (LOG.isDebugEnabled()) { + LOG.debug(lockpath + " already created by other process.", e); + } + } + } + } catch (Exception e) { + throw new IOException("Exception accessing Zookeeper", e); + } + } + + /** + * Update the path with prepending version number and hostname + * + * @param path + * - to be updated in zookeeper + * @throws IOException + */ + void update(String path) throws IOException { + String content = CURRENT_INPROGRESS_LAYOUT_VERSION + + CONTENT_DELIMITER + hostName + CONTENT_DELIMITER + path; + try { + zkc.setData(this.currentInprogressNode, content.getBytes(), + this.versionNumberForPermission); + } catch (KeeperException e) { + throw new IOException("Exception when setting the data " + + "[layout version number,hostname,inprogressNode path]= [" + content + + "] to CurrentInprogress. ", e); + } catch (InterruptedException e) { + throw new IOException("Interrupted while setting the data " + + "[layout version number,hostname,inprogressNode path]= [" + content + + "] to CurrentInprogress", e); + } + LOG.info("Updated data[layout version number,hostname,inprogressNode path]" + + "= [" + content + "] to CurrentInprogress"); + } + + /** + * Read the CurrentInprogress node data from Zookeeper and also get the znode + * version number. Return the 3rd field from the data. i.e saved path with + * #update api + * + * @return available inprogress node path. returns null if not available. + * @throws IOException + */ + String read() throws IOException { + Stat stat = new Stat(); + byte[] data = null; + try { + data = zkc.getData(this.currentInprogressNode, false, stat); + } catch (KeeperException e) { + throw new IOException("Exception while reading the data from " + + currentInprogressNode, e); + } catch (InterruptedException e) { + throw new IOException("Interrupted while reading data from " + + currentInprogressNode, e); + } + this.versionNumberForPermission = stat.getVersion(); + if (data != null) { + String stringData = new String(data); + LOG.info("Read data[layout version number,hostname,inprogressNode path]" + + "= [" + stringData + "] from CurrentInprogress"); + String[] contents = stringData.split(CONTENT_DELIMITER); + assert contents.length == 3 : "As per the current data format, " + + "CurrentInprogress node data should contain 3 fields. " + + "i.e layout version number,hostname,inprogressNode path"; + String layoutVersion = contents[0]; + if (Long.valueOf(layoutVersion) > CURRENT_INPROGRESS_LAYOUT_VERSION) { + throw new IOException( + "Supported layout version of CurrentInprogress node is : " + + CURRENT_INPROGRESS_LAYOUT_VERSION + + " . Layout version of CurrentInprogress node in ZK is : " + + layoutVersion); + } + String inprogressNodePath = contents[2]; + return inprogressNodePath; + } else { + LOG.info("No data available in CurrentInprogress"); + } + return null; + } + + /** Clear the CurrentInprogress node data */ + void clear() throws IOException { + try { + zkc.setData(this.currentInprogressNode, null, versionNumberForPermission); + } catch (KeeperException e) { + throw new IOException( + "Exception when setting the data to CurrentInprogress node", e); + } catch (InterruptedException e) { + throw new IOException( + "Interrupted when setting the data to CurrentInprogress node", e); + } + LOG.info("Cleared the data from CurrentInprogress"); + } + +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/WriteLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/WriteLock.java deleted file mode 100644 index 67743b2228c..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/WriteLock.java +++ /dev/null @@ -1,186 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.contrib.bkjournal; - -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.Watcher.Event.KeeperState; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.ZooDefs.Ids; - -import java.util.concurrent.atomic.AtomicInteger; -import java.util.List; -import java.util.Collections; -import java.util.Comparator; - -import java.net.InetAddress; -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -/** - * Distributed lock, using ZooKeeper. - * - * The lock is vulnerable to timing issues. For example, the process could - * encounter a really long GC cycle between acquiring the lock, and writing to - * a ledger. This could have timed out the lock, and another process could have - * acquired the lock and started writing to bookkeeper. Therefore other - * mechanisms are required to ensure correctness (i.e. Fencing). - */ -class WriteLock implements Watcher { - static final Log LOG = LogFactory.getLog(WriteLock.class); - - private final ZooKeeper zkc; - private final String lockpath; - - private AtomicInteger lockCount = new AtomicInteger(0); - private String myznode = null; - - WriteLock(ZooKeeper zkc, String lockpath) throws IOException { - this.lockpath = lockpath; - - this.zkc = zkc; - try { - if (zkc.exists(lockpath, false) == null) { - String localString = InetAddress.getLocalHost().toString(); - zkc.create(lockpath, localString.getBytes(), - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } - } catch (Exception e) { - throw new IOException("Exception accessing Zookeeper", e); - } - } - - void acquire() throws IOException { - while (true) { - if (lockCount.get() == 0) { - try { - synchronized(this) { - if (lockCount.get() > 0) { - lockCount.incrementAndGet(); - return; - } - myznode = zkc.create(lockpath + "/lock-", new byte[] {'0'}, - Ids.OPEN_ACL_UNSAFE, - CreateMode.EPHEMERAL_SEQUENTIAL); - if (LOG.isTraceEnabled()) { - LOG.trace("Acquiring lock, trying " + myznode); - } - - List nodes = zkc.getChildren(lockpath, false); - Collections.sort(nodes, new Comparator() { - public int compare(String o1, - String o2) { - Integer l1 = Integer.valueOf(o1.replace("lock-", "")); - Integer l2 = Integer.valueOf(o2.replace("lock-", "")); - return l1 - l2; - } - }); - if ((lockpath + "/" + nodes.get(0)).equals(myznode)) { - if (LOG.isTraceEnabled()) { - LOG.trace("Lock acquired - " + myznode); - } - lockCount.set(1); - zkc.exists(myznode, this); - return; - } else { - LOG.error("Failed to acquire lock with " + myznode - + ", " + nodes.get(0) + " already has it"); - throw new IOException("Could not acquire lock"); - } - } - } catch (KeeperException e) { - throw new IOException("Exception accessing Zookeeper", e); - } catch (InterruptedException ie) { - throw new IOException("Exception accessing Zookeeper", ie); - } - } else { - int ret = lockCount.getAndIncrement(); - if (ret == 0) { - lockCount.decrementAndGet(); - continue; // try again; - } else { - return; - } - } - } - } - - void release() throws IOException { - try { - if (lockCount.decrementAndGet() <= 0) { - if (lockCount.get() < 0) { - LOG.warn("Unbalanced lock handling somewhere, lockCount down to " - + lockCount.get()); - } - synchronized(this) { - if (lockCount.get() <= 0) { - if (LOG.isTraceEnabled()) { - LOG.trace("releasing lock " + myznode); - } - if (myznode != null) { - zkc.delete(myznode, -1); - myznode = null; - } - } - } - } - } catch (Exception e) { - throw new IOException("Exception accessing Zookeeper", e); - } - } - - public void checkWriteLock() throws IOException { - if (!haveLock()) { - throw new IOException("Lost writer lock"); - } - } - - boolean haveLock() throws IOException { - return lockCount.get() > 0; - } - - public void process(WatchedEvent event) { - if (event.getState() == KeeperState.Disconnected - || event.getState() == KeeperState.Expired) { - LOG.warn("Lost zookeeper session, lost lock "); - lockCount.set(0); - } else { - // reapply the watch - synchronized (this) { - LOG.info("Zookeeper event " + event - + " received, reapplying watch to " + myznode); - if (myznode != null) { - try { - zkc.exists(myznode, this); - } catch (Exception e) { - LOG.warn("Could not set watch on lock, releasing", e); - try { - release(); - } catch (IOException ioe) { - LOG.error("Could not release Zk lock", ioe); - } - } - } - } - } - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java index c313cd12296..7f97a6da646 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java @@ -215,8 +215,7 @@ public class TestBookKeeperAsHASharedDir { } /** - * Test that two namenodes can't become primary at the same - * time. + * Test that two namenodes can't continue as primary */ @Test public void testMultiplePrimariesStarted() throws Exception { @@ -247,21 +246,17 @@ public class TestBookKeeperAsHASharedDir { FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); fs.mkdirs(p1); nn1.getRpcServer().rollEditLog(); - try { - cluster.transitionToActive(1); - fail("Shouldn't have been able to start two primaries" - + " with single shared storage"); - } catch (ServiceFailedException sfe) { - assertTrue("Wrong exception", - sfe.getMessage().contains("Failed to start active services")); - } + cluster.transitionToActive(1); + fs = cluster.getFileSystem(0); // get the older active server. + // This edit log updation on older active should make older active + // shutdown. + fs.delete(p1, true); + verify(mockRuntime1, atLeastOnce()).exit(anyInt()); + verify(mockRuntime2, times(0)).exit(anyInt()); } finally { - verify(mockRuntime1, times(0)).exit(anyInt()); - verify(mockRuntime2, atLeastOnce()).exit(anyInt()); - if (cluster != null) { cluster.shutdown(); } } } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java index ddd2c468be5..ab96275414b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java @@ -361,6 +361,7 @@ public class TestBookKeeperJournalManager { assertEquals("New bookie didn't start", numBookies+1, bkutil.checkBookiesUp(numBookies+1, 10)); + bkjm.recoverUnfinalizedSegments(); out = bkjm.startLogSegment(txid); for (long i = 1 ; i <= 3; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestCurrentInprogress.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestCurrentInprogress.java new file mode 100644 index 00000000000..00497b7798f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestCurrentInprogress.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.contrib.bkjournal; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.bookkeeper.util.LocalBookKeeper; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Tests that read, update, clear api from CurrentInprogress + */ +public class TestCurrentInprogress { + private static final Log LOG = LogFactory.getLog(TestCurrentInprogress.class); + private static final String CURRENT_NODE_PATH = "/test"; + private static final String HOSTPORT = "127.0.0.1:2181"; + private static final int CONNECTION_TIMEOUT = 30000; + private static NIOServerCnxnFactory serverFactory; + private static ZooKeeperServer zks; + private static ZooKeeper zkc; + private static int ZooKeeperDefaultPort = 2181; + private static File zkTmpDir; + + private static ZooKeeper connectZooKeeper(String ensemble) + throws IOException, KeeperException, InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + + ZooKeeper zkc = new ZooKeeper(HOSTPORT, 3600, new Watcher() { + public void process(WatchedEvent event) { + if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { + latch.countDown(); + } + } + }); + if (!latch.await(10, TimeUnit.SECONDS)) { + throw new IOException("Zookeeper took too long to connect"); + } + return zkc; + } + + @BeforeClass + public static void setupZooKeeper() throws Exception { + LOG.info("Starting ZK server"); + zkTmpDir = File.createTempFile("zookeeper", "test"); + zkTmpDir.delete(); + zkTmpDir.mkdir(); + try { + zks = new ZooKeeperServer(zkTmpDir, zkTmpDir, ZooKeeperDefaultPort); + serverFactory = new NIOServerCnxnFactory(); + serverFactory.configure(new InetSocketAddress(ZooKeeperDefaultPort), 10); + serverFactory.startup(zks); + } catch (Exception e) { + LOG.error("Exception while instantiating ZooKeeper", e); + } + boolean b = LocalBookKeeper.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT); + LOG.debug("ZooKeeper server up: " + b); + } + + @AfterClass + public static void shutDownServer() { + if (null != zks) { + zks.shutdown(); + } + zkTmpDir.delete(); + } + + @Before + public void setup() throws Exception { + zkc = connectZooKeeper(HOSTPORT); + } + + @After + public void teardown() throws Exception { + if (null != zkc) { + zkc.close(); + } + + } + + /** + * Tests that read should be able to read the data which updated with update + * api + */ + @Test + public void testReadShouldReturnTheZnodePathAfterUpdate() throws Exception { + String data = "inprogressNode"; + CurrentInprogress ci = new CurrentInprogress(zkc, CURRENT_NODE_PATH); + ci.update(data); + String inprogressNodePath = ci.read(); + assertEquals("Not returning inprogressZnode", "inprogressNode", + inprogressNodePath); + } + + /** + * Tests that read should return null if we clear the updated data in + * CurrentInprogress node + */ + @Test + public void testReadShouldReturnNullAfterClear() throws Exception { + CurrentInprogress ci = new CurrentInprogress(zkc, CURRENT_NODE_PATH); + ci.update("myInprogressZnode"); + ci.read(); + ci.clear(); + String inprogressNodePath = ci.read(); + assertEquals("Expecting null to be return", null, inprogressNodePath); + } + + /** + * Tests that update should throw IOE, if version number modifies between read + * and update + */ + @Test(expected = IOException.class) + public void testUpdateShouldFailWithIOEIfVersionNumberChangedAfterRead() + throws Exception { + CurrentInprogress ci = new CurrentInprogress(zkc, CURRENT_NODE_PATH); + ci.update("myInprogressZnode"); + assertEquals("Not returning myInprogressZnode", "myInprogressZnode", ci + .read()); + // Updating data in-between to change the data to change the version number + ci.update("YourInprogressZnode"); + ci.update("myInprogressZnode"); + } + +} \ No newline at end of file