HDFS-2792. Make fsck work. Contributed by Aaron T. Myers.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1240449 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3cc71933e9
commit
ec6961b39c
|
@ -20,7 +20,6 @@ package org.apache.hadoop.io.retry;
|
|||
import java.io.IOException;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.Proxy;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -29,6 +28,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
|
||||
import org.apache.hadoop.util.ThreadUtil;
|
||||
import org.apache.hadoop.ipc.Client.ConnectionId;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.RpcInvocationHandler;
|
||||
|
||||
class RetryInvocationHandler implements RpcInvocationHandler {
|
||||
|
@ -163,9 +163,7 @@ class RetryInvocationHandler implements RpcInvocationHandler {
|
|||
|
||||
@Override //RpcInvocationHandler
|
||||
public ConnectionId getConnectionId() {
|
||||
RpcInvocationHandler inv = (RpcInvocationHandler) Proxy
|
||||
.getInvocationHandler(currentProxy);
|
||||
return inv.getConnectionId();
|
||||
return RPC.getConnectionIdForProxy(currentProxy);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ipc;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* An interface implemented by client-side protocol translators to get the
|
||||
* underlying proxy object the translator is operating on.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface ProtocolTranslator {
|
||||
|
||||
/**
|
||||
* Return the proxy object underlying this protocol translator.
|
||||
* @return the proxy object underlying this protocol translator.
|
||||
*/
|
||||
public Object getUnderlyingProxyObject();
|
||||
|
||||
}
|
|
@ -40,6 +40,7 @@ import javax.net.SocketFactory;
|
|||
import org.apache.commons.logging.*;
|
||||
|
||||
import org.apache.hadoop.io.*;
|
||||
import org.apache.hadoop.ipc.Client.ConnectionId;
|
||||
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
|
||||
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
|
@ -530,9 +531,24 @@ public class RPC {
|
|||
* Returns the server address for a given proxy.
|
||||
*/
|
||||
public static InetSocketAddress getServerAddress(Object proxy) {
|
||||
return getConnectionIdForProxy(proxy).getAddress();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the connection ID of the given object. If the provided object is in
|
||||
* fact a protocol translator, we'll get the connection ID of the underlying
|
||||
* proxy object.
|
||||
*
|
||||
* @param proxy the proxy object to get the connection ID of.
|
||||
* @return the connection ID for the provided proxy object.
|
||||
*/
|
||||
public static ConnectionId getConnectionIdForProxy(Object proxy) {
|
||||
if (proxy instanceof ProtocolTranslator) {
|
||||
proxy = ((ProtocolTranslator)proxy).getUnderlyingProxyObject();
|
||||
}
|
||||
RpcInvocationHandler inv = (RpcInvocationHandler) Proxy
|
||||
.getInvocationHandler(proxy);
|
||||
return inv.getConnectionId().getAddress();
|
||||
return inv.getConnectionId();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -164,3 +164,5 @@ HDFS-2863. Failures observed if dfs.edits.dir and shared.edits.dir have same dir
|
|||
HDFS-2874. Edit log should log to shared dirs before local dirs. (todd)
|
||||
|
||||
HDFS-2890. DFSUtil#getSuffixIDs should skip unset configurations. (atm)
|
||||
|
||||
HDFS-2792. Make fsck work. (atm)
|
||||
|
|
|
@ -71,7 +71,7 @@ import com.google.protobuf.BlockingService;
|
|||
|
||||
@InterfaceAudience.Private
|
||||
public class DFSUtil {
|
||||
private static final Log LOG = LogFactory.getLog(DFSUtil.class.getName());
|
||||
public static final Log LOG = LogFactory.getLog(DFSUtil.class.getName());
|
||||
|
||||
private DFSUtil() { /* Hidden constructor */ }
|
||||
private static final ThreadLocal<Random> RANDOM = new ThreadLocal<Random>() {
|
||||
|
|
|
@ -63,6 +63,7 @@ import org.apache.hadoop.io.Text;
|
|||
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||
import org.apache.hadoop.io.retry.RetryProxy;
|
||||
import org.apache.hadoop.ipc.ProtocolTranslator;
|
||||
import org.apache.hadoop.ipc.ProtobufHelper;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
import org.apache.hadoop.ipc.ProtocolSignature;
|
||||
|
@ -138,7 +139,7 @@ import com.google.protobuf.ServiceException;
|
|||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Stable
|
||||
public class ClientNamenodeProtocolTranslatorPB implements
|
||||
ClientProtocol, Closeable {
|
||||
ClientProtocol, Closeable, ProtocolTranslator {
|
||||
final private ClientNamenodeProtocolPB rpcProxy;
|
||||
|
||||
private static ClientNamenodeProtocolPB createNamenode(
|
||||
|
@ -874,4 +875,9 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
|||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getUnderlyingProxyObject() {
|
||||
return rpcProxy;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,11 +32,13 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.security.Krb5AndCertsSslSocketConnector;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
@ -204,8 +206,9 @@ public class DFSck extends Configured implements Tool {
|
|||
* Derive the namenode http address from the current file system,
|
||||
* either default or as set by "-fs" in the generic options.
|
||||
* @return Returns http address or null if failure.
|
||||
* @throws IOException if we can't determine the active NN address
|
||||
*/
|
||||
private String getCurrentNamenodeAddress() {
|
||||
private String getCurrentNamenodeAddress() throws IOException {
|
||||
//String nnAddress = null;
|
||||
Configuration conf = getConf();
|
||||
|
||||
|
@ -222,16 +225,21 @@ public class DFSck extends Configured implements Tool {
|
|||
System.err.println("FileSystem is " + fs.getUri());
|
||||
return null;
|
||||
}
|
||||
DistributedFileSystem dfs = (DistributedFileSystem) fs;
|
||||
|
||||
// Derive the nameservice ID from the filesystem URI.
|
||||
// The URI may have been provided by a human, and the server name may be
|
||||
// aliased, so compare InetSocketAddresses instead of URI strings, and
|
||||
// test against both possible variants of RPC address.
|
||||
InetSocketAddress namenode =
|
||||
NameNode.getAddress(dfs.getUri().getAuthority());
|
||||
|
||||
return DFSUtil.getInfoServer(namenode, conf, true);
|
||||
// force client address resolution.
|
||||
fs.exists(new Path("/"));
|
||||
|
||||
// Derive the nameservice ID from the filesystem connection. The URI may
|
||||
// have been provided by a human, the server name may be aliased, or there
|
||||
// may be multiple possible actual addresses (e.g. in an HA setup) so
|
||||
// compare InetSocketAddresses instead of URI strings, and test against both
|
||||
// possible configurations of RPC address (DFS_NAMENODE_RPC_ADDRESS_KEY and
|
||||
// DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY).
|
||||
DistributedFileSystem dfs = (DistributedFileSystem) fs;
|
||||
DFSClient dfsClient = dfs.getClient();
|
||||
InetSocketAddress addr = RPC.getServerAddress(dfsClient.getNamenode());
|
||||
|
||||
return DFSUtil.getInfoServer(addr, conf, true);
|
||||
}
|
||||
|
||||
private int doWork(final String[] args) throws IOException {
|
||||
|
|
|
@ -175,6 +175,7 @@ public abstract class HATestUtil {
|
|||
nameNodeId1 + "," + nameNodeId2);
|
||||
conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + logicalName,
|
||||
ConfiguredFailoverProxyProvider.class.getName());
|
||||
conf.set("fs.defaultFS", "hdfs://" + logicalName);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,103 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.hdfs.tools.DFSck;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestHAFsck {
|
||||
|
||||
static {
|
||||
((Log4JLogger)LogFactory.getLog(DFSUtil.class)).getLogger().setLevel(Level.ALL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that fsck still works with HA enabled.
|
||||
*/
|
||||
@Test
|
||||
public void testHaFsck() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
|
||||
// need some HTTP ports
|
||||
MiniDFSNNTopology topology = new MiniDFSNNTopology()
|
||||
.addNameservice(new MiniDFSNNTopology.NSConf("ha-nn-uri-0")
|
||||
.addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(10001))
|
||||
.addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(10002)));
|
||||
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||
.nnTopology(topology)
|
||||
.numDataNodes(0)
|
||||
.build();
|
||||
FileSystem fs = null;
|
||||
try {
|
||||
cluster.waitActive();
|
||||
|
||||
cluster.transitionToActive(0);
|
||||
|
||||
// Make sure conf has the relevant HA configs.
|
||||
HATestUtil.setFailoverConfigurations(cluster, conf, "ha-nn-uri-0", 0);
|
||||
|
||||
fs = HATestUtil.configureFailoverFs(cluster, conf);
|
||||
fs.mkdirs(new Path("/test1"));
|
||||
fs.mkdirs(new Path("/test2"));
|
||||
|
||||
runFsck(conf);
|
||||
|
||||
cluster.transitionToStandby(0);
|
||||
cluster.transitionToActive(1);
|
||||
|
||||
runFsck(conf);
|
||||
} finally {
|
||||
if (fs != null) {
|
||||
fs.close();
|
||||
}
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void runFsck(Configuration conf) throws Exception {
|
||||
ByteArrayOutputStream bStream = new ByteArrayOutputStream();
|
||||
PrintStream out = new PrintStream(bStream, true);
|
||||
int errCode = ToolRunner.run(new DFSck(conf, out),
|
||||
new String[]{"/", "-files"});
|
||||
String result = bStream.toString();
|
||||
System.out.println("output from fsck:\n" + result);
|
||||
Assert.assertEquals(0, errCode);
|
||||
assertTrue(result.contains("/test1"));
|
||||
assertTrue(result.contains("/test2"));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue