HDFS-16283. RBF: reducing the load of renewLease() RPC (#4524). Contributed by ZanderXu.

Reviewed-by: He Xiaoqiao <hexiaoqiao@apache.org>
Signed-off-by: Ayush Saxena <ayushsaxena@apache.org>
This commit is contained in:
xuzq 2022-07-14 09:56:40 +08:00 committed by GitHub
parent f1bd4e117e
commit 6f9c4359ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 255 additions and 32 deletions

View File

@ -41,6 +41,7 @@ import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
@ -579,6 +580,27 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
}
}
/**
* Get all namespaces of DFSOutputStreams.
*/
private List<String> getNamespaces() {
HashSet<String> namespaces = new HashSet<>();
synchronized (filesBeingWritten) {
for (DFSOutputStream outputStream : filesBeingWritten.values()) {
String namespace = outputStream.getNamespace();
if (namespace == null || namespace.isEmpty()) {
return null;
} else {
namespaces.add(namespace);
}
}
if (namespaces.isEmpty()) {
return null;
}
}
return new ArrayList<>(namespaces);
}
/**
* Renew leases.
* @return true if lease was renewed. May return false if this
@ -587,7 +609,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
public boolean renewLease() throws IOException {
if (clientRunning && !isFilesBeingWrittenEmpty()) {
try {
namenode.renewLease(clientName);
namenode.renewLease(clientName, getNamespaces());
updateLastLeaseRenewal();
return true;
} catch (IOException e) {

View File

@ -113,6 +113,7 @@ public class DFSOutputStream extends FSOutputSummer
protected final String src;
protected final long fileId;
private final String namespace;
protected final long blockSize;
protected final int bytesPerChecksum;
@ -195,6 +196,7 @@ public class DFSOutputStream extends FSOutputSummer
this.dfsClient = dfsClient;
this.src = src;
this.fileId = stat.getFileId();
this.namespace = stat.getNamespace();
this.blockSize = stat.getBlockSize();
this.blockReplication = stat.getReplication();
this.fileEncryptionInfo = stat.getFileEncryptionInfo();
@ -1084,6 +1086,11 @@ public class DFSOutputStream extends FSOutputSummer
return fileId;
}
@VisibleForTesting
public String getNamespace() {
return namespace;
}
/**
* Return the source of stream.
*/

View File

