HDFS-14814. RBF: RouterQuotaUpdateService supports inherited rule. Contributed by Jinglun.
This commit is contained in:
parent
4fdf016358
commit
761594549e
|
@ -23,6 +23,8 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.TreeMap;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.fs.QuotaUsage;
|
||||
|
@ -70,13 +72,30 @@ public class Quota {
|
|||
*/
|
||||
public void setQuota(String path, long namespaceQuota,
|
||||
long storagespaceQuota, StorageType type) throws IOException {
|
||||
setQuotaInternal(path, null, namespaceQuota, storagespaceQuota, type);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set quota for the federation path.
|
||||
* @param path Federation path.
|
||||
* @param locations Locations of the Federation path.
|
||||
* @param namespaceQuota Name space quota.
|
||||
* @param storagespaceQuota Storage space quota.
|
||||
* @param type StorageType that the space quota is intended to be set on.
|
||||
* @throws IOException If the quota system is disabled.
|
||||
*/
|
||||
void setQuotaInternal(String path, List<RemoteLocation> locations,
|
||||
long namespaceQuota, long storagespaceQuota, StorageType type)
|
||||
throws IOException {
|
||||
rpcServer.checkOperation(OperationCategory.WRITE);
|
||||
if (!router.isQuotaEnabled()) {
|
||||
throw new IOException("The quota system is disabled in Router.");
|
||||
}
|
||||
|
||||
// Set quota for current path and its children mount table path.
|
||||
final List<RemoteLocation> locations = getQuotaRemoteLocations(path);
|
||||
if (locations == null) {
|
||||
locations = getQuotaRemoteLocations(path);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
for (RemoteLocation loc : locations) {
|
||||
LOG.debug("Set quota for path: nsId: {}, dest: {}.",
|
||||
|
@ -92,12 +111,23 @@ public class Quota {
|
|||
}
|
||||
|
||||
/**
|
||||
* Get quota usage for the federation path.
|
||||
* Get aggregated quota usage for the federation path.
|
||||
* @param path Federation path.
|
||||
* @return Aggregated quota.
|
||||
* @throws IOException If the quota system is disabled.
|
||||
*/
|
||||
public QuotaUsage getQuotaUsage(String path) throws IOException {
|
||||
return aggregateQuota(getEachQuotaUsage(path));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get quota usage for the federation path.
|
||||
* @param path Federation path.
|
||||
* @return quota usage for each remote location.
|
||||
* @throws IOException If the quota system is disabled.
|
||||
*/
|
||||
Map<RemoteLocation, QuotaUsage> getEachQuotaUsage(String path)
|
||||
throws IOException {
|
||||
rpcServer.checkOperation(OperationCategory.READ);
|
||||
if (!router.isQuotaEnabled()) {
|
||||
throw new IOException("The quota system is disabled in Router.");
|
||||
|
@ -109,7 +139,39 @@ public class Quota {
|
|||
Map<RemoteLocation, QuotaUsage> results = rpcClient.invokeConcurrent(
|
||||
quotaLocs, method, true, false, QuotaUsage.class);
|
||||
|
||||
return aggregateQuota(results);
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get global quota for the federation path.
|
||||
* @param path Federation path.
|
||||
* @return global quota for path.
|
||||
* @throws IOException If the quota system is disabled.
|
||||
*/
|
||||
QuotaUsage getGlobalQuota(String path) throws IOException {
|
||||
if (!router.isQuotaEnabled()) {
|
||||
throw new IOException("The quota system is disabled in Router.");
|
||||
}
|
||||
|
||||
long nQuota = HdfsConstants.QUOTA_RESET;
|
||||
long sQuota = HdfsConstants.QUOTA_RESET;
|
||||
RouterQuotaManager manager = this.router.getQuotaManager();
|
||||
TreeMap<String, RouterQuotaUsage> pts =
|
||||
manager.getParentsContainingQuota(path);
|
||||
Entry<String, RouterQuotaUsage> entry = pts.lastEntry();
|
||||
while (entry != null && (nQuota == HdfsConstants.QUOTA_RESET
|
||||
|| sQuota == HdfsConstants.QUOTA_RESET)) {
|
||||
String ppath = entry.getKey();
|
||||
QuotaUsage quota = entry.getValue();
|
||||
if (nQuota == HdfsConstants.QUOTA_RESET) {
|
||||
nQuota = quota.getQuota();
|
||||
}
|
||||
if (sQuota == HdfsConstants.QUOTA_RESET) {
|
||||
sQuota = quota.getSpaceQuota();
|
||||
}
|
||||
entry = pts.lowerEntry(ppath);
|
||||
}
|
||||
return new QuotaUsage.Builder().quota(nQuota).spaceQuota(sQuota).build();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -157,7 +219,7 @@ public class Quota {
|
|||
* @param results Quota query result.
|
||||
* @return Aggregated Quota.
|
||||
*/
|
||||
private QuotaUsage aggregateQuota(Map<RemoteLocation, QuotaUsage> results) {
|
||||
QuotaUsage aggregateQuota(Map<RemoteLocation, QuotaUsage> results) {
|
||||
long nsCount = 0;
|
||||
long ssCount = 0;
|
||||
long nsQuota = HdfsConstants.QUOTA_RESET;
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.federation.router;
|
|||
import static org.apache.hadoop.hdfs.server.federation.router.FederationUtil.isParentEntry;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
|
@ -113,6 +114,32 @@ public class RouterQuotaManager {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get parent paths (including itself) and quotas of the specified federation
|
||||
* path. Only parents containing quota are returned.
|
||||
* @param childPath Federated path.
|
||||
* @return TreeMap of parent paths and quotas.
|
||||
*/
|
||||
TreeMap<String, RouterQuotaUsage> getParentsContainingQuota(
|
||||
String childPath) {
|
||||
TreeMap<String, RouterQuotaUsage> res = new TreeMap<>();
|
||||
readLock.lock();
|
||||
try {
|
||||
Entry<String, RouterQuotaUsage> entry = this.cache.floorEntry(childPath);
|
||||
while (entry != null) {
|
||||
String mountPath = entry.getKey();
|
||||
RouterQuotaUsage quota = entry.getValue();
|
||||
if (isQuotaSet(quota) && isParentEntry(childPath, mountPath)) {
|
||||
res.put(mountPath, quota);
|
||||
}
|
||||
entry = this.cache.lowerEntry(mountPath);
|
||||
}
|
||||
return res;
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Put new entity into cache.
|
||||
* @param path Mount table path.
|
||||
|
|
|
@ -18,16 +18,20 @@
|
|||
package org.apache.hadoop.hdfs.server.federation.router;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.QuotaUsage;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
|
||||
|
@ -79,6 +83,7 @@ public class RouterQuotaUpdateService extends PeriodicService {
|
|||
try {
|
||||
List<MountTable> updateMountTables = new LinkedList<>();
|
||||
List<MountTable> mountTables = getQuotaSetMountTables();
|
||||
Map<RemoteLocation, QuotaUsage> remoteQuotaUsage = new HashMap<>();
|
||||
for (MountTable entry : mountTables) {
|
||||
String src = entry.getSourcePath();
|
||||
RouterQuotaUsage oldQuota = entry.getQuota();
|
||||
|
@ -102,25 +107,17 @@ public class RouterQuotaUpdateService extends PeriodicService {
|
|||
// Call RouterRpcServer#getQuotaUsage for getting current quota usage.
|
||||
// If any exception occurs catch it and proceed with other entries.
|
||||
try {
|
||||
currentQuotaUsage = this.rpcServer.getQuotaModule()
|
||||
.getQuotaUsage(src);
|
||||
Quota quotaModule = this.rpcServer.getQuotaModule();
|
||||
Map<RemoteLocation, QuotaUsage> usageMap =
|
||||
quotaModule.getEachQuotaUsage(src);
|
||||
currentQuotaUsage = quotaModule.aggregateQuota(usageMap);
|
||||
remoteQuotaUsage.putAll(usageMap);
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Unable to get quota usage for " + src, ioe);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// If quota is not set in some subclusters under federation path,
|
||||
// set quota for this path.
|
||||
if (currentQuotaUsage.getQuota() == HdfsConstants.QUOTA_RESET) {
|
||||
try {
|
||||
this.rpcServer.setQuota(src, nsQuota, ssQuota, null);
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Unable to set quota at remote location for "
|
||||
+ src, ioe);
|
||||
}
|
||||
}
|
||||
|
||||
RouterQuotaUsage newQuota = generateNewQuota(oldQuota,
|
||||
currentQuotaUsage);
|
||||
this.quotaManager.put(src, newQuota);
|
||||
|
@ -139,12 +136,36 @@ public class RouterQuotaUpdateService extends PeriodicService {
|
|||
}
|
||||
}
|
||||
|
||||
// Fix inconsistent quota.
|
||||
for (Entry<RemoteLocation, QuotaUsage> en : remoteQuotaUsage
|
||||
.entrySet()) {
|
||||
RemoteLocation remoteLocation = en.getKey();
|
||||
QuotaUsage currentQuota = en.getValue();
|
||||
fixGlobalQuota(remoteLocation, currentQuota);
|
||||
}
|
||||
|
||||
updateMountTableEntries(updateMountTables);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Quota cache updated error.", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void fixGlobalQuota(RemoteLocation location, QuotaUsage remoteQuota)
|
||||
throws IOException {
|
||||
QuotaUsage gQuota =
|
||||
this.rpcServer.getQuotaModule().getGlobalQuota(location.getSrc());
|
||||
if (remoteQuota.getQuota() != gQuota.getQuota()
|
||||
|| remoteQuota.getSpaceQuota() != gQuota.getSpaceQuota()) {
|
||||
this.rpcServer.getQuotaModule()
|
||||
.setQuotaInternal(location.getSrc(), Arrays.asList(location),
|
||||
gQuota.getQuota(), gQuota.getSpaceQuota(), null);
|
||||
LOG.info("[Fix Quota] src={} dst={} oldQuota={}/{} newQuota={}/{}",
|
||||
location.getSrc(), location, remoteQuota.getQuota(),
|
||||
remoteQuota.getSpaceQuota(), gQuota.getQuota(),
|
||||
gQuota.getSpaceQuota());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get mount table store management interface.
|
||||
* @return MountTableStore instance.
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
|||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
@ -92,4 +93,13 @@ public class TestDisableRouterQuota {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetGlobalQuota() throws Exception {
|
||||
LambdaTestUtils.intercept(IOException.class,
|
||||
"The quota system is disabled in Router.",
|
||||
"The getGlobalQuota call should fail.", () -> {
|
||||
Quota quotaModule = router.getRpcServer().getQuotaModule();
|
||||
quotaModule.getGlobalQuota("/test");
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -868,4 +868,92 @@ public class TestRouterQuota {
|
|||
routerFs.listStatus(path);
|
||||
routerFs.getContentSummary(path);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetGlobalQuota() throws Exception {
|
||||
long nsQuota = 5;
|
||||
long ssQuota = 3 * BLOCK_SIZE;
|
||||
prepareGlobalQuotaTestMountTable(nsQuota, ssQuota);
|
||||
|
||||
Quota qModule = routerContext.getRouter().getRpcServer().getQuotaModule();
|
||||
QuotaUsage qu = qModule.getGlobalQuota("/dir-1");
|
||||
assertEquals(nsQuota, qu.getQuota());
|
||||
assertEquals(ssQuota, qu.getSpaceQuota());
|
||||
qu = qModule.getGlobalQuota("/dir-1/dir-2");
|
||||
assertEquals(nsQuota, qu.getQuota());
|
||||
assertEquals(ssQuota * 2, qu.getSpaceQuota());
|
||||
qu = qModule.getGlobalQuota("/dir-1/dir-2/dir-3");
|
||||
assertEquals(nsQuota, qu.getQuota());
|
||||
assertEquals(ssQuota * 2, qu.getSpaceQuota());
|
||||
qu = qModule.getGlobalQuota("/dir-4");
|
||||
assertEquals(-1, qu.getQuota());
|
||||
assertEquals(-1, qu.getSpaceQuota());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFixGlobalQuota() throws Exception {
|
||||
long nsQuota = 5;
|
||||
long ssQuota = 3 * BLOCK_SIZE;
|
||||
final FileSystem nnFs = nnContext1.getFileSystem();
|
||||
prepareGlobalQuotaTestMountTable(nsQuota, ssQuota);
|
||||
|
||||
QuotaUsage qu = nnFs.getQuotaUsage(new Path("/dir-1"));
|
||||
assertEquals(nsQuota, qu.getQuota());
|
||||
assertEquals(ssQuota, qu.getSpaceQuota());
|
||||
qu = nnFs.getQuotaUsage(new Path("/dir-2"));
|
||||
assertEquals(nsQuota, qu.getQuota());
|
||||
assertEquals(ssQuota * 2, qu.getSpaceQuota());
|
||||
qu = nnFs.getQuotaUsage(new Path("/dir-3"));
|
||||
assertEquals(nsQuota, qu.getQuota());
|
||||
assertEquals(ssQuota * 2, qu.getSpaceQuota());
|
||||
qu = nnFs.getQuotaUsage(new Path("/dir-4"));
|
||||
assertEquals(-1, qu.getQuota());
|
||||
assertEquals(-1, qu.getSpaceQuota());
|
||||
}
|
||||
|
||||
/**
|
||||
* Add three mount tables.
|
||||
* /dir-1 --> ns0---/dir-1 [nsQuota, ssQuota]
|
||||
* /dir-1/dir-2 --> ns0---/dir-2 [QUOTA_UNSET, ssQuota * 2]
|
||||
* /dir-1/dir-2/dir-3 --> ns0---/dir-3 [QUOTA_UNSET, QUOTA_UNSET]
|
||||
* /dir-4 --> ns0---/dir-4 [QUOTA_UNSET, QUOTA_UNSET]
|
||||
*
|
||||
* Expect three remote locations' global quota.
|
||||
* ns0---/dir-1 --> [nsQuota, ssQuota]
|
||||
* ns0---/dir-2 --> [nsQuota, ssQuota * 2]
|
||||
* ns0---/dir-3 --> [nsQuota, ssQuota * 2]
|
||||
* ns0---/dir-4 --> [-1, -1]
|
||||
*/
|
||||
private void prepareGlobalQuotaTestMountTable(long nsQuota, long ssQuota)
|
||||
throws IOException {
|
||||
final FileSystem nnFs = nnContext1.getFileSystem();
|
||||
|
||||
// Create destination directory
|
||||
nnFs.mkdirs(new Path("/dir-1"));
|
||||
nnFs.mkdirs(new Path("/dir-2"));
|
||||
nnFs.mkdirs(new Path("/dir-3"));
|
||||
nnFs.mkdirs(new Path("/dir-4"));
|
||||
|
||||
MountTable mountTable = MountTable.newInstance("/dir-1",
|
||||
Collections.singletonMap("ns0", "/dir-1"));
|
||||
mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
|
||||
.spaceQuota(ssQuota).build());
|
||||
addMountTable(mountTable);
|
||||
mountTable = MountTable.newInstance("/dir-1/dir-2",
|
||||
Collections.singletonMap("ns0", "/dir-2"));
|
||||
mountTable.setQuota(new RouterQuotaUsage.Builder().spaceQuota(ssQuota * 2)
|
||||
.build());
|
||||
addMountTable(mountTable);
|
||||
mountTable = MountTable.newInstance("/dir-1/dir-2/dir-3",
|
||||
Collections.singletonMap("ns0", "/dir-3"));
|
||||
addMountTable(mountTable);
|
||||
mountTable = MountTable.newInstance("/dir-4",
|
||||
Collections.singletonMap("ns0", "/dir-4"));
|
||||
addMountTable(mountTable);
|
||||
|
||||
// Ensure setQuota RPC was invoked and mount table was updated.
|
||||
RouterQuotaUpdateService updateService = routerContext.getRouter()
|
||||
.getQuotaCacheUpdateService();
|
||||
updateService.periodicInvoke();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue