From d98a2e6e2383f8b66def346409b0517aa32d298d Mon Sep 17 00:00:00 2001 From: Yiqun Lin Date: Wed, 10 Jan 2018 13:59:11 +0800 Subject: [PATCH] HDFS-12934. RBF: Federation supports global quota. Contributed by Yiqun Lin. --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 9 + .../hdfs/server/federation/router/Quota.java | 208 ++++++++ .../hdfs/server/federation/router/Router.java | 38 +- .../federation/router/RouterQuotaManager.java | 160 +++++++ .../router/RouterQuotaUpdateService.java | 228 +++++++++ .../federation/router/RouterQuotaUsage.java | 88 ++++ .../federation/router/RouterRpcServer.java | 40 +- .../federation/store/records/MountTable.java | 22 + .../records/impl/pb/MountTablePBImpl.java | 32 ++ .../hadoop/hdfs/server/namenode/Quota.java | 2 +- .../hdfs/tools/federation/RouterAdmin.java | 133 +++++- .../src/main/proto/FederationProtocol.proto | 3 + .../src/main/resources/hdfs-default.xml | 20 + .../federation/RouterConfigBuilder.java | 12 + .../federation/router/TestRouterAdminCLI.java | 54 +++ .../federation/router/TestRouterQuota.java | 452 ++++++++++++++++++ .../router/TestRouterQuotaManager.java | 113 +++++ .../store/records/TestMountTable.java | 41 ++ 18 files changed, 1639 insertions(+), 16 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUsage.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuotaManager.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index a3808331b51..2825cc9b5d7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -1322,6 +1322,15 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_ROUTER_HTTPS_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_ROUTER_HTTPS_PORT_DEFAULT; + // HDFS Router-based federation quota + public static final String DFS_ROUTER_QUOTA_ENABLE = + FEDERATION_ROUTER_PREFIX + "quota.enable"; + public static final boolean DFS_ROUTER_QUOTA_ENABLED_DEFAULT = false; + public static final String DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL = + FEDERATION_ROUTER_PREFIX + "quota-cache.update.interval"; + public static final long DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL_DEFAULT = + 60000; + // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry @Deprecated public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java new file mode 100644 index 00000000000..f5e5272ac1c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.fs.QuotaUsage; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Module that implements the quota relevant RPC calls + * {@link ClientProtocol#setQuota(String, long, long, StorageType)} + * and + * {@link ClientProtocol#getQuotaUsage(String)} + * in the {@link RouterRpcServer}. + */ +public class Quota { + private static final Logger LOG = LoggerFactory.getLogger(Quota.class); + + /** RPC server to receive client calls. */ + private final RouterRpcServer rpcServer; + /** RPC clients to connect to the Namenodes. */ + private final RouterRpcClient rpcClient; + /** Router used in RouterRpcServer. */ + private final Router router; + + public Quota(Router router, RouterRpcServer server) { + this.router = router; + this.rpcServer = server; + this.rpcClient = server.getRPCClient(); + } + + /** + * Set quota for the federation path. + * @param path Federation path. + * @param namespaceQuota Name space quota. + * @param storagespaceQuota Storage space quota. + * @param type StorageType that the space quota is intended to be set on. + * @throws IOException + */ + public void setQuota(String path, long namespaceQuota, + long storagespaceQuota, StorageType type) throws IOException { + rpcServer.checkOperation(OperationCategory.WRITE); + + // Set quota for current path and its children mount table path. + final List locations = getQuotaRemoteLocations(path); + if (LOG.isDebugEnabled()) { + for (RemoteLocation loc : locations) { + LOG.debug("Set quota for path: nsId: {}, dest: {}.", + loc.getNameserviceId(), loc.getDest()); + } + } + + RemoteMethod method = new RemoteMethod("setQuota", + new Class[] {String.class, long.class, long.class, + StorageType.class}, + new RemoteParam(), namespaceQuota, storagespaceQuota, type); + rpcClient.invokeConcurrent(locations, method, false, false); + } + + /** + * Get quota usage for the federation path. + * @param path Federation path. + * @return Aggregated quota. + * @throws IOException + */ + public QuotaUsage getQuotaUsage(String path) throws IOException { + final List quotaLocs = getValidQuotaLocations(path); + RemoteMethod method = new RemoteMethod("getQuotaUsage", + new Class[] {String.class}, new RemoteParam()); + Map results = rpcClient.invokeConcurrent(quotaLocs, + method, true, false); + + return aggregateQuota(results); + } + + /** + * Get valid quota remote locations used in {@link #getQuotaUsage(String)}. + * Differentiate the method {@link #getQuotaRemoteLocations(String)}, this + * method will do some additional filtering. + * @param path Federation path. + * @return List of valid quota remote locations. + * @throws IOException + */ + private List getValidQuotaLocations(String path) + throws IOException { + final List locations = getQuotaRemoteLocations(path); + + // NameService -> Locations + Map> validLocations = new HashMap<>(); + for (RemoteLocation loc : locations) { + String nsId = loc.getNameserviceId(); + List dests = validLocations.get(nsId); + if (dests == null) { + dests = new LinkedList<>(); + dests.add(loc); + validLocations.put(nsId, dests); + } else { + // Ensure the paths in the same nameservice is different. + // Don't include parent-child paths. + boolean isChildPath = false; + for (RemoteLocation d : dests) { + if (loc.getDest().startsWith(d.getDest())) { + isChildPath = true; + break; + } + } + + if (!isChildPath) { + dests.add(loc); + } + } + } + + List quotaLocs = new LinkedList<>(); + for (List locs : validLocations.values()) { + quotaLocs.addAll(locs); + } + + return quotaLocs; + } + + /** + * Aggregate quota that queried from sub-clusters. + * @param results Quota query result. + * @return Aggregated Quota. + */ + private QuotaUsage aggregateQuota(Map results) { + long nsCount = 0; + long ssCount = 0; + boolean hasQuotaUnSet = false; + + for (Map.Entry entry : results.entrySet()) { + RemoteLocation loc = entry.getKey(); + QuotaUsage usage = (QuotaUsage) entry.getValue(); + if (usage != null) { + // If quota is not set in real FileSystem, the usage + // value will return -1. + if (usage.getQuota() == -1 && usage.getSpaceQuota() == -1) { + hasQuotaUnSet = true; + } + + nsCount += usage.getFileAndDirectoryCount(); + ssCount += usage.getSpaceConsumed(); + LOG.debug( + "Get quota usage for path: nsId: {}, dest: {}," + + " nsCount: {}, ssCount: {}.", + loc.getNameserviceId(), loc.getDest(), + usage.getFileAndDirectoryCount(), usage.getSpaceConsumed()); + } + } + + QuotaUsage.Builder builder = new QuotaUsage.Builder() + .fileAndDirectoryCount(nsCount).spaceConsumed(ssCount); + if (hasQuotaUnSet) { + builder.quota(HdfsConstants.QUOTA_DONT_SET); + } + + return builder.build(); + } + + /** + * Get all quota remote locations across subclusters under given + * federation path. + * @param path Federation path. + * @return List of quota remote locations. + * @throws IOException + */ + private List getQuotaRemoteLocations(String path) + throws IOException { + List locations = new LinkedList<>(); + RouterQuotaManager manager = this.router.getQuotaManager(); + if (manager != null) { + Set childrenPaths = manager.getPaths(path); + for (String childPath : childrenPaths) { + locations.addAll(rpcServer.getLocationsForPath(childPath, true)); + } + } + + return locations; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java index 413566ed054..ea8a1c05766 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java @@ -45,6 +45,8 @@ import org.apache.hadoop.util.JvmPauseMonitor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; + /** * Router that provides a unified view of multiple federated HDFS clusters. It * has two main roles: (1) federated interface and (2) NameNode heartbeat. @@ -105,7 +107,10 @@ public class Router extends CompositeService { /** JVM pauses (GC and others). */ private JvmPauseMonitor pauseMonitor; - + /** Quota usage update service. */ + private RouterQuotaUpdateService quotaUpdateService; + /** Quota cache manager. */ + private RouterQuotaManager quotaManager; ///////////////////////////////////////////////////////// // Constructor @@ -200,6 +205,14 @@ public class Router extends CompositeService { this.pauseMonitor.init(conf); } + // Initial quota relevant service + if (conf.getBoolean(DFSConfigKeys.DFS_ROUTER_QUOTA_ENABLE, + DFSConfigKeys.DFS_ROUTER_QUOTA_ENABLED_DEFAULT)) { + this.quotaManager = new RouterQuotaManager(); + this.quotaUpdateService = new RouterQuotaUpdateService(this); + addService(this.quotaUpdateService); + } + super.serviceInit(conf); } @@ -524,4 +537,27 @@ public class Router extends CompositeService { this.namenodeResolver.setRouterId(this.routerId); } } + + /** + * If the quota system is enabled in Router. + */ + public boolean isQuotaEnabled() { + return this.quotaManager != null; + } + + /** + * Get route quota manager. + * @return RouterQuotaManager Quota manager. + */ + public RouterQuotaManager getQuotaManager() { + return this.quotaManager; + } + + /** + * Get quota cache update service. + */ + @VisibleForTesting + RouterQuotaUpdateService getQuotaCacheUpdateService() { + return this.quotaUpdateService; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java new file mode 100644 index 00000000000..fc3575c63d8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; + +/** + * Router quota manager in Router. The manager maintains + * {@link RouterQuotaUsage} cache of mount tables and do management + * for the quota caches. + */ +public class RouterQuotaManager { + /** Quota usage cache. */ + private TreeMap cache; + + /** Lock to access the quota cache. */ + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private final Lock readLock = readWriteLock.readLock(); + private final Lock writeLock = readWriteLock.writeLock(); + + public RouterQuotaManager() { + this.cache = new TreeMap<>(); + } + + /** + * Get all the mount quota paths. + */ + public Set getAll() { + readLock.lock(); + try { + return this.cache.keySet(); + } finally { + readLock.unlock(); + } + } + + /** + * Get the nearest ancestor's quota usage, and meanwhile its quota was set. + * @param path The path being written. + * @return RouterQuotaUsage Quota usage. + */ + public RouterQuotaUsage getQuotaUsage(String path) { + readLock.lock(); + try { + RouterQuotaUsage quotaUsage = this.cache.get(path); + if (quotaUsage != null && isQuotaSet(quotaUsage)) { + return quotaUsage; + } + + // If not found, look for its parent path usage value. + int pos = path.lastIndexOf(Path.SEPARATOR); + if (pos != -1) { + String parentPath = path.substring(0, pos); + return getQuotaUsage(parentPath); + } + } finally { + readLock.unlock(); + } + + return null; + } + + /** + * Get children paths (can including itself) under specified federation path. + * @param parentPath + * @return Set Children path set. + */ + public Set getPaths(String parentPath) { + readLock.lock(); + try { + String from = parentPath; + String to = parentPath + Character.MAX_VALUE; + SortedMap subMap = this.cache.subMap(from, to); + return subMap.keySet(); + } finally { + readLock.unlock(); + } + } + + /** + * Put new entity into cache. + * @param path Mount table path. + * @param quotaUsage Corresponding cache value. + */ + public void put(String path, RouterQuotaUsage quotaUsage) { + writeLock.lock(); + try { + this.cache.put(path, quotaUsage); + } finally { + writeLock.unlock(); + } + } + + /** + * Remove the entity from cache. + * @param path Mount table path. + */ + public void remove(String path) { + writeLock.lock(); + try { + this.cache.remove(path); + } finally { + writeLock.unlock(); + } + } + + /** + * Clean up the cache. + */ + public void clear() { + writeLock.lock(); + try { + this.cache.clear(); + } finally { + writeLock.unlock(); + } + } + + /** + * Check if the quota was set. + * @param quota RouterQuotaUsage set in mount table. + */ + public boolean isQuotaSet(RouterQuotaUsage quota) { + if (quota != null) { + long nsQuota = quota.getQuota(); + long ssQuota = quota.getSpaceQuota(); + + // once nsQuota or ssQuota was set, this mount table is quota set + if (nsQuota != HdfsConstants.QUOTA_DONT_SET + || ssQuota != HdfsConstants.QUOTA_DONT_SET) { + return true; + } + } + + return false; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java new file mode 100644 index 00000000000..80abc11b217 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.io.IOException; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.QuotaUsage; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.federation.store.MountTableStore; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service to periodically update the {@link RouterQuotaUsage} + * cached information in the {@link Router} and update corresponding + * mount table in State Store. + */ +public class RouterQuotaUpdateService extends PeriodicService { + private static final Logger LOG = + LoggerFactory.getLogger(RouterQuotaUpdateService.class); + + private MountTableStore mountTableStore; + private RouterRpcServer rpcServer; + /** Router using this Service. */ + private final Router router; + /** Router Quota manager. */ + private RouterQuotaManager quotaManager; + + public RouterQuotaUpdateService(final Router router) throws IOException { + super(RouterQuotaUpdateService.class.getName()); + this.router = router; + this.rpcServer = router.getRpcServer(); + this.quotaManager = router.getQuotaManager(); + + if (this.quotaManager == null) { + throw new IOException("Router quota manager is not initialized."); + } + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.setIntervalMs(conf.getTimeDuration( + DFSConfigKeys.DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL, + DFSConfigKeys.DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS)); + + super.serviceInit(conf); + } + + @Override + protected void periodicInvoke() { + LOG.debug("Start to update quota cache."); + try { + List updateMountTables = new LinkedList<>(); + List mountTables = getQuotaSetMountTables(); + for (MountTable entry : mountTables) { + String src = entry.getSourcePath(); + RouterQuotaUsage oldQuota = entry.getQuota(); + long nsQuota = oldQuota.getQuota(); + long ssQuota = oldQuota.getSpaceQuota(); + // Call RouterRpcServer#getQuotaUsage for getting current quota usage. + QuotaUsage currentQuotaUsage = this.rpcServer.getQuotaModule() + .getQuotaUsage(src); + // If quota is not set in some subclusters under federation path, + // set quota for this path. + if (currentQuotaUsage.getQuota() == HdfsConstants.QUOTA_DONT_SET) { + this.rpcServer.setQuota(src, nsQuota, ssQuota, null); + } + + RouterQuotaUsage newQuota = generateNewQuota(oldQuota, + currentQuotaUsage); + this.quotaManager.put(src, newQuota); + entry.setQuota(newQuota); + + // only update mount tables which quota was changed + if (!oldQuota.equals(newQuota)) { + updateMountTables.add(entry); + + LOG.debug( + "Update quota usage entity of path: {}, nsCount: {}," + + " nsQuota: {}, ssCount: {}, ssQuota: {}.", + src, newQuota.getFileAndDirectoryCount(), + newQuota.getQuota(), newQuota.getSpaceConsumed(), + newQuota.getSpaceQuota()); + } + } + + updateMountTableEntries(updateMountTables); + } catch (IOException e) { + LOG.error("Quota cache updated error.", e); + } + } + + /** + * Get mount table store management interface. + * @return MountTableStore instance. + * @throws IOException + */ + private MountTableStore getMountTableStore() throws IOException { + if (this.mountTableStore == null) { + this.mountTableStore = router.getStateStore().getRegisteredRecordStore( + MountTableStore.class); + if (this.mountTableStore == null) { + throw new IOException("Mount table state store is not available."); + } + } + return this.mountTableStore; + } + + /** + * Get all the existing mount tables. + * @return List of mount tables. + * @throws IOException + */ + private List getMountTableEntries() throws IOException { + // scan mount tables from root path + GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest + .newInstance("/"); + GetMountTableEntriesResponse getResponse = getMountTableStore() + .getMountTableEntries(getRequest); + return getResponse.getEntries(); + } + + /** + * Get mount tables which quota was set. + * During this time, the quota usage cache will also be updated by + * quota manager: + * 1. Stale paths (entries) will be removed. + * 2. Existing entries will be override and updated. + * @return List of mount tables which quota was set. + * @throws IOException + */ + private List getQuotaSetMountTables() throws IOException { + List mountTables = getMountTableEntries(); + Set stalePaths = new HashSet<>(); + for (String path : this.quotaManager.getAll()) { + stalePaths.add(path); + } + + List neededMountTables = new LinkedList<>(); + for (MountTable entry : mountTables) { + // select mount tables which is quota set + if (isQuotaSet(entry)) { + neededMountTables.add(entry); + } + + // update mount table entries info in quota cache + String src = entry.getSourcePath(); + this.quotaManager.put(src, entry.getQuota()); + stalePaths.remove(src); + } + + // remove stale paths that currently cached + for (String stalePath : stalePaths) { + this.quotaManager.remove(stalePath); + } + + return neededMountTables; + } + + /** + * Check if the quota was set in given MountTable. + * @param mountTable Mount table entry. + */ + private boolean isQuotaSet(MountTable mountTable) { + if (mountTable != null) { + return this.quotaManager.isQuotaSet(mountTable.getQuota()); + } + return false; + } + + /** + * Generate a new quota based on old quota and current quota usage value. + * @param oldQuota Old quota stored in State Store. + * @param currentQuotaUsage Current quota usage value queried from + * subcluster. + * @return A new RouterQuotaUsage. + */ + private RouterQuotaUsage generateNewQuota(RouterQuotaUsage oldQuota, + QuotaUsage currentQuotaUsage) { + RouterQuotaUsage newQuota = new RouterQuotaUsage.Builder() + .fileAndDirectoryCount(currentQuotaUsage.getFileAndDirectoryCount()) + .quota(oldQuota.getQuota()) + .spaceConsumed(currentQuotaUsage.getSpaceConsumed()) + .spaceQuota(oldQuota.getSpaceQuota()).build(); + return newQuota; + } + + /** + * Write out updated mount table entries into State Store. + * @param updateMountTables Mount tables to be updated. + * @throws IOException + */ + private void updateMountTableEntries(List updateMountTables) + throws IOException { + for (MountTable entry : updateMountTables) { + UpdateMountTableEntryRequest updateRequest = UpdateMountTableEntryRequest + .newInstance(entry); + getMountTableStore().updateMountTableEntry(updateRequest); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUsage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUsage.java new file mode 100644 index 00000000000..55bfc483dc3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUsage.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.fs.QuotaUsage; +import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; +import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; +import org.apache.hadoop.hdfs.server.namenode.DirectoryWithQuotaFeature; +import org.apache.hadoop.hdfs.server.namenode.Quota; + +/** + * The subclass of {@link QuotaUsage} used in Router-based federation. + */ +public final class RouterQuotaUsage extends QuotaUsage { + private RouterQuotaUsage(Builder builder) { + super(builder); + } + + /** Build the instance based on the builder. */ + public static class Builder extends QuotaUsage.Builder { + + public RouterQuotaUsage build() { + return new RouterQuotaUsage(this); + } + + @Override + public Builder fileAndDirectoryCount(long count) { + super.fileAndDirectoryCount(count); + return this; + } + + @Override + public Builder quota(long quota) { + super.quota(quota); + return this; + } + + @Override + public Builder spaceConsumed(long spaceConsumed) { + super.spaceConsumed(spaceConsumed); + return this; + } + + @Override + public Builder spaceQuota(long spaceQuota) { + super.spaceQuota(spaceQuota); + return this; + } + } + + /** + * Verify if namespace quota is violated once quota is set. Relevant + * method {@link DirectoryWithQuotaFeature#verifyNamespaceQuota}. + * @throws NSQuotaExceededException + */ + public void verifyNamespaceQuota() throws NSQuotaExceededException { + if (Quota.isViolated(getQuota(), getFileAndDirectoryCount())) { + throw new NSQuotaExceededException(getQuota(), + getFileAndDirectoryCount()); + } + } + + /** + * Verify if storage space quota is violated once quota is set. Relevant + * method {@link DirectoryWithQuotaFeature#verifyStoragespaceQuota}. + * @throws DSQuotaExceededException + */ + public void verifyStoragespaceQuota() throws DSQuotaExceededException { + if (Quota.isViolated(getSpaceQuota(), getSpaceConsumed())) { + throw new DSQuotaExceededException(getSpaceQuota(), getSpaceConsumed()); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 11f7fa6492f..73b189ed9bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -181,6 +181,8 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { /** Category of the operation that a thread is executing. */ private final ThreadLocal opCategory = new ThreadLocal<>(); + /** Router Quota calls. */ + private final Quota quotaCall; /** * Construct a router RPC server. @@ -277,6 +279,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { // Create the client this.rpcClient = new RouterRpcClient(this.conf, this.router.getRouterId(), this.namenodeResolver, this.rpcMonitor); + + // Initialize modules + this.quotaCall = new Quota(this.router, this); } @Override @@ -384,7 +389,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { * @throws StandbyException If the Router is in safe mode and cannot serve * client requests. */ - private void checkOperation(OperationCategory op) throws StandbyException { + protected void checkOperation(OperationCategory op) throws StandbyException { // Log the function we are currently calling. if (rpcMonitor != null) { rpcMonitor.startOp(); @@ -1839,21 +1844,13 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { @Override // ClientProtocol public void setQuota(String path, long namespaceQuota, long storagespaceQuota, StorageType type) throws IOException { - checkOperation(OperationCategory.WRITE); - - // TODO assign global replicas instead of applying them to each folder - final List locations = getLocationsForPath(path, true); - RemoteMethod method = new RemoteMethod("setQuota", - new Class[] {String.class, Long.class, Long.class, - StorageType.class}, - new RemoteParam(), namespaceQuota, storagespaceQuota, type); - rpcClient.invokeConcurrent(locations, method, false, false); + this.quotaCall.setQuota(path, namespaceQuota, storagespaceQuota, type); } @Override // ClientProtocol public QuotaUsage getQuotaUsage(String path) throws IOException { - checkOperation(OperationCategory.READ, false); - return null; + checkOperation(OperationCategory.READ); + return this.quotaCall.getQuotaUsage(path); } @Override @@ -1996,7 +1993,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { * @return Prioritized list of locations in the federated cluster. * @throws IOException If the location for this path cannot be determined. */ - private List getLocationsForPath( + protected List getLocationsForPath( String path, boolean failIfLocked) throws IOException { try { // Check the location for this path @@ -2016,6 +2013,16 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { } throw new IOException(path + " is in a read only mount point"); } + + // Check quota + if (this.router.isQuotaEnabled()) { + RouterQuotaUsage quotaUsage = this.router.getQuotaManager() + .getQuotaUsage(path); + if (quotaUsage != null) { + quotaUsage.verifyNamespaceQuota(); + quotaUsage.verifyStoragespaceQuota(); + } + } } return location.getDestinations(); @@ -2119,4 +2126,11 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { UserGroupInformation ugi = Server.getRemoteUser(); return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser(); } + + /** + * Get quota module implement. + */ + public Quota getQuotaModule() { + return this.quotaCall; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java index 1b5d2d6b894..53ad1e19190 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java @@ -29,9 +29,11 @@ import java.util.TreeMap; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; import org.apache.hadoop.hdfs.server.federation.router.RouterPermissionChecker; +import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.security.UserGroupInformation; @@ -140,6 +142,12 @@ public abstract class MountTable extends BaseRecord { record.setMode(new FsPermission( RouterPermissionChecker.MOUNT_TABLE_PERMISSION_DEFAULT)); + // Set quota for mount table + RouterQuotaUsage quota = new RouterQuotaUsage.Builder() + .fileAndDirectoryCount(0).quota(HdfsConstants.QUOTA_DONT_SET) + .spaceConsumed(0).spaceQuota(HdfsConstants.QUOTA_DONT_SET).build(); + record.setQuota(quota); + // Validate record.validate(); return record; @@ -248,6 +256,20 @@ public abstract class MountTable extends BaseRecord { */ public abstract void setMode(FsPermission mode); + /** + * Get quota of this mount table entry. + * + * @return RouterQuotaUsage quota usage + */ + public abstract RouterQuotaUsage getQuota(); + + /** + * Set quota for this mount table entry. + * + * @param quota QuotaUsage for mount table entry + */ + public abstract void setQuota(RouterQuotaUsage quota); + /** * Get the default location. * @return The default location. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java index 372f2098613..d9e95550bc8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java @@ -27,10 +27,12 @@ import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProt import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProto.DestOrder; import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProtoOrBuilder; import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoteLocationProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.QuotaUsageProto; import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; import org.apache.hadoop.hdfs.server.federation.router.RouterAdminServer; import org.apache.hadoop.hdfs.server.federation.router.RouterPermissionChecker; +import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage; import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.FederationProtocolPBTranslator; import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; @@ -250,6 +252,36 @@ public class MountTablePBImpl extends MountTable implements PBRecord { } } + @Override + public RouterQuotaUsage getQuota() { + MountTableRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder(); + if (!proto.hasQuota()) { + return null; + } + + QuotaUsageProto quotaProto = proto.getQuota(); + RouterQuotaUsage.Builder builder = new RouterQuotaUsage.Builder() + .fileAndDirectoryCount(quotaProto.getFileAndDirectoryCount()) + .quota(quotaProto.getQuota()) + .spaceConsumed(quotaProto.getSpaceConsumed()) + .spaceQuota(quotaProto.getSpaceQuota()); + return builder.build(); + } + + @Override + public void setQuota(RouterQuotaUsage quota) { + Builder builder = this.translator.getBuilder(); + if (quota == null) { + builder.clearQuota(); + } else { + QuotaUsageProto quotaUsage = QuotaUsageProto.newBuilder() + .setFileAndDirectoryCount(quota.getFileAndDirectoryCount()) + .setQuota(quota.getQuota()).setSpaceConsumed(quota.getSpaceConsumed()) + .setSpaceQuota(quota.getSpaceQuota()).build(); + builder.setQuota(quotaUsage); + } + } + private DestinationOrder convert(DestOrder order) { switch (order) { case LOCAL: diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Quota.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Quota.java index 6d20e6c5ac4..5e708bea87d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Quota.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Quota.java @@ -49,7 +49,7 @@ public enum Quota { * Is quota violated? * The quota is violated if quota is set and usage > quota. */ - static boolean isViolated(final long quota, final long usage) { + public static boolean isViolated(final long quota, final long usage) { return quota >= 0 && usage > quota; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java index fd961f292d5..0681ed52787 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java @@ -29,10 +29,12 @@ import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; import org.apache.hadoop.hdfs.server.federation.router.RouterClient; +import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage; import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; @@ -80,7 +82,9 @@ public class RouterAdmin extends Configured implements Tool { + "\t[-add " + "[-readonly] -owner -group -mode ]\n" + "\t[-rm ]\n" - + "\t[-ls ]\n"; + + "\t[-ls ]\n" + + "\t[-setQuota -ns -ss ]\n" + + "\t[-clrQuota \n"; System.out.println(usage); } @@ -109,6 +113,18 @@ public class RouterAdmin extends Configured implements Tool { printUsage(); return exitCode; } + } else if ("-setQuota".equalsIgnoreCase(cmd)) { + if (argv.length < 4) { + System.err.println("Not enough parameters specificed for cmd " + cmd); + printUsage(); + return exitCode; + } + } else if ("-clrQuota".equalsIgnoreCase(cmd)) { + if (argv.length < 2) { + System.err.println("Not enough parameters specificed for cmd " + cmd); + printUsage(); + return exitCode; + } } // Initialize RouterClient @@ -144,6 +160,16 @@ public class RouterAdmin extends Configured implements Tool { } else { listMounts("/"); } + } else if ("-setQuota".equals(cmd)) { + if (setQuota(argv, i)) { + System.out.println( + "Successfully set quota for mount point " + argv[i]); + } + } else if ("-clrQuota".equals(cmd)) { + if (clrQuota(argv[i])) { + System.out.println( + "Successfully clear quota for mount point " + argv[i]); + } } else { printUsage(); return exitCode; @@ -388,6 +414,111 @@ public class RouterAdmin extends Configured implements Tool { } } + /** + * Set quota for a mount table entry. + * + * @param parameters Parameters of the quota. + * @param i Index in the parameters. + */ + private boolean setQuota(String[] parameters, int i) throws IOException { + long nsQuota = HdfsConstants.QUOTA_DONT_SET; + long ssQuota = HdfsConstants.QUOTA_DONT_SET; + + String mount = parameters[i++]; + while (i < parameters.length) { + if (parameters[i].equals("-nsQuota")) { + i++; + try { + nsQuota = Long.parseLong(parameters[i]); + } catch (Exception e) { + System.err.println("Cannot parse nsQuota: " + parameters[i]); + } + } else if (parameters[i].equals("-ssQuota")) { + i++; + try { + ssQuota = Long.parseLong(parameters[i]); + } catch (Exception e) { + System.err.println("Cannot parse ssQuota: " + parameters[i]); + } + } + + i++; + } + + if (nsQuota <= 0 || ssQuota <= 0) { + System.err.println("Input quota value should be a positive number."); + return false; + } + + return updateQuota(mount, nsQuota, ssQuota); + } + + /** + * Clear quota of the mount point. + * + * @param mount Mount table to clear + * @return If the quota was cleared. + * @throws IOException Error clearing the mount point. + */ + private boolean clrQuota(String mount) throws IOException { + return updateQuota(mount, HdfsConstants.QUOTA_DONT_SET, + HdfsConstants.QUOTA_DONT_SET); + } + + /** + * Update quota of specified mount table. + * + * @param mount Specified mount table to update. + * @param nsQuota Namespace quota. + * @param ssQuota Storage space quota. + * @return If the quota was updated. + * @throws IOException Error updating quota. + */ + private boolean updateQuota(String mount, long nsQuota, long ssQuota) + throws IOException { + // Get existing entry + MountTableManager mountTable = client.getMountTableManager(); + GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest + .newInstance(mount); + GetMountTableEntriesResponse getResponse = mountTable + .getMountTableEntries(getRequest); + List results = getResponse.getEntries(); + MountTable existingEntry = null; + for (MountTable result : results) { + if (mount.equals(result.getSourcePath())) { + existingEntry = result; + break; + } + } + + if (existingEntry == null) { + return false; + } else { + long nsCount = existingEntry.getQuota().getFileAndDirectoryCount(); + long ssCount = existingEntry.getQuota().getSpaceConsumed(); + // If nsQuota or ssQuota was unset, reset corresponding usage + // value to zero. + if (nsQuota == HdfsConstants.QUOTA_DONT_SET) { + nsCount = 0; + } + + if (nsQuota == HdfsConstants.QUOTA_DONT_SET) { + ssCount = 0; + } + + RouterQuotaUsage updatedQuota = new RouterQuotaUsage.Builder() + .fileAndDirectoryCount(nsCount).quota(nsQuota) + .spaceConsumed(ssCount).spaceQuota(ssQuota).build(); + existingEntry.setQuota(updatedQuota); + } + + UpdateMountTableEntryRequest updateRequest = + UpdateMountTableEntryRequest.newInstance(existingEntry); + UpdateMountTableEntryResponse updateResponse = mountTable + .updateMountTableEntry(updateRequest); + return updateResponse.getStatus(); + } + /** * Inner class that stores ACL info of mount table. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto index 043a21a6679..5d9b9d4e81c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto @@ -22,6 +22,7 @@ option java_generic_services = true; option java_generate_equals_and_hash = true; package hadoop.hdfs; +import "hdfs.proto"; ///////////////////////////////////////////////// // Membership @@ -134,6 +135,8 @@ message MountTableRecordProto { optional string ownerName = 10; optional string groupName = 11; optional int32 mode = 12; + + optional QuotaUsageProto quota = 13; } message AddMountTableEntryRequestProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 4ca7b58c396..cd365be218d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -5126,4 +5126,24 @@ + + dfs.federation.router.quota.enable + false + + Set to true to enable quota system in Router. + + + + + dfs.federation.router.quota-cache.update.interval + 60s + + Interval time for updating quota usage cache in Router. + This property is used only if the value of + dfs.federation.router.quota.enable is true. + This setting supports multiple time unit suffixes as described + in dfs.heartbeat.interval. If no suffix is specified then milliseconds + is assumed. + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java index 88da77e6010..3d8b35ccbdb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java @@ -34,6 +34,7 @@ public class RouterConfigBuilder { private boolean enableLocalHeartbeat = false; private boolean enableStateStore = false; private boolean enableMetrics = false; + private boolean enableQuota = false; public RouterConfigBuilder(Configuration configuration) { this.conf = configuration; @@ -89,6 +90,11 @@ public class RouterConfigBuilder { return this; } + public RouterConfigBuilder quota(boolean enable) { + this.enableQuota = enable; + return this; + } + public RouterConfigBuilder rpc() { return this.rpc(true); } @@ -113,6 +119,10 @@ public class RouterConfigBuilder { return this.metrics(true); } + public RouterConfigBuilder quota() { + return this.quota(true); + } + public Configuration build() { conf.setBoolean(DFSConfigKeys.DFS_ROUTER_STORE_ENABLE, this.enableStateStore); @@ -127,6 +137,8 @@ public class RouterConfigBuilder { this.enableLocalHeartbeat); conf.setBoolean(DFSConfigKeys.DFS_ROUTER_METRICS_ENABLE, this.enableMetrics); + conf.setBoolean(DFSConfigKeys.DFS_ROUTER_QUOTA_ENABLE, + this.enableQuota); return conf; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java index 9e82967b014..ec47a41d47b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java @@ -28,6 +28,7 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext; import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; @@ -294,4 +295,57 @@ public class TestRouterAdminCLI { argv = new String[] {"-rm", mount}; assertEquals(rmCommandCode, ToolRunner.run(admin, argv)); } + + @Test + public void testSetAndClearQuota() throws Exception { + String nsId = "ns0"; + String src = "/test-QuotaMounttable"; + String dest = "/QuotaMounttable"; + String[] argv = new String[] {"-add", src, nsId, dest}; + assertEquals(0, ToolRunner.run(admin, argv)); + + stateStore.loadCache(MountTableStoreImpl.class, true); + GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest + .newInstance(src); + GetMountTableEntriesResponse getResponse = client.getMountTableManager() + .getMountTableEntries(getRequest); + MountTable mountTable = getResponse.getEntries().get(0); + RouterQuotaUsage quotaUsage = mountTable.getQuota(); + + // verify the default quota set + assertEquals(0, quotaUsage.getFileAndDirectoryCount()); + assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaUsage.getQuota()); + assertEquals(0, quotaUsage.getSpaceConsumed()); + assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaUsage.getSpaceQuota()); + + long nsQuota = 50; + long ssQuota = 100; + argv = new String[] {"-setQuota", src, "-nsQuota", String.valueOf(nsQuota), + "-ssQuota", String.valueOf(ssQuota)}; + assertEquals(0, ToolRunner.run(admin, argv)); + + stateStore.loadCache(MountTableStoreImpl.class, true); + getResponse = client.getMountTableManager() + .getMountTableEntries(getRequest); + mountTable = getResponse.getEntries().get(0); + quotaUsage = mountTable.getQuota(); + + // verify if the quota is set + assertEquals(nsQuota, quotaUsage.getQuota()); + assertEquals(ssQuota, quotaUsage.getSpaceQuota()); + + // test clrQuota command + argv = new String[] {"-clrQuota", src}; + assertEquals(0, ToolRunner.run(admin, argv)); + + stateStore.loadCache(MountTableStoreImpl.class, true); + getResponse = client.getMountTableManager() + .getMountTableEntries(getRequest); + mountTable = getResponse.getEntries().get(0); + quotaUsage = mountTable.getQuota(); + + // verify if quota unset successfully + assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaUsage.getQuota()); + assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaUsage.getSpaceQuota()); + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java new file mode 100644 index 00000000000..368b5e2aa6d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java @@ -0,0 +1,452 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.io.IOException; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.QuotaUsage; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; +import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext; +import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.base.Supplier; + +/** + * Tests quota behaviors in Router-based Federation. + */ +public class TestRouterQuota { + private static StateStoreDFSCluster cluster; + private static NamenodeContext nnContext1; + private static NamenodeContext nnContext2; + private static RouterContext routerContext; + private static MountTableResolver resolver; + + private static final int BLOCK_SIZE = 512; + + @Before + public void setUp() throws Exception { + + // Build and start a federated cluster + cluster = new StateStoreDFSCluster(false, 2); + Configuration routerConf = new RouterConfigBuilder() + .stateStore() + .admin() + .quota() + .rpc() + .build(); + routerConf.set(DFSConfigKeys.DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL, "2s"); + + // override some hdfs settings that used in testing space quota + Configuration hdfsConf = new Configuration(false); + hdfsConf.setInt(HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + hdfsConf.setInt(HdfsClientConfigKeys.DFS_REPLICATION_KEY, 1); + + cluster.addRouterOverrides(routerConf); + cluster.addNamenodeOverrides(hdfsConf); + cluster.startCluster(); + cluster.startRouters(); + cluster.waitClusterUp(); + + nnContext1 = cluster.getNamenode(cluster.getNameservices().get(0), null); + nnContext2 = cluster.getNamenode(cluster.getNameservices().get(1), null); + routerContext = cluster.getRandomRouter(); + Router router = routerContext.getRouter(); + resolver = (MountTableResolver) router.getSubclusterResolver(); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.stopRouter(routerContext); + cluster.shutdown(); + cluster = null; + } + } + + @Test + public void testNamespaceQuotaExceed() throws Exception { + long nsQuota = 3; + final FileSystem nnFs1 = nnContext1.getFileSystem(); + final FileSystem nnFs2 = nnContext2.getFileSystem(); + + // Add two mount tables: + // /nsquota --> ns0---testdir1 + // /nsquota/subdir --> ns1---testdir2 + nnFs1.mkdirs(new Path("/testdir1")); + nnFs2.mkdirs(new Path("/testdir2")); + MountTable mountTable1 = MountTable.newInstance("/nsquota", + Collections.singletonMap("ns0", "/testdir1")); + + mountTable1.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota).build()); + addMountTable(mountTable1); + + MountTable mountTable2 = MountTable.newInstance("/nsquota/subdir", + Collections.singletonMap("ns1", "/testdir2")); + mountTable2.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota).build()); + addMountTable(mountTable2); + + final FileSystem routerFs = routerContext.getFileSystem(); + GenericTestUtils.waitFor(new Supplier() { + + @Override + public Boolean get() { + boolean isNsQuotaViolated = false; + try { + // create new directory to trigger NSQuotaExceededException + routerFs.mkdirs(new Path("/nsquota/" + UUID.randomUUID())); + routerFs.mkdirs(new Path("/nsquota/subdir/" + UUID.randomUUID())); + } catch (NSQuotaExceededException e) { + isNsQuotaViolated = true; + } catch (IOException ignored) { + } + return isNsQuotaViolated; + } + }, 5000, 60000); + // mkdir in real FileSystem should be okay + nnFs1.mkdirs(new Path("/testdir1/" + UUID.randomUUID())); + nnFs2.mkdirs(new Path("/testdir2/" + UUID.randomUUID())); + } + + @Test + public void testStorageSpaceQuotaaExceed() throws Exception { + long ssQuota = 3071; + final FileSystem nnFs1 = nnContext1.getFileSystem(); + final FileSystem nnFs2 = nnContext2.getFileSystem(); + + // Add two mount tables: + // /ssquota --> ns0---testdir3 + // /ssquota/subdir --> ns1---testdir4 + nnFs1.mkdirs(new Path("/testdir3")); + nnFs2.mkdirs(new Path("/testdir4")); + MountTable mountTable1 = MountTable.newInstance("/ssquota", + Collections.singletonMap("ns0", "/testdir3")); + + mountTable1 + .setQuota(new RouterQuotaUsage.Builder().spaceQuota(ssQuota).build()); + addMountTable(mountTable1); + + MountTable mountTable2 = MountTable.newInstance("/ssquota/subdir", + Collections.singletonMap("ns1", "/testdir4")); + mountTable2 + .setQuota(new RouterQuotaUsage.Builder().spaceQuota(ssQuota).build()); + addMountTable(mountTable2); + + DFSClient routerClient = routerContext.getClient(); + routerClient.create("/ssquota/file", true).close(); + routerClient.create("/ssquota/subdir/file", true).close(); + + GenericTestUtils.waitFor(new Supplier() { + + @Override + public Boolean get() { + boolean isDsQuotaViolated = false; + try { + // append data to trigger NSQuotaExceededException + appendData("/ssquota/file", routerClient, BLOCK_SIZE); + appendData("/ssquota/subdir/file", routerClient, BLOCK_SIZE); + } catch (DSQuotaExceededException e) { + isDsQuotaViolated = true; + } catch (IOException ignored) { + } + return isDsQuotaViolated; + } + }, 5000, 60000); + + // append data to destination path in real FileSystem should be okay + appendData("/testdir3/file", nnContext1.getClient(), BLOCK_SIZE); + appendData("/testdir4/file", nnContext2.getClient(), BLOCK_SIZE); + } + + /** + * Add a mount table entry to the mount table through the admin API. + * @param entry Mount table entry to add. + * @return If it was successfully added. + * @throws IOException Problems adding entries. + */ + private boolean addMountTable(final MountTable entry) throws IOException { + RouterClient client = routerContext.getAdminClient(); + MountTableManager mountTableManager = client.getMountTableManager(); + AddMountTableEntryRequest addRequest = + AddMountTableEntryRequest.newInstance(entry); + AddMountTableEntryResponse addResponse = + mountTableManager.addMountTableEntry(addRequest); + + // Reload the Router cache + resolver.loadCache(true); + + return addResponse.getStatus(); + } + + /** + * Append data in specified file. + * @param path Path of file. + * @param client DFS Client. + * @param dataLen The length of write data. + * @throws IOException + */ + private void appendData(String path, DFSClient client, int dataLen) + throws IOException { + EnumSet createFlag = EnumSet.of(CreateFlag.APPEND); + HdfsDataOutputStream stream = client.append(path, 1024, createFlag, null, + null); + byte[] data = new byte[dataLen]; + stream.write(data); + stream.close(); + } + + @Test + public void testSetQuota() throws Exception { + long nsQuota = 5; + long ssQuota = 100; + final FileSystem nnFs1 = nnContext1.getFileSystem(); + final FileSystem nnFs2 = nnContext2.getFileSystem(); + + // Add two mount tables: + // /setquota --> ns0---testdir5 + // /setquota/subdir --> ns1---testdir6 + nnFs1.mkdirs(new Path("/testdir5")); + nnFs2.mkdirs(new Path("/testdir6")); + MountTable mountTable1 = MountTable.newInstance("/setquota", + Collections.singletonMap("ns0", "/testdir5")); + mountTable1 + .setQuota(new RouterQuotaUsage.Builder().quota(nsQuota) + .spaceQuota(ssQuota).build()); + addMountTable(mountTable1); + + // don't set quota for subpath of mount table + MountTable mountTable2 = MountTable.newInstance("/setquota/subdir", + Collections.singletonMap("ns1", "/testdir6")); + addMountTable(mountTable2); + + RouterQuotaUpdateService updateService = routerContext.getRouter() + .getQuotaCacheUpdateService(); + // ensure setQuota RPC call was invoked + updateService.periodicInvoke(); + + ClientProtocol client1 = nnContext1.getClient().getNamenode(); + ClientProtocol client2 = nnContext2.getClient().getNamenode(); + final QuotaUsage quota1 = client1.getQuotaUsage("/testdir5"); + final QuotaUsage quota2 = client2.getQuotaUsage("/testdir6"); + + assertEquals(nsQuota, quota1.getQuota()); + assertEquals(ssQuota, quota1.getSpaceQuota()); + assertEquals(nsQuota, quota2.getQuota()); + assertEquals(ssQuota, quota2.getSpaceQuota()); + } + + @Test + public void testGetQuota() throws Exception { + long nsQuota = 10; + long ssQuota = 100; + final FileSystem nnFs1 = nnContext1.getFileSystem(); + final FileSystem nnFs2 = nnContext2.getFileSystem(); + + // Add two mount tables: + // /getquota --> ns0---/testdir7 + // /getquota/subdir1 --> ns0---/testdir7/subdir + // /getquota/subdir2 --> ns1---/testdir8 + nnFs1.mkdirs(new Path("/testdir7")); + nnFs1.mkdirs(new Path("/testdir7/subdir")); + nnFs2.mkdirs(new Path("/testdir8")); + MountTable mountTable1 = MountTable.newInstance("/getquota", + Collections.singletonMap("ns0", "/testdir7")); + mountTable1 + .setQuota(new RouterQuotaUsage.Builder().quota(nsQuota) + .spaceQuota(ssQuota).build()); + addMountTable(mountTable1); + + MountTable mountTable2 = MountTable.newInstance("/getquota/subdir1", + Collections.singletonMap("ns0", "/testdir7/subdir")); + addMountTable(mountTable2); + + MountTable mountTable3 = MountTable.newInstance("/getquota/subdir2", + Collections.singletonMap("ns1", "/testdir8")); + addMountTable(mountTable3); + + // use router client to create new files + DFSClient routerClient = routerContext.getClient(); + routerClient.create("/getquota/file", true).close(); + routerClient.create("/getquota/subdir1/file", true).close(); + routerClient.create("/getquota/subdir2/file", true).close(); + + ClientProtocol clientProtocol = routerContext.getClient().getNamenode(); + RouterQuotaUpdateService updateService = routerContext.getRouter() + .getQuotaCacheUpdateService(); + updateService.periodicInvoke(); + final QuotaUsage quota = clientProtocol.getQuotaUsage("/getquota"); + // the quota should be aggregated + assertEquals(6, quota.getFileAndDirectoryCount()); + } + + @Test + public void testStaleQuotaRemoving() throws Exception { + long nsQuota = 20; + long ssQuota = 200; + String stalePath = "/stalequota"; + final FileSystem nnFs1 = nnContext1.getFileSystem(); + + // Add one mount tables: + // /stalequota --> ns0---/testdir9 + nnFs1.mkdirs(new Path("/testdir9")); + MountTable mountTable = MountTable.newInstance(stalePath, + Collections.singletonMap("ns0", "/testdir9")); + mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota) + .spaceQuota(ssQuota).build()); + addMountTable(mountTable); + + // Call periodicInvoke to ensure quota for stalePath was + // loaded into quota manager. + RouterQuotaUpdateService updateService = routerContext.getRouter() + .getQuotaCacheUpdateService(); + updateService.periodicInvoke(); + + // use quota manager to get its quota usage and do verification + RouterQuotaManager quotaManager = routerContext.getRouter() + .getQuotaManager(); + RouterQuotaUsage quota = quotaManager.getQuotaUsage(stalePath); + assertEquals(nsQuota, quota.getQuota()); + assertEquals(ssQuota, quota.getSpaceQuota()); + + // remove stale path entry + removeMountTable(stalePath); + updateService.periodicInvoke(); + // the stale entry should be removed and we will get null + quota = quotaManager.getQuotaUsage(stalePath); + assertNull(quota); + } + + /** + * Remove a mount table entry to the mount table through the admin API. + * @param entry Mount table entry to remove. + * @return If it was successfully removed. + * @throws IOException Problems removing entries. + */ + private boolean removeMountTable(String path) throws IOException { + RouterClient client = routerContext.getAdminClient(); + MountTableManager mountTableManager = client.getMountTableManager(); + RemoveMountTableEntryRequest removeRequest = RemoveMountTableEntryRequest + .newInstance(path); + RemoveMountTableEntryResponse removeResponse = mountTableManager + .removeMountTableEntry(removeRequest); + + // Reload the Router cache + resolver.loadCache(true); + return removeResponse.getStatus(); + } + + @Test + public void testQuotaUpdating() throws Exception { + long nsQuota = 30; + long ssQuota = 1024; + String path = "/updatequota"; + final FileSystem nnFs1 = nnContext1.getFileSystem(); + + // Add one mount table: + // /updatequota --> ns0---/testdir10 + nnFs1.mkdirs(new Path("/testdir10")); + MountTable mountTable = MountTable.newInstance(path, + Collections.singletonMap("ns0", "/testdir10")); + mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota) + .spaceQuota(ssQuota).build()); + addMountTable(mountTable); + + // Call periodicInvoke to ensure quota updated in quota manager + // and state store. + RouterQuotaUpdateService updateService = routerContext.getRouter() + .getQuotaCacheUpdateService(); + updateService.periodicInvoke(); + + // verify initial quota value + List results = getMountTable(path); + MountTable updatedMountTable = !results.isEmpty() ? results.get(0) : null; + RouterQuotaUsage quota = updatedMountTable.getQuota(); + assertEquals(nsQuota, quota.getQuota()); + assertEquals(ssQuota, quota.getSpaceQuota()); + assertEquals(1, quota.getFileAndDirectoryCount()); + assertEquals(0, quota.getSpaceConsumed()); + + // mkdir and write a new file + final FileSystem routerFs = routerContext.getFileSystem(); + routerFs.mkdirs(new Path(path + UUID.randomUUID())); + DFSClient routerClient = routerContext.getClient(); + routerClient.create(path + "/file", true).close(); + appendData(path + "/file", routerClient, BLOCK_SIZE); + + updateService.periodicInvoke(); + results = getMountTable(path); + updatedMountTable = !results.isEmpty() ? results.get(0) : null; + quota = updatedMountTable.getQuota(); + + // verify if quota has been updated in state store + assertEquals(nsQuota, quota.getQuota()); + assertEquals(ssQuota, quota.getSpaceQuota()); + assertEquals(3, quota.getFileAndDirectoryCount()); + assertEquals(BLOCK_SIZE, quota.getSpaceConsumed()); + } + + /** + * Get the mount table entries of specified path through the admin API. + * @param path Mount table entry to get. + * @return If it was successfully got. + * @throws IOException Problems getting entries. + */ + private List getMountTable(String path) throws IOException { + // Reload the Router cache + resolver.loadCache(true); + RouterClient client = routerContext.getAdminClient(); + MountTableManager mountTableManager = client.getMountTableManager(); + GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest + .newInstance(path); + GetMountTableEntriesResponse removeResponse = mountTableManager + .getMountTableEntries(getRequest); + + return removeResponse.getEntries(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuotaManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuotaManager.java new file mode 100644 index 00000000000..346c881477c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuotaManager.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.Set; + +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests for class {@link RouterQuotaManager}. + */ +public class TestRouterQuotaManager { + private static RouterQuotaManager manager; + + @Before + public void setup() { + manager = new RouterQuotaManager(); + } + + @After + public void cleanup() { + manager.clear(); + } + + @Test + public void testGetChildrenPaths() { + RouterQuotaUsage quotaUsage = new RouterQuotaUsage.Builder().build(); + manager.put("/path1", quotaUsage); + manager.put("/path2", quotaUsage); + manager.put("/path1/subdir", quotaUsage); + manager.put("/path1/subdir/subdir", quotaUsage); + + Set childrenPaths = manager.getPaths("/path1"); + assertEquals(3, childrenPaths.size()); + assertTrue(childrenPaths.contains("/path1/subdir") + && childrenPaths.contains("/path1/subdir/subdir") + && childrenPaths.contains("/path1")); + } + + @Test + public void testGetQuotaUsage() { + RouterQuotaUsage quotaGet; + + // test case1: get quota with an non-exist path + quotaGet = manager.getQuotaUsage("/non-exist-path"); + assertNull(quotaGet); + + // test case2: get quota from an no-quota set path + RouterQuotaUsage.Builder quota = new RouterQuotaUsage.Builder() + .quota(HdfsConstants.QUOTA_DONT_SET) + .spaceQuota(HdfsConstants.QUOTA_DONT_SET); + manager.put("/noQuotaSet", quota.build()); + quotaGet = manager.getQuotaUsage("/noQuotaSet"); + // it should return null + assertNull(quotaGet); + + // test case3: get quota from an quota-set path + quota.quota(1); + quota.spaceQuota(HdfsConstants.QUOTA_DONT_SET); + manager.put("/hasQuotaSet", quota.build()); + quotaGet = manager.getQuotaUsage("/hasQuotaSet"); + assertEquals(1, quotaGet.getQuota()); + assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaGet.getSpaceQuota()); + + // test case4: get quota with an non-exist child path + quotaGet = manager.getQuotaUsage("/hasQuotaSet/file"); + // it will return the nearest ancestor which quota was set + assertEquals(1, quotaGet.getQuota()); + assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaGet.getSpaceQuota()); + + // test case5: get quota with an child path which its parent + // wasn't quota set + quota.quota(HdfsConstants.QUOTA_DONT_SET); + quota.spaceQuota(HdfsConstants.QUOTA_DONT_SET); + manager.put("/hasQuotaSet/noQuotaSet", quota.build()); + // here should returns the quota of path /hasQuotaSet + // (the nearest ancestor which quota was set) + quotaGet = manager.getQuotaUsage("/hasQuotaSet/noQuotaSet/file"); + assertEquals(1, quotaGet.getQuota()); + assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaGet.getSpaceQuota()); + + // test case6: get quota with an child path which its parent was quota set + quota.quota(2); + quota.spaceQuota(HdfsConstants.QUOTA_DONT_SET); + manager.put("/hasQuotaSet/hasQuotaSet", quota.build()); + // here should return the quota of path /hasQuotaSet/hasQuotaSet + quotaGet = manager.getQuotaUsage("/hasQuotaSet/hasQuotaSet/file"); + assertEquals(2, quotaGet.getQuota()); + assertEquals(HdfsConstants.QUOTA_DONT_SET, quotaGet.getSpaceQuota()); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMountTable.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMountTable.java index 739d2e482b2..d306f773ae3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMountTable.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMountTable.java @@ -27,8 +27,10 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; +import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; import org.junit.Test; @@ -56,6 +58,14 @@ public class TestMountTable { private static final long DATE_CREATED = 100; private static final long DATE_MOD = 200; + private static final long NS_COUNT = 1; + private static final long NS_QUOTA = 5; + private static final long SS_COUNT = 10; + private static final long SS_QUOTA = 100; + + private static final RouterQuotaUsage QUOTA = new RouterQuotaUsage.Builder() + .fileAndDirectoryCount(NS_COUNT).quota(NS_QUOTA).spaceConsumed(SS_COUNT) + .spaceQuota(SS_QUOTA).build(); @Test public void testGetterSetter() throws IOException { @@ -68,6 +78,12 @@ public class TestMountTable { assertTrue(DATE_CREATED > 0); assertTrue(DATE_MOD > 0); + RouterQuotaUsage quota = record.getQuota(); + assertEquals(0, quota.getFileAndDirectoryCount()); + assertEquals(HdfsConstants.QUOTA_DONT_SET, quota.getQuota()); + assertEquals(0, quota.getSpaceConsumed()); + assertEquals(HdfsConstants.QUOTA_DONT_SET, quota.getSpaceQuota()); + MountTable record2 = MountTable.newInstance(SRC, DST_MAP, DATE_CREATED, DATE_MOD); @@ -94,6 +110,7 @@ public class TestMountTable { SRC, DST_MAP, DATE_CREATED, DATE_MOD); record.setReadOnly(true); record.setDestOrder(order); + record.setQuota(QUOTA); StateStoreSerializer serializer = StateStoreSerializer.getSerializer(); String serializedString = serializer.serializeString(record); @@ -107,6 +124,12 @@ public class TestMountTable { assertEquals(DATE_MOD, record2.getDateModified()); assertTrue(record2.isReadOnly()); assertEquals(order, record2.getDestOrder()); + + RouterQuotaUsage quotaGet = record2.getQuota(); + assertEquals(NS_COUNT, quotaGet.getFileAndDirectoryCount()); + assertEquals(NS_QUOTA, quotaGet.getQuota()); + assertEquals(SS_COUNT, quotaGet.getSpaceConsumed()); + assertEquals(SS_QUOTA, quotaGet.getSpaceQuota()); } @Test @@ -172,4 +195,22 @@ public class TestMountTable { assertEquals(DST_NS_1, location2.getNameserviceId()); assertEquals(DST_PATH_1, location2.getDest()); } + + @Test + public void testQuota() throws IOException { + MountTable record = MountTable.newInstance(SRC, DST_MAP); + record.setQuota(QUOTA); + + validateDestinations(record); + assertEquals(SRC, record.getSourcePath()); + assertEquals(DST, record.getDestinations()); + assertTrue(DATE_CREATED > 0); + assertTrue(DATE_MOD > 0); + + RouterQuotaUsage quotaGet = record.getQuota(); + assertEquals(NS_COUNT, quotaGet.getFileAndDirectoryCount()); + assertEquals(NS_QUOTA, quotaGet.getQuota()); + assertEquals(SS_COUNT, quotaGet.getSpaceConsumed()); + assertEquals(SS_QUOTA, quotaGet.getSpaceQuota()); + } } \ No newline at end of file