HDFS-15543. RBF: Write Should allow, when a subcluster is unavailable for RANDOM mount points with fault Tolerance enabled. Contributed by Hemanth Boyina.

This commit is contained in:
hemanthboyina 2020-10-07 09:58:53 +05:30
parent 1cfe5916e2
commit 921ca1f554
3 changed files with 91 additions and 7 deletions

View File

@ -306,7 +306,7 @@ public class RouterClientProtocol implements ClientProtocol {
* @return If caused by an unavailable subcluster. False if the should not be
* retried (e.g., NSQuotaExceededException).
*/
private static boolean isUnavailableSubclusterException(
protected static boolean isUnavailableSubclusterException(
final IOException ioe) {
if (ioe instanceof ConnectException ||
ioe instanceof ConnectTimeoutException ||

View File

@ -37,6 +37,7 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
@ -587,6 +588,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
/**
* Invokes the method at default namespace, if default namespace is not
* available then at the first available namespace.
* If the namespace is unavailable, retry once with other namespace.
* @param <T> expected return type.
* @param method the remote method.
* @return the response received after invoking method.
@ -595,18 +597,61 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
<T> T invokeAtAvailableNs(RemoteMethod method, Class<T> clazz)
throws IOException {
String nsId = subclusterResolver.getDefaultNamespace();
if (!nsId.isEmpty()) {
return rpcClient.invokeSingle(nsId, method, clazz);
}
// If default Ns is not present return result from first namespace.
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
if (nss.isEmpty()) {
throw new IOException("No namespace available.");
try {
if (!nsId.isEmpty()) {
return rpcClient.invokeSingle(nsId, method, clazz);
}
// If no namespace is available, throw IOException.
IOException io = new IOException("No namespace available.");
return invokeOnNs(method, clazz, io, nss);
} catch (IOException ioe) {
if (!clientProto.isUnavailableSubclusterException(ioe)) {
LOG.debug("{} exception cannot be retried",
ioe.getClass().getSimpleName());
throw ioe;
}
Set<FederationNamespaceInfo> nssWithoutFailed = getNameSpaceInfo(nsId);
return invokeOnNs(method, clazz, ioe, nssWithoutFailed);
}
nsId = nss.iterator().next().getNameserviceId();
}
/**
* Invoke the method on first available namespace,
* throw no namespace available exception, if no namespaces are available.
* @param method the remote method.
* @param clazz Class for the return type.
* @param ioe IOException .
* @param nss List of name spaces in the federation
* @return the response received after invoking method.
* @throws IOException
*/
<T> T invokeOnNs(RemoteMethod method, Class<T> clazz, IOException ioe,
Set<FederationNamespaceInfo> nss) throws IOException {
if (nss.isEmpty()) {
throw ioe;
}
String nsId = nss.iterator().next().getNameserviceId();
return rpcClient.invokeSingle(nsId, method, clazz);
}
/**
* Get set of namespace info's removing the already invoked namespaceinfo.
* @param nsId already invoked namespace id
* @return List of name spaces in the federation on
* removing the already invoked namespaceinfo.
*/
private Set<FederationNamespaceInfo> getNameSpaceInfo(String nsId) {
Set<FederationNamespaceInfo> namespaceInfos = new HashSet<>();
for (FederationNamespaceInfo ns : namespaceInfos) {
if (!nsId.equals(ns.getNameserviceId())) {
namespaceInfos.add(ns);
}
}
return namespaceInfos;
}
@Override // ClientProtocol
public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
throws IOException {

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AclEntry;
@ -45,6 +46,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.SnapshotStatus;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
@ -60,6 +62,7 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDestinationRes
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.tools.federation.RouterAdmin;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.ToolRunner;
import org.junit.After;
@ -640,6 +643,42 @@ public class TestRouterRPCMultipleDestinationMountTableResolver {
assertEquals(ssQuota, cs.getSpaceQuota());
}
/**
* Test write on mount point with multiple destinations
* and making a one of the destination's subcluster unavailable.
*/
@Test
public void testWriteWithUnavailableSubCluster() throws IOException {
//create a mount point with multiple destinations
Path path = new Path("/testWriteWithUnavailableSubCluster");
Map<String, String> destMap = new HashMap<>();
destMap.put("ns0", "/testWriteWithUnavailableSubCluster");
destMap.put("ns1", "/testWriteWithUnavailableSubCluster");
nnFs0.mkdirs(path);
nnFs1.mkdirs(path);
MountTable addEntry =
MountTable.newInstance("/testWriteWithUnavailableSubCluster", destMap);
addEntry.setQuota(new RouterQuotaUsage.Builder().build());
addEntry.setDestOrder(DestinationOrder.RANDOM);
addEntry.setFaultTolerant(true);
assertTrue(addMountTable(addEntry));
//make one subcluster unavailable and perform write on mount point
MiniDFSCluster dfsCluster = cluster.getCluster();
dfsCluster.shutdownNameNode(0);
FSDataOutputStream out = null;
Path filePath = new Path(path, "aa");
try {
out = routerFs.create(filePath);
out.write("hello".getBytes());
out.hflush();
assertTrue(routerFs.exists(filePath));
} finally {
IOUtils.closeStream(out);
dfsCluster.restartNameNode(0);
}
}
/**
* Test to verify rename operation on directories in case of multiple
* destinations.