HDFS-14833. RBF: Router Update Doesn't Sync Quota. Contributed by Ayush Saxena.
This commit is contained in:
parent
d7d6ec8fae
commit
aa938662f9
|
@ -269,16 +269,21 @@ public class RouterAdminServer extends AbstractService
|
||||||
@Override
|
@Override
|
||||||
public UpdateMountTableEntryResponse updateMountTableEntry(
|
public UpdateMountTableEntryResponse updateMountTableEntry(
|
||||||
UpdateMountTableEntryRequest request) throws IOException {
|
UpdateMountTableEntryRequest request) throws IOException {
|
||||||
|
MountTable updateEntry = request.getEntry();
|
||||||
|
MountTable oldEntry = null;
|
||||||
|
if (this.router.getSubclusterResolver() instanceof MountTableResolver) {
|
||||||
|
MountTableResolver mResolver =
|
||||||
|
(MountTableResolver) this.router.getSubclusterResolver();
|
||||||
|
oldEntry = mResolver.getMountPoint(updateEntry.getSourcePath());
|
||||||
|
}
|
||||||
UpdateMountTableEntryResponse response = getMountTableStore()
|
UpdateMountTableEntryResponse response = getMountTableStore()
|
||||||
.updateMountTableEntry(request);
|
.updateMountTableEntry(request);
|
||||||
try {
|
try {
|
||||||
MountTable mountTable = request.getEntry();
|
if (updateEntry != null && router.isQuotaEnabled()
|
||||||
if (mountTable != null && router.isQuotaEnabled()
|
&& isQuotaUpdated(request, oldEntry)) {
|
||||||
&& isQuotaUpdated(request, mountTable)) {
|
synchronizeQuota(updateEntry.getSourcePath(),
|
||||||
synchronizeQuota(mountTable.getSourcePath(),
|
updateEntry.getQuota().getQuota(),
|
||||||
mountTable.getQuota().getQuota(),
|
updateEntry.getQuota().getSpaceQuota());
|
||||||
mountTable.getQuota().getSpaceQuota());
|
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// Ignore exception, if any while reseting quota. Specifically to handle
|
// Ignore exception, if any while reseting quota. Specifically to handle
|
||||||
|
@ -289,29 +294,41 @@ public class RouterAdminServer extends AbstractService
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks whether quota needs to be synchronized with namespace or not. Quota
|
||||||
|
* needs to be synchronized either if there is change in mount entry quota or
|
||||||
|
* there is change in remote destinations.
|
||||||
|
* @param request the update request.
|
||||||
|
* @param oldEntry the mount entry before getting updated.
|
||||||
|
* @return true if quota needs to be updated.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
private boolean isQuotaUpdated(UpdateMountTableEntryRequest request,
|
private boolean isQuotaUpdated(UpdateMountTableEntryRequest request,
|
||||||
MountTable mountTable) throws IOException {
|
MountTable oldEntry) throws IOException {
|
||||||
long nsQuota = -1;
|
if (oldEntry != null) {
|
||||||
long ssQuota = -1;
|
MountTable updateEntry = request.getEntry();
|
||||||
|
// If locations are changed, the new destinations need to be in sync with
|
||||||
String path = request.getEntry().getSourcePath();
|
// the mount quota.
|
||||||
if (this.router.getSubclusterResolver() instanceof MountTableResolver) {
|
if (!oldEntry.getDestinations().equals(updateEntry.getDestinations())) {
|
||||||
MountTableResolver mResolver = (MountTableResolver) this.router
|
return true;
|
||||||
.getSubclusterResolver();
|
|
||||||
MountTable entry = mResolver.getMountPoint(path);
|
|
||||||
if (entry != null) {
|
|
||||||
RouterQuotaUsage preQuota = entry.getQuota();
|
|
||||||
nsQuota = preQuota.getQuota();
|
|
||||||
ssQuota = preQuota.getSpaceQuota();
|
|
||||||
}
|
}
|
||||||
}
|
// Previous quota.
|
||||||
RouterQuotaUsage mountQuota = mountTable.getQuota();
|
RouterQuotaUsage preQuota = oldEntry.getQuota();
|
||||||
|
long nsQuota = preQuota.getQuota();
|
||||||
|
long ssQuota = preQuota.getSpaceQuota();
|
||||||
|
// New quota
|
||||||
|
RouterQuotaUsage mountQuota = updateEntry.getQuota();
|
||||||
|
// If there is change in quota, the new quota needs to be synchronized.
|
||||||
if (nsQuota != mountQuota.getQuota()
|
if (nsQuota != mountQuota.getQuota()
|
||||||
|| ssQuota != mountQuota.getSpaceQuota()) {
|
|| ssQuota != mountQuota.getSpaceQuota()) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
|
} else {
|
||||||
|
// If old entry is not available, sync quota always, since we can't
|
||||||
|
// conclude no change in quota.
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -323,15 +340,30 @@ public class RouterAdminServer extends AbstractService
|
||||||
*/
|
*/
|
||||||
private void synchronizeQuota(String path, long nsQuota, long ssQuota)
|
private void synchronizeQuota(String path, long nsQuota, long ssQuota)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (router.isQuotaEnabled() &&
|
if (isQuotaSyncRequired(nsQuota, ssQuota)) {
|
||||||
(nsQuota != HdfsConstants.QUOTA_DONT_SET
|
if (iStateStoreCache) {
|
||||||
|
((StateStoreCache) this.router.getSubclusterResolver()).loadCache(true);
|
||||||
|
}
|
||||||
|
Quota routerQuota = this.router.getRpcServer().getQuotaModule();
|
||||||
|
routerQuota.setQuota(path, nsQuota, ssQuota, null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks if quota needs to be synchronized or not.
|
||||||
|
* @param nsQuota namespace quota to be set.
|
||||||
|
* @param ssQuota space quota to be set.
|
||||||
|
* @return true if the quota needs to be synchronized.
|
||||||
|
*/
|
||||||
|
private boolean isQuotaSyncRequired(long nsQuota, long ssQuota) {
|
||||||
|
// Check if quota is enabled for router or not.
|
||||||
|
if (router.isQuotaEnabled()) {
|
||||||
|
if ((nsQuota != HdfsConstants.QUOTA_DONT_SET
|
||||||
|| ssQuota != HdfsConstants.QUOTA_DONT_SET)) {
|
|| ssQuota != HdfsConstants.QUOTA_DONT_SET)) {
|
||||||
HdfsFileStatus ret = this.router.getRpcServer().getFileInfo(path);
|
return true;
|
||||||
if (ret != null) {
|
|
||||||
this.router.getRpcServer().getQuotaModule().setQuota(path, nsQuota,
|
|
||||||
ssQuota, null);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||||
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
|
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
|
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
|
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
|
||||||
|
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
|
||||||
|
@ -474,6 +475,14 @@ public class TestRouterQuota {
|
||||||
assertEquals(ssQuota, quota.getSpaceQuota());
|
assertEquals(ssQuota, quota.getSpaceQuota());
|
||||||
assertEquals(3, quota.getFileAndDirectoryCount());
|
assertEquals(3, quota.getFileAndDirectoryCount());
|
||||||
assertEquals(BLOCK_SIZE, quota.getSpaceConsumed());
|
assertEquals(BLOCK_SIZE, quota.getSpaceConsumed());
|
||||||
|
|
||||||
|
// verify quota sync on adding new destination to mount entry.
|
||||||
|
updatedMountTable = getMountTable(path);
|
||||||
|
nnFs1.mkdirs(new Path("/newPath"));
|
||||||
|
updatedMountTable.setDestinations(
|
||||||
|
Collections.singletonList(new RemoteLocation("ns0", "/newPath", path)));
|
||||||
|
updateMountTable(updatedMountTable);
|
||||||
|
assertEquals(nsQuota, nnFs1.getQuotaUsage(new Path("/newPath")).getQuota());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue