HDFS-12934. RBF: Federation supports global quota. Contributed by Yiqun Lin.

This commit is contained in:
Yiqun Lin 2018-01-10 13:59:11 +08:00
parent d9006d8a4e
commit d98a2e6e23
18 changed files with 1639 additions and 16 deletions

View File

@ -1322,6 +1322,15 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_ROUTER_HTTPS_ADDRESS_DEFAULT =
"0.0.0.0:" + DFS_ROUTER_HTTPS_PORT_DEFAULT;
// HDFS Router-based federation quota
public static final String DFS_ROUTER_QUOTA_ENABLE =
FEDERATION_ROUTER_PREFIX + "quota.enable";
public static final boolean DFS_ROUTER_QUOTA_ENABLED_DEFAULT = false;
public static final String DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL =
FEDERATION_ROUTER_PREFIX + "quota-cache.update.interval";
public static final long DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL_DEFAULT =
60000;
// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
@Deprecated
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY

View File

@ -0,0 +1,208 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Module that implements the quota relevant RPC calls
* {@link ClientProtocol#setQuota(String, long, long, StorageType)}
* and
* {@link ClientProtocol#getQuotaUsage(String)}
* in the {@link RouterRpcServer}.
*/
public class Quota {
private static final Logger LOG = LoggerFactory.getLogger(Quota.class);
/** RPC server to receive client calls. */
private final RouterRpcServer rpcServer;
/** RPC clients to connect to the Namenodes. */
private final RouterRpcClient rpcClient;
/** Router used in RouterRpcServer. */
private final Router router;
public Quota(Router router, RouterRpcServer server) {
this.router = router;
this.rpcServer = server;
this.rpcClient = server.getRPCClient();
}
/**
* Set quota for the federation path.
* @param path Federation path.
* @param namespaceQuota Name space quota.
* @param storagespaceQuota Storage space quota.
* @param type StorageType that the space quota is intended to be set on.
* @throws IOException
*/
public void setQuota(String path, long namespaceQuota,
long storagespaceQuota, StorageType type) throws IOException {
rpcServer.checkOperation(OperationCategory.WRITE);
// Set quota for current path and its children mount table path.
final List<RemoteLocation> locations = getQuotaRemoteLocations(path);
if (LOG.isDebugEnabled()) {
for (RemoteLocation loc : locations) {
LOG.debug("Set quota for path: nsId: {}, dest: {}.",
loc.getNameserviceId(), loc.getDest());
}
}
RemoteMethod method = new RemoteMethod("setQuota",
new Class<?>[] {String.class, long.class, long.class,
StorageType.class},
new RemoteParam(), namespaceQuota, storagespaceQuota, type);
rpcClient.invokeConcurrent(locations, method, false, false);
}
/**
* Get quota usage for the federation path.
* @param path Federation path.
* @return Aggregated quota.
* @throws IOException
*/
public QuotaUsage getQuotaUsage(String path) throws IOException {
final List<RemoteLocation> quotaLocs = getValidQuotaLocations(path);
RemoteMethod method = new RemoteMethod("getQuotaUsage",
new Class<?>[] {String.class}, new RemoteParam());
Map<RemoteLocation, Object> results = rpcClient.invokeConcurrent(quotaLocs,
method, true, false);
return aggregateQuota(results);
}
/**
* Get valid quota remote locations used in {@link #getQuotaUsage(String)}.
* Differentiate the method {@link #getQuotaRemoteLocations(String)}, this
* method will do some additional filtering.
* @param path Federation path.
* @return List of valid quota remote locations.
* @throws IOException
*/
private List<RemoteLocation> getValidQuotaLocations(String path)
throws IOException {
final List<RemoteLocation> locations = getQuotaRemoteLocations(path);
// NameService -> Locations
Map<String, List<RemoteLocation>> validLocations = new HashMap<>();
for (RemoteLocation loc : locations) {
String nsId = loc.getNameserviceId();
List<RemoteLocation> dests = validLocations.get(nsId);
if (dests == null) {
dests = new LinkedList<>();
dests.add(loc);
validLocations.put(nsId, dests);
} else {
// Ensure the paths in the same nameservice is different.
// Don't include parent-child paths.
boolean isChildPath = false;
for (RemoteLocation d : dests) {
if (loc.getDest().startsWith(d.getDest())) {
isChildPath = true;
break;
}
}
if (!isChildPath) {
dests.add(loc);
}
}
}
List<RemoteLocation> quotaLocs = new LinkedList<>();
for (List<RemoteLocation> locs : validLocations.values()) {
quotaLocs.addAll(locs);
}
return quotaLocs;
}
/**
* Aggregate quota that queried from sub-clusters.
* @param results Quota query result.
* @return Aggregated Quota.
*/
private QuotaUsage aggregateQuota(Map<RemoteLocation, Object> results) {
long nsCount = 0;
long ssCount = 0;
boolean hasQuotaUnSet = false;
for (Map.Entry<RemoteLocation, Object> entry : results.entrySet()) {
RemoteLocation loc = entry.getKey();
QuotaUsage usage = (QuotaUsage) entry.getValue();
if (usage != null) {
// If quota is not set in real FileSystem, the usage
// value will return -1.
if (usage.getQuota() == -1 && usage.getSpaceQuota() == -1) {
hasQuotaUnSet = true;
}
nsCount += usage.getFileAndDirectoryCount();
ssCount += usage.getSpaceConsumed();
LOG.debug(
"Get quota usage for path: nsId: {}, dest: {},"
+ " nsCount: {}, ssCount: {}.",
loc.getNameserviceId(), loc.getDest(),
usage.getFileAndDirectoryCount(), usage.getSpaceConsumed());
}
}
QuotaUsage.Builder builder = new QuotaUsage.Builder()
.fileAndDirectoryCount(nsCount).spaceConsumed(ssCount);
if (hasQuotaUnSet) {
builder.quota(HdfsConstants.QUOTA_DONT_SET);
}
return builder.build();
}
/**
* Get all quota remote locations across subclusters under given
* federation path.
* @param path Federation path.
* @return List of quota remote locations.
* @throws IOException
*/
private List<RemoteLocation> getQuotaRemoteLocations(String path)
throws IOException {
List<RemoteLocation> locations = new LinkedList<>();
RouterQuotaManager manager = this.router.getQuotaManager();
if (manager != null) {
Set<String> childrenPaths = manager.getPaths(path);
for (String childPath : childrenPaths) {
locations.addAll(rpcServer.getLocationsForPath(childPath, true));
}
}
return locations;
}
}

View File

@ -45,6 +45,8 @@ import org.apache.hadoop.util.JvmPauseMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* Router that provides a unified view of multiple federated HDFS clusters. It
* has two main roles: (1) federated interface and (2) NameNode heartbeat.
@ -105,7 +107,10 @@ public class Router extends CompositeService {
/** JVM pauses (GC and others). */
private JvmPauseMonitor pauseMonitor;
/** Quota usage update service. */
private RouterQuotaUpdateService quotaUpdateService;
/** Quota cache manager. */
private RouterQuotaManager quotaManager;
/////////////////////////////////////////////////////////
// Constructor
@ -200,6 +205,14 @@ public class Router extends CompositeService {
this.pauseMonitor.init(conf);
}
// Initial quota relevant service
if (conf.getBoolean(DFSConfigKeys.DFS_ROUTER_QUOTA_ENABLE,
DFSConfigKeys.DFS_ROUTER_QUOTA_ENABLED_DEFAULT)) {
this.quotaManager = new RouterQuotaManager();
this.quotaUpdateService = new RouterQuotaUpdateService(this);
addService(this.quotaUpdateService);
}
super.serviceInit(conf);
}
@ -524,4 +537,27 @@ public class Router extends CompositeService {
this.namenodeResolver.setRouterId(this.routerId);
}
}
/**
* If the quota system is enabled in Router.
*/
public boolean isQuotaEnabled() {
return this.quotaManager != null;
}
/**
* Get route quota manager.
* @return RouterQuotaManager Quota manager.
*/
public RouterQuotaManager getQuotaManager() {
return this.quotaManager;
}
/**
* Get quota cache update service.
*/
@VisibleForTesting
RouterQuotaUpdateService getQuotaCacheUpdateService() {
return this.quotaUpdateService;
}
}

View File

@ -0,0 +1,160 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
/**
* Router quota manager in Router. The manager maintains
* {@link RouterQuotaUsage} cache of mount tables and do management
* for the quota caches.
*/
public class RouterQuotaManager {
/** Quota usage <MountTable Path, Aggregated QuotaUsage> cache. */
private TreeMap<String, RouterQuotaUsage> cache;
/** Lock to access the quota cache. */
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock readLock = readWriteLock.readLock();
private final Lock writeLock = readWriteLock.writeLock();
public RouterQuotaManager() {
this.cache = new TreeMap<>();
}
/**
* Get all the mount quota paths.
*/
public Set<String> getAll() {
readLock.lock();
try {
return this.cache.keySet();
} finally {
readLock.unlock();
}
}
/**
* Get the nearest ancestor's quota usage, and meanwhile its quota was set.
* @param path The path being written.
* @return RouterQuotaUsage Quota usage.
*/
public RouterQuotaUsage getQuotaUsage(String path) {
readLock.lock();
try {
RouterQuotaUsage quotaUsage = this.cache.get(path);
if (quotaUsage != null && isQuotaSet(quotaUsage)) {
return quotaUsage;
}
// If not found, look for its parent path usage value.
int pos = path.lastIndexOf(Path.SEPARATOR);
if (pos != -1) {
String parentPath = path.substring(0, pos);
return getQuotaUsage(parentPath);
}
} finally {
readLock.unlock();
}
return null;
}
/**
* Get children paths (can including itself) under specified federation path.
* @param parentPath
* @return Set<String> Children path set.
*/
public Set<String> getPaths(String parentPath) {
readLock.lock();
try {
String from = parentPath;
String to = parentPath + Character.MAX_VALUE;
SortedMap<String, RouterQuotaUsage> subMap = this.cache.subMap(from, to);
return subMap.keySet();
} finally {
readLock.unlock();
}
}
/**
* Put new entity into cache.
* @param path Mount table path.
* @param quotaUsage Corresponding cache value.
*/
public void put(String path, RouterQuotaUsage quotaUsage) {
writeLock.lock();
try {
this.cache.put(path, quotaUsage);
} finally {
writeLock.unlock();
}
}
/**
* Remove the entity from cache.
* @param path Mount table path.
*/
public void remove(String path) {
writeLock.lock();
try {
this.cache.remove(path);
} finally {
writeLock.unlock();
}
}
/**
* Clean up the cache.
*/
public void clear() {
writeLock.lock();
try {
this.cache.clear();
} finally {
writeLock.unlock();
}
}
/**
* Check if the quota was set.
* @param quota RouterQuotaUsage set in mount table.
*/
public boolean isQuotaSet(RouterQuotaUsage quota) {
if (quota != null) {
long nsQuota = quota.getQuota();
long ssQuota = quota.getSpaceQuota();
// once nsQuota or ssQuota was set, this mount table is quota set
if (nsQuota != HdfsConstants.QUOTA_DONT_SET
|| ssQuota != HdfsConstants.QUOTA_DONT_SET) {
return true;
}
}
return false;
}
}

View File

@ -0,0 +1,228 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import java.io.IOException;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Service to periodically update the {@link RouterQuotaUsage}
* cached information in the {@link Router} and update corresponding
* mount table in State Store.
*/
public class RouterQuotaUpdateService extends PeriodicService {
private static final Logger LOG =
LoggerFactory.getLogger(RouterQuotaUpdateService.class);
private MountTableStore mountTableStore;
private RouterRpcServer rpcServer;
/** Router using this Service. */
private final Router router;
/** Router Quota manager. */
private RouterQuotaManager quotaManager;
public RouterQuotaUpdateService(final Router router) throws IOException {
super(RouterQuotaUpdateService.class.getName());
this.router = router;
this.rpcServer = router.getRpcServer();
this.quotaManager = router.getQuotaManager();
if (this.quotaManager == null) {
throw new IOException("Router quota manager is not initialized.");
}
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.setIntervalMs(conf.getTimeDuration(
DFSConfigKeys.DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL,
DFSConfigKeys.DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS));
super.serviceInit(conf);
}
@Override
protected void periodicInvoke() {
LOG.debug("Start to update quota cache.");
try {
List<MountTable> updateMountTables = new LinkedList<>();
List<MountTable> mountTables = getQuotaSetMountTables();
for (MountTable entry : mountTables) {
String src = entry.getSourcePath();
RouterQuotaUsage oldQuota = entry.getQuota();
long nsQuota = oldQuota.getQuota();
long ssQuota = oldQuota.getSpaceQuota();
// Call RouterRpcServer#getQuotaUsage for getting current quota usage.
QuotaUsage currentQuotaUsage = this.rpcServer.getQuotaModule()
.getQuotaUsage(src);
// If quota is not set in some subclusters under federation path,
// set quota for this path.
if (currentQuotaUsage.getQuota() == HdfsConstants.QUOTA_DONT_SET) {
this.rpcServer.setQuota(src, nsQuota, ssQuota, null);
}
RouterQuotaUsage newQuota = generateNewQuota(oldQuota,
currentQuotaUsage);
this.quotaManager.put(src, newQuota);
entry.setQuota(newQuota);
// only update mount tables which quota was changed
if (!oldQuota.equals(newQuota)) {
updateMountTables.add(entry);
LOG.debug(
"Update quota usage entity of path: {}, nsCount: {},"
+ " nsQuota: {}, ssCount: {}, ssQuota: {}.",
src, newQuota.getFileAndDirectoryCount(),
newQuota.getQuota(), newQuota.getSpaceConsumed(),
newQuota.getSpaceQuota());
}
}
updateMountTableEntries(updateMountTables);
} catch (IOException e) {
LOG.error("Quota cache updated error.", e);
}
}
/**
* Get mount table store management interface.
* @return MountTableStore instance.
* @throws IOException
*/
private MountTableStore getMountTableStore() throws IOException {
if (this.mountTableStore == null) {
this.mountTableStore = router.getStateStore().getRegisteredRecordStore(
MountTableStore.class);
if (this.mountTableStore == null) {
throw new IOException("Mount table state store is not available.");
}
}
return this.mountTableStore;
}
/**
* Get all the existing mount tables.
* @return List of mount tables.
* @throws IOException
*/
private List<MountTable> getMountTableEntries() throws IOException {
// scan mount tables from root path
GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest
.newInstance("/");
GetMountTableEntriesResponse getResponse = getMountTableStore()
.getMountTableEntries(getRequest);
return getResponse.getEntries();
}
/**
* Get mount tables which quota was set.
* During this time, the quota usage cache will also be updated by
* quota manager:
* 1. Stale paths (entries) will be removed.
* 2. Existing entries will be override and updated.
* @return List of mount tables which quota was set.
* @throws IOException
*/
private List<MountTable> getQuotaSetMountTables() throws IOException {
List<MountTable> mountTables = getMountTableEntries();
Set<String> stalePaths = new HashSet<>();
for (String path : this.quotaManager.getAll()) {
stalePaths.add(path);
}
List<MountTable> neededMountTables = new LinkedList<>();
for (MountTable entry : mountTables) {
// select mount tables which is quota set
if (isQuotaSet(entry)) {
neededMountTables.add(entry);
}
// update mount table entries info in quota cache
String src = entry.getSourcePath();
this.quotaManager.put(src, entry.getQuota());
stalePaths.remove(src);
}
// remove stale paths that currently cached
for (String stalePath : stalePaths) {
this.quotaManager.remove(stalePath);
}
return neededMountTables;
}
/**
* Check if the quota was set in given MountTable.
* @param mountTable Mount table entry.
*/
private boolean isQuotaSet(MountTable mountTable) {
if (mountTable != null) {
return this.quotaManager.isQuotaSet(mountTable.getQuota());
}
return false;
}
/**
* Generate a new quota based on old quota and current quota usage value.
* @param oldQuota Old quota stored in State Store.
* @param currentQuotaUsage Current quota usage value queried from
* subcluster.
* @return A new RouterQuotaUsage.
*/
private RouterQuotaUsage generateNewQuota(RouterQuotaUsage oldQuota,
QuotaUsage currentQuotaUsage) {
RouterQuotaUsage newQuota = new RouterQuotaUsage.Builder()
.fileAndDirectoryCount(currentQuotaUsage.getFileAndDirectoryCount())
.quota(oldQuota.getQuota())
.spaceConsumed(currentQuotaUsage.getSpaceConsumed())
.spaceQuota(oldQuota.getSpaceQuota()).build();
return newQuota;
}
/**
* Write out updated mount table entries into State Store.
* @param updateMountTables Mount tables to be updated.
* @throws IOException
*/
private void updateMountTableEntries(List<MountTable> updateMountTables)
throws IOException {
for (MountTable entry : updateMountTables) {
UpdateMountTableEntryRequest updateRequest = UpdateMountTableEntryRequest
.newInstance(entry);
getMountTableStore().updateMountTableEntry(updateRequest);
}
}
}

View File

@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.server.namenode.DirectoryWithQuotaFeature;
import org.apache.hadoop.hdfs.server.namenode.Quota;
/**
* The subclass of {@link QuotaUsage} used in Router-based federation.
*/
public final class RouterQuotaUsage extends QuotaUsage {
private RouterQuotaUsage(Builder builder) {
super(builder);
}
/** Build the instance based on the builder. */
public static class Builder extends QuotaUsage.Builder {
public RouterQuotaUsage build() {
return new RouterQuotaUsage(this);
}
@Override
public Builder fileAndDirectoryCount(long count) {
super.fileAndDirectoryCount(count);
return this;
}
@Override
public Builder quota(long quota) {
super.quota(quota);
return this;
}
@Override
public Builder spaceConsumed(long spaceConsumed) {
super.spaceConsumed(spaceConsumed);
return this;
}
@Override
public Builder spaceQuota(long spaceQuota) {
super.spaceQuota(spaceQuota);
return this;
}
}
/**
* Verify if namespace quota is violated once quota is set. Relevant
* method {@link DirectoryWithQuotaFeature#verifyNamespaceQuota}.
* @throws NSQuotaExceededException
*/
public void verifyNamespaceQuota() throws NSQuotaExceededException {
if (Quota.isViolated(getQuota(), getFileAndDirectoryCount())) {
throw new NSQuotaExceededException(getQuota(),
getFileAndDirectoryCount());
}
}
/**
* Verify if storage space quota is violated once quota is set. Relevant
* method {@link DirectoryWithQuotaFeature#verifyStoragespaceQuota}.
* @throws DSQuotaExceededException
*/
public void verifyStoragespaceQuota() throws DSQuotaExceededException {
if (Quota.isViolated(getSpaceQuota(), getSpaceConsumed())) {
throw new DSQuotaExceededException(getSpaceQuota(), getSpaceConsumed());
}
}
}

View File

