HDFS-12620. Backporting HDFS-10467 to branch-2. Contributed by Inigo Goiri.
This commit is contained in:
parent
629b88b4dd
commit
12c81c67d7
|
@ -47,6 +47,8 @@ function print_usage(){
|
|||
echo " datanode run a DFS datanode"
|
||||
echo " debug run a Debug Admin to execute HDFS debug commands"
|
||||
echo " dfsadmin run a DFS admin client"
|
||||
echo " dfsrouter run the DFS router"
|
||||
echo " dfsrouteradmin manage Router-based federation"
|
||||
echo " haadmin run a DFS HA admin client"
|
||||
echo " fsck run a DFS filesystem checking utility"
|
||||
echo " balancer run a cluster balancing utility"
|
||||
|
@ -157,6 +159,11 @@ elif [ "$COMMAND" = "dfs" ] ; then
|
|||
elif [ "$COMMAND" = "dfsadmin" ] ; then
|
||||
CLASS=org.apache.hadoop.hdfs.tools.DFSAdmin
|
||||
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
|
||||
elif [ "$COMMAND" = "dfsrouter" ] ; then
|
||||
CLASS='org.apache.hadoop.hdfs.server.federation.router.DFSRouter'
|
||||
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_ROUTER_OPTS"
|
||||
elif [ "$COMMAND" = "dfsrouteradmin" ] ; then
|
||||
CLASS='org.apache.hadoop.hdfs.tools.federation.RouterAdmin'
|
||||
elif [ "$COMMAND" = "haadmin" ] ; then
|
||||
CLASS=org.apache.hadoop.hdfs.tools.DFSHAAdmin
|
||||
CLASSPATH=${CLASSPATH}:${TOOL_PATH}
|
||||
|
|
|
@ -31,6 +31,7 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.LinkedList;
|
||||
|
@ -38,10 +39,6 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.ToIntFunction;
|
||||
import java.util.function.ToLongFunction;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import javax.management.NotCompliantMBeanException;
|
||||
import javax.management.ObjectName;
|
||||
|
@ -72,7 +69,7 @@ import org.apache.hadoop.metrics2.util.MBeans;
|
|||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.VersionInfo;
|
||||
import org.codehaus.jettison.json.JSONObject;
|
||||
import org.eclipse.jetty.util.ajax.JSON;
|
||||
import org.mortbay.util.ajax.JSON;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -263,12 +260,12 @@ public class FederationMetrics implements FederationMBean {
|
|||
|
||||
@Override
|
||||
public long getTotalCapacity() {
|
||||
return getNameserviceAggregatedLong(MembershipStats::getTotalSpace);
|
||||
return getNameserviceAggregatedLong("getTotalSpace");
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getRemainingCapacity() {
|
||||
return getNameserviceAggregatedLong(MembershipStats::getAvailableSpace);
|
||||
return getNameserviceAggregatedLong("getAvailableSpace");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -323,31 +320,27 @@ public class FederationMetrics implements FederationMBean {
|
|||
|
||||
@Override
|
||||
public int getNumLiveNodes() {
|
||||
return getNameserviceAggregatedInt(
|
||||
MembershipStats::getNumOfActiveDatanodes);
|
||||
return getNameserviceAggregatedInt("getNumOfActiveDatanodes");
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumDeadNodes() {
|
||||
return getNameserviceAggregatedInt(MembershipStats::getNumOfDeadDatanodes);
|
||||
return getNameserviceAggregatedInt("getNumOfDeadDatanodes");
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumDecommissioningNodes() {
|
||||
return getNameserviceAggregatedInt(
|
||||
MembershipStats::getNumOfDecommissioningDatanodes);
|
||||
return getNameserviceAggregatedInt("getNumOfDecommissioningDatanodes");
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumDecomLiveNodes() {
|
||||
return getNameserviceAggregatedInt(
|
||||
MembershipStats::getNumOfDecomActiveDatanodes);
|
||||
return getNameserviceAggregatedInt("getNumOfDecomActiveDatanodes");
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumDecomDeadNodes() {
|
||||
return getNameserviceAggregatedInt(
|
||||
MembershipStats::getNumOfDecomDeadDatanodes);
|
||||
return getNameserviceAggregatedInt("getNumOfDecomDeadDatanodes");
|
||||
}
|
||||
|
||||
@Override // NameNodeMXBean
|
||||
|
@ -398,35 +391,32 @@ public class FederationMetrics implements FederationMBean {
|
|||
|
||||
@Override
|
||||
public long getNumBlocks() {
|
||||
return getNameserviceAggregatedLong(MembershipStats::getNumOfBlocks);
|
||||
return getNameserviceAggregatedLong("getNumOfBlocks");
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getNumOfMissingBlocks() {
|
||||
return getNameserviceAggregatedLong(MembershipStats::getNumOfBlocksMissing);
|
||||
return getNameserviceAggregatedLong("getNumOfBlocksMissing");
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getNumOfBlocksPendingReplication() {
|
||||
return getNameserviceAggregatedLong(
|
||||
MembershipStats::getNumOfBlocksPendingReplication);
|
||||
return getNameserviceAggregatedLong("getNumOfBlocksPendingReplication");
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getNumOfBlocksUnderReplicated() {
|
||||
return getNameserviceAggregatedLong(
|
||||
MembershipStats::getNumOfBlocksUnderReplicated);
|
||||
return getNameserviceAggregatedLong("getNumOfBlocksUnderReplicated");
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getNumOfBlocksPendingDeletion() {
|
||||
return getNameserviceAggregatedLong(
|
||||
MembershipStats::getNumOfBlocksPendingDeletion);
|
||||
return getNameserviceAggregatedLong("getNumOfBlocksPendingDeletion");
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getNumFiles() {
|
||||
return getNameserviceAggregatedLong(MembershipStats::getNumOfFiles);
|
||||
return getNameserviceAggregatedLong("getNumOfFiles");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -472,8 +462,7 @@ public class FederationMetrics implements FederationMBean {
|
|||
@Override
|
||||
public String getClusterId() {
|
||||
try {
|
||||
Collection<String> clusterIds =
|
||||
getNamespaceInfo(FederationNamespaceInfo::getClusterId);
|
||||
Collection<String> clusterIds = getNamespaceInfo("getClusterId");
|
||||
return clusterIds.toString();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Cannot fetch cluster ID metrics: {}", e.getMessage());
|
||||
|
@ -484,8 +473,7 @@ public class FederationMetrics implements FederationMBean {
|
|||
@Override
|
||||
public String getBlockPoolId() {
|
||||
try {
|
||||
Collection<String> blockpoolIds =
|
||||
getNamespaceInfo(FederationNamespaceInfo::getBlockPoolId);
|
||||
Collection<String> blockpoolIds = getNamespaceInfo("getBlockPoolId");
|
||||
return blockpoolIds.toString();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Cannot fetch block pool ID metrics: {}", e.getMessage());
|
||||
|
@ -501,19 +489,31 @@ public class FederationMetrics implements FederationMBean {
|
|||
/**
|
||||
* Build a set of unique values found in all namespaces.
|
||||
*
|
||||
* @param f Method reference of the appropriate FederationNamespaceInfo
|
||||
* @param getterName String name of the appropriate FederationNamespaceInfo
|
||||
* getter function
|
||||
* @return Set of unique string values found in all discovered namespaces.
|
||||
* @throws IOException if the query could not be executed.
|
||||
*/
|
||||
private Collection<String> getNamespaceInfo(
|
||||
Function<FederationNamespaceInfo, String> f) throws IOException {
|
||||
public Collection<String> getNamespaceInfo(String getterName)
|
||||
throws IOException {
|
||||
|
||||
GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance();
|
||||
GetNamespaceInfoResponse response =
|
||||
membershipStore.getNamespaceInfo(request);
|
||||
return response.getNamespaceInfo().stream()
|
||||
.map(f)
|
||||
.collect(Collectors.toSet());
|
||||
Set<FederationNamespaceInfo> namespacesInfo = response.getNamespaceInfo();
|
||||
|
||||
Set<String> ret = new HashSet<>();
|
||||
for (FederationNamespaceInfo namespace : namespacesInfo) {
|
||||
try {
|
||||
Method m = FederationNamespaceInfo.class.getDeclaredMethod(getterName);
|
||||
String data = (String) m.invoke(namespace);
|
||||
ret.add(data);
|
||||
} catch (SecurityException | ReflectiveOperationException e) {
|
||||
throw new IOException(
|
||||
"Cannot invoke " + getterName + " from " + namespace);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -521,15 +521,19 @@ public class FederationMetrics implements FederationMBean {
|
|||
* @param f Method reference
|
||||
* @return Aggregated integer.
|
||||
*/
|
||||
private int getNameserviceAggregatedInt(ToIntFunction<MembershipStats> f) {
|
||||
private int getNameserviceAggregatedInt(String methodName) {
|
||||
int total = 0;
|
||||
try {
|
||||
return getActiveNamenodeRegistrations().stream()
|
||||
.map(MembershipState::getStats)
|
||||
.collect(Collectors.summingInt(f));
|
||||
Collection<Object> data = getNameservicesStats(methodName);
|
||||
for (Object o : data) {
|
||||
Integer l = (Integer) o;
|
||||
total += l;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to extract metrics: {}", e.getMessage());
|
||||
LOG.error("Cannot invoke {} for JMX: {}", methodName, e.getMessage());
|
||||
return 0;
|
||||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -537,15 +541,60 @@ public class FederationMetrics implements FederationMBean {
|
|||
* @param f Method reference
|
||||
* @return Aggregated long.
|
||||
*/
|
||||
private long getNameserviceAggregatedLong(ToLongFunction<MembershipStats> f) {
|
||||
private long getNameserviceAggregatedLong(String methodName) {
|
||||
long total = 0;
|
||||
try {
|
||||
return getActiveNamenodeRegistrations().stream()
|
||||
.map(MembershipState::getStats)
|
||||
.collect(Collectors.summingLong(f));
|
||||
Collection<Object> data = getNameservicesStats(methodName);
|
||||
for (Object o : data) {
|
||||
Long l = (Long) o;
|
||||
total += l;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to extract metrics: {}", e.getMessage());
|
||||
LOG.error("Cannot invoke {} for JMX: {}", methodName, e.getMessage());
|
||||
return 0;
|
||||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
/**
|
||||
* Aggregate a namenode data element from the most active namenode in each
|
||||
* registered nameservice.
|
||||
*
|
||||
* @param getter String name of the getter function to invoke on the
|
||||
* discovered NamenodeMembershipRecord object.
|
||||
* @return Aggregated getter return values from all registered nameservices,
|
||||
* one per nameservice.
|
||||
* @throws IOException if the query could not be performed.
|
||||
*/
|
||||
private Collection<Object> getNameservicesStats(String getter)
|
||||
throws IOException {
|
||||
|
||||
List<Object> ret = new ArrayList<>();
|
||||
try {
|
||||
Method metricsGetter = MembershipStats.class.getDeclaredMethod(getter);
|
||||
List<MembershipState> namenodes = getActiveNamenodeRegistrations();
|
||||
for (MembershipState namenode : namenodes) {
|
||||
try {
|
||||
MembershipStats stats = namenode.getStats();
|
||||
if (stats != null) {
|
||||
Object data = metricsGetter.invoke(stats);
|
||||
ret.add(data);
|
||||
}
|
||||
} catch (ReflectiveOperationException e) {
|
||||
throw new IOException(
|
||||
"Cannot invoke " + getter + " from " + namenode);
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new IOException("Bad arguments invoking " + getter);
|
||||
}
|
||||
}
|
||||
} catch (NoSuchMethodException e) {
|
||||
throw new IOException(
|
||||
"Cannot invoke " + getter + " from membership stats record");
|
||||
} catch (SecurityException e) {
|
||||
throw new IOException(
|
||||
"Cannot invoke " + getter + " from membership stats record");
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -21,12 +21,14 @@ import static org.apache.hadoop.util.Time.now;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.management.NotCompliantMBeanException;
|
||||
import javax.management.ObjectName;
|
||||
|
@ -52,7 +54,7 @@ import org.apache.hadoop.ipc.StandbyException;
|
|||
import org.apache.hadoop.metrics2.util.MBeans;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.util.VersionInfo;
|
||||
import org.eclipse.jetty.util.ajax.JSON;
|
||||
import org.mortbay.util.ajax.JSON;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -232,27 +234,15 @@ public class NamenodeBeanMetrics
|
|||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public long getPendingReplicationBlocks() {
|
||||
return getFederationMetrics().getNumOfBlocksPendingReplication();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getPendingReconstructionBlocks() {
|
||||
return getFederationMetrics().getNumOfBlocksPendingReplication();
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public long getUnderReplicatedBlocks() {
|
||||
return getFederationMetrics().getNumOfBlocksUnderReplicated();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLowRedundancyBlocks() {
|
||||
return getFederationMetrics().getNumOfBlocksUnderReplicated();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getPendingDeletionBlocks() {
|
||||
return getFederationMetrics().getNumOfBlocksPendingDeletion();
|
||||
|
@ -338,7 +328,7 @@ public class NamenodeBeanMetrics
|
|||
@Override
|
||||
public String getClusterId() {
|
||||
try {
|
||||
return getNamespaceInfo(FederationNamespaceInfo::getClusterId).toString();
|
||||
return getNamespaceInfo("getClusterId").toString();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Cannot fetch cluster ID metrics {}", e.getMessage());
|
||||
return "";
|
||||
|
@ -348,8 +338,7 @@ public class NamenodeBeanMetrics
|
|||
@Override
|
||||
public String getBlockPoolId() {
|
||||
try {
|
||||
return
|
||||
getNamespaceInfo(FederationNamespaceInfo::getBlockPoolId).toString();
|
||||
return getNamespaceInfo("getBlockPoolId").toString();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Cannot fetch block pool ID metrics {}", e.getMessage());
|
||||
return "";
|
||||
|
@ -359,13 +348,14 @@ public class NamenodeBeanMetrics
|
|||
/**
|
||||
* Build a set of unique values found in all namespaces.
|
||||
*
|
||||
* @param f Method reference of the appropriate FederationNamespaceInfo
|
||||
* @param getterName String name of the appropriate FederationNamespaceInfo
|
||||
* getter function
|
||||
* @return Set of unique string values found in all discovered namespaces.
|
||||
* @throws IOException if the query could not be executed.
|
||||
*/
|
||||
private Collection<String> getNamespaceInfo(
|
||||
Function<FederationNamespaceInfo, String> f) throws IOException {
|
||||
public Collection<String> getNamespaceInfo(String getterName)
|
||||
throws IOException {
|
||||
|
||||
StateStoreService stateStore = router.getStateStore();
|
||||
MembershipStore membershipStore =
|
||||
stateStore.getRegisteredRecordStore(MembershipStore.class);
|
||||
|
@ -373,9 +363,20 @@ public class NamenodeBeanMetrics
|
|||
GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance();
|
||||
GetNamespaceInfoResponse response =
|
||||
membershipStore.getNamespaceInfo(request);
|
||||
return response.getNamespaceInfo().stream()
|
||||
.map(f)
|
||||
.collect(Collectors.toSet());
|
||||
Set<FederationNamespaceInfo> namespacesInfo = response.getNamespaceInfo();
|
||||
|
||||
Set<String> ret = new HashSet<String>();
|
||||
for (FederationNamespaceInfo namespace : namespacesInfo) {
|
||||
try {
|
||||
Method m = FederationNamespaceInfo.class.getDeclaredMethod(getterName);
|
||||
String data = (String) m.invoke(namespace);
|
||||
ret.add(data);
|
||||
} catch (SecurityException | ReflectiveOperationException ex) {
|
||||
throw new IOException(
|
||||
"Cannot invoke " + getterName + " from " + namespace);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -403,6 +404,12 @@ public class NamenodeBeanMetrics
|
|||
return this.router.getStartTime();
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
@Override
|
||||
public String getNNStarted() {
|
||||
return new Date(this.router.getStartTime()).toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getCompileInfo() {
|
||||
return VersionInfo.getDate() + " by " + VersionInfo.getUser() +
|
||||
|
@ -454,6 +461,12 @@ public class NamenodeBeanMetrics
|
|||
return getFederationMetrics().getNumFiles();
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
@Override
|
||||
public long getTotalFiles() {
|
||||
return getFederationMetrics().getNumFiles();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getTotalLoad() {
|
||||
return -1;
|
||||
|
|
|
@ -323,20 +323,22 @@ public class MountTableResolver
|
|||
verifyMountTable();
|
||||
readLock.lock();
|
||||
try {
|
||||
return this.locationCache.computeIfAbsent(
|
||||
path, this::lookupLocation);
|
||||
PathLocation ret = this.locationCache.get(path);
|
||||
if (ret == null) {
|
||||
ret = buildPathLocation(path);
|
||||
this.locationCache.put(path, ret);
|
||||
}
|
||||
return ret;
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the path location to insert into the cache atomically. It must hold
|
||||
* the read lock.
|
||||
* @param path Path to check/insert.
|
||||
* @return New remote location.
|
||||
* Builder to insert the path location into the cache atomically. It must
|
||||
* hold the read lock.
|
||||
*/
|
||||
public PathLocation lookupLocation(final String path) {
|
||||
private PathLocation buildPathLocation(String path) {
|
||||
PathLocation ret = null;
|
||||
MountTable entry = findDeepest(path);
|
||||
if (entry != null) {
|
||||
|
|
|
@ -36,7 +36,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.eclipse.jetty.util.ajax.JSON;
|
||||
import org.mortbay.util.ajax.JSON;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
|
@ -48,7 +48,7 @@ import org.apache.hadoop.security.SaslRpcServer;
|
|||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.eclipse.jetty.util.ajax.JSON;
|
||||
import org.mortbay.util.ajax.JSON;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
|
@ -64,7 +64,6 @@ import org.apache.hadoop.hdfs.AddBlockFlag;
|
|||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.inotify.EventBatchList;
|
||||
import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
||||
|
@ -75,12 +74,9 @@ import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
|
|||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||
import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats;
|
||||
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
|
@ -88,11 +84,9 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
|
|||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
|
||||
import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats;
|
||||
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
|
||||
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
|
||||
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB;
|
||||
|
@ -457,18 +451,16 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|||
public HdfsFileStatus create(String src, FsPermission masked,
|
||||
String clientName, EnumSetWritable<CreateFlag> flag,
|
||||
boolean createParent, short replication, long blockSize,
|
||||
CryptoProtocolVersion[] supportedVersions, String ecPolicyName)
|
||||
throws IOException {
|
||||
CryptoProtocolVersion[] supportedVersions) throws IOException {
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
|
||||
RemoteLocation createLocation = getCreateLocation(src);
|
||||
RemoteMethod method = new RemoteMethod("create",
|
||||
new Class<?>[] {String.class, FsPermission.class, String.class,
|
||||
EnumSetWritable.class, boolean.class, short.class,
|
||||
long.class, CryptoProtocolVersion[].class,
|
||||
String.class},
|
||||
long.class, CryptoProtocolVersion[].class},
|
||||
createLocation.getDest(), masked, clientName, flag, createParent,
|
||||
replication, blockSize, supportedVersions, ecPolicyName);
|
||||
replication, blockSize, supportedVersions);
|
||||
return (HdfsFileStatus) rpcClient.invokeSingle(createLocation, method);
|
||||
}
|
||||
|
||||
|
@ -1216,27 +1208,12 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|||
}
|
||||
|
||||
@Override // ClientProtocol
|
||||
public boolean saveNamespace(long timeWindow, long txGap) throws IOException {
|
||||
public void saveNamespace() throws IOException {
|
||||
checkOperation(OperationCategory.UNCHECKED);
|
||||
|
||||
RemoteMethod method = new RemoteMethod("saveNamespace",
|
||||
new Class<?>[] {Long.class, Long.class}, timeWindow, txGap);
|
||||
RemoteMethod method = new RemoteMethod("saveNamespace", new Class<?>[] {});
|
||||
final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
|
||||
Map<FederationNamespaceInfo, Object> ret =
|
||||
rpcClient.invokeConcurrent(nss, method, true, false);
|
||||
|
||||
boolean success = true;
|
||||
Object obj = ret;
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<FederationNamespaceInfo, Boolean> results =
|
||||
(Map<FederationNamespaceInfo, Boolean>)obj;
|
||||
Collection<Boolean> sucesses = results.values();
|
||||
for (boolean s : sucesses) {
|
||||
if (!s) {
|
||||
success = false;
|
||||
}
|
||||
}
|
||||
return success;
|
||||
rpcClient.invokeConcurrent(nss, method, true, false);
|
||||
}
|
||||
|
||||
@Override // ClientProtocol
|
||||
|
@ -1658,19 +1635,6 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override // ClientProtocol
|
||||
public void reencryptEncryptionZone(String zone, ReencryptAction action)
|
||||
throws IOException {
|
||||
checkOperation(OperationCategory.WRITE, false);
|
||||
}
|
||||
|
||||
@Override // ClientProtocol
|
||||
public BatchedEntries<ZoneReencryptionStatus> listReencryptionStatus(
|
||||
long prevId) throws IOException {
|
||||
checkOperation(OperationCategory.READ, false);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override // ClientProtocol
|
||||
public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
|
||||
throws IOException {
|
||||
|
@ -1784,30 +1748,6 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|||
checkOperation(OperationCategory.WRITE, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ErasureCodingPolicy[] getErasureCodingPolicies() throws IOException {
|
||||
checkOperation(OperationCategory.READ, false);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override // ClientProtocol
|
||||
public ErasureCodingPolicy getErasureCodingPolicy(String src)
|
||||
throws IOException {
|
||||
checkOperation(OperationCategory.READ, false);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override // ClientProtocol
|
||||
public void setErasureCodingPolicy(String src, String ecPolicyName)
|
||||
throws IOException {
|
||||
checkOperation(OperationCategory.WRITE, false);
|
||||
}
|
||||
|
||||
@Override // ClientProtocol
|
||||
public void unsetErasureCodingPolicy(String src) throws IOException {
|
||||
checkOperation(OperationCategory.WRITE, false);
|
||||
}
|
||||
|
||||
@Override // ClientProtocol
|
||||
public void setQuota(String path, long namespaceQuota, long storagespaceQuota,
|
||||
StorageType type) throws IOException {
|
||||
|
@ -1869,46 +1809,6 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AddErasureCodingPolicyResponse[] addErasureCodingPolicies(
|
||||
ErasureCodingPolicy[] policies) throws IOException {
|
||||
checkOperation(OperationCategory.WRITE, false);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeErasureCodingPolicy(String arg0) throws IOException {
|
||||
checkOperation(OperationCategory.WRITE, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void disableErasureCodingPolicy(String arg0) throws IOException {
|
||||
checkOperation(OperationCategory.WRITE, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void enableErasureCodingPolicy(String arg0) throws IOException {
|
||||
checkOperation(OperationCategory.WRITE, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ECBlockGroupStats getECBlockGroupStats() throws IOException {
|
||||
checkOperation(OperationCategory.READ, false);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getErasureCodingCodecs() throws IOException {
|
||||
checkOperation(OperationCategory.READ, false);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplicatedBlockStats getReplicatedBlockStats() throws IOException {
|
||||
checkOperation(OperationCategory.READ, false);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BatchedEntries<OpenFileEntry> listOpenFiles(long arg0)
|
||||
throws IOException {
|
||||
|
@ -2017,9 +1917,8 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|||
}
|
||||
long inodeId = 0;
|
||||
return new HdfsFileStatus(0, true, 0, 0, modTime, accessTime, permission,
|
||||
EnumSet.noneOf(HdfsFileStatus.Flags.class),
|
||||
owner, group, new byte[0], DFSUtil.string2Bytes(name), inodeId,
|
||||
childrenNum, null, (byte) 0, null);
|
||||
childrenNum, null, (byte) 0);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -264,7 +264,10 @@ public class MockResolver
|
|||
@Override
|
||||
public PathLocation getDestinationForPath(String path) throws IOException {
|
||||
List<RemoteLocation> remoteLocations = new LinkedList<>();
|
||||
for (String key : this.locations.keySet()) {
|
||||
// We go from the leaves to the root
|
||||
List<String> keys = new ArrayList<>(this.locations.keySet());
|
||||
Collections.sort(keys, Collections.reverseOrder());
|
||||
for (String key : keys) {
|
||||
if (path.startsWith(key)) {
|
||||
for (RemoteLocation location : this.locations.get(key)) {
|
||||
String finalPath = location.getDest() + path.substring(key.length());
|
||||
|
|
|
@ -800,7 +800,7 @@ public class TestRouterRpc {
|
|||
HdfsFileStatus status = routerProtocol.create(
|
||||
newRouterFile, new FsPermission("777"), clientName,
|
||||
new EnumSetWritable<CreateFlag>(createFlag), true, (short) 1,
|
||||
(long) 1024, CryptoProtocolVersion.supported(), null);
|
||||
(long) 1024, CryptoProtocolVersion.supported());
|
||||
|
||||
// Add a block via router (requires client to have same lease)
|
||||
LocatedBlock block = routerProtocol.addBlock(
|
||||
|
|
Loading…
Reference in New Issue