diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java index 3f94abfbb2a..28e88501d0a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java @@ -20,7 +20,6 @@ package org.apache.hadoop.io.retry; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.lang.reflect.Proxy; import java.util.Collections; import java.util.Map; @@ -29,6 +28,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; import org.apache.hadoop.util.ThreadUtil; import org.apache.hadoop.ipc.Client.ConnectionId; +import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RpcInvocationHandler; class RetryInvocationHandler implements RpcInvocationHandler { @@ -163,9 +163,7 @@ class RetryInvocationHandler implements RpcInvocationHandler { @Override //RpcInvocationHandler public ConnectionId getConnectionId() { - RpcInvocationHandler inv = (RpcInvocationHandler) Proxy - .getInvocationHandler(currentProxy); - return inv.getConnectionId(); + return RPC.getConnectionIdForProxy(currentProxy); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolTranslator.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolTranslator.java new file mode 100644 index 00000000000..5bf9dbaed17 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolTranslator.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ipc; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * An interface implemented by client-side protocol translators to get the + * underlying proxy object the translator is operating on. + */ +@InterfaceAudience.Private +public interface ProtocolTranslator { + + /** + * Return the proxy object underlying this protocol translator. + * @return the proxy object underlying this protocol translator. + */ + public Object getUnderlyingProxyObject(); + +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java index 4f85e905cd3..069841b1c9b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java @@ -40,6 +40,7 @@ import javax.net.SocketFactory; import org.apache.commons.logging.*; import org.apache.hadoop.io.*; +import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService; import org.apache.hadoop.net.NetUtils; @@ -530,9 +531,24 @@ public class RPC { * Returns the server address for a given proxy. */ public static InetSocketAddress getServerAddress(Object proxy) { + return getConnectionIdForProxy(proxy).getAddress(); + } + + /** + * Return the connection ID of the given object. If the provided object is in + * fact a protocol translator, we'll get the connection ID of the underlying + * proxy object. + * + * @param proxy the proxy object to get the connection ID of. + * @return the connection ID for the provided proxy object. + */ + public static ConnectionId getConnectionIdForProxy(Object proxy) { + if (proxy instanceof ProtocolTranslator) { + proxy = ((ProtocolTranslator)proxy).getUnderlyingProxyObject(); + } RpcInvocationHandler inv = (RpcInvocationHandler) Proxy .getInvocationHandler(proxy); - return inv.getConnectionId().getAddress(); + return inv.getConnectionId(); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt index 4905a289af5..52b87db187d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt @@ -164,3 +164,5 @@ HDFS-2863. Failures observed if dfs.edits.dir and shared.edits.dir have same dir HDFS-2874. Edit log should log to shared dirs before local dirs. (todd) HDFS-2890. DFSUtil#getSuffixIDs should skip unset configurations. (atm) + +HDFS-2792. Make fsck work. (atm) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index 0ae198a25f6..9fd24381768 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -71,7 +71,7 @@ import com.google.protobuf.BlockingService; @InterfaceAudience.Private public class DFSUtil { - private static final Log LOG = LogFactory.getLog(DFSUtil.class.getName()); + public static final Log LOG = LogFactory.getLog(DFSUtil.class.getName()); private DFSUtil() { /* Hidden constructor */ } private static final ThreadLocal RANDOM = new ThreadLocal() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index 5860d3a13af..f38467e07b3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -63,6 +63,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtocolSignature; @@ -138,7 +139,7 @@ import com.google.protobuf.ServiceException; @InterfaceAudience.Private @InterfaceStability.Stable public class ClientNamenodeProtocolTranslatorPB implements - ClientProtocol, Closeable { + ClientProtocol, Closeable, ProtocolTranslator { final private ClientNamenodeProtocolPB rpcProxy; private static ClientNamenodeProtocolPB createNamenode( @@ -874,4 +875,9 @@ public class ClientNamenodeProtocolTranslatorPB implements throw ProtobufHelper.getRemoteException(e); } } + + @Override + public Object getUnderlyingProxyObject() { + return rpcProxy; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java index bc98995af30..1a99fcb62ab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java @@ -32,11 +32,13 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck; +import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.security.Krb5AndCertsSslSocketConnector; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; @@ -204,8 +206,9 @@ public class DFSck extends Configured implements Tool { * Derive the namenode http address from the current file system, * either default or as set by "-fs" in the generic options. * @return Returns http address or null if failure. + * @throws IOException if we can't determine the active NN address */ - private String getCurrentNamenodeAddress() { + private String getCurrentNamenodeAddress() throws IOException { //String nnAddress = null; Configuration conf = getConf(); @@ -222,16 +225,21 @@ public class DFSck extends Configured implements Tool { System.err.println("FileSystem is " + fs.getUri()); return null; } - DistributedFileSystem dfs = (DistributedFileSystem) fs; - - // Derive the nameservice ID from the filesystem URI. - // The URI may have been provided by a human, and the server name may be - // aliased, so compare InetSocketAddresses instead of URI strings, and - // test against both possible variants of RPC address. - InetSocketAddress namenode = - NameNode.getAddress(dfs.getUri().getAuthority()); - return DFSUtil.getInfoServer(namenode, conf, true); + // force client address resolution. + fs.exists(new Path("/")); + + // Derive the nameservice ID from the filesystem connection. The URI may + // have been provided by a human, the server name may be aliased, or there + // may be multiple possible actual addresses (e.g. in an HA setup) so + // compare InetSocketAddresses instead of URI strings, and test against both + // possible configurations of RPC address (DFS_NAMENODE_RPC_ADDRESS_KEY and + // DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY). + DistributedFileSystem dfs = (DistributedFileSystem) fs; + DFSClient dfsClient = dfs.getClient(); + InetSocketAddress addr = RPC.getServerAddress(dfsClient.getNamenode()); + + return DFSUtil.getInfoServer(addr, conf, true); } private int doWork(final String[] args) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java index 5439d15b814..572b97dc07f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java @@ -175,6 +175,7 @@ public abstract class HATestUtil { nameNodeId1 + "," + nameNodeId2); conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + logicalName, ConfiguredFailoverProxyProvider.class.getName()); + conf.set("fs.defaultFS", "hdfs://" + logicalName); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAFsck.java new file mode 100644 index 00000000000..10218f218ec --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAFsck.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import static org.junit.Assert.*; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; + +import junit.framework.Assert; + +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.tools.DFSck; +import org.apache.hadoop.util.ToolRunner; +import org.apache.log4j.Level; +import org.junit.Test; + +public class TestHAFsck { + + static { + ((Log4JLogger)LogFactory.getLog(DFSUtil.class)).getLogger().setLevel(Level.ALL); + } + + /** + * Test that fsck still works with HA enabled. + */ + @Test + public void testHaFsck() throws Exception { + Configuration conf = new Configuration(); + + // need some HTTP ports + MiniDFSNNTopology topology = new MiniDFSNNTopology() + .addNameservice(new MiniDFSNNTopology.NSConf("ha-nn-uri-0") + .addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(10001)) + .addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(10002))); + + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(topology) + .numDataNodes(0) + .build(); + FileSystem fs = null; + try { + cluster.waitActive(); + + cluster.transitionToActive(0); + + // Make sure conf has the relevant HA configs. + HATestUtil.setFailoverConfigurations(cluster, conf, "ha-nn-uri-0", 0); + + fs = HATestUtil.configureFailoverFs(cluster, conf); + fs.mkdirs(new Path("/test1")); + fs.mkdirs(new Path("/test2")); + + runFsck(conf); + + cluster.transitionToStandby(0); + cluster.transitionToActive(1); + + runFsck(conf); + } finally { + if (fs != null) { + fs.close(); + } + if (cluster != null) { + cluster.shutdown(); + } + } + } + + static void runFsck(Configuration conf) throws Exception { + ByteArrayOutputStream bStream = new ByteArrayOutputStream(); + PrintStream out = new PrintStream(bStream, true); + int errCode = ToolRunner.run(new DFSck(conf, out), + new String[]{"/", "-files"}); + String result = bStream.toString(); + System.out.println("output from fsck:\n" + result); + Assert.assertEquals(0, errCode); + assertTrue(result.contains("/test1")); + assertTrue(result.contains("/test2")); + } +}