@ -181,6 +181,8 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
/** Category of the operation that a thread is executing. */
private final ThreadLocal<OperationCategory> opCategory = new ThreadLocal<>();
/** Router Quota calls. */
private final Quota quotaCall;
/**
* Construct a router RPC server.
@ -277,6 +279,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
// Create the client
this.rpcClient = new RouterRpcClient(this.conf, this.router.getRouterId(),
this.namenodeResolver, this.rpcMonitor);
// Initialize modules
this.quotaCall = new Quota(this.router, this);
}
@Override
@ -384,7 +389,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
* @throws StandbyException If the Router is in safe mode and cannot serve
* client requests.
*/
private void checkOperation(OperationCategory op) throws StandbyException {
protected void checkOperation(OperationCategory op) throws StandbyException {
// Log the function we are currently calling.
if (rpcMonitor != null) {
rpcMonitor.startOp();
@ -1839,21 +1844,13 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
@Override // ClientProtocol
public void setQuota(String path, long namespaceQuota, long storagespaceQuota,
StorageType type) throws IOException {
checkOperation(OperationCategory.WRITE);
// TODO assign global replicas instead of applying them to each folder
final List<RemoteLocation> locations = getLocationsForPath(path, true);
RemoteMethod method = new RemoteMethod("setQuota",
new Class<?>[] {String.class, Long.class, Long.class,
StorageType.class},
new RemoteParam(), namespaceQuota, storagespaceQuota, type);
rpcClient.invokeConcurrent(locations, method, false, false);
this.quotaCall.setQuota(path, namespaceQuota, storagespaceQuota, type);
}
@Override // ClientProtocol
public QuotaUsage getQuotaUsage(String path) throws IOException {
checkOperation(OperationCategory.READ, false);
return null;
checkOperation(OperationCategory.READ);
return this.quotaCall.getQuotaUsage(path);
}
@Override
@ -1996,7 +1993,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
* @return Prioritized list of locations in the federated cluster.
* @throws IOException If the location for this path cannot be determined.
*/
private List<RemoteLocation> getLocationsForPath(
protected List<RemoteLocation> getLocationsForPath(
String path, boolean failIfLocked) throws IOException {
try {
// Check the location for this path
@ -2016,6 +2013,16 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
}
throw new IOException(path + " is in a read only mount point");
}
// Check quota
if (this.router.isQuotaEnabled()) {
RouterQuotaUsage quotaUsage = this.router.getQuotaManager()
.getQuotaUsage(path);
if (quotaUsage != null) {
quotaUsage.verifyNamespaceQuota();
quotaUsage.verifyStoragespaceQuota();
}
}
}
return location.getDestinations();
@ -2119,4 +2126,11 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
UserGroupInformation ugi = Server.getRemoteUser();
return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser();
}
/**
* Get quota module implement.
*/
public Quota getQuotaModule() {
return this.quotaCall;
}
}

View File

@ -29,9 +29,11 @@ import java.util.TreeMap;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.router.RouterPermissionChecker;
import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.security.UserGroupInformation;
@ -140,6 +142,12 @@ public abstract class MountTable extends BaseRecord {
record.setMode(new FsPermission(
RouterPermissionChecker.MOUNT_TABLE_PERMISSION_DEFAULT));
// Set quota for mount table
RouterQuotaUsage quota = new RouterQuotaUsage.Builder()
.fileAndDirectoryCount(0).quota(HdfsConstants.QUOTA_DONT_SET)
.spaceConsumed(0).spaceQuota(HdfsConstants.QUOTA_DONT_SET).build();
record.setQuota(quota);
// Validate
record.validate();
return record;
@ -248,6 +256,20 @@ public abstract class MountTable extends BaseRecord {
*/
public abstract void setMode(FsPermission mode);
/**
* Get quota of this mount table entry.
*
* @return RouterQuotaUsage quota usage
*/
public abstract RouterQuotaUsage getQuota();
/**
* Set quota for this mount table entry.
*
* @param quota QuotaUsage for mount table entry
*/
public abstract void setQuota(RouterQuotaUsage quota);
/**
* Get the default location.
* @return The default location.

View File

@ -27,10 +27,12 @@ import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProt
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProto.DestOrder;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProtoOrBuilder;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoteLocationProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.QuotaUsageProto;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.router.RouterAdminServer;
import org.apache.hadoop.hdfs.server.federation.router.RouterPermissionChecker;
import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.FederationProtocolPBTranslator;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
@ -250,6 +252,36 @@ public class MountTablePBImpl extends MountTable implements PBRecord {
}
}
@Override
public RouterQuotaUsage getQuota() {
MountTableRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder();
if (!proto.hasQuota()) {
return null;
}
QuotaUsageProto quotaProto = proto.getQuota();
RouterQuotaUsage.Builder builder = new RouterQuotaUsage.Builder()
.fileAndDirectoryCount(quotaProto.getFileAndDirectoryCount())
.quota(quotaProto.getQuota())
.spaceConsumed(quotaProto.getSpaceConsumed())
.spaceQuota(quotaProto.getSpaceQuota());
return builder.build();
}
@Override
public void setQuota(RouterQuotaUsage quota) {
Builder builder = this.translator.getBuilder();
if (quota == null) {
builder.clearQuota();
} else {
QuotaUsageProto quotaUsage = QuotaUsageProto.newBuilder()
.setFileAndDirectoryCount(quota.getFileAndDirectoryCount())
.setQuota(quota.getQuota()).setSpaceConsumed(quota.getSpaceConsumed())
.setSpaceQuota(quota.getSpaceQuota()).build();
builder.setQuota(quotaUsage);
}
}
private DestinationOrder convert(DestOrder order) {
switch (order) {
case LOCAL:

View File

@ -49,7 +49,7 @@ public enum Quota {
* Is quota violated?
* The quota is violated if quota is set and usage > quota.
*/
static boolean isViolated(final long quota, final long usage) {
public static boolean isViolated(final long quota, final long usage) {
return quota >= 0 && usage > quota;
}

View File

@ -29,10 +29,12 @@ import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
@ -80,7 +82,9 @@ public class RouterAdmin extends Configured implements Tool {
+ "\t[-add <source> <nameservice> <destination> "
+ "[-readonly] -owner <owner> -group <group> -mode <mode>]\n"
+ "\t[-rm <source>]\n"
+ "\t[-ls <path>]\n";
+ "\t[-ls <path>]\n"
+ "\t[-setQuota <path> -ns <nsQuota> -ss <ssQuota>]\n"
+ "\t[-clrQuota <path>\n";
System.out.println(usage);
}
@ -109,6 +113,18 @@ public class RouterAdmin extends Configured implements Tool {
printUsage();
return exitCode;
}
} else if ("-setQuota".equalsIgnoreCase(cmd)) {
if (argv.length < 4) {
System.err.println("Not enough parameters specificed for cmd " + cmd);
printUsage();
return exitCode;
}
} else if ("-clrQuota".equalsIgnoreCase(cmd)) {
if (argv.length < 2) {
System.err.println("Not enough parameters specificed for cmd " + cmd);
printUsage();
return exitCode;
}
}
// Initialize RouterClient
@ -144,6 +160,16 @@ public class RouterAdmin extends Configured implements Tool {
} else {
listMounts("/");
}
} else if ("-setQuota".equals(cmd)) {
if (setQuota(argv, i)) {
System.out.println(
"Successfully set quota for mount point " + argv[i]);
}
} else if ("-clrQuota".equals(cmd)) {
if (clrQuota(argv[i])) {
System.out.println(
"Successfully clear quota for mount point " + argv[i]);
}
} else {
printUsage();
return exitCode;
@ -388,6 +414,111 @@ public class RouterAdmin extends Configured implements Tool {
}
}
/**
* Set quota for a mount table entry.
*
* @param parameters Parameters of the quota.
* @param i Index in the parameters.
*/
private boolean setQuota(String[] parameters, int i) throws IOException {
long nsQuota = HdfsConstants.QUOTA_DONT_SET;
long ssQuota = HdfsConstants.QUOTA_DONT_SET;
String mount = parameters[i++];
while (i < parameters.length) {
if (parameters[i].equals("-nsQuota")) {
i++;
try {
nsQuota = Long.parseLong(parameters[i]);
} catch (Exception e) {
System.err.println("Cannot parse nsQuota: " + parameters[i]);
}
} else if (parameters[i].equals("-ssQuota")) {
i++;
try {
ssQuota = Long.parseLong(parameters[i]);
} catch (Exception e) {
System.err.println("Cannot parse ssQuota: " + parameters[i]);
}
}
i++;
}
if (nsQuota <= 0 || ssQuota <= 0) {
System.err.println("Input quota value should be a positive number.");
return false;
}
return updateQuota(mount, nsQuota, ssQuota);
}
/**
* Clear quota of the mount point.
*
* @param mount Mount table to clear
* @return If the quota was cleared.
* @throws IOException Error clearing the mount point.
*/
private boolean clrQuota(String mount) throws IOException {
return updateQuota(mount, HdfsConstants.QUOTA_DONT_SET,
HdfsConstants.QUOTA_DONT_SET);
}
/**
* Update quota of specified mount table.
*
* @param mount Specified mount table to update.
* @param nsQuota Namespace quota.
* @param ssQuota Storage space quota.
* @return If the quota was updated.
* @throws IOException Error updating quota.
*/
private boolean updateQuota(String mount, long nsQuota, long ssQuota)
throws IOException {
// Get existing entry
MountTableManager mountTable = client.getMountTableManager();
GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest
.newInstance(mount);
GetMountTableEntriesResponse getResponse = mountTable
.getMountTableEntries(getRequest);
List<MountTable> results = getResponse.getEntries();
MountTable existingEntry = null;
for (MountTable result : results) {
if (mount.equals(result.getSourcePath())) {
existingEntry = result;
break;
}
}
if (existingEntry == null) {
return false;
} else {
long nsCount = existingEntry.getQuota().getFileAndDirectoryCount();
long ssCount = existingEntry.getQuota().getSpaceConsumed();
// If nsQuota or ssQuota was unset, reset corresponding usage
// value to zero.
if (nsQuota == HdfsConstants.QUOTA_DONT_SET) {
nsCount = 0;
}
if (nsQuota == HdfsConstants.QUOTA_DONT_SET) {
ssCount = 0;
}
RouterQuotaUsage updatedQuota = new RouterQuotaUsage.Builder()
.fileAndDirectoryCount(nsCount).quota(nsQuota)
.spaceConsumed(ssCount).spaceQuota(ssQuota).build();
existingEntry.setQuota(updatedQuota);
}
UpdateMountTableEntryRequest updateRequest =
UpdateMountTableEntryRequest.newInstance(existingEntry);
UpdateMountTableEntryResponse updateResponse = mountTable
.updateMountTableEntry(updateRequest);
return updateResponse.getStatus();
}
/**
* Inner class that stores ACL info of mount table.
*/

View File

@ -22,6 +22,7 @@ option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.hdfs;
import "hdfs.proto";
/////////////////////////////////////////////////
// Membership
@ -134,6 +135,8 @@ message MountTableRecordProto {
optional string ownerName = 10;
optional string groupName = 11;
optional int32 mode = 12;
optional QuotaUsageProto quota = 13;
}
message AddMountTableEntryRequestProto {

View File

@ -5126,4 +5126,24 @@
</description>
</property>
<property>
<name>dfs.federation.router.quota.enable</name>
<value>false</value>
<description>
Set to true to enable quota system in Router.
</description>
</property>
<property>
<name>dfs.federation.router.quota-cache.update.interval</name>
<value>60s</value>
<description>
Interval time for updating quota usage cache in Router.
This property is used only if the value of
dfs.federation.router.quota.enable is true.
This setting supports multiple time unit suffixes as described
in dfs.heartbeat.interval. If no suffix is specified then milliseconds
is assumed.
</description>
</property>
</configuration>

View File

@ -34,6 +34,7 @@ public class RouterConfigBuilder {
private boolean enableLocalHeartbeat = false;
private boolean enableStateStore = false;
private boolean enableMetrics = false;
private boolean enableQuota = false;
public RouterConfigBuilder(Configuration configuration) {
this.conf = configuration;
@ -89,6 +90,11 @@ public class RouterConfigBuilder {
return this;
}
public RouterConfigBuilder quota(boolean enable) {
this.enableQuota = enable;
return this;
}
public RouterConfigBuilder rpc() {
return this.rpc(true);
}
@ -113,6 +119,10 @@ public class RouterConfigBuilder {
return this.metrics(true);
}
public RouterConfigBuilder quota() {
return this.quota(true);
}
public Configuration build() {
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_STORE_ENABLE,
this.enableStateStore);
@ -127,6 +137,8 @@ public class RouterConfigBuilder {
this.enableLocalHeartbeat);
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_METRICS_ENABLE,
this.enableMetrics);
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_QUOTA_ENABLE,
this.enableQuota);
return conf;
}
}

View File

@ -28,6 +28,7 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
@ -294,4 +295,57 @@ public class TestRouterAdminCLI {
argv = new String[] {"-rm", mount};
assertEquals(rmCommandCode, ToolRunner.run(admin, argv));
}
@Test
public void testSetAndClearQuota() throws Exception {
String nsId = "ns0";
String src = "/test-QuotaMounttable";
String dest = "/QuotaMounttable";
String[] argv = new String[] {"-add", src, nsId, dest};
assertEquals(0, ToolRunner.run(admin, argv));
stateStore.loadCache(MountTableStoreImpl.class, true);
GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest
.newInstance(src);
GetMountTableEntriesResponse getResponse = client.getMountTableManager()
.getMountTableEntries(getRequest);
MountTable mountTable = getResponse.getEntries().get(0);
RouterQuotaUsage quotaUsage = mountTable.getQuota();
// verify the default quota set
assertEquals(0, quotaUsage.getFileAndDirectoryCount());
assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaUsage.getQuota());
assertEquals(0, quotaUsage.getSpaceConsumed());
assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaUsage.getSpaceQuota());
long nsQuota = 50;
long ssQuota = 100;
argv = new String[] {"-setQuota", src, "-nsQuota", String.valueOf(nsQuota),
"-ssQuota", String.valueOf(ssQuota)};
assertEquals(0, ToolRunner.run(admin, argv));
stateStore.loadCache(MountTableStoreImpl.class, true);
getResponse = client.getMountTableManager()
.getMountTableEntries(getRequest);
mountTable = getResponse.getEntries().get(0);
quotaUsage = mountTable.getQuota();
// verify if the quota is set
assertEquals(nsQuota, quotaUsage.getQuota());
assertEquals(ssQuota, quotaUsage.getSpaceQuota());
// test clrQuota command
argv = new String[] {"-clrQuota", src};
assertEquals(0, ToolRunner.run(admin, argv));
stateStore.loadCache(MountTableStoreImpl.class, true);
getResponse = client.getMountTableManager()
.getMountTableEntries(getRequest);
mountTable = getResponse.getEntries().get(0);
quotaUsage = mountTable.getQuota();
// verify if quota unset successfully
assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaUsage.getQuota());
assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaUsage.getSpaceQuota());
}
}

View File

@ -0,0 +1,452 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import java.io.IOException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext;
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.google.common.base.Supplier;
/**
* Tests quota behaviors in Router-based Federation.
*/
public class TestRouterQuota {
private static StateStoreDFSCluster cluster;
private static NamenodeContext nnContext1;
private static NamenodeContext nnContext2;
private static RouterContext routerContext;
private static MountTableResolver resolver;
private static final int BLOCK_SIZE = 512;
@Before
public void setUp() throws Exception {
// Build and start a federated cluster
cluster = new StateStoreDFSCluster(false, 2);
Configuration routerConf = new RouterConfigBuilder()
.stateStore()
.admin()
.quota()
.rpc()
.build();
routerConf.set(DFSConfigKeys.DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL, "2s");
// override some hdfs settings that used in testing space quota
Configuration hdfsConf = new Configuration(false);
hdfsConf.setInt(HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
hdfsConf.setInt(HdfsClientConfigKeys.DFS_REPLICATION_KEY, 1);
cluster.addRouterOverrides(routerConf);
cluster.addNamenodeOverrides(hdfsConf);
cluster.startCluster();
cluster.startRouters();
cluster.waitClusterUp();
nnContext1 = cluster.getNamenode(cluster.getNameservices().get(0), null);
nnContext2 = cluster.getNamenode(cluster.getNameservices().get(1), null);
routerContext = cluster.getRandomRouter();
Router router = routerContext.getRouter();
resolver = (MountTableResolver) router.getSubclusterResolver();
}
@After
public void tearDown() {
if (cluster != null) {
cluster.stopRouter(routerContext);
cluster.shutdown();
cluster = null;
}
}
@Test
public void testNamespaceQuotaExceed() throws Exception {
long nsQuota = 3;
final FileSystem nnFs1 = nnContext1.getFileSystem();
final FileSystem nnFs2 = nnContext2.getFileSystem();
// Add two mount tables:
// /nsquota --> ns0---testdir1
// /nsquota/subdir --> ns1---testdir2
nnFs1.mkdirs(new Path("/testdir1"));
nnFs2.mkdirs(new Path("/testdir2"));
MountTable mountTable1 = MountTable.newInstance("/nsquota",
Collections.singletonMap("ns0", "/testdir1"));
mountTable1.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota).build());
addMountTable(mountTable1);
MountTable mountTable2 = MountTable.newInstance("/nsquota/subdir",
Collections.singletonMap("ns1", "/testdir2"));
mountTable2.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota).build());
addMountTable(mountTable2);
final FileSystem routerFs = routerContext.getFileSystem();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
boolean isNsQuotaViolated = false;
try {
// create new directory to trigger NSQuotaExceededException
routerFs.mkdirs(new Path("/nsquota/" + UUID.randomUUID()));
routerFs.mkdirs(new Path("/nsquota/subdir/" + UUID.randomUUID()));
} catch (NSQuotaExceededException e) {
isNsQuotaViolated = true;
} catch (IOException ignored) {
}
return isNsQuotaViolated;
}
}, 5000, 60000);
// mkdir in real FileSystem should be okay
nnFs1.mkdirs(new Path("/testdir1/" + UUID.randomUUID()));
nnFs2.mkdirs(new Path("/testdir2/" + UUID.randomUUID()));
}
@Test
public void testStorageSpaceQuotaaExceed() throws Exception {
long ssQuota = 3071;
final FileSystem nnFs1 = nnContext1.getFileSystem();
final FileSystem nnFs2 = nnContext2.getFileSystem();
// Add two mount tables:
// /ssquota --> ns0---testdir3
// /ssquota/subdir --> ns1---testdir4
nnFs1.mkdirs(new Path("/testdir3"));
nnFs2.mkdirs(new Path("/testdir4"));
MountTable mountTable1 = MountTable.newInstance("/ssquota",
Collections.singletonMap("ns0", "/testdir3"));
mountTable1
.setQuota(new RouterQuotaUsage.Builder().spaceQuota(ssQuota).build());
addMountTable(mountTable1);
MountTable mountTable2 = MountTable.newInstance("/ssquota/subdir",
Collections.singletonMap("ns1", "/testdir4"));
mountTable2
.setQuota(new RouterQuotaUsage.Builder().spaceQuota(ssQuota).build());
addMountTable(mountTable2);
DFSClient routerClient = routerContext.getClient();
routerClient.create("/ssquota/file", true).close();
routerClient.create("/ssquota/subdir/file", true).close();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
boolean isDsQuotaViolated = false;
try {
// append data to trigger NSQuotaExceededException
appendData("/ssquota/file", routerClient, BLOCK_SIZE);
appendData("/ssquota/subdir/file", routerClient, BLOCK_SIZE);
} catch (DSQuotaExceededException e) {
isDsQuotaViolated = true;
} catch (IOException ignored) {
}
return isDsQuotaViolated;
}
}, 5000, 60000);
// append data to destination path in real FileSystem should be okay
appendData("/testdir3/file", nnContext1.getClient(), BLOCK_SIZE);
appendData("/testdir4/file", nnContext2.getClient(), BLOCK_SIZE);
}
/**
* Add a mount table entry to the mount table through the admin API.
* @param entry Mount table entry to add.
* @return If it was successfully added.
* @throws IOException Problems adding entries.
*/
private boolean addMountTable(final MountTable entry) throws IOException {
RouterClient client = routerContext.getAdminClient();
MountTableManager mountTableManager = client.getMountTableManager();
AddMountTableEntryRequest addRequest =
AddMountTableEntryRequest.newInstance(entry);
AddMountTableEntryResponse addResponse =
mountTableManager.addMountTableEntry(addRequest);
// Reload the Router cache
resolver.loadCache(true);
return addResponse.getStatus();
}
/**
* Append data in specified file.
* @param path Path of file.
* @param client DFS Client.
* @param dataLen The length of write data.
* @throws IOException
*/
private void appendData(String path, DFSClient client, int dataLen)
throws IOException {
EnumSet<CreateFlag> createFlag = EnumSet.of(CreateFlag.APPEND);
HdfsDataOutputStream stream = client.append(path, 1024, createFlag, null,
null);
byte[] data = new byte[dataLen];
stream.write(data);
stream.close();
}
@Test
public void testSetQuota() throws Exception {
long nsQuota = 5;
long ssQuota = 100;
final FileSystem nnFs1 = nnContext1.getFileSystem();
final FileSystem nnFs2 = nnContext2.getFileSystem();
// Add two mount tables:
// /setquota --> ns0---testdir5
// /setquota/subdir --> ns1---testdir6
nnFs1.mkdirs(new Path("/testdir5"));
nnFs2.mkdirs(new Path("/testdir6"));
MountTable mountTable1 = MountTable.newInstance("/setquota",
Collections.singletonMap("ns0", "/testdir5"));
mountTable1
.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
.spaceQuota(ssQuota).build());
addMountTable(mountTable1);
// don't set quota for subpath of mount table
MountTable mountTable2 = MountTable.newInstance("/setquota/subdir",
Collections.singletonMap("ns1", "/testdir6"));
addMountTable(mountTable2);
RouterQuotaUpdateService updateService = routerContext.getRouter()
.getQuotaCacheUpdateService();
// ensure setQuota RPC call was invoked
updateService.periodicInvoke();
ClientProtocol client1 = nnContext1.getClient().getNamenode();
ClientProtocol client2 = nnContext2.getClient().getNamenode();
final QuotaUsage quota1 = client1.getQuotaUsage("/testdir5");
final QuotaUsage quota2 = client2.getQuotaUsage("/testdir6");
assertEquals(nsQuota, quota1.getQuota());
assertEquals(ssQuota, quota1.getSpaceQuota());
assertEquals(nsQuota, quota2.getQuota());
assertEquals(ssQuota, quota2.getSpaceQuota());
}
@Test
public void testGetQuota() throws Exception {
long nsQuota = 10;
long ssQuota = 100;
final FileSystem nnFs1 = nnContext1.getFileSystem();
final FileSystem nnFs2 = nnContext2.getFileSystem();
// Add two mount tables:
// /getquota --> ns0---/testdir7
// /getquota/subdir1 --> ns0---/testdir7/subdir
// /getquota/subdir2 --> ns1---/testdir8
nnFs1.mkdirs(new Path("/testdir7"));
nnFs1.mkdirs(new Path("/testdir7/subdir"));
nnFs2.mkdirs(new Path("/testdir8"));
MountTable mountTable1 = MountTable.newInstance("/getquota",
Collections.singletonMap("ns0", "/testdir7"));
mountTable1
.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
.spaceQuota(ssQuota).build());
addMountTable(mountTable1);
MountTable mountTable2 = MountTable.newInstance("/getquota/subdir1",
Collections.singletonMap("ns0", "/testdir7/subdir"));
addMountTable(mountTable2);
MountTable mountTable3 = MountTable.newInstance("/getquota/subdir2",
Collections.singletonMap("ns1", "/testdir8"));
addMountTable(mountTable3);
// use router client to create new files
DFSClient routerClient = routerContext.getClient();
routerClient.create("/getquota/file", true).close();
routerClient.create("/getquota/subdir1/file", true).close();
routerClient.create("/getquota/subdir2/file", true).close();
ClientProtocol clientProtocol = routerContext.getClient().getNamenode();
RouterQuotaUpdateService updateService = routerContext.getRouter()
.getQuotaCacheUpdateService();
updateService.periodicInvoke();
final QuotaUsage quota = clientProtocol.getQuotaUsage("/getquota");
// the quota should be aggregated
assertEquals(6, quota.getFileAndDirectoryCount());
}
@Test
public void testStaleQuotaRemoving() throws Exception {
long nsQuota = 20;
long ssQuota = 200;
String stalePath = "/stalequota";
final FileSystem nnFs1 = nnContext1.getFileSystem();
// Add one mount tables:
// /stalequota --> ns0---/testdir9
nnFs1.mkdirs(new Path("/testdir9"));
MountTable mountTable = MountTable.newInstance(stalePath,
Collections.singletonMap("ns0", "/testdir9"));
mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
.spaceQuota(ssQuota).build());
addMountTable(mountTable);
// Call periodicInvoke to ensure quota for stalePath was
// loaded into quota manager.
RouterQuotaUpdateService updateService = routerContext.getRouter()
.getQuotaCacheUpdateService();
updateService.periodicInvoke();
// use quota manager to get its quota usage and do verification
RouterQuotaManager quotaManager = routerContext.getRouter()
.getQuotaManager();
RouterQuotaUsage quota = quotaManager.getQuotaUsage(stalePath);
assertEquals(nsQuota, quota.getQuota());
assertEquals(ssQuota, quota.getSpaceQuota());
// remove stale path entry
removeMountTable(stalePath);
updateService.periodicInvoke();
// the stale entry should be removed and we will get null
quota = quotaManager.getQuotaUsage(stalePath);
assertNull(quota);
}
/**
* Remove a mount table entry to the mount table through the admin API.
* @param entry Mount table entry to remove.
* @return If it was successfully removed.
* @throws IOException Problems removing entries.
*/
private boolean removeMountTable(String path) throws IOException {
RouterClient client = routerContext.getAdminClient();
MountTableManager mountTableManager = client.getMountTableManager();
RemoveMountTableEntryRequest removeRequest = RemoveMountTableEntryRequest
.newInstance(path);
RemoveMountTableEntryResponse removeResponse = mountTableManager
.removeMountTableEntry(removeRequest);
// Reload the Router cache
resolver.loadCache(true);
return removeResponse.getStatus();
}
@Test
public void testQuotaUpdating() throws Exception {
long nsQuota = 30;
long ssQuota = 1024;
String path = "/updatequota";
final FileSystem nnFs1 = nnContext1.getFileSystem();
// Add one mount table:
// /updatequota --> ns0---/testdir10
nnFs1.mkdirs(new Path("/testdir10"));
MountTable mountTable = MountTable.newInstance(path,
Collections.singletonMap("ns0", "/testdir10"));
mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
.spaceQuota(ssQuota).build());
addMountTable(mountTable);
// Call periodicInvoke to ensure quota updated in quota manager
// and state store.
RouterQuotaUpdateService updateService = routerContext.getRouter()
.getQuotaCacheUpdateService();
updateService.periodicInvoke();
// verify initial quota value
List<MountTable> results = getMountTable(path);
MountTable updatedMountTable = !results.isEmpty() ? results.get(0) : null;
RouterQuotaUsage quota = updatedMountTable.getQuota();
assertEquals(nsQuota, quota.getQuota());
assertEquals(ssQuota, quota.getSpaceQuota());
assertEquals(1, quota.getFileAndDirectoryCount());
assertEquals(0, quota.getSpaceConsumed());
// mkdir and write a new file
final FileSystem routerFs = routerContext.getFileSystem();
routerFs.mkdirs(new Path(path + UUID.randomUUID()));
DFSClient routerClient = routerContext.getClient();
routerClient.create(path + "/file", true).close();
appendData(path + "/file", routerClient, BLOCK_SIZE);
updateService.periodicInvoke();
results = getMountTable(path);
updatedMountTable = !results.isEmpty() ? results.get(0) : null;
quota = updatedMountTable.getQuota();
// verify if quota has been updated in state store
assertEquals(nsQuota, quota.getQuota());
assertEquals(ssQuota, quota.getSpaceQuota());
assertEquals(3, quota.getFileAndDirectoryCount());
assertEquals(BLOCK_SIZE, quota.getSpaceConsumed());
}
/**
* Get the mount table entries of specified path through the admin API.
* @param path Mount table entry to get.
* @return If it was successfully got.
* @throws IOException Problems getting entries.
*/
private List<MountTable> getMountTable(String path) throws IOException {
// Reload the Router cache
resolver.loadCache(true);
RouterClient client = routerContext.getAdminClient();
MountTableManager mountTableManager = client.getMountTableManager();
GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest
.newInstance(path);
GetMountTableEntriesResponse removeResponse = mountTableManager
.getMountTableEntries(getRequest);
return removeResponse.getEntries();
}
}

