Merge r:1343913 HDFS-3452. BKJM:Switch from standby to active fails and NN gets shut down due to delay in clearing of lock. Contributed by Uma Maheswara Rao G.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1343929 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4200fa906e
commit
4688044c40
@ -118,6 +118,9 @@ Release 2.0.1-alpha - UNRELEASED
|
|||||||
|
|
||||||
HDFS-3368. Missing blocks due to bad DataNodes coming up and down. (shv)
|
HDFS-3368. Missing blocks due to bad DataNodes coming up and down. (shv)
|
||||||
|
|
||||||
|
HDFS-3452. BKJM:Switch from standby to active fails and NN gets shut down
|
||||||
|
due to delay in clearing of lock. (umamahesh)
|
||||||
|
|
||||||
Release 2.0.0-alpha - UNRELEASED
|
Release 2.0.0-alpha - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -56,15 +56,13 @@ class BookKeeperEditLogOutputStream
|
|||||||
private CountDownLatch syncLatch;
|
private CountDownLatch syncLatch;
|
||||||
private final AtomicInteger transmitResult
|
private final AtomicInteger transmitResult
|
||||||
= new AtomicInteger(BKException.Code.OK);
|
= new AtomicInteger(BKException.Code.OK);
|
||||||
private final WriteLock wl;
|
|
||||||
private final Writer writer;
|
private final Writer writer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct an edit log output stream which writes to a ledger.
|
* Construct an edit log output stream which writes to a ledger.
|
||||||
|
|
||||||
*/
|
*/
|
||||||
protected BookKeeperEditLogOutputStream(Configuration conf,
|
protected BookKeeperEditLogOutputStream(Configuration conf, LedgerHandle lh)
|
||||||
LedgerHandle lh, WriteLock wl)
|
|
||||||
throws IOException {
|
throws IOException {
|
||||||
super();
|
super();
|
||||||
|
|
||||||
@ -72,8 +70,6 @@ protected BookKeeperEditLogOutputStream(Configuration conf,
|
|||||||
outstandingRequests = new AtomicInteger(0);
|
outstandingRequests = new AtomicInteger(0);
|
||||||
syncLatch = null;
|
syncLatch = null;
|
||||||
this.lh = lh;
|
this.lh = lh;
|
||||||
this.wl = wl;
|
|
||||||
this.wl.acquire();
|
|
||||||
this.writer = new Writer(bufCurrent);
|
this.writer = new Writer(bufCurrent);
|
||||||
this.transmissionThreshold
|
this.transmissionThreshold
|
||||||
= conf.getInt(BookKeeperJournalManager.BKJM_OUTPUT_BUFFER_SIZE,
|
= conf.getInt(BookKeeperJournalManager.BKJM_OUTPUT_BUFFER_SIZE,
|
||||||
@ -108,7 +104,6 @@ public void abort() throws IOException {
|
|||||||
throw new IOException("BookKeeper error during abort", bke);
|
throw new IOException("BookKeeper error during abort", bke);
|
||||||
}
|
}
|
||||||
|
|
||||||
wl.release();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -118,8 +113,6 @@ public void writeRaw(final byte[] data, int off, int len) throws IOException {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void write(FSEditLogOp op) throws IOException {
|
public void write(FSEditLogOp op) throws IOException {
|
||||||
wl.checkWriteLock();
|
|
||||||
|
|
||||||
writer.writeOp(op);
|
writer.writeOp(op);
|
||||||
|
|
||||||
if (bufCurrent.getLength() > transmissionThreshold) {
|
if (bufCurrent.getLength() > transmissionThreshold) {
|
||||||
@ -129,19 +122,15 @@ public void write(FSEditLogOp op) throws IOException {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setReadyToFlush() throws IOException {
|
public void setReadyToFlush() throws IOException {
|
||||||
wl.checkWriteLock();
|
|
||||||
|
|
||||||
transmit();
|
transmit();
|
||||||
|
|
||||||
synchronized(this) {
|
synchronized (this) {
|
||||||
syncLatch = new CountDownLatch(outstandingRequests.get());
|
syncLatch = new CountDownLatch(outstandingRequests.get());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void flushAndSync() throws IOException {
|
public void flushAndSync() throws IOException {
|
||||||
wl.checkWriteLock();
|
|
||||||
|
|
||||||
assert(syncLatch != null);
|
assert(syncLatch != null);
|
||||||
try {
|
try {
|
||||||
syncLatch.await();
|
syncLatch.await();
|
||||||
@ -164,8 +153,6 @@ public void flushAndSync() throws IOException {
|
|||||||
* are never called at the same time.
|
* are never called at the same time.
|
||||||
*/
|
*/
|
||||||
private void transmit() throws IOException {
|
private void transmit() throws IOException {
|
||||||
wl.checkWriteLock();
|
|
||||||
|
|
||||||
if (!transmitResult.compareAndSet(BKException.Code.OK,
|
if (!transmitResult.compareAndSet(BKException.Code.OK,
|
||||||
BKException.Code.OK)) {
|
BKException.Code.OK)) {
|
||||||
throw new IOException("Trying to write to an errored stream;"
|
throw new IOException("Trying to write to an errored stream;"
|
||||||
|
@ -117,7 +117,7 @@ public class BookKeeperJournalManager implements JournalManager {
|
|||||||
private final ZooKeeper zkc;
|
private final ZooKeeper zkc;
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
private final BookKeeper bkc;
|
private final BookKeeper bkc;
|
||||||
private final WriteLock wl;
|
private final CurrentInprogress ci;
|
||||||
private final String ledgerPath;
|
private final String ledgerPath;
|
||||||
private final MaxTxId maxTxId;
|
private final MaxTxId maxTxId;
|
||||||
private final int ensembleSize;
|
private final int ensembleSize;
|
||||||
@ -155,7 +155,7 @@ public BookKeeperJournalManager(Configuration conf, URI uri)
|
|||||||
|
|
||||||
ledgerPath = zkPath + "/ledgers";
|
ledgerPath = zkPath + "/ledgers";
|
||||||
String maxTxIdPath = zkPath + "/maxtxid";
|
String maxTxIdPath = zkPath + "/maxtxid";
|
||||||
String lockPath = zkPath + "/lock";
|
String currentInprogressNodePath = zkPath + "/CurrentInprogress";
|
||||||
String versionPath = zkPath + "/version";
|
String versionPath = zkPath + "/version";
|
||||||
digestpw = conf.get(BKJM_BOOKKEEPER_DIGEST_PW,
|
digestpw = conf.get(BKJM_BOOKKEEPER_DIGEST_PW,
|
||||||
BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT);
|
BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT);
|
||||||
@ -192,7 +192,7 @@ public BookKeeperJournalManager(Configuration conf, URI uri)
|
|||||||
throw new IOException("Error initializing zk", e);
|
throw new IOException("Error initializing zk", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
wl = new WriteLock(zkc, lockPath);
|
ci = new CurrentInprogress(zkc, currentInprogressNodePath);
|
||||||
maxTxId = new MaxTxId(zkc, maxTxIdPath);
|
maxTxId = new MaxTxId(zkc, maxTxIdPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -207,13 +207,16 @@ public BookKeeperJournalManager(Configuration conf, URI uri)
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public EditLogOutputStream startLogSegment(long txId) throws IOException {
|
public EditLogOutputStream startLogSegment(long txId) throws IOException {
|
||||||
wl.acquire();
|
|
||||||
|
|
||||||
if (txId <= maxTxId.get()) {
|
if (txId <= maxTxId.get()) {
|
||||||
throw new IOException("We've already seen " + txId
|
throw new IOException("We've already seen " + txId
|
||||||
+ ". A new stream cannot be created with it");
|
+ ". A new stream cannot be created with it");
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
|
String existingInprogressNode = ci.read();
|
||||||
|
if (null != existingInprogressNode
|
||||||
|
&& zkc.exists(existingInprogressNode, false) != null) {
|
||||||
|
throw new IOException("Inprogress node already exists");
|
||||||
|
}
|
||||||
if (currentLedger != null) {
|
if (currentLedger != null) {
|
||||||
// bookkeeper errored on last stream, clean up ledger
|
// bookkeeper errored on last stream, clean up ledger
|
||||||
currentLedger.close();
|
currentLedger.close();
|
||||||
@ -234,7 +237,8 @@ public EditLogOutputStream startLogSegment(long txId) throws IOException {
|
|||||||
l.write(zkc, znodePath);
|
l.write(zkc, znodePath);
|
||||||
|
|
||||||
maxTxId.store(txId);
|
maxTxId.store(txId);
|
||||||
return new BookKeeperEditLogOutputStream(conf, currentLedger, wl);
|
ci.update(znodePath);
|
||||||
|
return new BookKeeperEditLogOutputStream(conf, currentLedger);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
if (currentLedger != null) {
|
if (currentLedger != null) {
|
||||||
try {
|
try {
|
||||||
@ -270,7 +274,6 @@ public void finalizeLogSegment(long firstTxId, long lastTxId)
|
|||||||
+ " doesn't exist");
|
+ " doesn't exist");
|
||||||
}
|
}
|
||||||
|
|
||||||
wl.checkWriteLock();
|
|
||||||
EditLogLedgerMetadata l
|
EditLogLedgerMetadata l
|
||||||
= EditLogLedgerMetadata.read(zkc, inprogressPath);
|
= EditLogLedgerMetadata.read(zkc, inprogressPath);
|
||||||
|
|
||||||
@ -307,12 +310,14 @@ public void finalizeLogSegment(long firstTxId, long lastTxId)
|
|||||||
}
|
}
|
||||||
maxTxId.store(lastTxId);
|
maxTxId.store(lastTxId);
|
||||||
zkc.delete(inprogressPath, inprogressStat.getVersion());
|
zkc.delete(inprogressPath, inprogressStat.getVersion());
|
||||||
|
String inprogressPathFromCI = ci.read();
|
||||||
|
if (inprogressPath.equals(inprogressPathFromCI)) {
|
||||||
|
ci.clear();
|
||||||
|
}
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
throw new IOException("Error finalising ledger", e);
|
throw new IOException("Error finalising ledger", e);
|
||||||
} catch (InterruptedException ie) {
|
} catch (InterruptedException ie) {
|
||||||
throw new IOException("Error finalising ledger", ie);
|
throw new IOException("Error finalising ledger", ie);
|
||||||
} finally {
|
|
||||||
wl.release();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -417,7 +422,6 @@ long getNumberOfTransactions(long fromTxId, boolean inProgressOk)
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void recoverUnfinalizedSegments() throws IOException {
|
public void recoverUnfinalizedSegments() throws IOException {
|
||||||
wl.acquire();
|
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
try {
|
try {
|
||||||
List<String> children = zkc.getChildren(ledgerPath, false);
|
List<String> children = zkc.getChildren(ledgerPath, false);
|
||||||
@ -445,10 +449,6 @@ public void recoverUnfinalizedSegments() throws IOException {
|
|||||||
} catch (InterruptedException ie) {
|
} catch (InterruptedException ie) {
|
||||||
throw new IOException("Interrupted getting list of inprogress segments",
|
throw new IOException("Interrupted getting list of inprogress segments",
|
||||||
ie);
|
ie);
|
||||||
} finally {
|
|
||||||
if (wl.haveLock()) {
|
|
||||||
wl.release();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,161 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.contrib.bkjournal;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetAddress;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.zookeeper.CreateMode;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
import org.apache.zookeeper.ZooKeeper;
|
||||||
|
import org.apache.zookeeper.KeeperException.NodeExistsException;
|
||||||
|
import org.apache.zookeeper.ZooDefs.Ids;
|
||||||
|
import org.apache.zookeeper.data.Stat;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Distributed write permission lock, using ZooKeeper. Read the version number
|
||||||
|
* and return the current inprogress node path available in CurrentInprogress
|
||||||
|
* path. If it exist, caller can treat that some other client already operating
|
||||||
|
* on it. Then caller can take action. If there is no inprogress node exist,
|
||||||
|
* then caller can treat that there is no client operating on it. Later same
|
||||||
|
* caller should update the his newly created inprogress node path. At this
|
||||||
|
* point, if some other activities done on this node, version number might
|
||||||
|
* change, so update will fail. So, this read, update api will ensure that there
|
||||||
|
* is only node can continue further after checking with CurrentInprogress.
|
||||||
|
*/
|
||||||
|
|
||||||
|
class CurrentInprogress {
|
||||||
|
private static final String CONTENT_DELIMITER = ",";
|
||||||
|
|
||||||
|
static final Log LOG = LogFactory.getLog(CurrentInprogress.class);
|
||||||
|
|
||||||
|
private final ZooKeeper zkc;
|
||||||
|
private final String currentInprogressNode;
|
||||||
|
private volatile int versionNumberForPermission = -1;
|
||||||
|
private static final int CURRENT_INPROGRESS_LAYOUT_VERSION = -1;
|
||||||
|
private final String hostName = InetAddress.getLocalHost().toString();
|
||||||
|
|
||||||
|
CurrentInprogress(ZooKeeper zkc, String lockpath) throws IOException {
|
||||||
|
this.currentInprogressNode = lockpath;
|
||||||
|
this.zkc = zkc;
|
||||||
|
try {
|
||||||
|
Stat isCurrentInprogressNodeExists = zkc.exists(lockpath, false);
|
||||||
|
if (isCurrentInprogressNodeExists == null) {
|
||||||
|
try {
|
||||||
|
zkc.create(lockpath, null, Ids.OPEN_ACL_UNSAFE,
|
||||||
|
CreateMode.PERSISTENT);
|
||||||
|
} catch (NodeExistsException e) {
|
||||||
|
// Node might created by other process at the same time. Ignore it.
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(lockpath + " already created by other process.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new IOException("Exception accessing Zookeeper", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update the path with prepending version number and hostname
|
||||||
|
*
|
||||||
|
* @param path
|
||||||
|
* - to be updated in zookeeper
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
void update(String path) throws IOException {
|
||||||
|
String content = CURRENT_INPROGRESS_LAYOUT_VERSION
|
||||||
|
+ CONTENT_DELIMITER + hostName + CONTENT_DELIMITER + path;
|
||||||
|
try {
|
||||||
|
zkc.setData(this.currentInprogressNode, content.getBytes(),
|
||||||
|
this.versionNumberForPermission);
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
throw new IOException("Exception when setting the data "
|
||||||
|
+ "[layout version number,hostname,inprogressNode path]= [" + content
|
||||||
|
+ "] to CurrentInprogress. ", e);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new IOException("Interrupted while setting the data "
|
||||||
|
+ "[layout version number,hostname,inprogressNode path]= [" + content
|
||||||
|
+ "] to CurrentInprogress", e);
|
||||||
|
}
|
||||||
|
LOG.info("Updated data[layout version number,hostname,inprogressNode path]"
|
||||||
|
+ "= [" + content + "] to CurrentInprogress");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read the CurrentInprogress node data from Zookeeper and also get the znode
|
||||||
|
* version number. Return the 3rd field from the data. i.e saved path with
|
||||||
|
* #update api
|
||||||
|
*
|
||||||
|
* @return available inprogress node path. returns null if not available.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
String read() throws IOException {
|
||||||
|
Stat stat = new Stat();
|
||||||
|
byte[] data = null;
|
||||||
|
try {
|
||||||
|
data = zkc.getData(this.currentInprogressNode, false, stat);
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
throw new IOException("Exception while reading the data from "
|
||||||
|
+ currentInprogressNode, e);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new IOException("Interrupted while reading data from "
|
||||||
|
+ currentInprogressNode, e);
|
||||||
|
}
|
||||||
|
this.versionNumberForPermission = stat.getVersion();
|
||||||
|
if (data != null) {
|
||||||
|
String stringData = new String(data);
|
||||||
|
LOG.info("Read data[layout version number,hostname,inprogressNode path]"
|
||||||
|
+ "= [" + stringData + "] from CurrentInprogress");
|
||||||
|
String[] contents = stringData.split(CONTENT_DELIMITER);
|
||||||
|
assert contents.length == 3 : "As per the current data format, "
|
||||||
|
+ "CurrentInprogress node data should contain 3 fields. "
|
||||||
|
+ "i.e layout version number,hostname,inprogressNode path";
|
||||||
|
String layoutVersion = contents[0];
|
||||||
|
if (Long.valueOf(layoutVersion) > CURRENT_INPROGRESS_LAYOUT_VERSION) {
|
||||||
|
throw new IOException(
|
||||||
|
"Supported layout version of CurrentInprogress node is : "
|
||||||
|
+ CURRENT_INPROGRESS_LAYOUT_VERSION
|
||||||
|
+ " . Layout version of CurrentInprogress node in ZK is : "
|
||||||
|
+ layoutVersion);
|
||||||
|
}
|
||||||
|
String inprogressNodePath = contents[2];
|
||||||
|
return inprogressNodePath;
|
||||||
|
} else {
|
||||||
|
LOG.info("No data available in CurrentInprogress");
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Clear the CurrentInprogress node data */
|
||||||
|
void clear() throws IOException {
|
||||||
|
try {
|
||||||
|
zkc.setData(this.currentInprogressNode, null, versionNumberForPermission);
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
throw new IOException(
|
||||||
|
"Exception when setting the data to CurrentInprogress node", e);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new IOException(
|
||||||
|
"Interrupted when setting the data to CurrentInprogress node", e);
|
||||||
|
}
|
||||||
|
LOG.info("Cleared the data from CurrentInprogress");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -1,186 +0,0 @@
|
|||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.contrib.bkjournal;
|
|
||||||
|
|
||||||
import org.apache.zookeeper.ZooKeeper;
|
|
||||||
import org.apache.zookeeper.Watcher;
|
|
||||||
import org.apache.zookeeper.WatchedEvent;
|
|
||||||
import org.apache.zookeeper.KeeperException;
|
|
||||||
import org.apache.zookeeper.Watcher.Event.KeeperState;
|
|
||||||
import org.apache.zookeeper.CreateMode;
|
|
||||||
import org.apache.zookeeper.ZooDefs.Ids;
|
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Comparator;
|
|
||||||
|
|
||||||
import java.net.InetAddress;
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Distributed lock, using ZooKeeper.
|
|
||||||
*
|
|
||||||
* The lock is vulnerable to timing issues. For example, the process could
|
|
||||||
* encounter a really long GC cycle between acquiring the lock, and writing to
|
|
||||||
* a ledger. This could have timed out the lock, and another process could have
|
|
||||||
* acquired the lock and started writing to bookkeeper. Therefore other
|
|
||||||
* mechanisms are required to ensure correctness (i.e. Fencing).
|
|
||||||
*/
|
|
||||||
class WriteLock implements Watcher {
|
|
||||||
static final Log LOG = LogFactory.getLog(WriteLock.class);
|
|
||||||
|
|
||||||
private final ZooKeeper zkc;
|
|
||||||
private final String lockpath;
|
|
||||||
|
|
||||||
private AtomicInteger lockCount = new AtomicInteger(0);
|
|
||||||
private String myznode = null;
|
|
||||||
|
|
||||||
WriteLock(ZooKeeper zkc, String lockpath) throws IOException {
|
|
||||||
this.lockpath = lockpath;
|
|
||||||
|
|
||||||
this.zkc = zkc;
|
|
||||||
try {
|
|
||||||
if (zkc.exists(lockpath, false) == null) {
|
|
||||||
String localString = InetAddress.getLocalHost().toString();
|
|
||||||
zkc.create(lockpath, localString.getBytes(),
|
|
||||||
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new IOException("Exception accessing Zookeeper", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void acquire() throws IOException {
|
|
||||||
while (true) {
|
|
||||||
if (lockCount.get() == 0) {
|
|
||||||
try {
|
|
||||||
synchronized(this) {
|
|
||||||
if (lockCount.get() > 0) {
|
|
||||||
lockCount.incrementAndGet();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
myznode = zkc.create(lockpath + "/lock-", new byte[] {'0'},
|
|
||||||
Ids.OPEN_ACL_UNSAFE,
|
|
||||||
CreateMode.EPHEMERAL_SEQUENTIAL);
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Acquiring lock, trying " + myznode);
|
|
||||||
}
|
|
||||||
|
|
||||||
List<String> nodes = zkc.getChildren(lockpath, false);
|
|
||||||
Collections.sort(nodes, new Comparator<String>() {
|
|
||||||
public int compare(String o1,
|
|
||||||
String o2) {
|
|
||||||
Integer l1 = Integer.valueOf(o1.replace("lock-", ""));
|
|
||||||
Integer l2 = Integer.valueOf(o2.replace("lock-", ""));
|
|
||||||
return l1 - l2;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
if ((lockpath + "/" + nodes.get(0)).equals(myznode)) {
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Lock acquired - " + myznode);
|
|
||||||
}
|
|
||||||
lockCount.set(1);
|
|
||||||
zkc.exists(myznode, this);
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
LOG.error("Failed to acquire lock with " + myznode
|
|
||||||
+ ", " + nodes.get(0) + " already has it");
|
|
||||||
throw new IOException("Could not acquire lock");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (KeeperException e) {
|
|
||||||
throw new IOException("Exception accessing Zookeeper", e);
|
|
||||||
} catch (InterruptedException ie) {
|
|
||||||
throw new IOException("Exception accessing Zookeeper", ie);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
int ret = lockCount.getAndIncrement();
|
|
||||||
if (ret == 0) {
|
|
||||||
lockCount.decrementAndGet();
|
|
||||||
continue; // try again;
|
|
||||||
} else {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void release() throws IOException {
|
|
||||||
try {
|
|
||||||
if (lockCount.decrementAndGet() <= 0) {
|
|
||||||
if (lockCount.get() < 0) {
|
|
||||||
LOG.warn("Unbalanced lock handling somewhere, lockCount down to "
|
|
||||||
+ lockCount.get());
|
|
||||||
}
|
|
||||||
synchronized(this) {
|
|
||||||
if (lockCount.get() <= 0) {
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("releasing lock " + myznode);
|
|
||||||
}
|
|
||||||
if (myznode != null) {
|
|
||||||
zkc.delete(myznode, -1);
|
|
||||||
myznode = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new IOException("Exception accessing Zookeeper", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void checkWriteLock() throws IOException {
|
|
||||||
if (!haveLock()) {
|
|
||||||
throw new IOException("Lost writer lock");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean haveLock() throws IOException {
|
|
||||||
return lockCount.get() > 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void process(WatchedEvent event) {
|
|
||||||
if (event.getState() == KeeperState.Disconnected
|
|
||||||
|| event.getState() == KeeperState.Expired) {
|
|
||||||
LOG.warn("Lost zookeeper session, lost lock ");
|
|
||||||
lockCount.set(0);
|
|
||||||
} else {
|
|
||||||
// reapply the watch
|
|
||||||
synchronized (this) {
|
|
||||||
LOG.info("Zookeeper event " + event
|
|
||||||
+ " received, reapplying watch to " + myznode);
|
|
||||||
if (myznode != null) {
|
|
||||||
try {
|
|
||||||
zkc.exists(myznode, this);
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.warn("Could not set watch on lock, releasing", e);
|
|
||||||
try {
|
|
||||||
release();
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
LOG.error("Could not release Zk lock", ioe);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -215,8 +215,7 @@ public void testFailoverWithFailingBKCluster() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test that two namenodes can't become primary at the same
|
* Test that two namenodes can't continue as primary
|
||||||
* time.
|
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testMultiplePrimariesStarted() throws Exception {
|
public void testMultiplePrimariesStarted() throws Exception {
|
||||||
@ -247,18 +246,14 @@ public void testMultiplePrimariesStarted() throws Exception {
|
|||||||
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
|
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
|
||||||
fs.mkdirs(p1);
|
fs.mkdirs(p1);
|
||||||
nn1.getRpcServer().rollEditLog();
|
nn1.getRpcServer().rollEditLog();
|
||||||
try {
|
|
||||||
cluster.transitionToActive(1);
|
cluster.transitionToActive(1);
|
||||||
fail("Shouldn't have been able to start two primaries"
|
fs = cluster.getFileSystem(0); // get the older active server.
|
||||||
+ " with single shared storage");
|
// This edit log updation on older active should make older active
|
||||||
} catch (ServiceFailedException sfe) {
|
// shutdown.
|
||||||
assertTrue("Wrong exception",
|
fs.delete(p1, true);
|
||||||
sfe.getMessage().contains("Failed to start active services"));
|
verify(mockRuntime1, atLeastOnce()).exit(anyInt());
|
||||||
}
|
verify(mockRuntime2, times(0)).exit(anyInt());
|
||||||
} finally {
|
} finally {
|
||||||
verify(mockRuntime1, times(0)).exit(anyInt());
|
|
||||||
verify(mockRuntime2, atLeastOnce()).exit(anyInt());
|
|
||||||
|
|
||||||
if (cluster != null) {
|
if (cluster != null) {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
|
@ -361,6 +361,7 @@ public void testAllBookieFailure() throws Exception {
|
|||||||
|
|
||||||
assertEquals("New bookie didn't start",
|
assertEquals("New bookie didn't start",
|
||||||
numBookies+1, bkutil.checkBookiesUp(numBookies+1, 10));
|
numBookies+1, bkutil.checkBookiesUp(numBookies+1, 10));
|
||||||
|
bkjm.recoverUnfinalizedSegments();
|
||||||
out = bkjm.startLogSegment(txid);
|
out = bkjm.startLogSegment(txid);
|
||||||
for (long i = 1 ; i <= 3; i++) {
|
for (long i = 1 ; i <= 3; i++) {
|
||||||
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
|
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
|
||||||
|
@ -0,0 +1,157 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.contrib.bkjournal;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.bookkeeper.util.LocalBookKeeper;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
import org.apache.zookeeper.WatchedEvent;
|
||||||
|
import org.apache.zookeeper.Watcher;
|
||||||
|
import org.apache.zookeeper.ZooKeeper;
|
||||||
|
import org.apache.zookeeper.server.NIOServerCnxnFactory;
|
||||||
|
import org.apache.zookeeper.server.ZooKeeperServer;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that read, update, clear api from CurrentInprogress
|
||||||
|
*/
|
||||||
|
public class TestCurrentInprogress {
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestCurrentInprogress.class);
|
||||||
|
private static final String CURRENT_NODE_PATH = "/test";
|
||||||
|
private static final String HOSTPORT = "127.0.0.1:2181";
|
||||||
|
private static final int CONNECTION_TIMEOUT = 30000;
|
||||||
|
private static NIOServerCnxnFactory serverFactory;
|
||||||
|
private static ZooKeeperServer zks;
|
||||||
|
private static ZooKeeper zkc;
|
||||||
|
private static int ZooKeeperDefaultPort = 2181;
|
||||||
|
private static File zkTmpDir;
|
||||||
|
|
||||||
|
private static ZooKeeper connectZooKeeper(String ensemble)
|
||||||
|
throws IOException, KeeperException, InterruptedException {
|
||||||
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
ZooKeeper zkc = new ZooKeeper(HOSTPORT, 3600, new Watcher() {
|
||||||
|
public void process(WatchedEvent event) {
|
||||||
|
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
if (!latch.await(10, TimeUnit.SECONDS)) {
|
||||||
|
throw new IOException("Zookeeper took too long to connect");
|
||||||
|
}
|
||||||
|
return zkc;
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupZooKeeper() throws Exception {
|
||||||
|
LOG.info("Starting ZK server");
|
||||||
|
zkTmpDir = File.createTempFile("zookeeper", "test");
|
||||||
|
zkTmpDir.delete();
|
||||||
|
zkTmpDir.mkdir();
|
||||||
|
try {
|
||||||
|
zks = new ZooKeeperServer(zkTmpDir, zkTmpDir, ZooKeeperDefaultPort);
|
||||||
|
serverFactory = new NIOServerCnxnFactory();
|
||||||
|
serverFactory.configure(new InetSocketAddress(ZooKeeperDefaultPort), 10);
|
||||||
|
serverFactory.startup(zks);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Exception while instantiating ZooKeeper", e);
|
||||||
|
}
|
||||||
|
boolean b = LocalBookKeeper.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT);
|
||||||
|
LOG.debug("ZooKeeper server up: " + b);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void shutDownServer() {
|
||||||
|
if (null != zks) {
|
||||||
|
zks.shutdown();
|
||||||
|
}
|
||||||
|
zkTmpDir.delete();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
zkc = connectZooKeeper(HOSTPORT);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void teardown() throws Exception {
|
||||||
|
if (null != zkc) {
|
||||||
|
zkc.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that read should be able to read the data which updated with update
|
||||||
|
* api
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testReadShouldReturnTheZnodePathAfterUpdate() throws Exception {
|
||||||
|
String data = "inprogressNode";
|
||||||
|
CurrentInprogress ci = new CurrentInprogress(zkc, CURRENT_NODE_PATH);
|
||||||
|
ci.update(data);
|
||||||
|
String inprogressNodePath = ci.read();
|
||||||
|
assertEquals("Not returning inprogressZnode", "inprogressNode",
|
||||||
|
inprogressNodePath);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that read should return null if we clear the updated data in
|
||||||
|
* CurrentInprogress node
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testReadShouldReturnNullAfterClear() throws Exception {
|
||||||
|
CurrentInprogress ci = new CurrentInprogress(zkc, CURRENT_NODE_PATH);
|
||||||
|
ci.update("myInprogressZnode");
|
||||||
|
ci.read();
|
||||||
|
ci.clear();
|
||||||
|
String inprogressNodePath = ci.read();
|
||||||
|
assertEquals("Expecting null to be return", null, inprogressNodePath);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that update should throw IOE, if version number modifies between read
|
||||||
|
* and update
|
||||||
|
*/
|
||||||
|
@Test(expected = IOException.class)
|
||||||
|
public void testUpdateShouldFailWithIOEIfVersionNumberChangedAfterRead()
|
||||||
|
throws Exception {
|
||||||
|
CurrentInprogress ci = new CurrentInprogress(zkc, CURRENT_NODE_PATH);
|
||||||
|
ci.update("myInprogressZnode");
|
||||||
|
assertEquals("Not returning myInprogressZnode", "myInprogressZnode", ci
|
||||||
|
.read());
|
||||||
|
// Updating data in-between to change the data to change the version number
|
||||||
|
ci.update("YourInprogressZnode");
|
||||||
|
ci.update("myInprogressZnode");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user