@ -759,11 +759,19 @@ public interface ClientProtocol {
* the last call to renewLease(), the NameNode assumes the
* client has died.
*
* @param namespaces The full Namespace list that the renewLease rpc
* should be forwarded by RBF.
* Tips: NN side, this value should be null.
* RBF side, if this value is null, this rpc will
* be forwarded to all available namespaces,
* else this rpc will be forwarded to
* the special namespaces.
*
* @throws org.apache.hadoop.security.AccessControlException permission denied
* @throws IOException If an I/O error occurred
*/
@Idempotent
void renewLease(String clientName) throws IOException;
void renewLease(String clientName, List<String> namespaces) throws IOException;
/**
* Start lease recovery.

View File

@ -490,6 +490,10 @@ public interface HdfsFileStatus
*/
int compareTo(FileStatus stat);
void setNamespace(String namespace);
String getNamespace();
/**
* Set redundant flags for compatibility with existing applications.
*/

View File

@ -54,6 +54,8 @@ public class HdfsLocatedFileStatus
// BlockLocations[] is the user-facing type
private transient LocatedBlocks hdfsloc;
private String namespace = null;
/**
* Constructor.
* @param length the number of bytes the file has
@ -217,4 +219,14 @@ public class HdfsLocatedFileStatus
return this;
}
@Override
public String getNamespace() {
return namespace;
}
@Override
public void setNamespace(String namespace) {
this.namespace = namespace;
}
}

View File

@ -44,6 +44,8 @@ public class HdfsNamedFileStatus extends FileStatus implements HdfsFileStatus {
private final int childrenNum;
private final byte storagePolicy;
private String namespace = null;
/**
* Constructor.
* @param length the number of bytes the file has
@ -177,4 +179,13 @@ public class HdfsNamedFileStatus extends FileStatus implements HdfsFileStatus {
return super.hashCode();
}
@Override
public String getNamespace() {
return namespace;
}
@Override
public void setNamespace(String namespace) {
this.namespace = namespace;
}
}

View File

@ -744,11 +744,15 @@ public class ClientNamenodeProtocolTranslatorPB implements
@Override
public void renewLease(String clientName) throws IOException {
RenewLeaseRequestProto req = RenewLeaseRequestProto.newBuilder()
.setClientName(clientName).build();
public void renewLease(String clientName, List<String> namespaces)
throws IOException {
RenewLeaseRequestProto.Builder builder = RenewLeaseRequestProto
.newBuilder().setClientName(clientName);
if (namespaces != null && !namespaces.isEmpty()) {
builder.addAllNamespaces(namespaces);
}
try {
rpcProxy.renewLease(null, req);
rpcProxy.renewLease(null, builder.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}

View File

@ -1764,7 +1764,7 @@ public class PBHelperClient {
EnumSet<HdfsFileStatus.Flags> flags = fs.hasFlags()
? convertFlags(fs.getFlags())
: convertFlags(fs.getPermission());
return new HdfsFileStatus.Builder()
HdfsFileStatus hdfsFileStatus = new HdfsFileStatus.Builder()
.length(fs.getLength())
.isdir(fs.getFileType().equals(FileType.IS_DIR))
.replication(fs.getBlockReplication())
@ -1794,6 +1794,10 @@ public class PBHelperClient {
? convertErasureCodingPolicy(fs.getEcPolicy())
: null)
.build();
if (fs.hasNamespace()) {
hdfsFileStatus.setNamespace(fs.getNamespace());
}
return hdfsFileStatus;
}
private static EnumSet<HdfsFileStatus.Flags> convertFlags(int flags) {
@ -2399,6 +2403,9 @@ public class PBHelperClient {
flags |= fs.isSnapshotEnabled() ? HdfsFileStatusProto.Flags
.SNAPSHOT_ENABLED_VALUE : 0;
builder.setFlags(flags);
if (fs.getNamespace() != null && !fs.getNamespace().isEmpty()) {
builder.setNamespace(fs.getNamespace());
}
return builder.build();
}

View File

@ -332,6 +332,7 @@ message GetSnapshotDiffReportListingResponseProto {
}
message RenewLeaseRequestProto {
required string clientName = 1;
repeated string namespaces = 2;
}
message RenewLeaseResponseProto { //void response

View File

@ -481,6 +481,7 @@ message HdfsFileStatusProto {
// Set of flags
optional uint32 flags = 18 [default = 0];
optional string namespace = 19;
}
/**

View File

@ -291,8 +291,10 @@ public class RouterClientProtocol implements ClientProtocol {
RemoteLocation createLocation = null;
try {
createLocation = rpcServer.getCreateLocation(src, locations);
return rpcClient.invokeSingle(createLocation, method,
HdfsFileStatus status = rpcClient.invokeSingle(createLocation, method,
HdfsFileStatus.class);
status.setNamespace(createLocation.getNameserviceId());
return status;
} catch (IOException ioe) {
final List<RemoteLocation> newLocations = checkFaultTolerantRetry(
method, src, ioe, createLocation, locations);
@ -377,8 +379,11 @@ public class RouterClientProtocol implements ClientProtocol {
RemoteMethod method = new RemoteMethod("append",
new Class<?>[] {String.class, String.class, EnumSetWritable.class},
new RemoteParam(), clientName, flag);
return rpcClient.invokeSequential(
locations, method, LastBlockWithStatus.class, null);
RemoteResult result = rpcClient.invokeSequential(
method, locations, LastBlockWithStatus.class, null);
LastBlockWithStatus lbws = (LastBlockWithStatus) result.getResult();
lbws.getFileStatus().setNamespace(result.getLocation().getNameserviceId());
return lbws;
}
@Override
@ -759,14 +764,49 @@ public class RouterClientProtocol implements ClientProtocol {
}
}
private Map<String, FederationNamespaceInfo> getAvailableNamespaces()
throws IOException {
Map<String, FederationNamespaceInfo> allAvailableNamespaces =
new HashMap<>();
namenodeResolver.getNamespaces().forEach(
k -> allAvailableNamespaces.put(k.getNameserviceId(), k));
return allAvailableNamespaces;
}
/**
* Try to get a list of FederationNamespaceInfo for renewLease RPC.
*/
private List<FederationNamespaceInfo> getRenewLeaseNSs(List<String> namespaces)
throws IOException {
if (namespaces == null || namespaces.isEmpty()) {
return new ArrayList<>(namenodeResolver.getNamespaces());
}
List<FederationNamespaceInfo> result = new ArrayList<>();
Map<String, FederationNamespaceInfo> allAvailableNamespaces =
getAvailableNamespaces();
for (String namespace : namespaces) {
if (!allAvailableNamespaces.containsKey(namespace)) {
return new ArrayList<>(namenodeResolver.getNamespaces());
} else {
result.add(allAvailableNamespaces.get(namespace));
}
}
return result;
}
@Override
public void renewLease(String clientName) throws IOException {
public void renewLease(String clientName, List<String> namespaces)
throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
RemoteMethod method = new RemoteMethod("renewLease",
new Class<?>[] {String.class}, clientName);
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
rpcClient.invokeConcurrent(nss, method, false, false);
new Class<?>[] {String.class, List.class}, clientName, null);
List<FederationNamespaceInfo> nss = getRenewLeaseNSs(namespaces);
if (nss.size() == 1) {
rpcClient.invokeSingle(nss.get(0).getNameserviceId(), method);
} else {
rpcClient.invokeConcurrent(nss, method, false, false);
}
}
@Override

