HDFS-1973. HA: HDFS clients must handle namenode failover and switch over to the new active namenode. (atm)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1179896 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2011-10-06 23:26:14 +00:00
parent 90727b82e0
commit 8b4f497af8
10 changed files with 301 additions and 25 deletions

View File

@ -9,3 +9,5 @@ HDFS-2179. Add fencing framework and mechanisms for NameNode HA. (todd)
HDFS-1974. Introduce active and standy states to the namenode. (suresh)
HDFS-2407. getServerDefaults and getStats don't check operation category (atm)
HDFS-1973. HA: HDFS clients must handle namenode failover and switch over to the new active namenode. (atm)

View File

@ -80,8 +80,7 @@ public class Hdfs extends AbstractFileSystem {
throw new IOException("Incomplete HDFS URI, no host: " + theUri);
}
InetSocketAddress namenode = NameNode.getAddress(theUri.getAuthority());
this.dfs = new DFSClient(namenode, conf, getStatistics());
this.dfs = new DFSClient(theUri, conf, getStatistics());
}
@Override

View File

@ -1,4 +1,3 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@ -26,11 +25,11 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
@ -87,6 +86,9 @@ import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
@ -96,6 +98,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
/********************************************************
* DFSClient can connect to a Hadoop Filesystem and
@ -210,11 +213,15 @@ public class DFSClient implements java.io.Closeable {
this(NameNode.getAddress(conf), conf);
}
public DFSClient(InetSocketAddress address, Configuration conf) throws IOException {
this(NameNode.getUri(address), conf);
}
/**
* Same as this(nameNodeAddr, conf, null);
* @see #DFSClient(InetSocketAddress, Configuration, org.apache.hadoop.fs.FileSystem.Statistics)
*/
public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf
public DFSClient(URI nameNodeAddr, Configuration conf
) throws IOException {
this(nameNodeAddr, conf, null);
}
@ -223,7 +230,7 @@ public class DFSClient implements java.io.Closeable {
* Same as this(nameNodeAddr, null, conf, stats);
* @see #DFSClient(InetSocketAddress, ClientProtocol, Configuration, org.apache.hadoop.fs.FileSystem.Statistics)
*/
public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf,
public DFSClient(URI nameNodeAddr, Configuration conf,
FileSystem.Statistics stats)
throws IOException {
this(nameNodeAddr, null, conf, stats);
@ -233,7 +240,7 @@ public class DFSClient implements java.io.Closeable {
* Create a new DFSClient connected to the given nameNodeAddr or rpcNamenode.
* Exactly one of nameNodeAddr or rpcNamenode must be null.
*/
DFSClient(InetSocketAddress nameNodeAddr, ClientProtocol rpcNamenode,
DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
Configuration conf, FileSystem.Statistics stats)
throws IOException {
// Copy only the required DFSClient configuration
@ -246,20 +253,45 @@ public class DFSClient implements java.io.Closeable {
// The hdfsTimeout is currently the same as the ipc timeout
this.hdfsTimeout = Client.getTimeout(conf);
this.ugi = UserGroupInformation.getCurrentUser();
final String authority = nameNodeAddr == null? "null":
nameNodeAddr.getHostName() + ":" + nameNodeAddr.getPort();
final String authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();
this.leaserenewer = LeaseRenewer.getInstance(authority, ugi, this);
this.clientName = leaserenewer.getClientName(dfsClientConf.taskId);
this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity);
if (nameNodeAddr != null && rpcNamenode == null) {
this.namenode = DFSUtil.createNamenode(nameNodeAddr, conf);
} else if (nameNodeAddr == null && rpcNamenode != null) {
Class<?> failoverProxyProviderClass = getFailoverProxyProviderClass(authority, conf);
if (nameNodeUri != null && failoverProxyProviderClass != null) {
FailoverProxyProvider failoverProxyProvider = (FailoverProxyProvider)
ReflectionUtils.newInstance(failoverProxyProviderClass, conf);
this.namenode = (ClientProtocol)RetryProxy.create(ClientProtocol.class,
failoverProxyProvider, RetryPolicies.failoverOnNetworkException(1));
} else if (nameNodeUri != null && rpcNamenode == null) {
this.namenode = DFSUtil.createNamenode(NameNode.getAddress(nameNodeUri), conf);
} else if (nameNodeUri == null && rpcNamenode != null) {
//This case is used for testing.
this.namenode = rpcNamenode;
} else {
throw new IllegalArgumentException(
"Expecting exactly one of nameNodeAddr and rpcNamenode being null: "
+ "nameNodeAddr=" + nameNodeAddr + ", rpcNamenode=" + rpcNamenode);
+ "nameNodeAddr=" + nameNodeUri + ", rpcNamenode=" + rpcNamenode);
}
}
private Class<?> getFailoverProxyProviderClass(String authority, Configuration conf)
throws IOException {
String configKey = DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + authority;
try {
return conf.getClass(configKey, null);
} catch (RuntimeException e) {
if (e.getCause() instanceof ClassNotFoundException) {
throw new IOException("Could not load failover proxy provider class "
+ conf.get(configKey) + " which is configured for authority " + authority,
e);
} else {
throw e;
}
}
}

View File

@ -46,6 +46,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT = "DEFAULT";
public static final String DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY = "dfs.client.socketcache.capacity";
public static final int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16;
public static final String DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX = "dfs.client.failover.proxy.provider";
public static final String DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address";
public static final String DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100";

View File

@ -617,15 +617,19 @@ public class DFSUtil {
}
/** Create a {@link NameNode} proxy */
public static ClientProtocol createNamenode( InetSocketAddress nameNodeAddr,
public static ClientProtocol createNamenode(InetSocketAddress nameNodeAddr,
Configuration conf) throws IOException {
return createNamenode(createRPCNamenode(nameNodeAddr, conf,
UserGroupInformation.getCurrentUser()));
return createNamenode(nameNodeAddr, conf, UserGroupInformation.getCurrentUser());
}
/** Create a {@link NameNode} proxy */
static ClientProtocol createRPCNamenode(InetSocketAddress nameNodeAddr,
public static ClientProtocol createNamenode(InetSocketAddress nameNodeAddr,
Configuration conf, UserGroupInformation ugi) throws IOException {
return createNamenode(createRPCNamenode(nameNodeAddr, conf, ugi));
}
/** Create a {@link NameNode} proxy */
public static ClientProtocol createRPCNamenode(InetSocketAddress nameNodeAddr,
Configuration conf, UserGroupInformation ugi)
throws IOException {
return (ClientProtocol)RPC.getProxy(ClientProtocol.class,
@ -634,7 +638,7 @@ public class DFSUtil {
}
/** Create a {@link NameNode} proxy */
static ClientProtocol createNamenode(ClientProtocol rpcNamenode)
public static ClientProtocol createNamenode(ClientProtocol rpcNamenode)
throws IOException {
RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
5, HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);

View File

@ -106,8 +106,7 @@ public class DistributedFileSystem extends FileSystem {
throw new IOException("Incomplete HDFS URI, no host: "+ uri);
}
InetSocketAddress namenode = NameNode.getAddress(uri.getAuthority());
this.dfs = new DFSClient(namenode, conf, statistics);
this.dfs = new DFSClient(uri, conf, statistics);
this.uri = URI.create(HdfsConstants.HDFS_URI_SCHEME + "://" + uri.getAuthority());
this.workingDir = getHomeDirectory();
}

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.KerberosInfo;
@ -99,6 +100,7 @@ public interface ClientProtocol extends VersionedProtocol {
* @throws IOException If an I/O error occurred
*/
@Nullable
@Idempotent
public LocatedBlocks getBlockLocations(String src,
long offset,
long length)
@ -249,7 +251,7 @@ public interface ClientProtocol extends VersionedProtocol {
UnresolvedLinkException, IOException;
/**
* The client can give up on a blcok by calling abandonBlock().
* The client can give up on a block by calling abandonBlock().
* The client can then
* either obtain a new block, or complete or abandon the file.
* Any partial writes to the block will be discarded.
@ -721,6 +723,7 @@ public interface ClientProtocol extends VersionedProtocol {
* @throws IOException If an I/O error occurred
*/
@Nullable
@Idempotent
public HdfsFileStatus getFileInfo(String src) throws AccessControlException,
FileNotFoundException, UnresolvedLinkException, IOException;

View File

@ -267,7 +267,7 @@ public class NameNode {
* @param filesystemURI
* @return address of file system
*/
static InetSocketAddress getAddress(URI filesystemURI) {
public static InetSocketAddress getAddress(URI filesystemURI) {
String authority = filesystemURI.getAuthority();
if (authority == null) {
throw new IllegalArgumentException(String.format(

View File

@ -0,0 +1,140 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
/**
* A FailoverProxyProvider implementation which allows one to configure two URIs
* to connect to during fail-over. The first configured address is tried first,
* and on a fail-over event the other address is tried.
*/
public class ConfiguredFailoverProxyProvider implements FailoverProxyProvider,
Configurable {
public static final String CONFIGURED_NAMENODE_ADDRESSES
= "dfs.ha.namenode.addresses";
private static final Log LOG =
LogFactory.getLog(ConfiguredFailoverProxyProvider.class);
private Configuration conf;
private int currentProxyIndex = 0;
private List<AddressRpcProxyPair> proxies = new ArrayList<AddressRpcProxyPair>();
private UserGroupInformation ugi;
@Override
public Class<?> getInterface() {
return ClientProtocol.class;
}
/**
* Lazily initialize the RPC proxy object.
*/
@Override
public synchronized Object getProxy() {
AddressRpcProxyPair current = proxies.get(currentProxyIndex);
if (current.namenode == null) {
try {
current.namenode = DFSUtil.createRPCNamenode(current.address, conf, ugi);
} catch (IOException e) {
LOG.error("Failed to create RPC proxy to NameNode", e);
throw new RuntimeException(e);
}
}
return current.namenode;
}
@Override
public synchronized void performFailover(Object currentProxy) {
currentProxyIndex = (currentProxyIndex + 1) % proxies.size();
}
@Override
public synchronized Configuration getConf() {
return conf;
}
@Override
public synchronized void setConf(Configuration conf) {
this.conf = conf;
try {
ugi = UserGroupInformation.getCurrentUser();
Collection<String> addresses = conf.getTrimmedStringCollection(
CONFIGURED_NAMENODE_ADDRESSES);
if (addresses == null || addresses.size() == 0) {
throw new RuntimeException(this.getClass().getSimpleName() +
" is configured but " + CONFIGURED_NAMENODE_ADDRESSES +
" is not set.");
}
for (String address : addresses) {
proxies.add(new AddressRpcProxyPair(
NameNode.getAddress(new URI(address).getAuthority())));
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (URISyntaxException e) {
throw new RuntimeException("Malformed URI set in " +
CONFIGURED_NAMENODE_ADDRESSES, e);
}
}
/**
* A little pair object to store the address and connected RPC proxy object to
* an NN. Note that {@link AddressRpcProxyPair#namenode} may be null.
*/
private static class AddressRpcProxyPair {
public InetSocketAddress address;
public ClientProtocol namenode;
public AddressRpcProxyPair(InetSocketAddress address) {
this.address = address;
}
}
/**
* Close all the proxy objects which have been opened over the lifetime of
* this proxy provider.
*/
@Override
public synchronized void close() throws IOException {
for (AddressRpcProxyPair proxy : proxies) {
if (proxy.namenode != null) {
RPC.stopProxy(proxy.namenode);
}
}
}
}

View File

@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestDFSClientFailover {
private static final Path TEST_FILE = new Path("/tmp/failover-test-file");
private static final int FILE_LENGTH_TO_VERIFY = 100;
private Configuration conf = new Configuration();
private MiniDFSCluster cluster;
@Before
public void setUpCluster() throws IOException {
cluster = new MiniDFSCluster.Builder(conf).numNameNodes(2).build();
cluster.waitActive();
}
@After
public void tearDownCluster() throws IOException {
cluster.shutdown();
}
// TODO(HA): This test should probably be made to fail if a client fails over
// to talk to an NN with a different block pool id. Once failover between
// active/standy in a single block pool is implemented, this test should be
// changed to exercise that.
@Test
public void testDfsClientFailover() throws IOException, URISyntaxException {
final String nameServiceId = "name-service-uri";
InetSocketAddress nnAddr1 = cluster.getNameNode(0).getNameNodeAddress();
InetSocketAddress nnAddr2 = cluster.getNameNode(1).getNameNodeAddress();
ClientProtocol nn1 = DFSUtil.createNamenode(nnAddr1, conf);
ClientProtocol nn2 = DFSUtil.createNamenode(nnAddr2, conf);
DFSClient dfsClient1 = new DFSClient(null, nn1, conf, null);
DFSClient dfsClient2 = new DFSClient(null, nn2, conf, null);
OutputStream out1 = dfsClient1.create(TEST_FILE.toString(), false);
OutputStream out2 = dfsClient2.create(TEST_FILE.toString(), false);
AppendTestUtil.write(out1, 0, FILE_LENGTH_TO_VERIFY);
AppendTestUtil.write(out2, 0, FILE_LENGTH_TO_VERIFY);
out1.close();
out2.close();
String address1 = "hdfs://" + nnAddr1.getHostName() + ":" + nnAddr1.getPort();
String address2 = "hdfs://" + nnAddr2.getHostName() + ":" + nnAddr2.getPort();
conf.set(ConfiguredFailoverProxyProvider.CONFIGURED_NAMENODE_ADDRESSES,
address1 + "," + address2);
conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + nameServiceId,
ConfiguredFailoverProxyProvider.class.getName());
FileSystem fs = FileSystem.get(new URI("hdfs://" + nameServiceId), conf);
AppendTestUtil.check(fs, TEST_FILE, FILE_LENGTH_TO_VERIFY);
cluster.getNameNode(0).stop();
AppendTestUtil.check(fs, TEST_FILE, FILE_LENGTH_TO_VERIFY);
fs.close();
}
}