HDFS-13528. RBF: If a directory exceeds quota limit then quota usage is not refreshed for other mount entries. Contributed by Dibyendu Karmakar.
This commit is contained in:
parent
7ca4f0cefa
commit
3b637155a4
|
@ -199,7 +199,7 @@ public class Quota {
|
||||||
if (manager != null) {
|
if (manager != null) {
|
||||||
Set<String> childrenPaths = manager.getPaths(path);
|
Set<String> childrenPaths = manager.getPaths(path);
|
||||||
for (String childPath : childrenPaths) {
|
for (String childPath : childrenPaths) {
|
||||||
locations.addAll(rpcServer.getLocationsForPath(childPath, true));
|
locations.addAll(rpcServer.getLocationsForPath(childPath, true, false));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.QuotaUsage;
|
import org.apache.hadoop.fs.QuotaUsage;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
|
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
|
||||||
|
@ -83,13 +84,40 @@ public class RouterQuotaUpdateService extends PeriodicService {
|
||||||
RouterQuotaUsage oldQuota = entry.getQuota();
|
RouterQuotaUsage oldQuota = entry.getQuota();
|
||||||
long nsQuota = oldQuota.getQuota();
|
long nsQuota = oldQuota.getQuota();
|
||||||
long ssQuota = oldQuota.getSpaceQuota();
|
long ssQuota = oldQuota.getSpaceQuota();
|
||||||
// Call RouterRpcServer#getQuotaUsage for getting current quota usage.
|
|
||||||
QuotaUsage currentQuotaUsage = this.rpcServer.getQuotaModule()
|
QuotaUsage currentQuotaUsage = null;
|
||||||
.getQuotaUsage(src);
|
|
||||||
|
// Check whether destination path exists in filesystem. If destination
|
||||||
|
// is not present, reset the usage. For other mount entry get current
|
||||||
|
// quota usage
|
||||||
|
HdfsFileStatus ret = this.rpcServer.getFileInfo(src);
|
||||||
|
if (ret == null) {
|
||||||
|
currentQuotaUsage = new RouterQuotaUsage.Builder()
|
||||||
|
.fileAndDirectoryCount(0)
|
||||||
|
.quota(nsQuota)
|
||||||
|
.spaceConsumed(0)
|
||||||
|
.spaceQuota(ssQuota).build();
|
||||||
|
} else {
|
||||||
|
// Call RouterRpcServer#getQuotaUsage for getting current quota usage.
|
||||||
|
// If any exception occurs catch it and proceed with other entries.
|
||||||
|
try {
|
||||||
|
currentQuotaUsage = this.rpcServer.getQuotaModule()
|
||||||
|
.getQuotaUsage(src);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.error("Unable to get quota usage for " + src, ioe);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// If quota is not set in some subclusters under federation path,
|
// If quota is not set in some subclusters under federation path,
|
||||||
// set quota for this path.
|
// set quota for this path.
|
||||||
if (currentQuotaUsage.getQuota() == HdfsConstants.QUOTA_DONT_SET) {
|
if (currentQuotaUsage.getQuota() == HdfsConstants.QUOTA_DONT_SET) {
|
||||||
this.rpcServer.setQuota(src, nsQuota, ssQuota, null);
|
try {
|
||||||
|
this.rpcServer.setQuota(src, nsQuota, ssQuota, null);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.error("Unable to set quota at remote location for "
|
||||||
|
+ src, ioe);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
RouterQuotaUsage newQuota = generateNewQuota(oldQuota,
|
RouterQuotaUsage newQuota = generateNewQuota(oldQuota,
|
||||||
|
@ -221,7 +249,12 @@ public class RouterQuotaUpdateService extends PeriodicService {
|
||||||
for (MountTable entry : updateMountTables) {
|
for (MountTable entry : updateMountTables) {
|
||||||
UpdateMountTableEntryRequest updateRequest = UpdateMountTableEntryRequest
|
UpdateMountTableEntryRequest updateRequest = UpdateMountTableEntryRequest
|
||||||
.newInstance(entry);
|
.newInstance(entry);
|
||||||
getMountTableStore().updateMountTableEntry(updateRequest);
|
try {
|
||||||
|
getMountTableStore().updateMountTableEntry(updateRequest);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Quota update error for mount entry "
|
||||||
|
+ entry.getSourcePath(), e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.federation.router;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotEquals;
|
import static org.junit.Assert.assertNotEquals;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -410,8 +411,7 @@ public class TestRouterQuota {
|
||||||
updateService.periodicInvoke();
|
updateService.periodicInvoke();
|
||||||
|
|
||||||
// verify initial quota value
|
// verify initial quota value
|
||||||
List<MountTable> results = getMountTable(path);
|
MountTable updatedMountTable = getMountTable(path);
|
||||||
MountTable updatedMountTable = !results.isEmpty() ? results.get(0) : null;
|
|
||||||
RouterQuotaUsage quota = updatedMountTable.getQuota();
|
RouterQuotaUsage quota = updatedMountTable.getQuota();
|
||||||
assertEquals(nsQuota, quota.getQuota());
|
assertEquals(nsQuota, quota.getQuota());
|
||||||
assertEquals(ssQuota, quota.getSpaceQuota());
|
assertEquals(ssQuota, quota.getSpaceQuota());
|
||||||
|
@ -426,8 +426,7 @@ public class TestRouterQuota {
|
||||||
appendData(path + "/file", routerClient, BLOCK_SIZE);
|
appendData(path + "/file", routerClient, BLOCK_SIZE);
|
||||||
|
|
||||||
updateService.periodicInvoke();
|
updateService.periodicInvoke();
|
||||||
results = getMountTable(path);
|
updatedMountTable = getMountTable(path);
|
||||||
updatedMountTable = !results.isEmpty() ? results.get(0) : null;
|
|
||||||
quota = updatedMountTable.getQuota();
|
quota = updatedMountTable.getQuota();
|
||||||
|
|
||||||
// verify if quota has been updated in state store
|
// verify if quota has been updated in state store
|
||||||
|
@ -443,17 +442,18 @@ public class TestRouterQuota {
|
||||||
* @return If it was successfully got.
|
* @return If it was successfully got.
|
||||||
* @throws IOException Problems getting entries.
|
* @throws IOException Problems getting entries.
|
||||||
*/
|
*/
|
||||||
private List<MountTable> getMountTable(String path) throws IOException {
|
private MountTable getMountTable(String path) throws IOException {
|
||||||
// Reload the Router cache
|
// Reload the Router cache
|
||||||
resolver.loadCache(true);
|
resolver.loadCache(true);
|
||||||
RouterClient client = routerContext.getAdminClient();
|
RouterClient client = routerContext.getAdminClient();
|
||||||
MountTableManager mountTableManager = client.getMountTableManager();
|
MountTableManager mountTableManager = client.getMountTableManager();
|
||||||
GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest
|
GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest
|
||||||
.newInstance(path);
|
.newInstance(path);
|
||||||
GetMountTableEntriesResponse removeResponse = mountTableManager
|
GetMountTableEntriesResponse response = mountTableManager
|
||||||
.getMountTableEntries(getRequest);
|
.getMountTableEntries(getRequest);
|
||||||
|
List<MountTable> results = response.getEntries();
|
||||||
|
|
||||||
return removeResponse.getEntries();
|
return !results.isEmpty() ? results.get(0) : null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -493,4 +493,200 @@ public class TestRouterQuota {
|
||||||
assertEquals(updateNsQuota, realQuota.getQuota());
|
assertEquals(updateNsQuota, realQuota.getQuota());
|
||||||
assertEquals(updateSsQuota, realQuota.getSpaceQuota());
|
assertEquals(updateSsQuota, realQuota.getSpaceQuota());
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
@Test
|
||||||
|
public void testQuotaRefreshAfterQuotaExceed() throws Exception {
|
||||||
|
long nsQuota = 3;
|
||||||
|
long ssQuota = 100;
|
||||||
|
final FileSystem nnFs1 = nnContext1.getFileSystem();
|
||||||
|
final FileSystem nnFs2 = nnContext2.getFileSystem();
|
||||||
|
|
||||||
|
// Add two mount tables:
|
||||||
|
// /setquota1 --> ns0---testdir11
|
||||||
|
// /setquota2 --> ns1---testdir12
|
||||||
|
nnFs1.mkdirs(new Path("/testdir11"));
|
||||||
|
nnFs2.mkdirs(new Path("/testdir12"));
|
||||||
|
MountTable mountTable1 = MountTable.newInstance("/setquota1",
|
||||||
|
Collections.singletonMap("ns0", "/testdir11"));
|
||||||
|
mountTable1
|
||||||
|
.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
|
||||||
|
.spaceQuota(ssQuota).build());
|
||||||
|
addMountTable(mountTable1);
|
||||||
|
|
||||||
|
MountTable mountTable2 = MountTable.newInstance("/setquota2",
|
||||||
|
Collections.singletonMap("ns1", "/testdir12"));
|
||||||
|
mountTable2
|
||||||
|
.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
|
||||||
|
.spaceQuota(ssQuota).build());
|
||||||
|
addMountTable(mountTable2);
|
||||||
|
|
||||||
|
final FileSystem routerFs = routerContext.getFileSystem();
|
||||||
|
// Create directory to make directory count equals to nsQuota
|
||||||
|
routerFs.mkdirs(new Path("/setquota1/" + UUID.randomUUID()));
|
||||||
|
routerFs.mkdirs(new Path("/setquota1/" + UUID.randomUUID()));
|
||||||
|
|
||||||
|
// create one more directory to exceed the nsQuota
|
||||||
|
routerFs.mkdirs(new Path("/setquota1/" + UUID.randomUUID()));
|
||||||
|
|
||||||
|
RouterQuotaUpdateService updateService = routerContext.getRouter()
|
||||||
|
.getQuotaCacheUpdateService();
|
||||||
|
// Call RouterQuotaUpdateService#periodicInvoke to update quota cache
|
||||||
|
updateService.periodicInvoke();
|
||||||
|
// Reload the Router cache
|
||||||
|
resolver.loadCache(true);
|
||||||
|
|
||||||
|
RouterQuotaManager quotaManager =
|
||||||
|
routerContext.getRouter().getQuotaManager();
|
||||||
|
ClientProtocol client1 = nnContext1.getClient().getNamenode();
|
||||||
|
ClientProtocol client2 = nnContext2.getClient().getNamenode();
|
||||||
|
QuotaUsage quota1 = client1.getQuotaUsage("/testdir11");
|
||||||
|
QuotaUsage quota2 = client2.getQuotaUsage("/testdir12");
|
||||||
|
QuotaUsage cacheQuota1 = quotaManager.getQuotaUsage("/setquota1");
|
||||||
|
QuotaUsage cacheQuota2 = quotaManager.getQuotaUsage("/setquota2");
|
||||||
|
|
||||||
|
// Verify quota usage
|
||||||
|
assertEquals(4, quota1.getFileAndDirectoryCount());
|
||||||
|
assertEquals(4, cacheQuota1.getFileAndDirectoryCount());
|
||||||
|
assertEquals(1, quota2.getFileAndDirectoryCount());
|
||||||
|
assertEquals(1, cacheQuota2.getFileAndDirectoryCount());
|
||||||
|
|
||||||
|
try {
|
||||||
|
// create new directory to trigger NSQuotaExceededException
|
||||||
|
routerFs.mkdirs(new Path("/testdir11/" + UUID.randomUUID()));
|
||||||
|
fail("Mkdir should be failed under dir /testdir11.");
|
||||||
|
} catch (NSQuotaExceededException ignored) {
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create directory under the other mount point
|
||||||
|
routerFs.mkdirs(new Path("/setquota2/" + UUID.randomUUID()));
|
||||||
|
routerFs.mkdirs(new Path("/setquota2/" + UUID.randomUUID()));
|
||||||
|
|
||||||
|
// Call RouterQuotaUpdateService#periodicInvoke to update quota cache
|
||||||
|
updateService.periodicInvoke();
|
||||||
|
|
||||||
|
quota1 = client1.getQuotaUsage("/testdir11");
|
||||||
|
cacheQuota1 = quotaManager.getQuotaUsage("/setquota1");
|
||||||
|
quota2 = client2.getQuotaUsage("/testdir12");
|
||||||
|
cacheQuota2 = quotaManager.getQuotaUsage("/setquota2");
|
||||||
|
|
||||||
|
// Verify whether quota usage cache is update by periodicInvoke().
|
||||||
|
assertEquals(4, quota1.getFileAndDirectoryCount());
|
||||||
|
assertEquals(4, cacheQuota1.getFileAndDirectoryCount());
|
||||||
|
assertEquals(3, quota2.getFileAndDirectoryCount());
|
||||||
|
assertEquals(3, cacheQuota2.getFileAndDirectoryCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify whether mount table and quota usage cache is updated properly.
|
||||||
|
* {@link RouterQuotaUpdateService#periodicInvoke()} should be able to update
|
||||||
|
* the cache and the mount table even if the destination directory for some
|
||||||
|
* mount entry is not present in the filesystem.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testQuotaRefreshWhenDestinationNotPresent() throws Exception {
|
||||||
|
long nsQuota = 5;
|
||||||
|
long ssQuota = 3*BLOCK_SIZE;
|
||||||
|
final FileSystem nnFs = nnContext1.getFileSystem();
|
||||||
|
|
||||||
|
// Add three mount tables:
|
||||||
|
// /setdir1 --> ns0---testdir13
|
||||||
|
// /setdir2 --> ns0---testdir14
|
||||||
|
// Create destination directory
|
||||||
|
nnFs.mkdirs(new Path("/testdir13"));
|
||||||
|
nnFs.mkdirs(new Path("/testdir14"));
|
||||||
|
|
||||||
|
MountTable mountTable = MountTable.newInstance("/setdir1",
|
||||||
|
Collections.singletonMap("ns0", "/testdir13"));
|
||||||
|
mountTable
|
||||||
|
.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
|
||||||
|
.spaceQuota(ssQuota).build());
|
||||||
|
addMountTable(mountTable);
|
||||||
|
|
||||||
|
mountTable = MountTable.newInstance("/setdir2",
|
||||||
|
Collections.singletonMap("ns0", "/testdir14"));
|
||||||
|
mountTable
|
||||||
|
.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota)
|
||||||
|
.spaceQuota(ssQuota).build());
|
||||||
|
addMountTable(mountTable);
|
||||||
|
|
||||||
|
final DFSClient routerClient = routerContext.getClient();
|
||||||
|
// Create file
|
||||||
|
routerClient.create("/setdir1/file1", true).close();
|
||||||
|
routerClient.create("/setdir2/file2", true).close();
|
||||||
|
// append data to the file
|
||||||
|
appendData("/setdir1/file1", routerClient, BLOCK_SIZE);
|
||||||
|
appendData("/setdir2/file2", routerClient, BLOCK_SIZE);
|
||||||
|
|
||||||
|
RouterQuotaUpdateService updateService =
|
||||||
|
routerContext.getRouter().getQuotaCacheUpdateService();
|
||||||
|
// Update quota cache
|
||||||
|
updateService.periodicInvoke();
|
||||||
|
// Reload the Router cache
|
||||||
|
resolver.loadCache(true);
|
||||||
|
|
||||||
|
ClientProtocol client1 = nnContext1.getClient().getNamenode();
|
||||||
|
RouterQuotaManager quotaManager =
|
||||||
|
routerContext.getRouter().getQuotaManager();
|
||||||
|
QuotaUsage quota1 = client1.getQuotaUsage("/testdir13");
|
||||||
|
QuotaUsage quota2 = client1.getQuotaUsage("/testdir14");
|
||||||
|
QuotaUsage cacheQuota1 = quotaManager.getQuotaUsage("/setdir1");
|
||||||
|
QuotaUsage cacheQuota2 = quotaManager.getQuotaUsage("/setdir2");
|
||||||
|
|
||||||
|
// Get quota details in mount table
|
||||||
|
MountTable updatedMountTable = getMountTable("/setdir1");
|
||||||
|
RouterQuotaUsage mountQuota1 = updatedMountTable.getQuota();
|
||||||
|
updatedMountTable = getMountTable("/setdir2");
|
||||||
|
RouterQuotaUsage mountQuota2 = updatedMountTable.getQuota();
|
||||||
|
|
||||||
|
// Verify quota usage
|
||||||
|
assertEquals(2, quota1.getFileAndDirectoryCount());
|
||||||
|
assertEquals(2, cacheQuota1.getFileAndDirectoryCount());
|
||||||
|
assertEquals(2, mountQuota1.getFileAndDirectoryCount());
|
||||||
|
assertEquals(2, quota2.getFileAndDirectoryCount());
|
||||||
|
assertEquals(2, cacheQuota2.getFileAndDirectoryCount());
|
||||||
|
assertEquals(2, mountQuota2.getFileAndDirectoryCount());
|
||||||
|
assertEquals(BLOCK_SIZE, quota1.getSpaceConsumed());
|
||||||
|
assertEquals(BLOCK_SIZE, cacheQuota1.getSpaceConsumed());
|
||||||
|
assertEquals(BLOCK_SIZE, mountQuota1.getSpaceConsumed());
|
||||||
|
assertEquals(BLOCK_SIZE, quota2.getSpaceConsumed());
|
||||||
|
assertEquals(BLOCK_SIZE, cacheQuota2.getSpaceConsumed());
|
||||||
|
assertEquals(BLOCK_SIZE, mountQuota2.getSpaceConsumed());
|
||||||
|
|
||||||
|
FileSystem routerFs = routerContext.getFileSystem();
|
||||||
|
// Remove destination directory for the mount entry
|
||||||
|
routerFs.delete(new Path("/setdir1"), true);
|
||||||
|
|
||||||
|
// Create file
|
||||||
|
routerClient.create("/setdir2/file3", true).close();
|
||||||
|
// append data to the file
|
||||||
|
appendData("/setdir2/file3", routerClient, BLOCK_SIZE);
|
||||||
|
int updatedSpace = BLOCK_SIZE + BLOCK_SIZE;
|
||||||
|
|
||||||
|
// Update quota cache
|
||||||
|
updateService.periodicInvoke();
|
||||||
|
|
||||||
|
quota2 = client1.getQuotaUsage("/testdir14");
|
||||||
|
cacheQuota1 = quotaManager.getQuotaUsage("/setdir1");
|
||||||
|
cacheQuota2 = quotaManager.getQuotaUsage("/setdir2");
|
||||||
|
|
||||||
|
// Get quota details in mount table
|
||||||
|
updatedMountTable = getMountTable("/setdir1");
|
||||||
|
mountQuota1 = updatedMountTable.getQuota();
|
||||||
|
updatedMountTable = getMountTable("/setdir2");
|
||||||
|
mountQuota2 = updatedMountTable.getQuota();
|
||||||
|
|
||||||
|
// If destination is not present the quota usage should be reset to 0
|
||||||
|
assertEquals(0, cacheQuota1.getFileAndDirectoryCount());
|
||||||
|
assertEquals(0, mountQuota1.getFileAndDirectoryCount());
|
||||||
|
assertEquals(0, cacheQuota1.getSpaceConsumed());
|
||||||
|
assertEquals(0, mountQuota1.getSpaceConsumed());
|
||||||
|
|
||||||
|
// Verify current quota usage for other mount entries
|
||||||
|
assertEquals(3, quota2.getFileAndDirectoryCount());
|
||||||
|
assertEquals(3, cacheQuota2.getFileAndDirectoryCount());
|
||||||
|
assertEquals(3, mountQuota2.getFileAndDirectoryCount());
|
||||||
|
assertEquals(updatedSpace, quota2.getSpaceConsumed());
|
||||||
|
assertEquals(updatedSpace, cacheQuota2.getSpaceConsumed());
|
||||||
|
assertEquals(updatedSpace, mountQuota2.getSpaceConsumed());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue