HDFS-2185. HDFS portion of ZK-based FailoverController. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3042@1308637 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-04-03 00:59:02 +00:00
parent 2fd05aa597
commit 789ed57e71
8 changed files with 383 additions and 4 deletions

View File

@ -0,0 +1,7 @@
Changes for HDFS-3042 branch.
This change list will be merged into the trunk CHANGES.txt when the HDFS-3042
branch is merged.
------------------------------
HDFS-2185. HDFS portion of ZK-based FailoverController (todd)

View File

@ -5,6 +5,9 @@
<Match> <Match>
<Package name="org.apache.hadoop.hdfs.protocol.proto" /> <Package name="org.apache.hadoop.hdfs.protocol.proto" />
</Match> </Match>
<Match>
<Package name="org.apache.hadoop.hdfs.server.namenode.ha.proto" />
</Match>
<Match> <Match>
<Bug pattern="EI_EXPOSE_REP" /> <Bug pattern="EI_EXPOSE_REP" />
</Match> </Match>

View File

@ -110,6 +110,33 @@
<artifactId>ant</artifactId> <artifactId>ant</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.2</version>
<exclusions>
<exclusion>
<!-- otherwise seems to drag in junit 3.8.1 via jline -->
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.2</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -30,6 +30,7 @@ function print_usage(){
echo " namenode -format format the DFS filesystem" echo " namenode -format format the DFS filesystem"
echo " secondarynamenode run the DFS secondary namenode" echo " secondarynamenode run the DFS secondary namenode"
echo " namenode run the DFS namenode" echo " namenode run the DFS namenode"
echo " zkfc run the ZK Failover Controller daemon"
echo " datanode run a DFS datanode" echo " datanode run a DFS datanode"
echo " dfsadmin run a DFS admin client" echo " dfsadmin run a DFS admin client"
echo " haadmin run a DFS HA admin client" echo " haadmin run a DFS HA admin client"
@ -71,6 +72,9 @@ fi
if [ "$COMMAND" = "namenode" ] ; then if [ "$COMMAND" = "namenode" ] ; then
CLASS='org.apache.hadoop.hdfs.server.namenode.NameNode' CLASS='org.apache.hadoop.hdfs.server.namenode.NameNode'
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_NAMENODE_OPTS" HADOOP_OPTS="$HADOOP_OPTS $HADOOP_NAMENODE_OPTS"
elif [ "$COMMAND" = "zkfc" ] ; then
CLASS='org.apache.hadoop.hdfs.tools.DFSZKFailoverController'
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_ZKFC_OPTS"
elif [ "$COMMAND" = "secondarynamenode" ] ; then elif [ "$COMMAND" = "secondarynamenode" ] ; then
CLASS='org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode' CLASS='org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode'
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_SECONDARYNAMENODE_OPTS" HADOOP_OPTS="$HADOOP_OPTS $HADOOP_SECONDARYNAMENODE_OPTS"

View File

@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.tools;
import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceTarget;
import org.apache.hadoop.ha.ZKFailoverController;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.proto.HAZKInfoProtos.ActiveNodeInfo;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import com.google.common.base.Preconditions;
import com.google.protobuf.InvalidProtocolBufferException;
@InterfaceAudience.Private
public class DFSZKFailoverController extends ZKFailoverController {
private static final Log LOG =
LogFactory.getLog(DFSZKFailoverController.class);
private NNHAServiceTarget localTarget;
private Configuration localNNConf;
@Override
protected HAServiceTarget dataToTarget(byte[] data) {
ActiveNodeInfo proto;
try {
proto = ActiveNodeInfo.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("Invalid data in ZK: " +
StringUtils.byteToHexString(data));
}
NNHAServiceTarget ret = new NNHAServiceTarget(
getConf(), proto.getNameserviceId(), proto.getNamenodeId());
InetSocketAddress addressFromProtobuf = new InetSocketAddress(
proto.getHostname(), proto.getPort());
if (!addressFromProtobuf.equals(ret.getAddress())) {
throw new RuntimeException("Mismatched address stored in ZK for " +
ret + ": Stored protobuf was " + proto + ", address from our own " +
"configuration for this NameNode was " + ret.getAddress());
}
return ret;
}
@Override
protected byte[] targetToData(HAServiceTarget target) {
InetSocketAddress addr = target.getAddress();
return ActiveNodeInfo.newBuilder()
.setHostname(addr.getHostName())
.setPort(addr.getPort())
.setNameserviceId(localTarget.getNameServiceId())
.setNamenodeId(localTarget.getNameNodeId())
.build()
.toByteArray();
}
@Override
public void setConf(Configuration conf) {
// Use HdfsConfiguration here to force hdfs-site.xml to load
localNNConf = new HdfsConfiguration(conf);
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
if (!HAUtil.isHAEnabled(localNNConf, nsId)) {
throw new HadoopIllegalArgumentException(
"HA is not enabled for this namenode.");
}
String nnId = HAUtil.getNameNodeId(localNNConf, nsId);
NameNode.initializeGenericKeys(localNNConf, nsId, nnId);
localTarget = new NNHAServiceTarget(localNNConf, nsId, nnId);
super.setConf(localNNConf);
LOG.info("Failover controller configured for NameNode " +
nsId + "." + nnId);
}
@Override
public HAServiceTarget getLocalTarget() {
Preconditions.checkState(localTarget != null,
"setConf() should have already been called");
return localTarget;
}
public static void main(String args[])
throws Exception {
System.exit(ToolRunner.run(
new DFSZKFailoverController(), args));
}
}

View File

@ -20,11 +20,11 @@ package org.apache.hadoop.hdfs.tools;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.BadFencingConfigurationException; import org.apache.hadoop.ha.BadFencingConfigurationException;
import org.apache.hadoop.ha.HAServiceTarget; import org.apache.hadoop.ha.HAServiceTarget;
import org.apache.hadoop.ha.NodeFencer; import org.apache.hadoop.ha.NodeFencer;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
@ -38,11 +38,13 @@ public class NNHAServiceTarget extends HAServiceTarget {
private final InetSocketAddress addr; private final InetSocketAddress addr;
private NodeFencer fencer; private NodeFencer fencer;
private BadFencingConfigurationException fenceConfigError; private BadFencingConfigurationException fenceConfigError;
private final String nnId;
private final String nsId;
public NNHAServiceTarget(HdfsConfiguration conf, public NNHAServiceTarget(Configuration localNNConf,
String nsId, String nnId) { String nsId, String nnId) {
String serviceAddr = String serviceAddr =
DFSUtil.getNamenodeServiceAddr(conf, nsId, nnId); DFSUtil.getNamenodeServiceAddr(localNNConf, nsId, nnId);
if (serviceAddr == null) { if (serviceAddr == null) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Unable to determine service address for namenode '" + nnId + "'"); "Unable to determine service address for namenode '" + nnId + "'");
@ -50,10 +52,12 @@ public class NNHAServiceTarget extends HAServiceTarget {
this.addr = NetUtils.createSocketAddr(serviceAddr, this.addr = NetUtils.createSocketAddr(serviceAddr,
NameNode.DEFAULT_PORT); NameNode.DEFAULT_PORT);
try { try {
this.fencer = NodeFencer.create(conf); this.fencer = NodeFencer.create(localNNConf);
} catch (BadFencingConfigurationException e) { } catch (BadFencingConfigurationException e) {
this.fenceConfigError = e; this.fenceConfigError = e;
} }
this.nnId = nnId;
this.nsId = nsId;
} }
/** /**
@ -69,6 +73,10 @@ public class NNHAServiceTarget extends HAServiceTarget {
if (fenceConfigError != null) { if (fenceConfigError != null) {
throw fenceConfigError; throw fenceConfigError;
} }
if (fencer == null) {
throw new BadFencingConfigurationException(
"No fencer configured for " + this);
}
} }
@Override @Override
@ -81,4 +89,11 @@ public class NNHAServiceTarget extends HAServiceTarget {
return "NameNode at " + addr; return "NameNode at " + addr;
} }
public String getNameServiceId() {
return this.nsId;
}
public String getNameNodeId() {
return this.nnId;
}
} }

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.hdfs.server.namenode.ha.proto";
option java_outer_classname = "HAZKInfoProtos";
message ActiveNodeInfo {
required string nameserviceId = 1;
required string namenodeId = 2;
required string hostname = 3;
required int32 port = 4;
}

View File

@ -0,0 +1,181 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.*;
import java.io.File;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ha.NodeFencer;
import org.apache.hadoop.ha.ZKFailoverController;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.TestNodeFencer.AlwaysSucceedFencer;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.tools.DFSZKFailoverController;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
import org.apache.zookeeper.test.ClientBase;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.google.common.base.Supplier;
public class TestDFSZKFailoverController extends ClientBase {
private Configuration conf;
private MiniDFSCluster cluster;
private TestContext ctx;
private ZKFCThread thr1, thr2;
private FileSystem fs;
@Override
public void setUp() throws Exception {
// build.test.dir is used by zookeeper
new File(System.getProperty("build.test.dir", "build")).mkdirs();
super.setUp();
}
@Before
public void setup() throws Exception {
conf = new Configuration();
conf.set(ZKFailoverController.ZK_QUORUM_KEY, hostPort);
conf.set(NodeFencer.CONF_METHODS_KEY,
AlwaysSucceedFencer.class.getName());
MiniDFSNNTopology topology = new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf("ns1")
.addNN(new MiniDFSNNTopology.NNConf("nn1").setIpcPort(10001))
.addNN(new MiniDFSNNTopology.NNConf("nn2").setIpcPort(10002)));
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(topology)
.numDataNodes(0)
.build();
cluster.waitActive();
ctx = new TestContext();
ctx.addThread(thr1 = new ZKFCThread(ctx, 0));
assertEquals(0, thr1.zkfc.run(new String[]{"-formatZK"}));
thr1.start();
waitForHAState(0, HAServiceState.ACTIVE);
ctx.addThread(thr2 = new ZKFCThread(ctx, 1));
thr2.start();
fs = HATestUtil.configureFailoverFs(cluster, conf);
}
@After
public void shutdown() throws Exception {
cluster.shutdown();
if (thr1 != null) {
thr1.interrupt();
}
if (thr2 != null) {
thr2.interrupt();
}
if (ctx != null) {
ctx.stop();
}
}
/**
* Test that automatic failover is triggered by shutting the
* active NN down.
*/
@Test(timeout=30000)
public void testFailoverAndBackOnNNShutdown() throws Exception {
Path p1 = new Path("/dir1");
Path p2 = new Path("/dir2");
// Write some data on the first NN
fs.mkdirs(p1);
// Shut it down, causing automatic failover
cluster.shutdownNameNode(0);
// Data should still exist. Write some on the new NN
assertTrue(fs.exists(p1));
fs.mkdirs(p2);
assertEquals(AlwaysSucceedFencer.getLastFencedService().getAddress(),
thr1.zkfc.getLocalTarget().getAddress());
// Start the first node back up
cluster.restartNameNode(0);
// This should have no effect -- the new node should be STANDBY.
waitForHAState(0, HAServiceState.STANDBY);
assertTrue(fs.exists(p1));
assertTrue(fs.exists(p2));
// Shut down the second node, which should failback to the first
cluster.shutdownNameNode(1);
waitForHAState(0, HAServiceState.ACTIVE);
// First node should see what was written on the second node while it was down.
assertTrue(fs.exists(p1));
assertTrue(fs.exists(p2));
assertEquals(AlwaysSucceedFencer.getLastFencedService().getAddress(),
thr2.zkfc.getLocalTarget().getAddress());
}
private void waitForHAState(int nnidx, final HAServiceState state)
throws TimeoutException, InterruptedException {
final NameNode nn = cluster.getNameNode(nnidx);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
return nn.getRpcServer().getServiceStatus().getState() == state;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
}, 50, 5000);
}
/**
* Test-thread which runs a ZK Failover Controller corresponding
* to a given NameNode in the minicluster.
*/
private class ZKFCThread extends TestingThread {
private final DFSZKFailoverController zkfc;
public ZKFCThread(TestContext ctx, int idx) {
super(ctx);
this.zkfc = new DFSZKFailoverController();
zkfc.setConf(cluster.getConfiguration(idx));
}
@Override
public void doWork() throws Exception {
try {
assertEquals(0, zkfc.run(new String[0]));
} catch (InterruptedException ie) {
// Interrupted by main thread, that's OK.
}
}
}
}