From 8b4f497af85b49519da2e05e8269db6c4e9d621f Mon Sep 17 00:00:00 2001 From: Aaron Myers Date: Thu, 6 Oct 2011 23:26:14 +0000 Subject: [PATCH] HDFS-1973. HA: HDFS clients must handle namenode failover and switch over to the new active namenode. (atm) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1179896 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-hdfs/CHANGES.HDFS-1623.txt | 2 + .../main/java/org/apache/hadoop/fs/Hdfs.java | 3 +- .../org/apache/hadoop/hdfs/DFSClient.java | 58 ++++++-- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 1 + .../java/org/apache/hadoop/hdfs/DFSUtil.java | 16 +- .../hadoop/hdfs/DistributedFileSystem.java | 3 +- .../hadoop/hdfs/protocol/ClientProtocol.java | 5 +- .../hadoop/hdfs/server/namenode/NameNode.java | 2 +- .../ha/ConfiguredFailoverProxyProvider.java | 140 ++++++++++++++++++ .../hadoop/hdfs/TestDFSClientFailover.java | 96 ++++++++++++ 10 files changed, 301 insertions(+), 25 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt index 21d9b7d9db7..4a847593381 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt @@ -9,3 +9,5 @@ HDFS-2179. Add fencing framework and mechanisms for NameNode HA. (todd) HDFS-1974. Introduce active and standy states to the namenode. (suresh) HDFS-2407. getServerDefaults and getStats don't check operation category (atm) + +HDFS-1973. HA: HDFS clients must handle namenode failover and switch over to the new active namenode. (atm) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java index 7772ad97928..5232ea9c9c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java @@ -80,8 +80,7 @@ public class Hdfs extends AbstractFileSystem { throw new IOException("Incomplete HDFS URI, no host: " + theUri); } - InetSocketAddress namenode = NameNode.getAddress(theUri.getAuthority()); - this.dfs = new DFSClient(namenode, conf, getStatistics()); + this.dfs = new DFSClient(theUri, conf, getStatistics()); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 41fc6510743..6b306df7810 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -1,4 +1,3 @@ - /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -26,11 +25,11 @@ import java.io.IOException; import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; +import java.net.URI; import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; - import javax.net.SocketFactory; import org.apache.commons.logging.Log; @@ -87,6 +86,9 @@ import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.retry.FailoverProxyProvider; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; @@ -96,6 +98,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.ReflectionUtils; /******************************************************** * DFSClient can connect to a Hadoop Filesystem and @@ -199,7 +202,7 @@ public class DFSClient implements java.io.Closeable { */ private final Map filesBeingWritten = new HashMap(); - + /** * Same as this(NameNode.getAddress(conf), conf); * @see #DFSClient(InetSocketAddress, Configuration) @@ -209,12 +212,16 @@ public class DFSClient implements java.io.Closeable { public DFSClient(Configuration conf) throws IOException { this(NameNode.getAddress(conf), conf); } + + public DFSClient(InetSocketAddress address, Configuration conf) throws IOException { + this(NameNode.getUri(address), conf); + } /** * Same as this(nameNodeAddr, conf, null); * @see #DFSClient(InetSocketAddress, Configuration, org.apache.hadoop.fs.FileSystem.Statistics) */ - public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf + public DFSClient(URI nameNodeAddr, Configuration conf ) throws IOException { this(nameNodeAddr, conf, null); } @@ -223,17 +230,17 @@ public class DFSClient implements java.io.Closeable { * Same as this(nameNodeAddr, null, conf, stats); * @see #DFSClient(InetSocketAddress, ClientProtocol, Configuration, org.apache.hadoop.fs.FileSystem.Statistics) */ - public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf, + public DFSClient(URI nameNodeAddr, Configuration conf, FileSystem.Statistics stats) throws IOException { this(nameNodeAddr, null, conf, stats); } - + /** * Create a new DFSClient connected to the given nameNodeAddr or rpcNamenode. * Exactly one of nameNodeAddr or rpcNamenode must be null. */ - DFSClient(InetSocketAddress nameNodeAddr, ClientProtocol rpcNamenode, + DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, FileSystem.Statistics stats) throws IOException { // Copy only the required DFSClient configuration @@ -246,20 +253,45 @@ public class DFSClient implements java.io.Closeable { // The hdfsTimeout is currently the same as the ipc timeout this.hdfsTimeout = Client.getTimeout(conf); this.ugi = UserGroupInformation.getCurrentUser(); - final String authority = nameNodeAddr == null? "null": - nameNodeAddr.getHostName() + ":" + nameNodeAddr.getPort(); + + final String authority = nameNodeUri == null? "null": nameNodeUri.getAuthority(); this.leaserenewer = LeaseRenewer.getInstance(authority, ugi, this); this.clientName = leaserenewer.getClientName(dfsClientConf.taskId); + this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity); - if (nameNodeAddr != null && rpcNamenode == null) { - this.namenode = DFSUtil.createNamenode(nameNodeAddr, conf); - } else if (nameNodeAddr == null && rpcNamenode != null) { + + Class failoverProxyProviderClass = getFailoverProxyProviderClass(authority, conf); + + if (nameNodeUri != null && failoverProxyProviderClass != null) { + FailoverProxyProvider failoverProxyProvider = (FailoverProxyProvider) + ReflectionUtils.newInstance(failoverProxyProviderClass, conf); + this.namenode = (ClientProtocol)RetryProxy.create(ClientProtocol.class, + failoverProxyProvider, RetryPolicies.failoverOnNetworkException(1)); + } else if (nameNodeUri != null && rpcNamenode == null) { + this.namenode = DFSUtil.createNamenode(NameNode.getAddress(nameNodeUri), conf); + } else if (nameNodeUri == null && rpcNamenode != null) { //This case is used for testing. this.namenode = rpcNamenode; } else { throw new IllegalArgumentException( "Expecting exactly one of nameNodeAddr and rpcNamenode being null: " - + "nameNodeAddr=" + nameNodeAddr + ", rpcNamenode=" + rpcNamenode); + + "nameNodeAddr=" + nameNodeUri + ", rpcNamenode=" + rpcNamenode); + } + } + + private Class getFailoverProxyProviderClass(String authority, Configuration conf) + throws IOException { + String configKey = DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + authority; + try { + return conf.getClass(configKey, null); + } catch (RuntimeException e) { + if (e.getCause() instanceof ClassNotFoundException) { + throw new IOException("Could not load failover proxy provider class " + + conf.get(configKey) + " which is configured for authority " + authority, + e); + } else { + throw e; + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index f92064239c0..fdf38ed7066 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -46,6 +46,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT = "DEFAULT"; public static final String DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY = "dfs.client.socketcache.capacity"; public static final int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16; + public static final String DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX = "dfs.client.failover.proxy.provider"; public static final String DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address"; public static final String DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index 6ae4a13952a..8a1baf2b68b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -617,15 +617,19 @@ public class DFSUtil { } /** Create a {@link NameNode} proxy */ - public static ClientProtocol createNamenode( InetSocketAddress nameNodeAddr, + public static ClientProtocol createNamenode(InetSocketAddress nameNodeAddr, Configuration conf) throws IOException { - return createNamenode(createRPCNamenode(nameNodeAddr, conf, - UserGroupInformation.getCurrentUser())); - + return createNamenode(nameNodeAddr, conf, UserGroupInformation.getCurrentUser()); + } + + /** Create a {@link NameNode} proxy */ + public static ClientProtocol createNamenode(InetSocketAddress nameNodeAddr, + Configuration conf, UserGroupInformation ugi) throws IOException { + return createNamenode(createRPCNamenode(nameNodeAddr, conf, ugi)); } /** Create a {@link NameNode} proxy */ - static ClientProtocol createRPCNamenode(InetSocketAddress nameNodeAddr, + public static ClientProtocol createRPCNamenode(InetSocketAddress nameNodeAddr, Configuration conf, UserGroupInformation ugi) throws IOException { return (ClientProtocol)RPC.getProxy(ClientProtocol.class, @@ -634,7 +638,7 @@ public class DFSUtil { } /** Create a {@link NameNode} proxy */ - static ClientProtocol createNamenode(ClientProtocol rpcNamenode) + public static ClientProtocol createNamenode(ClientProtocol rpcNamenode) throws IOException { RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( 5, HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 4d12efe5fcf..52343c3834b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -106,8 +106,7 @@ public class DistributedFileSystem extends FileSystem { throw new IOException("Incomplete HDFS URI, no host: "+ uri); } - InetSocketAddress namenode = NameNode.getAddress(uri.getAuthority()); - this.dfs = new DFSClient(namenode, conf, statistics); + this.dfs = new DFSClient(uri, conf, statistics); this.uri = URI.create(HdfsConstants.HDFS_URI_SCHEME + "://" + uri.getAuthority()); this.workingDir = getHomeDirectory(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index e69a2727b45..262c1e3e04d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.retry.Idempotent; import org.apache.hadoop.ipc.VersionedProtocol; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.KerberosInfo; @@ -99,6 +100,7 @@ public interface ClientProtocol extends VersionedProtocol { * @throws IOException If an I/O error occurred */ @Nullable + @Idempotent public LocatedBlocks getBlockLocations(String src, long offset, long length) @@ -249,7 +251,7 @@ public interface ClientProtocol extends VersionedProtocol { UnresolvedLinkException, IOException; /** - * The client can give up on a blcok by calling abandonBlock(). + * The client can give up on a block by calling abandonBlock(). * The client can then * either obtain a new block, or complete or abandon the file. * Any partial writes to the block will be discarded. @@ -721,6 +723,7 @@ public interface ClientProtocol extends VersionedProtocol { * @throws IOException If an I/O error occurred */ @Nullable + @Idempotent public HdfsFileStatus getFileInfo(String src) throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index 0efa268e313..2b7c765ef4c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -267,7 +267,7 @@ public class NameNode { * @param filesystemURI * @return address of file system */ - static InetSocketAddress getAddress(URI filesystemURI) { + public static InetSocketAddress getAddress(URI filesystemURI) { String authority = filesystemURI.getAuthority(); if (authority == null) { throw new IllegalArgumentException(String.format( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java new file mode 100644 index 00000000000..987f345ae7c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.io.retry.FailoverProxyProvider; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * A FailoverProxyProvider implementation which allows one to configure two URIs + * to connect to during fail-over. The first configured address is tried first, + * and on a fail-over event the other address is tried. + */ +public class ConfiguredFailoverProxyProvider implements FailoverProxyProvider, + Configurable { + + public static final String CONFIGURED_NAMENODE_ADDRESSES + = "dfs.ha.namenode.addresses"; + + private static final Log LOG = + LogFactory.getLog(ConfiguredFailoverProxyProvider.class); + + private Configuration conf; + private int currentProxyIndex = 0; + private List proxies = new ArrayList(); + private UserGroupInformation ugi; + + @Override + public Class getInterface() { + return ClientProtocol.class; + } + + /** + * Lazily initialize the RPC proxy object. + */ + @Override + public synchronized Object getProxy() { + AddressRpcProxyPair current = proxies.get(currentProxyIndex); + if (current.namenode == null) { + try { + current.namenode = DFSUtil.createRPCNamenode(current.address, conf, ugi); + } catch (IOException e) { + LOG.error("Failed to create RPC proxy to NameNode", e); + throw new RuntimeException(e); + } + } + return current.namenode; + } + + @Override + public synchronized void performFailover(Object currentProxy) { + currentProxyIndex = (currentProxyIndex + 1) % proxies.size(); + } + + @Override + public synchronized Configuration getConf() { + return conf; + } + + @Override + public synchronized void setConf(Configuration conf) { + this.conf = conf; + try { + ugi = UserGroupInformation.getCurrentUser(); + + Collection addresses = conf.getTrimmedStringCollection( + CONFIGURED_NAMENODE_ADDRESSES); + if (addresses == null || addresses.size() == 0) { + throw new RuntimeException(this.getClass().getSimpleName() + + " is configured but " + CONFIGURED_NAMENODE_ADDRESSES + + " is not set."); + } + for (String address : addresses) { + proxies.add(new AddressRpcProxyPair( + NameNode.getAddress(new URI(address).getAuthority()))); + } + } catch (IOException e) { + throw new RuntimeException(e); + } catch (URISyntaxException e) { + throw new RuntimeException("Malformed URI set in " + + CONFIGURED_NAMENODE_ADDRESSES, e); + } + } + + /** + * A little pair object to store the address and connected RPC proxy object to + * an NN. Note that {@link AddressRpcProxyPair#namenode} may be null. + */ + private static class AddressRpcProxyPair { + public InetSocketAddress address; + public ClientProtocol namenode; + + public AddressRpcProxyPair(InetSocketAddress address) { + this.address = address; + } + } + + /** + * Close all the proxy objects which have been opened over the lifetime of + * this proxy provider. + */ + @Override + public synchronized void close() throws IOException { + for (AddressRpcProxyPair proxy : proxies) { + if (proxy.namenode != null) { + RPC.stopProxy(proxy.namenode); + } + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java new file mode 100644 index 00000000000..5ac38c6a8fa --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestDFSClientFailover { + + private static final Path TEST_FILE = new Path("/tmp/failover-test-file"); + private static final int FILE_LENGTH_TO_VERIFY = 100; + + private Configuration conf = new Configuration(); + private MiniDFSCluster cluster; + + @Before + public void setUpCluster() throws IOException { + cluster = new MiniDFSCluster.Builder(conf).numNameNodes(2).build(); + cluster.waitActive(); + } + + @After + public void tearDownCluster() throws IOException { + cluster.shutdown(); + } + + // TODO(HA): This test should probably be made to fail if a client fails over + // to talk to an NN with a different block pool id. Once failover between + // active/standy in a single block pool is implemented, this test should be + // changed to exercise that. + @Test + public void testDfsClientFailover() throws IOException, URISyntaxException { + final String nameServiceId = "name-service-uri"; + InetSocketAddress nnAddr1 = cluster.getNameNode(0).getNameNodeAddress(); + InetSocketAddress nnAddr2 = cluster.getNameNode(1).getNameNodeAddress(); + + ClientProtocol nn1 = DFSUtil.createNamenode(nnAddr1, conf); + ClientProtocol nn2 = DFSUtil.createNamenode(nnAddr2, conf); + + DFSClient dfsClient1 = new DFSClient(null, nn1, conf, null); + DFSClient dfsClient2 = new DFSClient(null, nn2, conf, null); + + OutputStream out1 = dfsClient1.create(TEST_FILE.toString(), false); + OutputStream out2 = dfsClient2.create(TEST_FILE.toString(), false); + AppendTestUtil.write(out1, 0, FILE_LENGTH_TO_VERIFY); + AppendTestUtil.write(out2, 0, FILE_LENGTH_TO_VERIFY); + out1.close(); + out2.close(); + + String address1 = "hdfs://" + nnAddr1.getHostName() + ":" + nnAddr1.getPort(); + String address2 = "hdfs://" + nnAddr2.getHostName() + ":" + nnAddr2.getPort(); + conf.set(ConfiguredFailoverProxyProvider.CONFIGURED_NAMENODE_ADDRESSES, + address1 + "," + address2); + + conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + nameServiceId, + ConfiguredFailoverProxyProvider.class.getName()); + + FileSystem fs = FileSystem.get(new URI("hdfs://" + nameServiceId), conf); + + AppendTestUtil.check(fs, TEST_FILE, FILE_LENGTH_TO_VERIFY); + cluster.getNameNode(0).stop(); + AppendTestUtil.check(fs, TEST_FILE, FILE_LENGTH_TO_VERIFY); + + fs.close(); + } + +} \ No newline at end of file