View File

@ -0,0 +1,113 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.Set;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* Tests for class {@link RouterQuotaManager}.
*/
public class TestRouterQuotaManager {
private static RouterQuotaManager manager;
@Before
public void setup() {
manager = new RouterQuotaManager();
}
@After
public void cleanup() {
manager.clear();
}
@Test
public void testGetChildrenPaths() {
RouterQuotaUsage quotaUsage = new RouterQuotaUsage.Builder().build();
manager.put("/path1", quotaUsage);
manager.put("/path2", quotaUsage);
manager.put("/path1/subdir", quotaUsage);
manager.put("/path1/subdir/subdir", quotaUsage);
Set<String> childrenPaths = manager.getPaths("/path1");
assertEquals(3, childrenPaths.size());
assertTrue(childrenPaths.contains("/path1/subdir")
&& childrenPaths.contains("/path1/subdir/subdir")
&& childrenPaths.contains("/path1"));
}
@Test
public void testGetQuotaUsage() {
RouterQuotaUsage quotaGet;
// test case1: get quota with an non-exist path
quotaGet = manager.getQuotaUsage("/non-exist-path");
assertNull(quotaGet);
// test case2: get quota from an no-quota set path
RouterQuotaUsage.Builder quota = new RouterQuotaUsage.Builder()
.quota(HdfsConstants.QUOTA_DONT_SET)
.spaceQuota(HdfsConstants.QUOTA_DONT_SET);
manager.put("/noQuotaSet", quota.build());
quotaGet = manager.getQuotaUsage("/noQuotaSet");
// it should return null
assertNull(quotaGet);
// test case3: get quota from an quota-set path
quota.quota(1);
quota.spaceQuota(HdfsConstants.QUOTA_DONT_SET);
manager.put("/hasQuotaSet", quota.build());
quotaGet = manager.getQuotaUsage("/hasQuotaSet");
assertEquals(1, quotaGet.getQuota());
assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaGet.getSpaceQuota());
// test case4: get quota with an non-exist child path
quotaGet = manager.getQuotaUsage("/hasQuotaSet/file");
// it will return the nearest ancestor which quota was set
assertEquals(1, quotaGet.getQuota());
assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaGet.getSpaceQuota());
// test case5: get quota with an child path which its parent
// wasn't quota set
quota.quota(HdfsConstants.QUOTA_DONT_SET);
quota.spaceQuota(HdfsConstants.QUOTA_DONT_SET);
manager.put("/hasQuotaSet/noQuotaSet", quota.build());
// here should returns the quota of path /hasQuotaSet
// (the nearest ancestor which quota was set)
quotaGet = manager.getQuotaUsage("/hasQuotaSet/noQuotaSet/file");
assertEquals(1, quotaGet.getQuota());
assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaGet.getSpaceQuota());
// test case6: get quota with an child path which its parent was quota set
quota.quota(2);
quota.spaceQuota(HdfsConstants.QUOTA_DONT_SET);
manager.put("/hasQuotaSet/hasQuotaSet", quota.build());
// here should return the quota of path /hasQuotaSet/hasQuotaSet
quotaGet = manager.getQuotaUsage("/hasQuotaSet/hasQuotaSet/file");
assertEquals(2, quotaGet.getQuota());
assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaGet.getSpaceQuota());
}
}

