HBASE-23679 FileSystem objects leak when cleaned up in cleanupBulkLoad

The cleanupBulkLoad method is only called for the first Region in the
table which was being bulk loaded into. This means that potentially N-1
other RegionServers (where N is the number of RegionServers) will leak
one FileSystem object into the FileSystem cache which will never be
cleaned up. We need to do this clean-up as a part of secureBulkLoadHFiles
otherwise we cannot guarantee that heap usage won't grow unbounded.

Closes #1029

Signed-off-by: Sean Busbey <busbey@apache.org>
This commit is contained in:
Josh Elser 2020-01-13 18:30:30 -05:00
parent caaaf0810e
commit 4bf7fb8613
1 changed files with 20 additions and 17 deletions

View File

@ -152,26 +152,15 @@ public class SecureBulkLoadManager {
public void cleanupBulkLoad(final HRegion region, final CleanupBulkLoadRequest request) public void cleanupBulkLoad(final HRegion region, final CleanupBulkLoadRequest request)
throws IOException { throws IOException {
try { region.getCoprocessorHost().preCleanupBulkLoad(getActiveUser());
region.getCoprocessorHost().preCleanupBulkLoad(getActiveUser());
Path path = new Path(request.getBulkToken()); Path path = new Path(request.getBulkToken());
if (!fs.delete(path, true)) { if (!fs.delete(path, true)) {
if (fs.exists(path)) { if (fs.exists(path)) {
throw new IOException("Failed to clean up " + path); throw new IOException("Failed to clean up " + path);
}
}
LOG.info("Cleaned up " + path + " successfully.");
} finally {
UserGroupInformation ugi = getActiveUser().getUGI();
try {
if (!UserGroupInformation.getLoginUser().equals(ugi) && !isUserReferenced(ugi)) {
FileSystem.closeAllForUGI(ugi);
}
} catch (IOException e) {
LOG.error("Failed to close FileSystem for: " + ugi, e);
} }
} }
LOG.trace("Cleaned up {} successfully.", path);
} }
private Consumer<HRegion> fsCreatedListener; private Consumer<HRegion> fsCreatedListener;
@ -280,6 +269,13 @@ public class SecureBulkLoadManager {
public Map<byte[], List<Path>> run() { public Map<byte[], List<Path>> run() {
FileSystem fs = null; FileSystem fs = null;
try { try {
/*
* This is creating and caching a new FileSystem instance. Other code called
* "beneath" this method will rely on this FileSystem instance being in the
* cache. This is important as those methods make _no_ attempt to close this
* FileSystem instance. It is critical that here, in SecureBulkLoadManager,
* we are tracking the lifecycle and closing the FS when safe to do so.
*/
fs = FileSystem.get(conf); fs = FileSystem.get(conf);
for(Pair<byte[], String> el: familyPaths) { for(Pair<byte[], String> el: familyPaths) {
Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst())); Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst()));
@ -304,6 +300,13 @@ public class SecureBulkLoadManager {
}); });
} finally { } finally {
decrementUgiReference(ugi); decrementUgiReference(ugi);
try {
if (!UserGroupInformation.getLoginUser().equals(ugi) && !isUserReferenced(ugi)) {
FileSystem.closeAllForUGI(ugi);
}
} catch (IOException e) {
LOG.error("Failed to close FileSystem for: {}", ugi, e);
}
if (region.getCoprocessorHost() != null) { if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map); region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map);
} }