From 97b75f47fd9dd4597b21f50eb2921fb7dc06c006 Mon Sep 17 00:00:00 2001 From: Brahma Reddy Battula Date: Wed, 26 Sep 2018 22:40:48 +0530 Subject: [PATCH] HDFS-13790. RBF: Move ClientProtocol APIs to its own module Contributed by Chao Sun. --- .../router/RouterClientProtocol.java | 1803 +++++++++++++++++ .../federation/router/RouterRpcServer.java | 1338 ++---------- 2 files changed, 1951 insertions(+), 1190 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java new file mode 100644 index 00000000000..35279bf4094 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -0,0 +1,1803 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.CryptoProtocolVersion; +import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; +import org.apache.hadoop.fs.CacheFlag; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FsServerDefaults; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.QuotaUsage; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.fs.XAttr; +import org.apache.hadoop.fs.XAttrSetFlag; +import org.apache.hadoop.fs.permission.AclEntry; +import org.apache.hadoop.fs.permission.AclStatus; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.AddBlockFlag; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.inotify.EventBatchList; +import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; +import org.apache.hadoop.hdfs.protocol.CachePoolEntry; +import org.apache.hadoop.hdfs.protocol.CachePoolInfo; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats; +import org.apache.hadoop.hdfs.protocol.EncryptionZone; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; +import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator; +import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats; +import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; +import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; +import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing; +import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; +import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; +import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; +import org.apache.hadoop.io.EnumSetWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Collection; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +/** + * Module that implements all the RPC calls in {@link ClientProtocol} in the + * {@link RouterRpcServer}. + */ +public class RouterClientProtocol implements ClientProtocol { + private static final Logger LOG = + LoggerFactory.getLogger(RouterClientProtocol.class.getName()); + + private final RouterRpcServer rpcServer; + private final RouterRpcClient rpcClient; + private final FileSubclusterResolver subclusterResolver; + private final ActiveNamenodeResolver namenodeResolver; + + /** Identifier for the super user. */ + private final String superUser; + /** Identifier for the super group. */ + private final String superGroup; + /** Erasure coding calls. */ + private final ErasureCoding erasureCoding; + + RouterClientProtocol(Configuration conf, RouterRpcServer rpcServer) { + this.rpcServer = rpcServer; + this.rpcClient = rpcServer.getRPCClient(); + this.subclusterResolver = rpcServer.getSubclusterResolver(); + this.namenodeResolver = rpcServer.getNamenodeResolver(); + + // User and group for reporting + this.superUser = System.getProperty("user.name"); + this.superGroup = conf.get( + DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY, + DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT); + this.erasureCoding = new ErasureCoding(rpcServer); + } + + @Override + public Token getDelegationToken(Text renewer) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false); + return null; + } + + /** + * The the delegation token from each name service. + * + * @param renewer + * @return Name service -> Token. + * @throws IOException + */ + public Map> + getDelegationTokens(Text renewer) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false); + return null; + } + + @Override + public long renewDelegationToken(Token token) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false); + return 0; + } + + @Override + public void cancelDelegationToken(Token token) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false); + } + + @Override + public LocatedBlocks getBlockLocations(String src, final long offset, + final long length) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + List locations = rpcServer.getLocationsForPath(src, false); + RemoteMethod remoteMethod = new RemoteMethod("getBlockLocations", + new Class[] {String.class, long.class, long.class}, + new RemoteParam(), offset, length); + return rpcClient.invokeSequential(locations, remoteMethod, + LocatedBlocks.class, null); + } + + @Override + public FsServerDefaults getServerDefaults() throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + RemoteMethod method = new RemoteMethod("getServerDefaults"); + String ns = subclusterResolver.getDefaultNamespace(); + return (FsServerDefaults) rpcClient.invokeSingle(ns, method); + } + + @Override + public HdfsFileStatus create(String src, FsPermission masked, + String clientName, EnumSetWritable flag, + boolean createParent, short replication, long blockSize, + CryptoProtocolVersion[] supportedVersions, String ecPolicyName) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + if (createParent && isPathAll(src)) { + int index = src.lastIndexOf(Path.SEPARATOR); + String parent = src.substring(0, index); + LOG.debug("Creating {} requires creating parent {}", src, parent); + FsPermission parentPermissions = getParentPermission(masked); + boolean success = mkdirs(parent, parentPermissions, createParent); + if (!success) { + // This shouldn't happen as mkdirs returns true or exception + LOG.error("Couldn't create parents for {}", src); + } + } + + RemoteLocation createLocation = rpcServer.getCreateLocation(src); + RemoteMethod method = new RemoteMethod("create", + new Class[] {String.class, FsPermission.class, String.class, + EnumSetWritable.class, boolean.class, short.class, + long.class, CryptoProtocolVersion[].class, + String.class}, + createLocation.getDest(), masked, clientName, flag, createParent, + replication, blockSize, supportedVersions, ecPolicyName); + return (HdfsFileStatus) rpcClient.invokeSingle(createLocation, method); + } + + @Override + public LastBlockWithStatus append(String src, final String clientName, + final EnumSetWritable flag) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + List locations = rpcServer.getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("append", + new Class[] {String.class, String.class, EnumSetWritable.class}, + new RemoteParam(), clientName, flag); + return rpcClient.invokeSequential( + locations, method, LastBlockWithStatus.class, null); + } + + @Override + public boolean recoverLease(String src, String clientName) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + final List locations = + rpcServer.getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("recoverLease", + new Class[] {String.class, String.class}, new RemoteParam(), + clientName); + Object result = rpcClient.invokeSequential( + locations, method, Boolean.class, Boolean.TRUE); + return (boolean) result; + } + + @Override + public boolean setReplication(String src, short replication) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + List locations = rpcServer.getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("setReplication", + new Class[] {String.class, short.class}, new RemoteParam(), + replication); + Object result = rpcClient.invokeSequential( + locations, method, Boolean.class, Boolean.TRUE); + return (boolean) result; + } + + @Override + public void setStoragePolicy(String src, String policyName) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + List locations = rpcServer.getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("setStoragePolicy", + new Class[] {String.class, String.class}, + new RemoteParam(), policyName); + rpcClient.invokeSequential(locations, method, null, null); + } + + @Override + public BlockStoragePolicy[] getStoragePolicies() throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + RemoteMethod method = new RemoteMethod("getStoragePolicies"); + String ns = subclusterResolver.getDefaultNamespace(); + return (BlockStoragePolicy[]) rpcClient.invokeSingle(ns, method); + } + + @Override + public void setPermission(String src, FsPermission permissions) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + final List locations = + rpcServer.getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("setPermission", + new Class[] {String.class, FsPermission.class}, + new RemoteParam(), permissions); + if (isPathAll(src)) { + rpcClient.invokeConcurrent(locations, method); + } else { + rpcClient.invokeSequential(locations, method); + } + } + + @Override + public void setOwner(String src, String username, String groupname) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + final List locations = + rpcServer.getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("setOwner", + new Class[] {String.class, String.class, String.class}, + new RemoteParam(), username, groupname); + if (isPathAll(src)) { + rpcClient.invokeConcurrent(locations, method); + } else { + rpcClient.invokeSequential(locations, method); + } + } + + /** + * Excluded and favored nodes are not verified and will be ignored by + * placement policy if they are not in the same nameservice as the file. + */ + @Override + public LocatedBlock addBlock(String src, String clientName, + ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId, + String[] favoredNodes, EnumSet addBlockFlags) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + final List locations = + rpcServer.getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("addBlock", + new Class[] {String.class, String.class, ExtendedBlock.class, + DatanodeInfo[].class, long.class, String[].class, + EnumSet.class}, + new RemoteParam(), clientName, previous, excludedNodes, fileId, + favoredNodes, addBlockFlags); + // TODO verify the excludedNodes and favoredNodes are acceptable to this NN + return rpcClient.invokeSequential( + locations, method, LocatedBlock.class, null); + } + + /** + * Excluded nodes are not verified and will be ignored by placement if they + * are not in the same nameservice as the file. + */ + @Override + public LocatedBlock getAdditionalDatanode(final String src, final long fileId, + final ExtendedBlock blk, final DatanodeInfo[] existings, + final String[] existingStorageIDs, final DatanodeInfo[] excludes, + final int numAdditionalNodes, final String clientName) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + final List locations = + rpcServer.getLocationsForPath(src, false); + RemoteMethod method = new RemoteMethod("getAdditionalDatanode", + new Class[] {String.class, long.class, ExtendedBlock.class, + DatanodeInfo[].class, String[].class, + DatanodeInfo[].class, int.class, String.class}, + new RemoteParam(), fileId, blk, existings, existingStorageIDs, excludes, + numAdditionalNodes, clientName); + return rpcClient.invokeSequential( + locations, method, LocatedBlock.class, null); + } + + @Override + public void abandonBlock(ExtendedBlock b, long fileId, String src, + String holder) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + RemoteMethod method = new RemoteMethod("abandonBlock", + new Class[] {ExtendedBlock.class, long.class, String.class, + String.class}, + b, fileId, new RemoteParam(), holder); + rpcClient.invokeSingle(b, method); + } + + @Override + public boolean complete(String src, String clientName, ExtendedBlock last, + long fileId) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + final List locations = + rpcServer.getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("complete", + new Class[] {String.class, String.class, ExtendedBlock.class, + long.class}, + new RemoteParam(), clientName, last, fileId); + // Complete can return true/false, so don't expect a result + return rpcClient.invokeSequential(locations, method, Boolean.class, null); + } + + @Override + public LocatedBlock updateBlockForPipeline( + ExtendedBlock block, String clientName) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + RemoteMethod method = new RemoteMethod("updateBlockForPipeline", + new Class[] {ExtendedBlock.class, String.class}, + block, clientName); + return (LocatedBlock) rpcClient.invokeSingle(block, method); + } + + /** + * Datanode are not verified to be in the same nameservice as the old block. + * TODO This may require validation. + */ + @Override + public void updatePipeline(String clientName, ExtendedBlock oldBlock, + ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + RemoteMethod method = new RemoteMethod("updatePipeline", + new Class[] {String.class, ExtendedBlock.class, ExtendedBlock.class, + DatanodeID[].class, String[].class}, + clientName, oldBlock, newBlock, newNodes, newStorageIDs); + rpcClient.invokeSingle(oldBlock, method); + } + + @Override + public long getPreferredBlockSize(String src) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + final List locations = + rpcServer.getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("getPreferredBlockSize", + new Class[] {String.class}, new RemoteParam()); + return rpcClient.invokeSequential(locations, method, Long.class, null); + } + + @Deprecated + @Override + public boolean rename(final String src, final String dst) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + final List srcLocations = + rpcServer.getLocationsForPath(src, true, false); + // srcLocations may be trimmed by getRenameDestinations() + final List locs = new LinkedList<>(srcLocations); + RemoteParam dstParam = getRenameDestinations(locs, dst); + if (locs.isEmpty()) { + throw new IOException( + "Rename of " + src + " to " + dst + " is not allowed," + + " no eligible destination in the same namespace was found."); + } + RemoteMethod method = new RemoteMethod("rename", + new Class[] {String.class, String.class}, + new RemoteParam(), dstParam); + return rpcClient.invokeSequential(locs, method, Boolean.class, + Boolean.TRUE); + } + + @Override + public void rename2(final String src, final String dst, + final Options.Rename... options) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + final List srcLocations = + rpcServer.getLocationsForPath(src, true, false); + // srcLocations may be trimmed by getRenameDestinations() + final List locs = new LinkedList<>(srcLocations); + RemoteParam dstParam = getRenameDestinations(locs, dst); + if (locs.isEmpty()) { + throw new IOException( + "Rename of " + src + " to " + dst + " is not allowed," + + " no eligible destination in the same namespace was found."); + } + RemoteMethod method = new RemoteMethod("rename2", + new Class[] {String.class, String.class, options.getClass()}, + new RemoteParam(), dstParam, options); + rpcClient.invokeSequential(locs, method, null, null); + } + + @Override + public void concat(String trg, String[] src) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + // See if the src and target files are all in the same namespace + LocatedBlocks targetBlocks = getBlockLocations(trg, 0, 1); + if (targetBlocks == null) { + throw new IOException("Cannot locate blocks for target file - " + trg); + } + LocatedBlock lastLocatedBlock = targetBlocks.getLastLocatedBlock(); + String targetBlockPoolId = lastLocatedBlock.getBlock().getBlockPoolId(); + for (String source : src) { + LocatedBlocks sourceBlocks = getBlockLocations(source, 0, 1); + if (sourceBlocks == null) { + throw new IOException( + "Cannot located blocks for source file " + source); + } + String sourceBlockPoolId = + sourceBlocks.getLastLocatedBlock().getBlock().getBlockPoolId(); + if (!sourceBlockPoolId.equals(targetBlockPoolId)) { + throw new IOException("Cannot concatenate source file " + source + + " because it is located in a different namespace" + + " with block pool id " + sourceBlockPoolId + + " from the target file with block pool id " + + targetBlockPoolId); + } + } + + // Find locations in the matching namespace. + final RemoteLocation targetDestination = + rpcServer.getLocationForPath(trg, true, targetBlockPoolId); + String[] sourceDestinations = new String[src.length]; + for (int i = 0; i < src.length; i++) { + String sourceFile = src[i]; + RemoteLocation location = + rpcServer.getLocationForPath(sourceFile, true, targetBlockPoolId); + sourceDestinations[i] = location.getDest(); + } + // Invoke + RemoteMethod method = new RemoteMethod("concat", + new Class[] {String.class, String[].class}, + targetDestination.getDest(), sourceDestinations); + rpcClient.invokeSingle(targetDestination, method); + } + + @Override + public boolean truncate(String src, long newLength, String clientName) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + final List locations = + rpcServer.getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("truncate", + new Class[] {String.class, long.class, String.class}, + new RemoteParam(), newLength, clientName); + return rpcClient.invokeSequential(locations, method, Boolean.class, + Boolean.TRUE); + } + + @Override + public boolean delete(String src, boolean recursive) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + final List locations = + rpcServer.getLocationsForPath(src, true, false); + RemoteMethod method = new RemoteMethod("delete", + new Class[] {String.class, boolean.class}, new RemoteParam(), + recursive); + if (isPathAll(src)) { + return rpcClient.invokeAll(locations, method); + } else { + return rpcClient.invokeSequential(locations, method, + Boolean.class, Boolean.TRUE); + } + } + + @Override + public boolean mkdirs(String src, FsPermission masked, boolean createParent) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + final List locations = + rpcServer.getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("mkdirs", + new Class[] {String.class, FsPermission.class, boolean.class}, + new RemoteParam(), masked, createParent); + + // Create in all locations + if (isPathAll(src)) { + return rpcClient.invokeAll(locations, method); + } + + if (locations.size() > 1) { + // Check if this directory already exists + try { + HdfsFileStatus fileStatus = getFileInfo(src); + if (fileStatus != null) { + // When existing, the NN doesn't return an exception; return true + return true; + } + } catch (IOException ioe) { + // Can't query if this file exists or not. + LOG.error("Error requesting file info for path {} while proxing mkdirs", + src, ioe); + } + } + + RemoteLocation firstLocation = locations.get(0); + return (boolean) rpcClient.invokeSingle(firstLocation, method); + } + + @Override + public void renewLease(String clientName) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + RemoteMethod method = new RemoteMethod("renewLease", + new Class[] {String.class}, clientName); + Set nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent(nss, method, false, false); + } + + @Override + public DirectoryListing getListing(String src, byte[] startAfter, + boolean needLocation) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + // Locate the dir and fetch the listing + final List locations = + rpcServer.getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("getListing", + new Class[] {String.class, startAfter.getClass(), boolean.class}, + new RemoteParam(), startAfter, needLocation); + Map listings = + rpcClient.invokeConcurrent( + locations, method, false, false, DirectoryListing.class); + + Map nnListing = new TreeMap<>(); + int totalRemainingEntries = 0; + int remainingEntries = 0; + boolean namenodeListingExists = false; + if (listings != null) { + // Check the subcluster listing with the smallest name + String lastName = null; + for (Map.Entry entry : + listings.entrySet()) { + RemoteLocation location = entry.getKey(); + DirectoryListing listing = entry.getValue(); + if (listing == null) { + LOG.debug("Cannot get listing from {}", location); + } else { + totalRemainingEntries += listing.getRemainingEntries(); + HdfsFileStatus[] partialListing = listing.getPartialListing(); + int length = partialListing.length; + if (length > 0) { + HdfsFileStatus lastLocalEntry = partialListing[length-1]; + String lastLocalName = lastLocalEntry.getLocalName(); + if (lastName == null || lastName.compareTo(lastLocalName) > 0) { + lastName = lastLocalName; + } + } + } + } + + // Add existing entries + for (Object value : listings.values()) { + DirectoryListing listing = (DirectoryListing) value; + if (listing != null) { + namenodeListingExists = true; + for (HdfsFileStatus file : listing.getPartialListing()) { + String filename = file.getLocalName(); + if (totalRemainingEntries > 0 && filename.compareTo(lastName) > 0) { + // Discarding entries further than the lastName + remainingEntries++; + } else { + nnListing.put(filename, file); + } + } + remainingEntries += listing.getRemainingEntries(); + } + } + } + + // Add mount points at this level in the tree + final List children = subclusterResolver.getMountPoints(src); + if (children != null) { + // Get the dates for each mount point + Map dates = getMountPointDates(src); + + // Create virtual folder with the mount name + for (String child : children) { + long date = 0; + if (dates != null && dates.containsKey(child)) { + date = dates.get(child); + } + // TODO add number of children + HdfsFileStatus dirStatus = getMountPointStatus(child, 0, date); + + // This may overwrite existing listing entries with the mount point + // TODO don't add if already there? + nnListing.put(child, dirStatus); + } + } + + if (!namenodeListingExists && nnListing.size() == 0) { + // NN returns a null object if the directory cannot be found and has no + // listing. If we didn't retrieve any NN listing data, and there are no + // mount points here, return null. + return null; + } + + // Generate combined listing + HdfsFileStatus[] combinedData = new HdfsFileStatus[nnListing.size()]; + combinedData = nnListing.values().toArray(combinedData); + return new DirectoryListing(combinedData, remainingEntries); + } + + @Override + public HdfsFileStatus getFileInfo(String src) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + final List locations = + rpcServer.getLocationsForPath(src, false); + RemoteMethod method = new RemoteMethod("getFileInfo", + new Class[] {String.class}, new RemoteParam()); + + HdfsFileStatus ret = null; + // If it's a directory, we check in all locations + if (isPathAll(src)) { + ret = getFileInfoAll(locations, method); + } else { + // Check for file information sequentially + ret = rpcClient.invokeSequential( + locations, method, HdfsFileStatus.class, null); + } + + // If there is no real path, check mount points + if (ret == null) { + List children = subclusterResolver.getMountPoints(src); + if (children != null && !children.isEmpty()) { + Map dates = getMountPointDates(src); + long date = 0; + if (dates != null && dates.containsKey(src)) { + date = dates.get(src); + } + ret = getMountPointStatus(src, children.size(), date); + } + } + + return ret; + } + + @Override + public boolean isFileClosed(String src) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + final List locations = + rpcServer.getLocationsForPath(src, false); + RemoteMethod method = new RemoteMethod("isFileClosed", + new Class[] {String.class}, new RemoteParam()); + return rpcClient.invokeSequential(locations, method, Boolean.class, + Boolean.TRUE); + } + + @Override + public HdfsFileStatus getFileLinkInfo(String src) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + final List locations = + rpcServer.getLocationsForPath(src, false); + RemoteMethod method = new RemoteMethod("getFileLinkInfo", + new Class[] {String.class}, new RemoteParam()); + return rpcClient.invokeSequential(locations, method, HdfsFileStatus.class, + null); + } + + @Override + public HdfsLocatedFileStatus getLocatedFileInfo(String src, + boolean needBlockToken) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + final List locations = + rpcServer.getLocationsForPath(src, false); + RemoteMethod method = new RemoteMethod("getLocatedFileInfo", + new Class[] {String.class, boolean.class}, new RemoteParam(), + needBlockToken); + return (HdfsLocatedFileStatus) rpcClient.invokeSequential( + locations, method, HdfsFileStatus.class, null); + } + + @Override + public long[] getStats() throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED); + + RemoteMethod method = new RemoteMethod("getStats"); + Set nss = namenodeResolver.getNamespaces(); + Map results = + rpcClient.invokeConcurrent(nss, method, true, false, long[].class); + long[] combinedData = new long[STATS_ARRAY_LENGTH]; + for (long[] data : results.values()) { + for (int i = 0; i < combinedData.length && i < data.length; i++) { + if (data[i] >= 0) { + combinedData[i] += data[i]; + } + } + } + return combinedData; + } + + @Override + public DatanodeInfo[] getDatanodeReport(HdfsConstants.DatanodeReportType type) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED); + return rpcServer.getDatanodeReport(type, true, 0); + } + + @Override + public DatanodeStorageReport[] getDatanodeStorageReport( + HdfsConstants.DatanodeReportType type) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED); + + Map dnSubcluster = + rpcServer.getDatanodeStorageReportMap(type); + + // Avoid repeating machines in multiple subclusters + Map datanodesMap = new LinkedHashMap<>(); + for (DatanodeStorageReport[] dns : dnSubcluster.values()) { + for (DatanodeStorageReport dn : dns) { + DatanodeInfo dnInfo = dn.getDatanodeInfo(); + String nodeId = dnInfo.getXferAddr(); + if (!datanodesMap.containsKey(nodeId)) { + datanodesMap.put(nodeId, dn); + } + // TODO merge somehow, right now it just takes the first one + } + } + + Collection datanodes = datanodesMap.values(); + DatanodeStorageReport[] combinedData = + new DatanodeStorageReport[datanodes.size()]; + combinedData = datanodes.toArray(combinedData); + return combinedData; + } + + @Override + public boolean setSafeMode(HdfsConstants.SafeModeAction action, + boolean isChecked) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + // Set safe mode in all the name spaces + RemoteMethod method = new RemoteMethod("setSafeMode", + new Class[] {HdfsConstants.SafeModeAction.class, boolean.class}, + action, isChecked); + Set nss = namenodeResolver.getNamespaces(); + Map results = + rpcClient.invokeConcurrent( + nss, method, true, !isChecked, Boolean.class); + + // We only report true if all the name space are in safe mode + int numSafemode = 0; + for (boolean safemode : results.values()) { + if (safemode) { + numSafemode++; + } + } + return numSafemode == results.size(); + } + + @Override + public boolean restoreFailedStorage(String arg) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED); + + RemoteMethod method = new RemoteMethod("restoreFailedStorage", + new Class[] {String.class}, arg); + final Set nss = namenodeResolver.getNamespaces(); + Map ret = + rpcClient.invokeConcurrent(nss, method, true, false, Boolean.class); + + boolean success = true; + for (boolean s : ret.values()) { + if (!s) { + success = false; + break; + } + } + return success; + } + + @Override + public boolean saveNamespace(long timeWindow, long txGap) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED); + + RemoteMethod method = new RemoteMethod("saveNamespace", + new Class[] {Long.class, Long.class}, timeWindow, txGap); + final Set nss = namenodeResolver.getNamespaces(); + Map ret = + rpcClient.invokeConcurrent(nss, method, true, false, boolean.class); + + boolean success = true; + for (boolean s : ret.values()) { + if (!s) { + success = false; + break; + } + } + return success; + } + + @Override + public long rollEdits() throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + RemoteMethod method = new RemoteMethod("rollEdits", new Class[] {}); + final Set nss = namenodeResolver.getNamespaces(); + Map ret = + rpcClient.invokeConcurrent(nss, method, true, false, long.class); + + // Return the maximum txid + long txid = 0; + for (long t : ret.values()) { + if (t > txid) { + txid = t; + } + } + return txid; + } + + @Override + public void refreshNodes() throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED); + + RemoteMethod method = new RemoteMethod("refreshNodes", new Class[] {}); + final Set nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent(nss, method, true, true); + } + + @Override + public void finalizeUpgrade() throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED); + + RemoteMethod method = new RemoteMethod("finalizeUpgrade", + new Class[] {}); + final Set nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent(nss, method, true, false); + } + + @Override + public boolean upgradeStatus() throws IOException { + String methodName = RouterRpcServer.getMethodName(); + throw new UnsupportedOperationException( + "Operation \"" + methodName + "\" is not supported"); + } + + @Override + public RollingUpgradeInfo rollingUpgrade( + HdfsConstants.RollingUpgradeAction action) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + RemoteMethod method = new RemoteMethod("rollingUpgrade", + new Class[] {HdfsConstants.RollingUpgradeAction.class}, action); + final Set nss = namenodeResolver.getNamespaces(); + Map ret = + rpcClient.invokeConcurrent( + nss, method, true, false, RollingUpgradeInfo.class); + + // Return the first rolling upgrade info + RollingUpgradeInfo info = null; + for (RollingUpgradeInfo infoNs : ret.values()) { + if (info == null && infoNs != null) { + info = infoNs; + } + } + return info; + } + + @Override + public void metaSave(String filename) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED); + + RemoteMethod method = new RemoteMethod("metaSave", + new Class[] {String.class}, filename); + final Set nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent(nss, method, true, false); + } + + @Override + public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + final List locations = + rpcServer.getLocationsForPath(path, false); + RemoteMethod method = new RemoteMethod("listCorruptFileBlocks", + new Class[] {String.class, String.class}, + new RemoteParam(), cookie); + return rpcClient.invokeSequential( + locations, method, CorruptFileBlocks.class, null); + } + + @Override + public void setBalancerBandwidth(long bandwidth) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED); + + RemoteMethod method = new RemoteMethod("setBalancerBandwidth", + new Class[] {Long.class}, bandwidth); + final Set nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent(nss, method, true, false); + } + + @Override + public ContentSummary getContentSummary(String path) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + // Get the summaries from regular files + Collection summaries = new LinkedList<>(); + FileNotFoundException notFoundException = null; + try { + final List locations = + rpcServer.getLocationsForPath(path, false); + RemoteMethod method = new RemoteMethod("getContentSummary", + new Class[] {String.class}, new RemoteParam()); + Map results = + rpcClient.invokeConcurrent( + locations, method, false, false, ContentSummary.class); + summaries.addAll(results.values()); + } catch (FileNotFoundException e) { + notFoundException = e; + } + + // Add mount points at this level in the tree + final List children = subclusterResolver.getMountPoints(path); + if (children != null) { + for (String child : children) { + Path childPath = new Path(path, child); + try { + ContentSummary mountSummary = getContentSummary(childPath.toString()); + if (mountSummary != null) { + summaries.add(mountSummary); + } + } catch (Exception e) { + LOG.error("Cannot get content summary for mount {}: {}", + childPath, e.getMessage()); + } + } + } + + // Throw original exception if no original nor mount points + if (summaries.isEmpty() && notFoundException != null) { + throw notFoundException; + } + + return aggregateContentSummary(summaries); + } + + @Override + public void fsync(String src, long fileId, String clientName, + long lastBlockLength) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + final List locations = + rpcServer.getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("fsync", + new Class[] {String.class, long.class, String.class, long.class }, + new RemoteParam(), fileId, clientName, lastBlockLength); + rpcClient.invokeSequential(locations, method); + } + + @Override + public void setTimes(String src, long mtime, long atime) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + final List locations = + rpcServer.getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("setTimes", + new Class[] {String.class, long.class, long.class}, + new RemoteParam(), mtime, atime); + rpcClient.invokeSequential(locations, method); + } + + @Override + public void createSymlink(String target, String link, FsPermission dirPerms, + boolean createParent) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + // TODO Verify that the link location is in the same NS as the targets + final List targetLocations = + rpcServer.getLocationsForPath(target, true); + final List linkLocations = + rpcServer.getLocationsForPath(link, true); + RemoteLocation linkLocation = linkLocations.get(0); + RemoteMethod method = new RemoteMethod("createSymlink", + new Class[] {String.class, String.class, FsPermission.class, + boolean.class}, + new RemoteParam(), linkLocation.getDest(), dirPerms, createParent); + rpcClient.invokeSequential(targetLocations, method); + } + + @Override + public String getLinkTarget(String path) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + final List locations = + rpcServer.getLocationsForPath(path, true); + RemoteMethod method = new RemoteMethod("getLinkTarget", + new Class[] {String.class}, new RemoteParam()); + return rpcClient.invokeSequential(locations, method, String.class, null); + } + + @Override // Client Protocol + public void allowSnapshot(String snapshotRoot) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false); + } + + @Override // Client Protocol + public void disallowSnapshot(String snapshot) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false); + } + + @Override + public void renameSnapshot(String snapshotRoot, String snapshotOldName, + String snapshotNewName) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false); + } + + @Override + public SnapshottableDirectoryStatus[] getSnapshottableDirListing() + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ, false); + return null; + } + + @Override + public SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot, + String earlierSnapshotName, String laterSnapshotName) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ, false); + return null; + } + + @Override + public SnapshotDiffReportListing getSnapshotDiffReportListing( + String snapshotRoot, String earlierSnapshotName, String laterSnapshotName, + byte[] startPath, int index) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ, false); + return null; + } + + @Override + public long addCacheDirective(CacheDirectiveInfo path, + EnumSet flags) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false); + return 0; + } + + @Override + public void modifyCacheDirective(CacheDirectiveInfo directive, + EnumSet flags) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false); + } + + @Override + public void removeCacheDirective(long id) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false); + } + + @Override + public BatchedEntries listCacheDirectives( + long prevId, CacheDirectiveInfo filter) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ, false); + return null; + } + + @Override + public void addCachePool(CachePoolInfo info) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false); + } + + @Override + public void modifyCachePool(CachePoolInfo info) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false); + } + + @Override + public void removeCachePool(String cachePoolName) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false); + } + + @Override + public BatchedEntries listCachePools(String prevKey) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ, false); + return null; + } + + @Override + public void modifyAclEntries(String src, List aclSpec) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + // TODO handle virtual directories + final List locations = + rpcServer.getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("modifyAclEntries", + new Class[] {String.class, List.class}, + new RemoteParam(), aclSpec); + rpcClient.invokeSequential(locations, method, null, null); + } + + @Override + public void removeAclEntries(String src, List aclSpec) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + // TODO handle virtual directories + final List locations = + rpcServer.getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("removeAclEntries", + new Class[] {String.class, List.class}, + new RemoteParam(), aclSpec); + rpcClient.invokeSequential(locations, method, null, null); + } + + @Override + public void removeDefaultAcl(String src) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + // TODO handle virtual directories + final List locations = + rpcServer.getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("removeDefaultAcl", + new Class[] {String.class}, new RemoteParam()); + rpcClient.invokeSequential(locations, method); + } + + @Override + public void removeAcl(String src) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + // TODO handle virtual directories + final List locations = + rpcServer.getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("removeAcl", + new Class[] {String.class}, new RemoteParam()); + rpcClient.invokeSequential(locations, method); + } + + @Override + public void setAcl(String src, List aclSpec) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + // TODO handle virtual directories + final List locations = + rpcServer.getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod( + "setAcl", new Class[] {String.class, List.class}, + new RemoteParam(), aclSpec); + rpcClient.invokeSequential(locations, method); + } + + @Override + public AclStatus getAclStatus(String src) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + // TODO handle virtual directories + final List locations = + rpcServer.getLocationsForPath(src, false); + RemoteMethod method = new RemoteMethod("getAclStatus", + new Class[] {String.class}, new RemoteParam()); + return rpcClient.invokeSequential(locations, method, AclStatus.class, null); + } + + @Override + public void createEncryptionZone(String src, String keyName) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + // TODO handle virtual directories + final List locations = + rpcServer.getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("createEncryptionZone", + new Class[] {String.class, String.class}, + new RemoteParam(), keyName); + rpcClient.invokeSequential(locations, method); + } + + @Override + public EncryptionZone getEZForPath(String src) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + // TODO handle virtual directories + final List locations = + rpcServer.getLocationsForPath(src, false); + RemoteMethod method = new RemoteMethod("getEZForPath", + new Class[] {String.class}, new RemoteParam()); + return rpcClient.invokeSequential( + locations, method, EncryptionZone.class, null); + } + + @Override + public BatchedEntries listEncryptionZones(long prevId) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ, false); + return null; + } + + @Override + public void reencryptEncryptionZone(String zone, + HdfsConstants.ReencryptAction action) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false); + } + + @Override + public BatchedEntries listReencryptionStatus( + long prevId) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ, false); + return null; + } + + @Override + public void setXAttr(String src, XAttr xAttr, EnumSet flag) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + // TODO handle virtual directories + final List locations = + rpcServer.getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("setXAttr", + new Class[] {String.class, XAttr.class, EnumSet.class}, + new RemoteParam(), xAttr, flag); + rpcClient.invokeSequential(locations, method); + } + + @SuppressWarnings("unchecked") + @Override + public List getXAttrs(String src, List xAttrs) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + // TODO handle virtual directories + final List locations = + rpcServer.getLocationsForPath(src, false); + RemoteMethod method = new RemoteMethod("getXAttrs", + new Class[] {String.class, List.class}, new RemoteParam(), xAttrs); + return (List) rpcClient.invokeSequential( + locations, method, List.class, null); + } + + @SuppressWarnings("unchecked") + @Override + public List listXAttrs(String src) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + // TODO handle virtual directories + final List locations = + rpcServer.getLocationsForPath(src, false); + RemoteMethod method = new RemoteMethod("listXAttrs", + new Class[] {String.class}, new RemoteParam()); + return (List) rpcClient.invokeSequential( + locations, method, List.class, null); + } + + @Override + public void removeXAttr(String src, XAttr xAttr) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + // TODO handle virtual directories + final List locations = + rpcServer.getLocationsForPath(src, true); + RemoteMethod method = new RemoteMethod("removeXAttr", + new Class[] {String.class, XAttr.class}, new RemoteParam(), xAttr); + rpcClient.invokeSequential(locations, method); + } + + @Override + public void checkAccess(String path, FsAction mode) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + // TODO handle virtual directories + final List locations = + rpcServer.getLocationsForPath(path, true); + RemoteMethod method = new RemoteMethod("checkAccess", + new Class[] {String.class, FsAction.class}, + new RemoteParam(), mode); + rpcClient.invokeSequential(locations, method); + } + + @Override + public long getCurrentEditLogTxid() throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ); + + RemoteMethod method = new RemoteMethod( + "getCurrentEditLogTxid", new Class[] {}); + final Set nss = namenodeResolver.getNamespaces(); + Map ret = + rpcClient.invokeConcurrent(nss, method, true, false, long.class); + + // Return the maximum txid + long txid = 0; + for (long t : ret.values()) { + if (t > txid) { + txid = t; + } + } + return txid; + } + + @Override + public EventBatchList getEditsFromTxid(long txid) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ, false); + return null; + } + + @Override + public DataEncryptionKey getDataEncryptionKey() throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ, false); + return null; + } + + @Override + public String createSnapshot(String snapshotRoot, String snapshotName) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + return null; + } + + @Override + public void deleteSnapshot(String snapshotRoot, String snapshotName) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false); + } + + @Override + public void setQuota(String path, long namespaceQuota, long storagespaceQuota, + StorageType type) throws IOException { + rpcServer.getQuotaModule() + .setQuota(path, namespaceQuota, storagespaceQuota, type); + } + + @Override + public QuotaUsage getQuotaUsage(String path) throws IOException { + return rpcServer.getQuotaModule().getQuotaUsage(path); + } + + @Override + public void reportBadBlocks(LocatedBlock[] blocks) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE); + + // Block pool id -> blocks + Map> blockLocations = new HashMap<>(); + for (LocatedBlock block : blocks) { + String bpId = block.getBlock().getBlockPoolId(); + List bpBlocks = blockLocations.get(bpId); + if (bpBlocks == null) { + bpBlocks = new LinkedList<>(); + blockLocations.put(bpId, bpBlocks); + } + bpBlocks.add(block); + } + + // Invoke each block pool + for (Map.Entry> entry : + blockLocations.entrySet()) { + String bpId = entry.getKey(); + List bpBlocks = entry.getValue(); + + LocatedBlock[] bpBlocksArray = + bpBlocks.toArray(new LocatedBlock[bpBlocks.size()]); + RemoteMethod method = new RemoteMethod("reportBadBlocks", + new Class[] {LocatedBlock[].class}, + new Object[] {bpBlocksArray}); + rpcClient.invokeSingleBlockPool(bpId, method); + } + } + + @Override + public void unsetStoragePolicy(String src) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false); + } + + @Override + public BlockStoragePolicy getStoragePolicy(String path) throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ, false); + return null; + } + + @Override + public ErasureCodingPolicyInfo[] getErasureCodingPolicies() + throws IOException { + return erasureCoding.getErasureCodingPolicies(); + } + + @Override + public Map getErasureCodingCodecs() throws IOException { + return erasureCoding.getErasureCodingCodecs(); + } + + @Override + public AddErasureCodingPolicyResponse[] addErasureCodingPolicies( + ErasureCodingPolicy[] policies) throws IOException { + return erasureCoding.addErasureCodingPolicies(policies); + } + + @Override + public void removeErasureCodingPolicy(String ecPolicyName) + throws IOException { + erasureCoding.removeErasureCodingPolicy(ecPolicyName); + } + + @Override + public void disableErasureCodingPolicy(String ecPolicyName) + throws IOException { + erasureCoding.disableErasureCodingPolicy(ecPolicyName); + } + + @Override + public void enableErasureCodingPolicy(String ecPolicyName) + throws IOException { + erasureCoding.enableErasureCodingPolicy(ecPolicyName); + } + + @Override + public ErasureCodingPolicy getErasureCodingPolicy(String src) + throws IOException { + return erasureCoding.getErasureCodingPolicy(src); + } + + @Override + public void setErasureCodingPolicy(String src, String ecPolicyName) + throws IOException { + erasureCoding.setErasureCodingPolicy(src, ecPolicyName); + } + + @Override + public void unsetErasureCodingPolicy(String src) throws IOException { + erasureCoding.unsetErasureCodingPolicy(src); + } + + @Override + public ECBlockGroupStats getECBlockGroupStats() throws IOException { + return erasureCoding.getECBlockGroupStats(); + } + + @Override + public ReplicatedBlockStats getReplicatedBlockStats() throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ, false); + return null; + } + + @Deprecated + @Override + public BatchedEntries listOpenFiles(long prevId) + throws IOException { + return listOpenFiles(prevId, + EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES), + OpenFilesIterator.FILTER_PATH_DEFAULT); + } + + @Override + public BatchedEntries listOpenFiles(long prevId, + EnumSet openFilesTypes, String path) + throws IOException { + rpcServer.checkOperation(NameNode.OperationCategory.READ, false); + return null; + } + + /** + * Determines combinations of eligible src/dst locations for a rename. A + * rename cannot change the namespace. Renames are only allowed if there is an + * eligible dst location in the same namespace as the source. + * + * @param srcLocations List of all potential source destinations where the + * path may be located. On return this list is trimmed to include + * only the paths that have corresponding destinations in the same + * namespace. + * @param dst The destination path + * @return A map of all eligible source namespaces and their corresponding + * replacement value. + * @throws IOException If the dst paths could not be determined. + */ + private RemoteParam getRenameDestinations( + final List srcLocations, final String dst) + throws IOException { + + final List dstLocations = + rpcServer.getLocationsForPath(dst, true); + final Map dstMap = new HashMap<>(); + + Iterator iterator = srcLocations.iterator(); + while (iterator.hasNext()) { + RemoteLocation srcLocation = iterator.next(); + RemoteLocation eligibleDst = + getFirstMatchingLocation(srcLocation, dstLocations); + if (eligibleDst != null) { + // Use this dst for this source location + dstMap.put(srcLocation, eligibleDst.getDest()); + } else { + // This src destination is not valid, remove from the source list + iterator.remove(); + } + } + return new RemoteParam(dstMap); + } + + /** + * Get first matching location. + * + * @param location Location we are looking for. + * @param locations List of locations. + * @return The first matchin location in the list. + */ + private RemoteLocation getFirstMatchingLocation(RemoteLocation location, + List locations) { + for (RemoteLocation loc : locations) { + if (loc.getNameserviceId().equals(location.getNameserviceId())) { + // Return first matching location + return loc; + } + } + return null; + } + + /** + * Aggregate content summaries for each subcluster. + * + * @param summaries Collection of individual summaries. + * @return Aggregated content summary. + */ + private ContentSummary aggregateContentSummary( + Collection summaries) { + if (summaries.size() == 1) { + return summaries.iterator().next(); + } + + long length = 0; + long fileCount = 0; + long directoryCount = 0; + long quota = 0; + long spaceConsumed = 0; + long spaceQuota = 0; + + for (ContentSummary summary : summaries) { + length += summary.getLength(); + fileCount += summary.getFileCount(); + directoryCount += summary.getDirectoryCount(); + quota += summary.getQuota(); + spaceConsumed += summary.getSpaceConsumed(); + spaceQuota += summary.getSpaceQuota(); + } + + ContentSummary ret = new ContentSummary.Builder() + .length(length) + .fileCount(fileCount) + .directoryCount(directoryCount) + .quota(quota) + .spaceConsumed(spaceConsumed) + .spaceQuota(spaceQuota) + .build(); + return ret; + } + + /** + * Get the file info from all the locations. + * + * @param locations Locations to check. + * @param method The file information method to run. + * @return The first file info if it's a file, the directory if it's + * everywhere. + * @throws IOException If all the locations throw an exception. + */ + private HdfsFileStatus getFileInfoAll(final List locations, + final RemoteMethod method) throws IOException { + + // Get the file info from everybody + Map results = + rpcClient.invokeConcurrent(locations, method, HdfsFileStatus.class); + + // We return the first file + HdfsFileStatus dirStatus = null; + for (RemoteLocation loc : locations) { + HdfsFileStatus fileStatus = results.get(loc); + if (fileStatus != null) { + if (!fileStatus.isDirectory()) { + return fileStatus; + } else if (dirStatus == null) { + dirStatus = fileStatus; + } + } + } + return dirStatus; + } + + /** + * Get the permissions for the parent of a child with given permissions. + * Add implicit u+wx permission for parent. This is based on + * @{FSDirMkdirOp#addImplicitUwx}. + * @param mask The permission mask of the child. + * @return The permission mask of the parent. + */ + private static FsPermission getParentPermission(final FsPermission mask) { + FsPermission ret = new FsPermission( + mask.getUserAction().or(FsAction.WRITE_EXECUTE), + mask.getGroupAction(), + mask.getOtherAction()); + return ret; + } + + /** + * Check if a path should be in all subclusters. + * + * @param path Path to check. + * @return If a path should be in all subclusters. + */ + private boolean isPathAll(final String path) { + if (subclusterResolver instanceof MountTableResolver) { + try { + MountTableResolver mountTable = (MountTableResolver)subclusterResolver; + MountTable entry = mountTable.getMountPoint(path); + if (entry != null) { + return entry.isAll(); + } + } catch (IOException e) { + LOG.error("Cannot get mount point", e); + } + } + return false; + } + + /** + * Create a new file status for a mount point. + * + * @param name Name of the mount point. + * @param childrenNum Number of children. + * @param date Map with the dates. + * @return New HDFS file status representing a mount point. + */ + private HdfsFileStatus getMountPointStatus( + String name, int childrenNum, long date) { + long modTime = date; + long accessTime = date; + FsPermission permission = FsPermission.getDirDefault(); + String owner = this.superUser; + String group = this.superGroup; + try { + // TODO support users, it should be the user for the pointed folder + UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); + owner = ugi.getUserName(); + group = ugi.getPrimaryGroupName(); + } catch (IOException e) { + LOG.error("Cannot get the remote user: {}", e.getMessage()); + } + long inodeId = 0; + return new HdfsFileStatus.Builder() + .isdir(true) + .mtime(modTime) + .atime(accessTime) + .perm(permission) + .owner(owner) + .group(group) + .symlink(new byte[0]) + .path(DFSUtil.string2Bytes(name)) + .fileId(inodeId) + .children(childrenNum) + .build(); + } + + /** + * Get the modification dates for mount points. + * + * @param path Name of the path to start checking dates from. + * @return Map with the modification dates for all sub-entries. + */ + private Map getMountPointDates(String path) { + Map ret = new TreeMap<>(); + if (subclusterResolver instanceof MountTableResolver) { + try { + final List children = subclusterResolver.getMountPoints(path); + for (String child : children) { + Long modTime = getModifiedTime(ret, path, child); + ret.put(child, modTime); + } + } catch (IOException e) { + LOG.error("Cannot get mount point", e); + } + } + return ret; + } + + /** + * Get modified time for child. If the child is present in mount table it + * will return the modified time. If the child is not present but subdirs of + * this child are present then it will return latest modified subdir's time + * as modified time of the requested child. + * + * @param ret contains children and modified times. + * @param path Name of the path to start checking dates from. + * @param child child of the requested path. + * @return modified time. + */ + private long getModifiedTime(Map ret, String path, + String child) { + MountTableResolver mountTable = (MountTableResolver)subclusterResolver; + String srcPath; + if (path.equals(Path.SEPARATOR)) { + srcPath = Path.SEPARATOR + child; + } else { + srcPath = path + Path.SEPARATOR + child; + } + Long modTime = 0L; + try { + // Get mount table entry for the srcPath + MountTable entry = mountTable.getMountPoint(srcPath); + // if srcPath is not in mount table but its subdirs are in mount + // table we will display latest modified subdir date/time. + if (entry == null) { + List entries = mountTable.getMounts(srcPath); + for (MountTable eachEntry : entries) { + // Get the latest date + if (ret.get(child) == null || + ret.get(child) < eachEntry.getDateModified()) { + modTime = eachEntry.getDateModified(); + } + } + } else { + modTime = entry.getDateModified(); + } + } catch (IOException e) { + LOG.error("Cannot get mount point", e); + } + return modTime; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 50ec67003fe..1f2f79a36c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -33,16 +33,12 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; -import java.util.HashMap; -import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.CryptoProtocolVersion; @@ -54,7 +50,6 @@ import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.Options; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.QuotaUsage; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.XAttr; @@ -64,7 +59,6 @@ import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.AddBlockFlag; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.inotify.EventBatchList; import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; @@ -93,7 +87,6 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.OpenFileEntry; -import org.apache.hadoop.hdfs.protocol.OpenFilesIterator; import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType; import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; @@ -101,8 +94,8 @@ import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus; -import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol; import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; @@ -167,11 +160,6 @@ public class RouterRpcServer extends AbstractService /** Configuration for the RPC server. */ private Configuration conf; - /** Identifier for the super user. */ - private final String superUser; - /** Identifier for the super group. */ - private final String superGroup; - /** Router using this RPC server. */ private final Router router; @@ -199,11 +187,10 @@ public class RouterRpcServer extends AbstractService // Modules implementing groups of RPC calls /** Router Quota calls. */ private final Quota quotaCall; - /** Erasure coding calls. */ - private final ErasureCoding erasureCoding; /** NamenodeProtocol calls. */ private final RouterNamenodeProtocol nnProto; - + /** ClientProtocol calls. */ + private final RouterClientProtocol clientProto; /** * Construct a router RPC server. @@ -224,12 +211,6 @@ public class RouterRpcServer extends AbstractService this.namenodeResolver = nnResolver; this.subclusterResolver = fileResolver; - // User and group for reporting - this.superUser = System.getProperty("user.name"); - this.superGroup = this.conf.get( - DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY, - DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT); - // RPC server settings int handlerCount = this.conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, DFS_ROUTER_HANDLER_COUNT_DEFAULT); @@ -316,8 +297,8 @@ public class RouterRpcServer extends AbstractService // Initialize modules this.quotaCall = new Quota(this.router, this); - this.erasureCoding = new ErasureCoding(this); this.nnProto = new RouterNamenodeProtocol(this); + this.clientProto = new RouterClientProtocol(conf, this); } @Override @@ -371,6 +352,13 @@ public class RouterRpcServer extends AbstractService return subclusterResolver; } + /** + * Get the active namenode resolver. + */ + public ActiveNamenodeResolver getNamenodeResolver() { + return namenodeResolver; + } + /** * Get the RPC monitor and metrics. * @@ -412,7 +400,7 @@ public class RouterRpcServer extends AbstractService * client requests. * @throws UnsupportedOperationException If the operation is not supported. */ - protected void checkOperation(OperationCategory op, boolean supported) + void checkOperation(OperationCategory op, boolean supported) throws StandbyException, UnsupportedOperationException { checkOperation(op); @@ -434,7 +422,7 @@ public class RouterRpcServer extends AbstractService * @throws SafeModeException If the Router is in safe mode and cannot serve * client requests. */ - protected void checkOperation(OperationCategory op) + void checkOperation(OperationCategory op) throws StandbyException { // Log the function we are currently calling. if (rpcMonitor != null) { @@ -465,58 +453,44 @@ public class RouterRpcServer extends AbstractService } } + /** + * Get the name of the method that is calling this function. + * + * @return Name of the method calling this function. + */ + static String getMethodName() { + final StackTraceElement[] stack = Thread.currentThread().getStackTrace(); + String methodName = stack[3].getMethodName(); + return methodName; + } + @Override // ClientProtocol public Token getDelegationToken(Text renewer) throws IOException { - checkOperation(OperationCategory.WRITE, false); - return null; - } - - /** - * The the delegation token from each name service. - * @param renewer - * @return Name service -> Token. - * @throws IOException - */ - public Map> - getDelegationTokens(Text renewer) throws IOException { - checkOperation(OperationCategory.WRITE, false); - return null; + return clientProto.getDelegationToken(renewer); } @Override // ClientProtocol public long renewDelegationToken(Token token) throws IOException { - checkOperation(OperationCategory.WRITE, false); - return 0; + return clientProto.renewDelegationToken(token); } @Override // ClientProtocol public void cancelDelegationToken(Token token) throws IOException { - checkOperation(OperationCategory.WRITE, false); + clientProto.cancelDelegationToken(token); } @Override // ClientProtocol public LocatedBlocks getBlockLocations(String src, final long offset, final long length) throws IOException { - checkOperation(OperationCategory.READ); - - List locations = getLocationsForPath(src, false); - RemoteMethod remoteMethod = new RemoteMethod("getBlockLocations", - new Class[] {String.class, long.class, long.class}, - new RemoteParam(), offset, length); - return (LocatedBlocks) rpcClient.invokeSequential(locations, remoteMethod, - LocatedBlocks.class, null); + return clientProto.getBlockLocations(src, offset, length); } @Override // ClientProtocol public FsServerDefaults getServerDefaults() throws IOException { - checkOperation(OperationCategory.READ); - - RemoteMethod method = new RemoteMethod("getServerDefaults"); - String ns = subclusterResolver.getDefaultNamespace(); - return (FsServerDefaults) rpcClient.invokeSingle(ns, method); + return clientProto.getServerDefaults(); } @Override // ClientProtocol @@ -525,44 +499,8 @@ public class RouterRpcServer extends AbstractService boolean createParent, short replication, long blockSize, CryptoProtocolVersion[] supportedVersions, String ecPolicyName) throws IOException { - checkOperation(OperationCategory.WRITE); - - if (createParent && isPathAll(src)) { - int index = src.lastIndexOf(Path.SEPARATOR); - String parent = src.substring(0, index); - LOG.debug("Creating {} requires creating parent {}", src, parent); - FsPermission parentPermissions = getParentPermission(masked); - boolean success = mkdirs(parent, parentPermissions, createParent); - if (!success) { - // This shouldn't happen as mkdirs returns true or exception - LOG.error("Couldn't create parents for {}", src); - } - } - - RemoteLocation createLocation = getCreateLocation(src); - RemoteMethod method = new RemoteMethod("create", - new Class[] {String.class, FsPermission.class, String.class, - EnumSetWritable.class, boolean.class, short.class, - long.class, CryptoProtocolVersion[].class, - String.class}, - createLocation.getDest(), masked, clientName, flag, createParent, + return clientProto.create(src, masked, clientName, flag, createParent, replication, blockSize, supportedVersions, ecPolicyName); - return (HdfsFileStatus) rpcClient.invokeSingle(createLocation, method); - } - - /** - * Get the permissions for the parent of a child with given permissions. - * Add implicit u+wx permission for parent. This is based on - * @{FSDirMkdirOp#addImplicitUwx}. - * @param mask The permission mask of the child. - * @return The permission mask of the parent. - */ - private static FsPermission getParentPermission(final FsPermission mask) { - FsPermission ret = new FsPermission( - mask.getUserAction().or(FsAction.WRITE_EXECUTE), - mask.getGroupAction(), - mask.getOtherAction()); - return ret; } /** @@ -573,7 +511,7 @@ public class RouterRpcServer extends AbstractService * @return The remote location for this file. * @throws IOException If the file has no creation location. */ - protected RemoteLocation getCreateLocation(final String src) + RemoteLocation getCreateLocation(final String src) throws IOException { final List locations = getLocationsForPath(src, true); @@ -614,100 +552,45 @@ public class RouterRpcServer extends AbstractService return createLocation; } - // Medium @Override // ClientProtocol public LastBlockWithStatus append(String src, final String clientName, final EnumSetWritable flag) throws IOException { - checkOperation(OperationCategory.WRITE); - - List locations = getLocationsForPath(src, true); - RemoteMethod method = new RemoteMethod("append", - new Class[] {String.class, String.class, EnumSetWritable.class}, - new RemoteParam(), clientName, flag); - return rpcClient.invokeSequential( - locations, method, LastBlockWithStatus.class, null); + return clientProto.append(src, clientName, flag); } - // Low @Override // ClientProtocol public boolean recoverLease(String src, String clientName) throws IOException { - checkOperation(OperationCategory.WRITE); - - final List locations = getLocationsForPath(src, true); - RemoteMethod method = new RemoteMethod("recoverLease", - new Class[] {String.class, String.class}, new RemoteParam(), - clientName); - Object result = rpcClient.invokeSequential( - locations, method, Boolean.class, Boolean.TRUE); - return (boolean) result; + return clientProto.recoverLease(src, clientName); } @Override // ClientProtocol public boolean setReplication(String src, short replication) throws IOException { - checkOperation(OperationCategory.WRITE); - - List locations = getLocationsForPath(src, true); - RemoteMethod method = new RemoteMethod("setReplication", - new Class[] {String.class, short.class}, new RemoteParam(), - replication); - Object result = rpcClient.invokeSequential( - locations, method, Boolean.class, Boolean.TRUE); - return (boolean) result; + return clientProto.setReplication(src, replication); } - @Override + @Override // ClientProtocol public void setStoragePolicy(String src, String policyName) throws IOException { - checkOperation(OperationCategory.WRITE); - - List locations = getLocationsForPath(src, true); - RemoteMethod method = new RemoteMethod("setStoragePolicy", - new Class[] {String.class, String.class}, - new RemoteParam(), policyName); - rpcClient.invokeSequential(locations, method, null, null); + clientProto.setStoragePolicy(src, policyName); } - @Override + @Override // ClientProtocol public BlockStoragePolicy[] getStoragePolicies() throws IOException { - checkOperation(OperationCategory.READ); - - RemoteMethod method = new RemoteMethod("getStoragePolicies"); - String ns = subclusterResolver.getDefaultNamespace(); - return (BlockStoragePolicy[]) rpcClient.invokeSingle(ns, method); + return clientProto.getStoragePolicies(); } @Override // ClientProtocol public void setPermission(String src, FsPermission permissions) throws IOException { - checkOperation(OperationCategory.WRITE); - - final List locations = getLocationsForPath(src, true); - RemoteMethod method = new RemoteMethod("setPermission", - new Class[] {String.class, FsPermission.class}, - new RemoteParam(), permissions); - if (isPathAll(src)) { - rpcClient.invokeConcurrent(locations, method); - } else { - rpcClient.invokeSequential(locations, method); - } + clientProto.setPermission(src, permissions); } @Override // ClientProtocol public void setOwner(String src, String username, String groupname) throws IOException { - checkOperation(OperationCategory.WRITE); - - final List locations = getLocationsForPath(src, true); - RemoteMethod method = new RemoteMethod("setOwner", - new Class[] {String.class, String.class, String.class}, - new RemoteParam(), username, groupname); - if (isPathAll(src)) { - rpcClient.invokeConcurrent(locations, method); - } else { - rpcClient.invokeSequential(locations, method); - } + clientProto.setOwner(src, username, groupname); } /** @@ -719,18 +602,8 @@ public class RouterRpcServer extends AbstractService ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId, String[] favoredNodes, EnumSet addBlockFlags) throws IOException { - checkOperation(OperationCategory.WRITE); - - final List locations = getLocationsForPath(src, true); - RemoteMethod method = new RemoteMethod("addBlock", - new Class[] {String.class, String.class, ExtendedBlock.class, - DatanodeInfo[].class, long.class, String[].class, - EnumSet.class}, - new RemoteParam(), clientName, previous, excludedNodes, fileId, - favoredNodes, addBlockFlags); - // TODO verify the excludedNodes and favoredNodes are acceptable to this NN - return (LocatedBlock) rpcClient.invokeSequential( - locations, method, LocatedBlock.class, null); + return clientProto.addBlock(src, clientName, previous, excludedNodes, + fileId, favoredNodes, addBlockFlags); } /** @@ -743,55 +616,26 @@ public class RouterRpcServer extends AbstractService final String[] existingStorageIDs, final DatanodeInfo[] excludes, final int numAdditionalNodes, final String clientName) throws IOException { - checkOperation(OperationCategory.READ); - - final List locations = getLocationsForPath(src, false); - RemoteMethod method = new RemoteMethod("getAdditionalDatanode", - new Class[] {String.class, long.class, ExtendedBlock.class, - DatanodeInfo[].class, String[].class, - DatanodeInfo[].class, int.class, String.class}, - new RemoteParam(), fileId, blk, existings, existingStorageIDs, excludes, - numAdditionalNodes, clientName); - return (LocatedBlock) rpcClient.invokeSequential( - locations, method, LocatedBlock.class, null); + return clientProto.getAdditionalDatanode(src, fileId, blk, existings, + existingStorageIDs, excludes, numAdditionalNodes, clientName); } @Override // ClientProtocol public void abandonBlock(ExtendedBlock b, long fileId, String src, String holder) throws IOException { - checkOperation(OperationCategory.WRITE); - - RemoteMethod method = new RemoteMethod("abandonBlock", - new Class[] {ExtendedBlock.class, long.class, String.class, - String.class}, - b, fileId, new RemoteParam(), holder); - rpcClient.invokeSingle(b, method); + clientProto.abandonBlock(b, fileId, src, holder); } @Override // ClientProtocol public boolean complete(String src, String clientName, ExtendedBlock last, long fileId) throws IOException { - checkOperation(OperationCategory.WRITE); - - final List locations = getLocationsForPath(src, true); - RemoteMethod method = new RemoteMethod("complete", - new Class[] {String.class, String.class, ExtendedBlock.class, - long.class}, - new RemoteParam(), clientName, last, fileId); - // Complete can return true/false, so don't expect a result - return ((Boolean) rpcClient.invokeSequential( - locations, method, Boolean.class, null)).booleanValue(); + return clientProto.complete(src, clientName, last, fileId); } @Override // ClientProtocol public LocatedBlock updateBlockForPipeline( ExtendedBlock block, String clientName) throws IOException { - checkOperation(OperationCategory.WRITE); - - RemoteMethod method = new RemoteMethod("updateBlockForPipeline", - new Class[] {ExtendedBlock.class, String.class}, - block, clientName); - return (LocatedBlock) rpcClient.invokeSingle(block, method); + return clientProto.updateBlockForPipeline(block, clientName); } /** @@ -802,462 +646,91 @@ public class RouterRpcServer extends AbstractService public void updatePipeline(String clientName, ExtendedBlock oldBlock, ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs) throws IOException { - checkOperation(OperationCategory.WRITE); - - RemoteMethod method = new RemoteMethod("updatePipeline", - new Class[] {String.class, ExtendedBlock.class, ExtendedBlock.class, - DatanodeID[].class, String[].class}, - clientName, oldBlock, newBlock, newNodes, newStorageIDs); - rpcClient.invokeSingle(oldBlock, method); + clientProto.updatePipeline(clientName, oldBlock, newBlock, newNodes, + newStorageIDs); } @Override // ClientProtocol public long getPreferredBlockSize(String src) throws IOException { - checkOperation(OperationCategory.READ); - - final List locations = getLocationsForPath(src, true); - RemoteMethod method = new RemoteMethod("getPreferredBlockSize", - new Class[] {String.class}, new RemoteParam()); - return ((Long) rpcClient.invokeSequential( - locations, method, Long.class, null)).longValue(); - } - - /** - * Determines combinations of eligible src/dst locations for a rename. A - * rename cannot change the namespace. Renames are only allowed if there is an - * eligible dst location in the same namespace as the source. - * - * @param srcLocations List of all potential source destinations where the - * path may be located. On return this list is trimmed to include - * only the paths that have corresponding destinations in the same - * namespace. - * @param dst The destination path - * @return A map of all eligible source namespaces and their corresponding - * replacement value. - * @throws IOException If the dst paths could not be determined. - */ - private RemoteParam getRenameDestinations( - final List srcLocations, final String dst) - throws IOException { - - final List dstLocations = getLocationsForPath(dst, true); - final Map dstMap = new HashMap<>(); - - Iterator iterator = srcLocations.iterator(); - while (iterator.hasNext()) { - RemoteLocation srcLocation = iterator.next(); - RemoteLocation eligibleDst = - getFirstMatchingLocation(srcLocation, dstLocations); - if (eligibleDst != null) { - // Use this dst for this source location - dstMap.put(srcLocation, eligibleDst.getDest()); - } else { - // This src destination is not valid, remove from the source list - iterator.remove(); - } - } - return new RemoteParam(dstMap); - } - - /** - * Get first matching location. - * - * @param location Location we are looking for. - * @param locations List of locations. - * @return The first matchin location in the list. - */ - private RemoteLocation getFirstMatchingLocation(RemoteLocation location, - List locations) { - for (RemoteLocation loc : locations) { - if (loc.getNameserviceId().equals(location.getNameserviceId())) { - // Return first matching location - return loc; - } - } - return null; + return clientProto.getPreferredBlockSize(src); } @Deprecated @Override // ClientProtocol public boolean rename(final String src, final String dst) throws IOException { - checkOperation(OperationCategory.WRITE); - - final List srcLocations = - getLocationsForPath(src, true, false); - // srcLocations may be trimmed by getRenameDestinations() - final List locs = new LinkedList<>(srcLocations); - RemoteParam dstParam = getRenameDestinations(locs, dst); - if (locs.isEmpty()) { - throw new IOException( - "Rename of " + src + " to " + dst + " is not allowed," + - " no eligible destination in the same namespace was found."); - } - RemoteMethod method = new RemoteMethod("rename", - new Class[] {String.class, String.class}, - new RemoteParam(), dstParam); - return ((Boolean) rpcClient.invokeSequential( - locs, method, Boolean.class, Boolean.TRUE)).booleanValue(); + return clientProto.rename(src, dst); } @Override // ClientProtocol public void rename2(final String src, final String dst, final Options.Rename... options) throws IOException { - checkOperation(OperationCategory.WRITE); - - final List srcLocations = - getLocationsForPath(src, true, false); - // srcLocations may be trimmed by getRenameDestinations() - final List locs = new LinkedList<>(srcLocations); - RemoteParam dstParam = getRenameDestinations(locs, dst); - if (locs.isEmpty()) { - throw new IOException( - "Rename of " + src + " to " + dst + " is not allowed," + - " no eligible destination in the same namespace was found."); - } - RemoteMethod method = new RemoteMethod("rename2", - new Class[] {String.class, String.class, options.getClass()}, - new RemoteParam(), dstParam, options); - rpcClient.invokeSequential(locs, method, null, null); + clientProto.rename2(src, dst, options); } @Override // ClientProtocol public void concat(String trg, String[] src) throws IOException { - checkOperation(OperationCategory.WRITE); - - // See if the src and target files are all in the same namespace - LocatedBlocks targetBlocks = getBlockLocations(trg, 0, 1); - if (targetBlocks == null) { - throw new IOException("Cannot locate blocks for target file - " + trg); - } - LocatedBlock lastLocatedBlock = targetBlocks.getLastLocatedBlock(); - String targetBlockPoolId = lastLocatedBlock.getBlock().getBlockPoolId(); - for (String source : src) { - LocatedBlocks sourceBlocks = getBlockLocations(source, 0, 1); - if (sourceBlocks == null) { - throw new IOException( - "Cannot located blocks for source file " + source); - } - String sourceBlockPoolId = - sourceBlocks.getLastLocatedBlock().getBlock().getBlockPoolId(); - if (!sourceBlockPoolId.equals(targetBlockPoolId)) { - throw new IOException("Cannot concatenate source file " + source - + " because it is located in a different namespace" - + " with block pool id " + sourceBlockPoolId - + " from the target file with block pool id " - + targetBlockPoolId); - } - } - - // Find locations in the matching namespace. - final RemoteLocation targetDestination = - getLocationForPath(trg, true, targetBlockPoolId); - String[] sourceDestinations = new String[src.length]; - for (int i = 0; i < src.length; i++) { - String sourceFile = src[i]; - RemoteLocation location = - getLocationForPath(sourceFile, true, targetBlockPoolId); - sourceDestinations[i] = location.getDest(); - } - // Invoke - RemoteMethod method = new RemoteMethod("concat", - new Class[] {String.class, String[].class}, - targetDestination.getDest(), sourceDestinations); - rpcClient.invokeSingle(targetDestination, method); + clientProto.concat(trg, src); } @Override // ClientProtocol public boolean truncate(String src, long newLength, String clientName) throws IOException { - checkOperation(OperationCategory.WRITE); - - final List locations = getLocationsForPath(src, true); - RemoteMethod method = new RemoteMethod("truncate", - new Class[] {String.class, long.class, String.class}, - new RemoteParam(), newLength, clientName); - return ((Boolean) rpcClient.invokeSequential(locations, method, - Boolean.class, Boolean.TRUE)).booleanValue(); + return clientProto.truncate(src, newLength, clientName); } @Override // ClientProtocol public boolean delete(String src, boolean recursive) throws IOException { - checkOperation(OperationCategory.WRITE); - - final List locations = - getLocationsForPath(src, true, false); - RemoteMethod method = new RemoteMethod("delete", - new Class[] {String.class, boolean.class}, new RemoteParam(), - recursive); - if (isPathAll(src)) { - return rpcClient.invokeAll(locations, method); - } else { - return rpcClient.invokeSequential(locations, method, - Boolean.class, Boolean.TRUE).booleanValue(); - } + return clientProto.delete(src, recursive); } @Override // ClientProtocol public boolean mkdirs(String src, FsPermission masked, boolean createParent) throws IOException { - checkOperation(OperationCategory.WRITE); - - final List locations = getLocationsForPath(src, true); - RemoteMethod method = new RemoteMethod("mkdirs", - new Class[] {String.class, FsPermission.class, boolean.class}, - new RemoteParam(), masked, createParent); - - // Create in all locations - if (isPathAll(src)) { - return rpcClient.invokeAll(locations, method); - } - - if (locations.size() > 1) { - // Check if this directory already exists - try { - HdfsFileStatus fileStatus = getFileInfo(src); - if (fileStatus != null) { - // When existing, the NN doesn't return an exception; return true - return true; - } - } catch (IOException ioe) { - // Can't query if this file exists or not. - LOG.error("Error requesting file info for path {} while proxing mkdirs", - src, ioe); - } - } - - RemoteLocation firstLocation = locations.get(0); - return ((Boolean) rpcClient.invokeSingle(firstLocation, method)) - .booleanValue(); + return clientProto.mkdirs(src, masked, createParent); } @Override // ClientProtocol public void renewLease(String clientName) throws IOException { - checkOperation(OperationCategory.WRITE); - - RemoteMethod method = new RemoteMethod("renewLease", - new Class[] {String.class}, clientName); - Set nss = namenodeResolver.getNamespaces(); - rpcClient.invokeConcurrent(nss, method, false, false); + clientProto.renewLease(clientName); } @Override // ClientProtocol public DirectoryListing getListing(String src, byte[] startAfter, boolean needLocation) throws IOException { - checkOperation(OperationCategory.READ); - - // Locate the dir and fetch the listing - final List locations = getLocationsForPath(src, true); - RemoteMethod method = new RemoteMethod("getListing", - new Class[] {String.class, startAfter.getClass(), boolean.class}, - new RemoteParam(), startAfter, needLocation); - Map listings = - rpcClient.invokeConcurrent( - locations, method, false, false, DirectoryListing.class); - - Map nnListing = new TreeMap<>(); - int totalRemainingEntries = 0; - int remainingEntries = 0; - boolean namenodeListingExists = false; - if (listings != null) { - // Check the subcluster listing with the smallest name - String lastName = null; - for (Entry entry : - listings.entrySet()) { - RemoteLocation location = entry.getKey(); - DirectoryListing listing = entry.getValue(); - if (listing == null) { - LOG.debug("Cannot get listing from {}", location); - } else { - totalRemainingEntries += listing.getRemainingEntries(); - HdfsFileStatus[] partialListing = listing.getPartialListing(); - int length = partialListing.length; - if (length > 0) { - HdfsFileStatus lastLocalEntry = partialListing[length-1]; - String lastLocalName = lastLocalEntry.getLocalName(); - if (lastName == null || lastName.compareTo(lastLocalName) > 0) { - lastName = lastLocalName; - } - } - } - } - - // Add existing entries - for (Object value : listings.values()) { - DirectoryListing listing = (DirectoryListing) value; - if (listing != null) { - namenodeListingExists = true; - for (HdfsFileStatus file : listing.getPartialListing()) { - String filename = file.getLocalName(); - if (totalRemainingEntries > 0 && filename.compareTo(lastName) > 0) { - // Discarding entries further than the lastName - remainingEntries++; - } else { - nnListing.put(filename, file); - } - } - remainingEntries += listing.getRemainingEntries(); - } - } - } - - // Add mount points at this level in the tree - final List children = subclusterResolver.getMountPoints(src); - if (children != null) { - // Get the dates for each mount point - Map dates = getMountPointDates(src); - - // Create virtual folder with the mount name - for (String child : children) { - long date = 0; - if (dates != null && dates.containsKey(child)) { - date = dates.get(child); - } - // TODO add number of children - HdfsFileStatus dirStatus = getMountPointStatus(child, 0, date); - - // This may overwrite existing listing entries with the mount point - // TODO don't add if already there? - nnListing.put(child, dirStatus); - } - } - - if (!namenodeListingExists && nnListing.size() == 0) { - // NN returns a null object if the directory cannot be found and has no - // listing. If we didn't retrieve any NN listing data, and there are no - // mount points here, return null. - return null; - } - - // Generate combined listing - HdfsFileStatus[] combinedData = new HdfsFileStatus[nnListing.size()]; - combinedData = nnListing.values().toArray(combinedData); - return new DirectoryListing(combinedData, remainingEntries); + return clientProto.getListing(src, startAfter, needLocation); } @Override // ClientProtocol public HdfsFileStatus getFileInfo(String src) throws IOException { - checkOperation(OperationCategory.READ); - - final List locations = getLocationsForPath(src, false); - RemoteMethod method = new RemoteMethod("getFileInfo", - new Class[] {String.class}, new RemoteParam()); - - HdfsFileStatus ret = null; - // If it's a directory, we check in all locations - if (isPathAll(src)) { - ret = getFileInfoAll(locations, method); - } else { - // Check for file information sequentially - ret = (HdfsFileStatus) rpcClient.invokeSequential( - locations, method, HdfsFileStatus.class, null); - } - - // If there is no real path, check mount points - if (ret == null) { - List children = subclusterResolver.getMountPoints(src); - if (children != null && !children.isEmpty()) { - Map dates = getMountPointDates(src); - long date = 0; - if (dates != null && dates.containsKey(src)) { - date = dates.get(src); - } - ret = getMountPointStatus(src, children.size(), date); - } - } - - return ret; - } - - /** - * Get the file info from all the locations. - * - * @param locations Locations to check. - * @param method The file information method to run. - * @return The first file info if it's a file, the directory if it's - * everywhere. - * @throws IOException If all the locations throw an exception. - */ - private HdfsFileStatus getFileInfoAll(final List locations, - final RemoteMethod method) throws IOException { - - // Get the file info from everybody - Map results = - rpcClient.invokeConcurrent(locations, method, HdfsFileStatus.class); - - // We return the first file - HdfsFileStatus dirStatus = null; - for (RemoteLocation loc : locations) { - HdfsFileStatus fileStatus = results.get(loc); - if (fileStatus != null) { - if (!fileStatus.isDirectory()) { - return fileStatus; - } else if (dirStatus == null) { - dirStatus = fileStatus; - } - } - } - return dirStatus; + return clientProto.getFileInfo(src); } @Override // ClientProtocol public boolean isFileClosed(String src) throws IOException { - checkOperation(OperationCategory.READ); - - final List locations = getLocationsForPath(src, false); - RemoteMethod method = new RemoteMethod("isFileClosed", - new Class[] {String.class}, new RemoteParam()); - return ((Boolean) rpcClient.invokeSequential( - locations, method, Boolean.class, Boolean.TRUE)).booleanValue(); + return clientProto.isFileClosed(src); } @Override // ClientProtocol public HdfsFileStatus getFileLinkInfo(String src) throws IOException { - checkOperation(OperationCategory.READ); - - final List locations = getLocationsForPath(src, false); - RemoteMethod method = new RemoteMethod("getFileLinkInfo", - new Class[] {String.class}, new RemoteParam()); - return (HdfsFileStatus) rpcClient.invokeSequential( - locations, method, HdfsFileStatus.class, null); + return clientProto.getFileLinkInfo(src); } - @Override + @Override // ClientProtocol public HdfsLocatedFileStatus getLocatedFileInfo(String src, boolean needBlockToken) throws IOException { - checkOperation(OperationCategory.READ); - final List locations = getLocationsForPath(src, false); - RemoteMethod method = new RemoteMethod("getLocatedFileInfo", - new Class[] {String.class, boolean.class}, new RemoteParam(), - Boolean.valueOf(needBlockToken)); - return (HdfsLocatedFileStatus) rpcClient.invokeSequential( - locations, method, HdfsFileStatus.class, null); + return clientProto.getLocatedFileInfo(src, needBlockToken); } @Override // ClientProtocol public long[] getStats() throws IOException { - checkOperation(OperationCategory.UNCHECKED); - - RemoteMethod method = new RemoteMethod("getStats"); - Set nss = namenodeResolver.getNamespaces(); - Map results = - rpcClient.invokeConcurrent(nss, method, true, false, long[].class); - long[] combinedData = new long[STATS_ARRAY_LENGTH]; - for (long[] data : results.values()) { - for (int i = 0; i < combinedData.length && i < data.length; i++) { - if (data[i] >= 0) { - combinedData[i] += data[i]; - } - } - } - return combinedData; + return clientProto.getStats(); } @Override // ClientProtocol public DatanodeInfo[] getDatanodeReport(DatanodeReportType type) throws IOException { - checkOperation(OperationCategory.UNCHECKED); - return getDatanodeReport(type, true, 0); + return clientProto.getDatanodeReport(type); } /** @@ -1306,29 +779,7 @@ public class RouterRpcServer extends AbstractService @Override // ClientProtocol public DatanodeStorageReport[] getDatanodeStorageReport( DatanodeReportType type) throws IOException { - checkOperation(OperationCategory.UNCHECKED); - - Map dnSubcluster = - getDatanodeStorageReportMap(type); - - // Avoid repeating machines in multiple subclusters - Map datanodesMap = new LinkedHashMap<>(); - for (DatanodeStorageReport[] dns : dnSubcluster.values()) { - for (DatanodeStorageReport dn : dns) { - DatanodeInfo dnInfo = dn.getDatanodeInfo(); - String nodeId = dnInfo.getXferAddr(); - if (!datanodesMap.containsKey(nodeId)) { - datanodesMap.put(nodeId, dn); - } - // TODO merge somehow, right now it just takes the first one - } - } - - Collection datanodes = datanodesMap.values(); - DatanodeStorageReport[] combinedData = - new DatanodeStorageReport[datanodes.size()]; - combinedData = datanodes.toArray(combinedData); - return combinedData; + return clientProto.getDatanodeStorageReport(type); } /** @@ -1361,740 +812,383 @@ public class RouterRpcServer extends AbstractService @Override // ClientProtocol public boolean setSafeMode(SafeModeAction action, boolean isChecked) throws IOException { - checkOperation(OperationCategory.WRITE); - - // Set safe mode in all the name spaces - RemoteMethod method = new RemoteMethod("setSafeMode", - new Class[] {SafeModeAction.class, boolean.class}, - action, isChecked); - Set nss = namenodeResolver.getNamespaces(); - Map results = - rpcClient.invokeConcurrent( - nss, method, true, !isChecked, Boolean.class); - - // We only report true if all the name space are in safe mode - int numSafemode = 0; - for (boolean safemode : results.values()) { - if (safemode) { - numSafemode++; - } - } - return numSafemode == results.size(); + return clientProto.setSafeMode(action, isChecked); } @Override // ClientProtocol public boolean restoreFailedStorage(String arg) throws IOException { - checkOperation(OperationCategory.UNCHECKED); - - RemoteMethod method = new RemoteMethod("restoreFailedStorage", - new Class[] {String.class}, arg); - final Set nss = namenodeResolver.getNamespaces(); - Map ret = - rpcClient.invokeConcurrent(nss, method, true, false, Boolean.class); - - boolean success = true; - for (boolean s : ret.values()) { - if (!s) { - success = false; - break; - } - } - return success; + return clientProto.restoreFailedStorage(arg); } @Override // ClientProtocol public boolean saveNamespace(long timeWindow, long txGap) throws IOException { - checkOperation(OperationCategory.UNCHECKED); - - RemoteMethod method = new RemoteMethod("saveNamespace", - new Class[] {Long.class, Long.class}, timeWindow, txGap); - final Set nss = namenodeResolver.getNamespaces(); - Map ret = - rpcClient.invokeConcurrent(nss, method, true, false, boolean.class); - - boolean success = true; - for (boolean s : ret.values()) { - if (!s) { - success = false; - break; - } - } - return success; + return clientProto.saveNamespace(timeWindow, txGap); } @Override // ClientProtocol public long rollEdits() throws IOException { - checkOperation(OperationCategory.WRITE); - - RemoteMethod method = new RemoteMethod("rollEdits", new Class[] {}); - final Set nss = namenodeResolver.getNamespaces(); - Map ret = - rpcClient.invokeConcurrent(nss, method, true, false, long.class); - - // Return the maximum txid - long txid = 0; - for (long t : ret.values()) { - if (t > txid) { - txid = t; - } - } - return txid; + return clientProto.rollEdits(); } @Override // ClientProtocol public void refreshNodes() throws IOException { - checkOperation(OperationCategory.UNCHECKED); - - RemoteMethod method = new RemoteMethod("refreshNodes", new Class[] {}); - final Set nss = namenodeResolver.getNamespaces(); - rpcClient.invokeConcurrent(nss, method, true, true); + clientProto.refreshNodes(); } @Override // ClientProtocol public void finalizeUpgrade() throws IOException { - checkOperation(OperationCategory.UNCHECKED); - - RemoteMethod method = new RemoteMethod("finalizeUpgrade", - new Class[] {}); - final Set nss = namenodeResolver.getNamespaces(); - rpcClient.invokeConcurrent(nss, method, true, false); + clientProto.finalizeUpgrade(); } @Override // ClientProtocol public boolean upgradeStatus() throws IOException { - String methodName = getMethodName(); - throw new UnsupportedOperationException( - "Operation \"" + methodName + "\" is not supported"); + return clientProto.upgradeStatus(); } @Override // ClientProtocol public RollingUpgradeInfo rollingUpgrade(RollingUpgradeAction action) throws IOException { - checkOperation(OperationCategory.READ); - - RemoteMethod method = new RemoteMethod("rollingUpgrade", - new Class[] {RollingUpgradeAction.class}, action); - final Set nss = namenodeResolver.getNamespaces(); - Map ret = - rpcClient.invokeConcurrent( - nss, method, true, false, RollingUpgradeInfo.class); - - // Return the first rolling upgrade info - RollingUpgradeInfo info = null; - for (RollingUpgradeInfo infoNs : ret.values()) { - if (info == null && infoNs != null) { - info = infoNs; - } - } - return info; + return clientProto.rollingUpgrade(action); } @Override // ClientProtocol public void metaSave(String filename) throws IOException { - checkOperation(OperationCategory.UNCHECKED); - - RemoteMethod method = new RemoteMethod("metaSave", - new Class[] {String.class}, filename); - final Set nss = namenodeResolver.getNamespaces(); - rpcClient.invokeConcurrent(nss, method, true, false); + clientProto.metaSave(filename); } @Override // ClientProtocol public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) throws IOException { - checkOperation(OperationCategory.READ); - - final List locations = getLocationsForPath(path, false); - RemoteMethod method = new RemoteMethod("listCorruptFileBlocks", - new Class[] {String.class, String.class}, - new RemoteParam(), cookie); - return (CorruptFileBlocks) rpcClient.invokeSequential( - locations, method, CorruptFileBlocks.class, null); + return clientProto.listCorruptFileBlocks(path, cookie); } @Override // ClientProtocol public void setBalancerBandwidth(long bandwidth) throws IOException { - checkOperation(OperationCategory.UNCHECKED); - - RemoteMethod method = new RemoteMethod("setBalancerBandwidth", - new Class[] {Long.class}, bandwidth); - final Set nss = namenodeResolver.getNamespaces(); - rpcClient.invokeConcurrent(nss, method, true, false); + clientProto.setBalancerBandwidth(bandwidth); } @Override // ClientProtocol public ContentSummary getContentSummary(String path) throws IOException { - checkOperation(OperationCategory.READ); - - // Get the summaries from regular files - Collection summaries = new LinkedList<>(); - FileNotFoundException notFoundException = null; - try { - final List locations = getLocationsForPath(path, false); - RemoteMethod method = new RemoteMethod("getContentSummary", - new Class[] {String.class}, new RemoteParam()); - Map results = - rpcClient.invokeConcurrent( - locations, method, false, false, ContentSummary.class); - summaries.addAll(results.values()); - } catch (FileNotFoundException e) { - notFoundException = e; - } - - // Add mount points at this level in the tree - final List children = subclusterResolver.getMountPoints(path); - if (children != null) { - for (String child : children) { - Path childPath = new Path(path, child); - try { - ContentSummary mountSummary = getContentSummary(childPath.toString()); - if (mountSummary != null) { - summaries.add(mountSummary); - } - } catch (Exception e) { - LOG.error("Cannot get content summary for mount {}: {}", - childPath, e.getMessage()); - } - } - } - - // Throw original exception if no original nor mount points - if (summaries.isEmpty() && notFoundException != null) { - throw notFoundException; - } - - return aggregateContentSummary(summaries); - } - - /** - * Aggregate content summaries for each subcluster. - * - * @param summaries Collection of individual summaries. - * @return Aggregated content summary. - */ - private ContentSummary aggregateContentSummary( - Collection summaries) { - if (summaries.size() == 1) { - return summaries.iterator().next(); - } - - long length = 0; - long fileCount = 0; - long directoryCount = 0; - long quota = 0; - long spaceConsumed = 0; - long spaceQuota = 0; - - for (ContentSummary summary : summaries) { - length += summary.getLength(); - fileCount += summary.getFileCount(); - directoryCount += summary.getDirectoryCount(); - quota += summary.getQuota(); - spaceConsumed += summary.getSpaceConsumed(); - spaceQuota += summary.getSpaceQuota(); - } - - ContentSummary ret = new ContentSummary.Builder() - .length(length) - .fileCount(fileCount) - .directoryCount(directoryCount) - .quota(quota) - .spaceConsumed(spaceConsumed) - .spaceQuota(spaceQuota) - .build(); - return ret; + return clientProto.getContentSummary(path); } @Override // ClientProtocol public void fsync(String src, long fileId, String clientName, long lastBlockLength) throws IOException { - checkOperation(OperationCategory.WRITE); - - final List locations = getLocationsForPath(src, true); - RemoteMethod method = new RemoteMethod("fsync", - new Class[] {String.class, long.class, String.class, long.class }, - new RemoteParam(), fileId, clientName, lastBlockLength); - rpcClient.invokeSequential(locations, method); + clientProto.fsync(src, fileId, clientName, lastBlockLength); } @Override // ClientProtocol public void setTimes(String src, long mtime, long atime) throws IOException { - checkOperation(OperationCategory.WRITE); - - final List locations = getLocationsForPath(src, true); - RemoteMethod method = new RemoteMethod("setTimes", - new Class[] {String.class, long.class, long.class}, - new RemoteParam(), mtime, atime); - rpcClient.invokeSequential(locations, method); + clientProto.setTimes(src, mtime, atime); } @Override // ClientProtocol public void createSymlink(String target, String link, FsPermission dirPerms, boolean createParent) throws IOException { - checkOperation(OperationCategory.WRITE); - - // TODO Verify that the link location is in the same NS as the targets - final List targetLocations = - getLocationsForPath(target, true); - final List linkLocations = - getLocationsForPath(link, true); - RemoteLocation linkLocation = linkLocations.get(0); - RemoteMethod method = new RemoteMethod("createSymlink", - new Class[] {String.class, String.class, FsPermission.class, - boolean.class}, - new RemoteParam(), linkLocation.getDest(), dirPerms, createParent); - rpcClient.invokeSequential(targetLocations, method); + clientProto.createSymlink(target, link, dirPerms, createParent); } @Override // ClientProtocol public String getLinkTarget(String path) throws IOException { - checkOperation(OperationCategory.READ); - - final List locations = getLocationsForPath(path, true); - RemoteMethod method = new RemoteMethod("getLinkTarget", - new Class[] {String.class}, new RemoteParam()); - return (String) rpcClient.invokeSequential( - locations, method, String.class, null); + return clientProto.getLinkTarget(path); } @Override // Client Protocol public void allowSnapshot(String snapshotRoot) throws IOException { - checkOperation(OperationCategory.WRITE, false); + clientProto.allowSnapshot(snapshotRoot); } @Override // Client Protocol public void disallowSnapshot(String snapshot) throws IOException { - checkOperation(OperationCategory.WRITE, false); + clientProto.disallowSnapshot(snapshot); } @Override // ClientProtocol public void renameSnapshot(String snapshotRoot, String snapshotOldName, String snapshotNewName) throws IOException { - checkOperation(OperationCategory.WRITE, false); + clientProto.renameSnapshot(snapshotRoot, snapshotOldName, snapshotNewName); } @Override // Client Protocol public SnapshottableDirectoryStatus[] getSnapshottableDirListing() throws IOException { - checkOperation(OperationCategory.READ, false); - return null; + return clientProto.getSnapshottableDirListing(); } @Override // ClientProtocol public SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot, String earlierSnapshotName, String laterSnapshotName) throws IOException { - checkOperation(OperationCategory.READ, false); - return null; + return clientProto.getSnapshotDiffReport( + snapshotRoot, earlierSnapshotName, laterSnapshotName); } @Override // ClientProtocol public SnapshotDiffReportListing getSnapshotDiffReportListing( String snapshotRoot, String earlierSnapshotName, String laterSnapshotName, byte[] startPath, int index) throws IOException { - checkOperation(OperationCategory.READ, false); - return null; + return clientProto.getSnapshotDiffReportListing(snapshotRoot, + earlierSnapshotName, laterSnapshotName, startPath, index); } @Override // ClientProtocol public long addCacheDirective(CacheDirectiveInfo path, EnumSet flags) throws IOException { - checkOperation(OperationCategory.WRITE, false); - return 0; + return clientProto.addCacheDirective(path, flags); } @Override // ClientProtocol public void modifyCacheDirective(CacheDirectiveInfo directive, EnumSet flags) throws IOException { - checkOperation(OperationCategory.WRITE, false); + clientProto.modifyCacheDirective(directive, flags); } @Override // ClientProtocol public void removeCacheDirective(long id) throws IOException { - checkOperation(OperationCategory.WRITE, false); + clientProto.removeCacheDirective(id); } @Override // ClientProtocol public BatchedEntries listCacheDirectives( long prevId, CacheDirectiveInfo filter) throws IOException { - checkOperation(OperationCategory.READ, false); - return null; + return clientProto.listCacheDirectives(prevId, filter); } @Override // ClientProtocol public void addCachePool(CachePoolInfo info) throws IOException { - checkOperation(OperationCategory.WRITE, false); + clientProto.addCachePool(info); } @Override // ClientProtocol public void modifyCachePool(CachePoolInfo info) throws IOException { - checkOperation(OperationCategory.WRITE, false); + clientProto.modifyCachePool(info); } @Override // ClientProtocol public void removeCachePool(String cachePoolName) throws IOException { - checkOperation(OperationCategory.WRITE, false); + clientProto.removeCachePool(cachePoolName); } @Override // ClientProtocol public BatchedEntries listCachePools(String prevKey) throws IOException { - checkOperation(OperationCategory.READ, false); - return null; + return clientProto.listCachePools(prevKey); } @Override // ClientProtocol public void modifyAclEntries(String src, List aclSpec) throws IOException { - checkOperation(OperationCategory.WRITE); - - // TODO handle virtual directories - final List locations = getLocationsForPath(src, true); - RemoteMethod method = new RemoteMethod("modifyAclEntries", - new Class[] {String.class, List.class}, - new RemoteParam(), aclSpec); - rpcClient.invokeSequential(locations, method, null, null); + clientProto.modifyAclEntries(src, aclSpec); } @Override // ClienProtocol public void removeAclEntries(String src, List aclSpec) throws IOException { - checkOperation(OperationCategory.WRITE); - - // TODO handle virtual directories - final List locations = getLocationsForPath(src, true); - RemoteMethod method = new RemoteMethod("removeAclEntries", - new Class[] {String.class, List.class}, - new RemoteParam(), aclSpec); - rpcClient.invokeSequential(locations, method, null, null); + clientProto.removeAclEntries(src, aclSpec); } @Override // ClientProtocol public void removeDefaultAcl(String src) throws IOException { - checkOperation(OperationCategory.WRITE); - - // TODO handle virtual directories - final List locations = getLocationsForPath(src, true); - RemoteMethod method = new RemoteMethod("removeDefaultAcl", - new Class[] {String.class}, new RemoteParam()); - rpcClient.invokeSequential(locations, method); + clientProto.removeDefaultAcl(src); } @Override // ClientProtocol public void removeAcl(String src) throws IOException { - checkOperation(OperationCategory.WRITE); - - // TODO handle virtual directories - final List locations = getLocationsForPath(src, true); - RemoteMethod method = new RemoteMethod("removeAcl", - new Class[] {String.class}, new RemoteParam()); - rpcClient.invokeSequential(locations, method); + clientProto.removeAcl(src); } @Override // ClientProtocol public void setAcl(String src, List aclSpec) throws IOException { - checkOperation(OperationCategory.WRITE); - - // TODO handle virtual directories - final List locations = getLocationsForPath(src, true); - RemoteMethod method = new RemoteMethod( - "setAcl", new Class[] {String.class, List.class}, - new RemoteParam(), aclSpec); - rpcClient.invokeSequential(locations, method); + clientProto.setAcl(src, aclSpec); } @Override // ClientProtocol public AclStatus getAclStatus(String src) throws IOException { - checkOperation(OperationCategory.READ); - - // TODO handle virtual directories - final List locations = getLocationsForPath(src, false); - RemoteMethod method = new RemoteMethod("getAclStatus", - new Class[] {String.class}, new RemoteParam()); - return (AclStatus) rpcClient.invokeSequential( - locations, method, AclStatus.class, null); + return clientProto.getAclStatus(src); } @Override // ClientProtocol public void createEncryptionZone(String src, String keyName) throws IOException { - checkOperation(OperationCategory.WRITE); - - // TODO handle virtual directories - final List locations = getLocationsForPath(src, true); - RemoteMethod method = new RemoteMethod("createEncryptionZone", - new Class[] {String.class, String.class}, - new RemoteParam(), keyName); - rpcClient.invokeSequential(locations, method); + clientProto.createEncryptionZone(src, keyName); } @Override // ClientProtocol public EncryptionZone getEZForPath(String src) throws IOException { - checkOperation(OperationCategory.READ); - - // TODO handle virtual directories - final List locations = getLocationsForPath(src, false); - RemoteMethod method = new RemoteMethod("getEZForPath", - new Class[] {String.class}, new RemoteParam()); - return (EncryptionZone) rpcClient.invokeSequential( - locations, method, EncryptionZone.class, null); + return clientProto.getEZForPath(src); } @Override // ClientProtocol public BatchedEntries listEncryptionZones(long prevId) throws IOException { - checkOperation(OperationCategory.READ, false); - return null; + return clientProto.listEncryptionZones(prevId); } @Override // ClientProtocol public void reencryptEncryptionZone(String zone, ReencryptAction action) throws IOException { - checkOperation(OperationCategory.WRITE, false); + clientProto.reencryptEncryptionZone(zone, action); } @Override // ClientProtocol public BatchedEntries listReencryptionStatus( long prevId) throws IOException { - checkOperation(OperationCategory.READ, false); - return null; + return clientProto.listReencryptionStatus(prevId); } @Override // ClientProtocol public void setXAttr(String src, XAttr xAttr, EnumSet flag) throws IOException { - checkOperation(OperationCategory.WRITE); - - // TODO handle virtual directories - final List locations = getLocationsForPath(src, true); - RemoteMethod method = new RemoteMethod("setXAttr", - new Class[] {String.class, XAttr.class, EnumSet.class}, - new RemoteParam(), xAttr, flag); - rpcClient.invokeSequential(locations, method); + clientProto.setXAttr(src, xAttr, flag); } - @SuppressWarnings("unchecked") @Override // ClientProtocol public List getXAttrs(String src, List xAttrs) throws IOException { - checkOperation(OperationCategory.READ); - - // TODO handle virtual directories - final List locations = getLocationsForPath(src, false); - RemoteMethod method = new RemoteMethod("getXAttrs", - new Class[] {String.class, List.class}, new RemoteParam(), xAttrs); - return (List) rpcClient.invokeSequential( - locations, method, List.class, null); + return clientProto.getXAttrs(src, xAttrs); } - @SuppressWarnings("unchecked") @Override // ClientProtocol public List listXAttrs(String src) throws IOException { - checkOperation(OperationCategory.READ); - - // TODO handle virtual directories - final List locations = getLocationsForPath(src, false); - RemoteMethod method = new RemoteMethod("listXAttrs", - new Class[] {String.class}, new RemoteParam()); - return (List) rpcClient.invokeSequential( - locations, method, List.class, null); + return clientProto.listXAttrs(src); } @Override // ClientProtocol public void removeXAttr(String src, XAttr xAttr) throws IOException { - checkOperation(OperationCategory.WRITE); - - // TODO handle virtual directories - final List locations = getLocationsForPath(src, true); - RemoteMethod method = new RemoteMethod("removeXAttr", - new Class[] {String.class, XAttr.class}, new RemoteParam(), xAttr); - rpcClient.invokeSequential(locations, method); + clientProto.removeXAttr(src, xAttr); } @Override // ClientProtocol public void checkAccess(String path, FsAction mode) throws IOException { - checkOperation(OperationCategory.READ); - - // TODO handle virtual directories - final List locations = getLocationsForPath(path, true); - RemoteMethod method = new RemoteMethod("checkAccess", - new Class[] {String.class, FsAction.class}, - new RemoteParam(), mode); - rpcClient.invokeSequential(locations, method); + clientProto.checkAccess(path, mode); } @Override // ClientProtocol public long getCurrentEditLogTxid() throws IOException { - checkOperation(OperationCategory.READ); - - RemoteMethod method = new RemoteMethod( - "getCurrentEditLogTxid", new Class[] {}); - final Set nss = namenodeResolver.getNamespaces(); - Map ret = - rpcClient.invokeConcurrent(nss, method, true, false, long.class); - - // Return the maximum txid - long txid = 0; - for (long t : ret.values()) { - if (t > txid) { - txid = t; - } - } - return txid; + return clientProto.getCurrentEditLogTxid(); } @Override // ClientProtocol public EventBatchList getEditsFromTxid(long txid) throws IOException { - checkOperation(OperationCategory.READ, false); - return null; + return clientProto.getEditsFromTxid(txid); } - @Override + @Override // ClientProtocol public DataEncryptionKey getDataEncryptionKey() throws IOException { - checkOperation(OperationCategory.READ, false); - return null; + return clientProto.getDataEncryptionKey(); } - @Override + @Override // ClientProtocol public String createSnapshot(String snapshotRoot, String snapshotName) throws IOException { - checkOperation(OperationCategory.WRITE); - return null; + return clientProto.createSnapshot(snapshotRoot, snapshotName); } - @Override + @Override // ClientProtocol public void deleteSnapshot(String snapshotRoot, String snapshotName) throws IOException { - checkOperation(OperationCategory.WRITE, false); + clientProto.deleteSnapshot(snapshotRoot, snapshotName); } @Override // ClientProtocol public void setQuota(String path, long namespaceQuota, long storagespaceQuota, StorageType type) throws IOException { - this.quotaCall.setQuota(path, namespaceQuota, storagespaceQuota, type); + clientProto.setQuota(path, namespaceQuota, storagespaceQuota, type); } @Override // ClientProtocol public QuotaUsage getQuotaUsage(String path) throws IOException { - return this.quotaCall.getQuotaUsage(path); + return clientProto.getQuotaUsage(path); } - @Override + @Override // ClientProtocol public void reportBadBlocks(LocatedBlock[] blocks) throws IOException { - checkOperation(OperationCategory.WRITE); - - // Block pool id -> blocks - Map> blockLocations = new HashMap<>(); - for (LocatedBlock block : blocks) { - String bpId = block.getBlock().getBlockPoolId(); - List bpBlocks = blockLocations.get(bpId); - if (bpBlocks == null) { - bpBlocks = new LinkedList<>(); - blockLocations.put(bpId, bpBlocks); - } - bpBlocks.add(block); - } - - // Invoke each block pool - for (Entry> entry : blockLocations.entrySet()) { - String bpId = entry.getKey(); - List bpBlocks = entry.getValue(); - - LocatedBlock[] bpBlocksArray = - bpBlocks.toArray(new LocatedBlock[bpBlocks.size()]); - RemoteMethod method = new RemoteMethod("reportBadBlocks", - new Class[] {LocatedBlock[].class}, - new Object[] {bpBlocksArray}); - rpcClient.invokeSingleBlockPool(bpId, method); - } + clientProto.reportBadBlocks(blocks); } - @Override + @Override // ClientProtocol public void unsetStoragePolicy(String src) throws IOException { - checkOperation(OperationCategory.WRITE, false); + clientProto.unsetStoragePolicy(src); } - @Override + @Override // ClientProtocol public BlockStoragePolicy getStoragePolicy(String path) throws IOException { - checkOperation(OperationCategory.READ, false); - return null; + return clientProto.getStoragePolicy(path); } @Override // ClientProtocol public ErasureCodingPolicyInfo[] getErasureCodingPolicies() throws IOException { - return erasureCoding.getErasureCodingPolicies(); + return clientProto.getErasureCodingPolicies(); } @Override // ClientProtocol public Map getErasureCodingCodecs() throws IOException { - return erasureCoding.getErasureCodingCodecs(); + return clientProto.getErasureCodingCodecs(); } @Override // ClientProtocol public AddErasureCodingPolicyResponse[] addErasureCodingPolicies( ErasureCodingPolicy[] policies) throws IOException { - return erasureCoding.addErasureCodingPolicies(policies); + return clientProto.addErasureCodingPolicies(policies); } @Override // ClientProtocol public void removeErasureCodingPolicy(String ecPolicyName) throws IOException { - erasureCoding.removeErasureCodingPolicy(ecPolicyName); + clientProto.removeErasureCodingPolicy(ecPolicyName); } @Override // ClientProtocol public void disableErasureCodingPolicy(String ecPolicyName) throws IOException { - erasureCoding.disableErasureCodingPolicy(ecPolicyName); + clientProto.disableErasureCodingPolicy(ecPolicyName); } @Override // ClientProtocol public void enableErasureCodingPolicy(String ecPolicyName) throws IOException { - erasureCoding.enableErasureCodingPolicy(ecPolicyName); + clientProto.enableErasureCodingPolicy(ecPolicyName); } @Override // ClientProtocol public ErasureCodingPolicy getErasureCodingPolicy(String src) throws IOException { - return erasureCoding.getErasureCodingPolicy(src); + return clientProto.getErasureCodingPolicy(src); } @Override // ClientProtocol public void setErasureCodingPolicy(String src, String ecPolicyName) throws IOException { - erasureCoding.setErasureCodingPolicy(src, ecPolicyName); + clientProto.setErasureCodingPolicy(src, ecPolicyName); } @Override // ClientProtocol public void unsetErasureCodingPolicy(String src) throws IOException { - erasureCoding.unsetErasureCodingPolicy(src); + clientProto.unsetErasureCodingPolicy(src); } - @Override + @Override // ClientProtocol public ECBlockGroupStats getECBlockGroupStats() throws IOException { - return erasureCoding.getECBlockGroupStats(); + return clientProto.getECBlockGroupStats(); } - @Override + @Override // ClientProtocol public ReplicatedBlockStats getReplicatedBlockStats() throws IOException { - checkOperation(OperationCategory.READ, false); - return null; + return clientProto.getReplicatedBlockStats(); } @Deprecated - @Override + @Override // ClientProtocol public BatchedEntries listOpenFiles(long prevId) throws IOException { - return listOpenFiles(prevId, EnumSet.of(OpenFilesType.ALL_OPEN_FILES), - OpenFilesIterator.FILTER_PATH_DEFAULT); + return clientProto.listOpenFiles(prevId); } - @Override + @Override // ClientProtocol public BatchedEntries listOpenFiles(long prevId, EnumSet openFilesTypes, String path) throws IOException { - checkOperation(OperationCategory.READ, false); - return null; + return clientProto.listOpenFiles(prevId, openFilesTypes, path); } @Override // NamenodeProtocol @@ -2177,7 +1271,7 @@ public class RouterRpcServer extends AbstractService * @return Prioritized list of locations in the federated cluster. * @throws IOException if the location for this path cannot be determined. */ - private RemoteLocation getLocationForPath( + protected RemoteLocation getLocationForPath( String path, boolean failIfLocked, String blockPoolId) throws IOException { @@ -2276,27 +1370,6 @@ public class RouterRpcServer extends AbstractService } } - /** - * Check if a path should be in all subclusters. - * - * @param path Path to check. - * @return If a path should be in all subclusters. - */ - private boolean isPathAll(final String path) { - if (subclusterResolver instanceof MountTableResolver) { - try { - MountTableResolver mountTable = (MountTableResolver)subclusterResolver; - MountTable entry = mountTable.getMountPoint(path); - if (entry != null) { - return entry.isAll(); - } - } catch (IOException e) { - LOG.error("Cannot get mount point", e); - } - } - return false; - } - /** * Check if a path is in a read only mount point. * @@ -2318,121 +1391,6 @@ public class RouterRpcServer extends AbstractService return false; } - /** - * Get the modification dates for mount points. - * - * @param path Name of the path to start checking dates from. - * @return Map with the modification dates for all sub-entries. - */ - private Map getMountPointDates(String path) { - Map ret = new TreeMap<>(); - if (subclusterResolver instanceof MountTableResolver) { - try { - final List children = subclusterResolver.getMountPoints(path); - for (String child : children) { - Long modTime = getModifiedTime(ret, path, child); - ret.put(child, modTime); - } - } catch (IOException e) { - LOG.error("Cannot get mount point", e); - } - } - return ret; - } - - /** - * Get modified time for child. If the child is present in mount table it - * will return the modified time. If the child is not present but subdirs of - * this child are present then it will return latest modified subdir's time - * as modified time of the requested child. - * @param ret contains children and modified times. - * @param mountTable. - * @param path Name of the path to start checking dates from. - * @param child child of the requested path. - * @return modified time. - */ - private long getModifiedTime(Map ret, String path, - String child) { - MountTableResolver mountTable = (MountTableResolver)subclusterResolver; - String srcPath; - if (path.equals(Path.SEPARATOR)) { - srcPath = Path.SEPARATOR + child; - } else { - srcPath = path + Path.SEPARATOR + child; - } - Long modTime = 0L; - try { - // Get mount table entry for the srcPath - MountTable entry = mountTable.getMountPoint(srcPath); - // if srcPath is not in mount table but its subdirs are in mount - // table we will display latest modified subdir date/time. - if (entry == null) { - List entries = mountTable.getMounts(srcPath); - for (MountTable eachEntry : entries) { - // Get the latest date - if (ret.get(child) == null || - ret.get(child) < eachEntry.getDateModified()) { - modTime = eachEntry.getDateModified(); - } - } - } else { - modTime = entry.getDateModified(); - } - } catch (IOException e) { - LOG.error("Cannot get mount point", e); - } - return modTime; - } - - /** - * Create a new file status for a mount point. - * - * @param name Name of the mount point. - * @param childrenNum Number of children. - * @param date Map with the dates. - * @return New HDFS file status representing a mount point. - */ - private HdfsFileStatus getMountPointStatus( - String name, int childrenNum, long date) { - long modTime = date; - long accessTime = date; - FsPermission permission = FsPermission.getDirDefault(); - String owner = this.superUser; - String group = this.superGroup; - try { - // TODO support users, it should be the user for the pointed folder - UserGroupInformation ugi = getRemoteUser(); - owner = ugi.getUserName(); - group = ugi.getPrimaryGroupName(); - } catch (IOException e) { - LOG.error("Cannot get the remote user: {}", e.getMessage()); - } - long inodeId = 0; - return new HdfsFileStatus.Builder() - .isdir(true) - .mtime(modTime) - .atime(accessTime) - .perm(permission) - .owner(owner) - .group(group) - .symlink(new byte[0]) - .path(DFSUtil.string2Bytes(name)) - .fileId(inodeId) - .children(childrenNum) - .build(); - } - - /** - * Get the name of the method that is calling this function. - * - * @return Name of the method calling this function. - */ - private static String getMethodName() { - final StackTraceElement[] stack = Thread.currentThread().getStackTrace(); - String methodName = stack[3].getMethodName(); - return methodName; - } - /** * Get the user that is invoking this operation. *