View File

@ -27,8 +27,10 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
import org.junit.Test;
@ -56,6 +58,14 @@ public class TestMountTable {
private static final long DATE_CREATED = 100;
private static final long DATE_MOD = 200;
private static final long NS_COUNT = 1;
private static final long NS_QUOTA = 5;
private static final long SS_COUNT = 10;
private static final long SS_QUOTA = 100;
private static final RouterQuotaUsage QUOTA = new RouterQuotaUsage.Builder()
.fileAndDirectoryCount(NS_COUNT).quota(NS_QUOTA).spaceConsumed(SS_COUNT)
.spaceQuota(SS_QUOTA).build();
@Test
public void testGetterSetter() throws IOException {
@ -68,6 +78,12 @@ public class TestMountTable {
assertTrue(DATE_CREATED > 0);
assertTrue(DATE_MOD > 0);
RouterQuotaUsage quota = record.getQuota();
assertEquals(0, quota.getFileAndDirectoryCount());
assertEquals(HdfsConstants.QUOTA_DONT_SET, quota.getQuota());
assertEquals(0, quota.getSpaceConsumed());
assertEquals(HdfsConstants.QUOTA_DONT_SET, quota.getSpaceQuota());
MountTable record2 =
MountTable.newInstance(SRC, DST_MAP, DATE_CREATED, DATE_MOD);
@ -94,6 +110,7 @@ public class TestMountTable {
SRC, DST_MAP, DATE_CREATED, DATE_MOD);
record.setReadOnly(true);
record.setDestOrder(order);
record.setQuota(QUOTA);
StateStoreSerializer serializer = StateStoreSerializer.getSerializer();
String serializedString = serializer.serializeString(record);
@ -107,6 +124,12 @@ public class TestMountTable {
assertEquals(DATE_MOD, record2.getDateModified());
assertTrue(record2.isReadOnly());
assertEquals(order, record2.getDestOrder());
RouterQuotaUsage quotaGet = record2.getQuota();
assertEquals(NS_COUNT, quotaGet.getFileAndDirectoryCount());
assertEquals(NS_QUOTA, quotaGet.getQuota());
assertEquals(SS_COUNT, quotaGet.getSpaceConsumed());
assertEquals(SS_QUOTA, quotaGet.getSpaceQuota());
}
@Test
@ -172,4 +195,22 @@ public class TestMountTable {
assertEquals(DST_NS_1, location2.getNameserviceId());
assertEquals(DST_PATH_1, location2.getDest());
}
@Test
public void testQuota() throws IOException {
MountTable record = MountTable.newInstance(SRC, DST_MAP);
record.setQuota(QUOTA);
validateDestinations(record);
assertEquals(SRC, record.getSourcePath());
assertEquals(DST, record.getDestinations());
assertTrue(DATE_CREATED > 0);
assertTrue(DATE_MOD > 0);
RouterQuotaUsage quotaGet = record.getQuota();
assertEquals(NS_COUNT, quotaGet.getFileAndDirectoryCount());
assertEquals(NS_QUOTA, quotaGet.getQuota());
assertEquals(SS_COUNT, quotaGet.getSpaceConsumed());
assertEquals(SS_QUOTA, quotaGet.getSpaceQuota());
}
}