View File

@ -980,8 +980,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
}
@Override // ClientProtocol
public void renewLease(String clientName) throws IOException {
clientProto.renewLease(clientName);
public void renewLease(String clientName, List<String> namespaces)
throws IOException {
clientProto.renewLease(clientName, namespaces);
}
@Override // ClientProtocol

View File

@ -194,7 +194,7 @@ public class TestRouterHandlersFairness {
private void invokeConcurrent(ClientProtocol routerProto, String clientName)
throws IOException {
routerProto.renewLease(clientName);
routerProto.renewLease(clientName, null);
}
private int getTotalRejectedPermits(RouterContext routerContext) {

View File

@ -156,7 +156,7 @@ public class TestRouterClientMetrics {
@Test
public void testRenewLease() throws Exception {
router.getRpcServer().renewLease("test");
router.getRpcServer().renewLease("test", null);
assertCounter("RenewLeaseOps", 2L, getMetrics(ROUTER_METRICS));
assertCounter("ConcurrentRenewLeaseOps", 1L, getMetrics(ROUTER_METRICS));
}

View File

@ -159,7 +159,7 @@ public class TestDisableNameservices {
public void testWithoutDisabling() throws IOException {
// ns0 is slow and renewLease should take a long time
long t0 = monotonicNow();
routerProtocol.renewLease("client0");
routerProtocol.renewLease("client0", null);
long t = monotonicNow() - t0;
assertTrue("It took too little: " + t + "ms",
t > TimeUnit.SECONDS.toMillis(1));
@ -178,7 +178,7 @@ public class TestDisableNameservices {
// renewLease should be fast as we are skipping ns0
long t0 = monotonicNow();
routerProtocol.renewLease("client0");
routerProtocol.renewLease("client0", null);
long t = monotonicNow() - t0;
assertTrue("It took too long: " + t + "ms",
t < TimeUnit.SECONDS.toMillis(1));

View File

@ -215,7 +215,7 @@ public class TestRouterClientRejectOverload {
routerClient = new DFSClient(address, conf);
String clientName = routerClient.getClientName();
ClientProtocol routerProto = routerClient.getNamenode();
routerProto.renewLease(clientName);
routerProto.renewLease(clientName, null);
} catch (RemoteException re) {
IOException ioe = re.unwrapRemoteException();
assertTrue("Wrong exception: " + ioe,
@ -390,7 +390,7 @@ public class TestRouterClientRejectOverload {
cluster.getRouterClientConf());
String clientName = routerClient.getClientName();
ClientProtocol routerProto = routerClient.getNamenode();
routerProto.renewLease(clientName);
routerProto.renewLease(clientName, null);
} catch (Exception e) {
fail("Client request failed: " + e);
} finally {

View File

@ -153,7 +153,7 @@ public class TestRouterRPCClientRetries {
DFSClient client = nnContext1.getClient();
// Renew lease for the DFS client, it will succeed.
routerProtocol.renewLease(client.getClientName());
routerProtocol.renewLease(client.getClientName(), null);
// Verify the retry times, it will retry one time for ns0.
FederationRPCMetrics rpcMetrics = routerContext.getRouter()

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -107,6 +108,7 @@ import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeCon
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.MockResolver;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
import org.apache.hadoop.hdfs.server.federation.metrics.RBFMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
@ -1450,6 +1452,101 @@ public class TestRouterRpc {
assertEquals(nnSuccess, routerSuccess);
}
private void testRenewLeaseInternal(DistributedFileSystem dfs,
FederationRPCMetrics rpcMetrics, Path testPath, boolean createFlag)
throws Exception {
FSDataOutputStream outputStream = null;
try {
if (createFlag) {
outputStream = dfs.create(testPath);
} else {
outputStream = dfs.append(testPath);
}
outputStream.write("hello world. \n".getBytes());
long proxyOpBeforeRenewLease = rpcMetrics.getProxyOps();
assertTrue(dfs.getClient().renewLease());
long proxyOpAfterRenewLease = rpcMetrics.getProxyOps();
assertEquals((proxyOpBeforeRenewLease + 1), proxyOpAfterRenewLease);
} finally {
if (outputStream != null) {
outputStream.close();
}
}
}
@Test
public void testRenewLeaseForECFile() throws Exception {
String ecName = "RS-6-3-1024k";
FederationRPCMetrics metrics = router.getRouterRpcServer().getRPCMetrics();
// Install a mount point to a different path to check
MockResolver resolver =
(MockResolver)router.getRouter().getSubclusterResolver();
String ns0 = cluster.getNameservices().get(0);
resolver.addLocation("/testRenewLease0", ns0, "/testRenewLease0");
// Stop LeaseRenewer
DistributedFileSystem routerDFS = (DistributedFileSystem) routerFS;
routerDFS.getClient().getLeaseRenewer().interruptAndJoin();
Path testECPath = new Path("/testRenewLease0/ecDirectory/test_ec.txt");
routerDFS.mkdirs(testECPath.getParent());
routerDFS.setErasureCodingPolicy(
testECPath.getParent(), ecName);
testRenewLeaseInternal(routerDFS, metrics, testECPath, true);
ErasureCodingPolicy ecPolicy = routerDFS.getErasureCodingPolicy(testECPath);
assertNotNull(ecPolicy);
assertEquals(ecName, ecPolicy.getName());
}
@Test
public void testRenewLeaseForReplicaFile() throws Exception {
FederationRPCMetrics metrics = router.getRouterRpcServer().getRPCMetrics();
// Install a mount point to a different path to check
MockResolver resolver =
(MockResolver)router.getRouter().getSubclusterResolver();
String ns0 = cluster.getNameservices().get(0);
resolver.addLocation("/testRenewLease0", ns0, "/testRenewLease0");
// Stop LeaseRenewer
DistributedFileSystem routerDFS = (DistributedFileSystem) routerFS;
routerDFS.getClient().getLeaseRenewer().interruptAndJoin();
// Test Replica File
Path testPath = new Path("/testRenewLease0/test_replica.txt");
testRenewLeaseInternal(routerDFS, metrics, testPath, true);
testRenewLeaseInternal(routerDFS, metrics, testPath, false);
}
@Test
public void testRenewLeaseWithMultiStream() throws Exception {
FederationRPCMetrics metrics = router.getRouterRpcServer().getRPCMetrics();
// Install a mount point to a different path to check
MockResolver resolver =
(MockResolver)router.getRouter().getSubclusterResolver();
String ns0 = cluster.getNameservices().get(0);
String ns1 = cluster.getNameservices().get(1);
resolver.addLocation("/testRenewLease0", ns0, "/testRenewLease0");
resolver.addLocation("/testRenewLease1", ns1, "/testRenewLease1");
// Stop LeaseRenewer
DistributedFileSystem routerDFS = (DistributedFileSystem) routerFS;
routerDFS.getClient().getLeaseRenewer().interruptAndJoin();
Path newTestPath0 = new Path("/testRenewLease0/test1.txt");
Path newTestPath1 = new Path("/testRenewLease1/test1.txt");
try (FSDataOutputStream outStream1 = routerDFS.create(newTestPath0);
FSDataOutputStream outStream2 = routerDFS.create(newTestPath1)) {
outStream1.write("hello world \n".getBytes());
outStream2.write("hello world \n".getBytes());
long proxyOpBeforeRenewLease2 = metrics.getProxyOps();
assertTrue(routerDFS.getClient().renewLease());
long proxyOpAfterRenewLease2 = metrics.getProxyOps();
assertEquals((proxyOpBeforeRenewLease2 + 2), proxyOpAfterRenewLease2);
}
}
@Test
public void testProxyExceptionMessages() throws IOException {

View File

@ -818,7 +818,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
public RenewLeaseResponseProto renewLease(RpcController controller,
RenewLeaseRequestProto req) throws ServiceException {
try {
server.renewLease(req.getClientName());
server.renewLease(req.getClientName(), req.getNamespacesList());
return VOID_RENEWLEASE_RESPONSE;
} catch (IOException e) {
throw new ServiceException(e);

View File

@ -1174,7 +1174,14 @@ public class NameNodeRpcServer implements NamenodeProtocols {
}
@Override // ClientProtocol
public void renewLease(String clientName) throws IOException {
public void renewLease(String clientName, List<String> namespaces)
throws IOException {
if (namespaces != null && namespaces.size() > 0) {
LOG.warn("namespaces({}) should be null or empty "
+ "on NameNode side, please check it.", namespaces);
throw new IOException("namespaces(" + namespaces
+ ") should be null or empty");
}
checkNNStartup();
namesystem.renewLease(clientName);
}

View File

@ -384,7 +384,7 @@ public class TestDFSClientRetries {
cluster.waitActive();
NamenodeProtocols spyNN = spy(cluster.getNameNodeRpc());
Mockito.doThrow(new SocketTimeoutException()).when(spyNN).renewLease(
Mockito.anyString());
Mockito.anyString(), any());
DFSClient client = new DFSClient(null, spyNN, conf, null);
// Get hold of the lease renewer instance used by the client
final LeaseRenewer leaseRenewer1 = client.getLeaseRenewer();
@ -392,7 +392,7 @@ public class TestDFSClientRetries {
OutputStream out1 = client.create(file1, false);
Mockito.verify(spyNN, timeout(10000).times(1)).renewLease(
Mockito.anyString());
Mockito.anyString(), any());
verifyEmptyLease(leaseRenewer1);
GenericTestUtils.waitFor(() -> !(leaseRenewer1.isRunning()), 100, 10000);
try {
@ -406,12 +406,12 @@ public class TestDFSClientRetries {
// Verify DFSClient can do write operation after renewLease no longer
// throws SocketTimeoutException.
Mockito.doNothing().when(spyNN).renewLease(
Mockito.anyString());
Mockito.anyString(), any());
final LeaseRenewer leaseRenewer2 = client.getLeaseRenewer();
leaseRenewer2.setRenewalTime(100);
OutputStream out2 = client.create(file2, false);
Mockito.verify(spyNN, timeout(10000).times(2)).renewLease(
Mockito.anyString());
Mockito.anyString(), any());
out2.write(new byte[256]);
out2.close();
verifyEmptyLease(leaseRenewer2);
@ -1309,7 +1309,7 @@ public class TestDFSClientRetries {
try {
//1. trigger get LeaseRenewer lock
Mockito.doThrow(new SocketTimeoutException()).when(spyNN)
.renewLease(Mockito.anyString());
.renewLease(Mockito.anyString(), any());
} catch (IOException e) {
e.printStackTrace();
}

View File

@ -90,7 +90,8 @@ public class TestLease {
// stub the renew method.
doThrow(new RemoteException(InvalidToken.class.getName(),
"Your token is worthless")).when(spyNN).renewLease(anyString());
"Your token is worthless")).when(spyNN).renewLease(
anyString(), any());
// We don't need to wait the lease renewer thread to act.
// call renewLease() manually.
@ -131,7 +132,7 @@ public class TestLease {
Assert.assertTrue(originalRenewer.isEmpty());
// unstub
doNothing().when(spyNN).renewLease(anyString());
doNothing().when(spyNN).renewLease(anyString(), any());
// existing input streams should work
try {