HDFS-6478. Merge r1611410 from trunk.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1611412 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4143eec554
commit
27e2008484
|
@ -69,6 +69,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
|
|
||||||
HDFS-6689. NFS doesn't return correct lookup access for direcories (brandonli)
|
HDFS-6689. NFS doesn't return correct lookup access for direcories (brandonli)
|
||||||
|
|
||||||
|
HDFS-6478. RemoteException can't be retried properly for non-HA scenario.
|
||||||
|
(Ming Ma via jing9)
|
||||||
|
|
||||||
Release 2.5.0 - UNRELEASED
|
Release 2.5.0 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -333,19 +333,18 @@ public class NameNodeProxies {
|
||||||
address, conf, ugi, NamenodeProtocolPB.class);
|
address, conf, ugi, NamenodeProtocolPB.class);
|
||||||
if (withRetries) { // create the proxy with retries
|
if (withRetries) { // create the proxy with retries
|
||||||
RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(5, 200,
|
RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(5, 200,
|
||||||
TimeUnit.MILLISECONDS);
|
TimeUnit.MILLISECONDS);
|
||||||
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap
|
|
||||||
= new HashMap<Class<? extends Exception>, RetryPolicy>();
|
|
||||||
RetryPolicy methodPolicy = RetryPolicies.retryByException(timeoutPolicy,
|
|
||||||
exceptionToPolicyMap);
|
|
||||||
Map<String, RetryPolicy> methodNameToPolicyMap
|
Map<String, RetryPolicy> methodNameToPolicyMap
|
||||||
= new HashMap<String, RetryPolicy>();
|
= new HashMap<String, RetryPolicy>();
|
||||||
methodNameToPolicyMap.put("getBlocks", methodPolicy);
|
methodNameToPolicyMap.put("getBlocks", timeoutPolicy);
|
||||||
methodNameToPolicyMap.put("getAccessKeys", methodPolicy);
|
methodNameToPolicyMap.put("getAccessKeys", timeoutPolicy);
|
||||||
proxy = (NamenodeProtocolPB) RetryProxy.create(NamenodeProtocolPB.class,
|
NamenodeProtocol translatorProxy =
|
||||||
proxy, methodNameToPolicyMap);
|
new NamenodeProtocolTranslatorPB(proxy);
|
||||||
|
return (NamenodeProtocol) RetryProxy.create(
|
||||||
|
NamenodeProtocol.class, translatorProxy, methodNameToPolicyMap);
|
||||||
|
} else {
|
||||||
|
return new NamenodeProtocolTranslatorPB(proxy);
|
||||||
}
|
}
|
||||||
return new NamenodeProtocolTranslatorPB(proxy);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static ClientProtocol createNNProxyWithClientProtocol(
|
private static ClientProtocol createNNProxyWithClientProtocol(
|
||||||
|
@ -380,26 +379,24 @@ public class NameNodeProxies {
|
||||||
remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class,
|
remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class,
|
||||||
createPolicy);
|
createPolicy);
|
||||||
|
|
||||||
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap
|
RetryPolicy methodPolicy = RetryPolicies.retryByRemoteException(
|
||||||
= new HashMap<Class<? extends Exception>, RetryPolicy>();
|
defaultPolicy, remoteExceptionToPolicyMap);
|
||||||
exceptionToPolicyMap.put(RemoteException.class, RetryPolicies
|
|
||||||
.retryByRemoteException(defaultPolicy,
|
|
||||||
remoteExceptionToPolicyMap));
|
|
||||||
RetryPolicy methodPolicy = RetryPolicies.retryByException(
|
|
||||||
defaultPolicy, exceptionToPolicyMap);
|
|
||||||
Map<String, RetryPolicy> methodNameToPolicyMap
|
Map<String, RetryPolicy> methodNameToPolicyMap
|
||||||
= new HashMap<String, RetryPolicy>();
|
= new HashMap<String, RetryPolicy>();
|
||||||
|
|
||||||
methodNameToPolicyMap.put("create", methodPolicy);
|
methodNameToPolicyMap.put("create", methodPolicy);
|
||||||
|
|
||||||
proxy = (ClientNamenodeProtocolPB) RetryProxy.create(
|
ClientProtocol translatorProxy =
|
||||||
ClientNamenodeProtocolPB.class,
|
new ClientNamenodeProtocolTranslatorPB(proxy);
|
||||||
new DefaultFailoverProxyProvider<ClientNamenodeProtocolPB>(
|
return (ClientProtocol) RetryProxy.create(
|
||||||
ClientNamenodeProtocolPB.class, proxy),
|
ClientProtocol.class,
|
||||||
|
new DefaultFailoverProxyProvider<ClientProtocol>(
|
||||||
|
ClientProtocol.class, translatorProxy),
|
||||||
methodNameToPolicyMap,
|
methodNameToPolicyMap,
|
||||||
defaultPolicy);
|
defaultPolicy);
|
||||||
|
} else {
|
||||||
|
return new ClientNamenodeProtocolTranslatorPB(proxy);
|
||||||
}
|
}
|
||||||
return new ClientNamenodeProtocolTranslatorPB(proxy);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Object createNameNodeProxy(InetSocketAddress address,
|
private static Object createNameNodeProxy(InetSocketAddress address,
|
||||||
|
|
|
@ -97,7 +97,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements
|
||||||
RPC.setProtocolEngine(conf, DatanodeProtocolPB.class,
|
RPC.setProtocolEngine(conf, DatanodeProtocolPB.class,
|
||||||
ProtobufRpcEngine.class);
|
ProtobufRpcEngine.class);
|
||||||
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||||
rpcProxy = createNamenodeWithRetry(createNamenode(nameNodeAddr, conf, ugi));
|
rpcProxy = createNamenode(nameNodeAddr, conf, ugi);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static DatanodeProtocolPB createNamenode(
|
private static DatanodeProtocolPB createNamenode(
|
||||||
|
@ -109,33 +109,6 @@ public class DatanodeProtocolClientSideTranslatorPB implements
|
||||||
org.apache.hadoop.ipc.Client.getPingInterval(conf), null).getProxy();
|
org.apache.hadoop.ipc.Client.getPingInterval(conf), null).getProxy();
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Create a {@link NameNode} proxy */
|
|
||||||
static DatanodeProtocolPB createNamenodeWithRetry(
|
|
||||||
DatanodeProtocolPB rpcNamenode) {
|
|
||||||
RetryPolicy createPolicy = RetryPolicies
|
|
||||||
.retryUpToMaximumCountWithFixedSleep(5,
|
|
||||||
HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
|
|
||||||
|
|
||||||
Map<Class<? extends Exception>, RetryPolicy> remoteExceptionToPolicyMap =
|
|
||||||
new HashMap<Class<? extends Exception>, RetryPolicy>();
|
|
||||||
remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class,
|
|
||||||
createPolicy);
|
|
||||||
|
|
||||||
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
|
|
||||||
new HashMap<Class<? extends Exception>, RetryPolicy>();
|
|
||||||
exceptionToPolicyMap.put(RemoteException.class, RetryPolicies
|
|
||||||
.retryByRemoteException(RetryPolicies.TRY_ONCE_THEN_FAIL,
|
|
||||||
remoteExceptionToPolicyMap));
|
|
||||||
RetryPolicy methodPolicy = RetryPolicies.retryByException(
|
|
||||||
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
|
|
||||||
Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<String, RetryPolicy>();
|
|
||||||
|
|
||||||
methodNameToPolicyMap.put("create", methodPolicy);
|
|
||||||
|
|
||||||
return (DatanodeProtocolPB) RetryProxy.create(DatanodeProtocolPB.class,
|
|
||||||
rpcNamenode, methodNameToPolicyMap);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
RPC.stopProxy(rpcProxy);
|
RPC.stopProxy(rpcProxy);
|
||||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
||||||
import org.apache.hadoop.ipc.ProtobufHelper;
|
import org.apache.hadoop.ipc.ProtobufHelper;
|
||||||
import org.apache.hadoop.ipc.ProtocolMetaInterface;
|
import org.apache.hadoop.ipc.ProtocolMetaInterface;
|
||||||
|
import org.apache.hadoop.ipc.ProtocolTranslator;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.ipc.RpcClientUtil;
|
import org.apache.hadoop.ipc.RpcClientUtil;
|
||||||
|
|
||||||
|
@ -61,7 +62,7 @@ import com.google.protobuf.ServiceException;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Stable
|
@InterfaceStability.Stable
|
||||||
public class NamenodeProtocolTranslatorPB implements NamenodeProtocol,
|
public class NamenodeProtocolTranslatorPB implements NamenodeProtocol,
|
||||||
ProtocolMetaInterface, Closeable {
|
ProtocolMetaInterface, Closeable, ProtocolTranslator {
|
||||||
/** RpcController is not used and hence is set to null */
|
/** RpcController is not used and hence is set to null */
|
||||||
private final static RpcController NULL_CONTROLLER = null;
|
private final static RpcController NULL_CONTROLLER = null;
|
||||||
|
|
||||||
|
@ -88,6 +89,11 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol,
|
||||||
RPC.stopProxy(rpcProxy);
|
RPC.stopProxy(rpcProxy);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object getUnderlyingProxyObject() {
|
||||||
|
return rpcProxy;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
|
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
|
@ -30,6 +30,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHEC
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
|
||||||
|
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
||||||
|
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
@ -79,6 +81,7 @@ import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
import org.apache.hadoop.io.EnumSetWritable;
|
import org.apache.hadoop.io.EnumSetWritable;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
|
@ -97,6 +100,8 @@ public class TestFileCreation {
|
||||||
((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
|
((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
|
||||||
((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
|
((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
|
||||||
}
|
}
|
||||||
|
private static final String RPC_DETAILED_METRICS =
|
||||||
|
"RpcDetailedActivityForPort";
|
||||||
|
|
||||||
static final long seed = 0xDEADBEEFL;
|
static final long seed = 0xDEADBEEFL;
|
||||||
static final int blockSize = 8192;
|
static final int blockSize = 8192;
|
||||||
|
@ -381,11 +386,15 @@ public class TestFileCreation {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
String metricsName = RPC_DETAILED_METRICS + cluster.getNameNodePort();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Path p = new Path("/testfile");
|
Path p = new Path("/testfile");
|
||||||
FSDataOutputStream stm1 = fs.create(p);
|
FSDataOutputStream stm1 = fs.create(p);
|
||||||
stm1.write(1);
|
stm1.write(1);
|
||||||
|
|
||||||
|
assertCounter("CreateNumOps", 1L, getMetrics(metricsName));
|
||||||
|
|
||||||
// Create file again without overwrite
|
// Create file again without overwrite
|
||||||
try {
|
try {
|
||||||
fs2.create(p, false);
|
fs2.create(p, false);
|
||||||
|
@ -394,7 +403,9 @@ public class TestFileCreation {
|
||||||
GenericTestUtils.assertExceptionContains("already being created by",
|
GenericTestUtils.assertExceptionContains("already being created by",
|
||||||
abce);
|
abce);
|
||||||
}
|
}
|
||||||
|
// NameNodeProxies' createNNProxyWithClientProtocol has 5 retries.
|
||||||
|
assertCounter("AlreadyBeingCreatedExceptionNumOps",
|
||||||
|
6L, getMetrics(metricsName));
|
||||||
FSDataOutputStream stm2 = fs2.create(p, true);
|
FSDataOutputStream stm2 = fs2.create(p, true);
|
||||||
stm2.write(2);
|
stm2.write(2);
|
||||||
stm2.close();
|
stm2.close();
|
||||||
|
|
|
@ -25,14 +25,16 @@ import java.net.InetSocketAddress;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB;
|
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
|
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
|
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolTranslatorPB;
|
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolTranslatorPB;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
|
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
||||||
|
import org.apache.hadoop.ipc.RPC;
|
||||||
|
import org.apache.hadoop.ipc.RpcClientUtil;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.RefreshUserMappingsProtocol;
|
import org.apache.hadoop.security.RefreshUserMappingsProtocol;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
@ -76,13 +78,19 @@ public class TestIsMethodSupported {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNamenodeProtocol() throws IOException {
|
public void testNamenodeProtocol() throws IOException {
|
||||||
NamenodeProtocolTranslatorPB translator =
|
NamenodeProtocol np =
|
||||||
(NamenodeProtocolTranslatorPB) NameNodeProxies.createNonHAProxy(conf,
|
NameNodeProxies.createNonHAProxy(conf,
|
||||||
nnAddress, NamenodeProtocol.class, UserGroupInformation.getCurrentUser(),
|
nnAddress, NamenodeProtocol.class, UserGroupInformation.getCurrentUser(),
|
||||||
true).getProxy();
|
true).getProxy();
|
||||||
boolean exists = translator.isMethodSupported("rollEditLog");
|
|
||||||
|
boolean exists = RpcClientUtil.isMethodSupported(np,
|
||||||
|
NamenodeProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
|
||||||
|
RPC.getProtocolVersion(NamenodeProtocolPB.class), "rollEditLog");
|
||||||
|
|
||||||
assertTrue(exists);
|
assertTrue(exists);
|
||||||
exists = translator.isMethodSupported("bogusMethod");
|
exists = RpcClientUtil.isMethodSupported(np,
|
||||||
|
NamenodeProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
|
||||||
|
RPC.getProtocolVersion(NamenodeProtocolPB.class), "bogusMethod");
|
||||||
assertFalse(exists);
|
assertFalse(exists);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -110,11 +118,13 @@ public class TestIsMethodSupported {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testClientNamenodeProtocol() throws IOException {
|
public void testClientNamenodeProtocol() throws IOException {
|
||||||
ClientNamenodeProtocolTranslatorPB translator =
|
ClientProtocol cp =
|
||||||
(ClientNamenodeProtocolTranslatorPB) NameNodeProxies.createNonHAProxy(
|
NameNodeProxies.createNonHAProxy(
|
||||||
conf, nnAddress, ClientProtocol.class,
|
conf, nnAddress, ClientProtocol.class,
|
||||||
UserGroupInformation.getCurrentUser(), true).getProxy();
|
UserGroupInformation.getCurrentUser(), true).getProxy();
|
||||||
assertTrue(translator.isMethodSupported("mkdirs"));
|
RpcClientUtil.isMethodSupported(cp,
|
||||||
|
ClientNamenodeProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
|
||||||
|
RPC.getProtocolVersion(ClientNamenodeProtocolPB.class), "mkdirs");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue