HBASE-15291 FileSystem not closed in secure bulkLoad
Signed-off-by: Ashish Singhi <ashishsinghi@apache.org>
This commit is contained in:
parent
9bf087d28b
commit
4bcb560e22
|
@ -145,15 +145,26 @@ public class SecureBulkLoadManager {
|
||||||
|
|
||||||
public void cleanupBulkLoad(final HRegion region, final CleanupBulkLoadRequest request)
|
public void cleanupBulkLoad(final HRegion region, final CleanupBulkLoadRequest request)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
region.getCoprocessorHost().preCleanupBulkLoad(getActiveUser());
|
try {
|
||||||
|
region.getCoprocessorHost().preCleanupBulkLoad(getActiveUser());
|
||||||
|
|
||||||
Path path = new Path(request.getBulkToken());
|
Path path = new Path(request.getBulkToken());
|
||||||
if (!fs.delete(path, true)) {
|
if (!fs.delete(path, true)) {
|
||||||
if (fs.exists(path)) {
|
if (fs.exists(path)) {
|
||||||
throw new IOException("Failed to clean up " + path);
|
throw new IOException("Failed to clean up " + path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.info("Cleaned up " + path + " successfully.");
|
||||||
|
} finally {
|
||||||
|
UserGroupInformation ugi = getActiveUser().getUGI();
|
||||||
|
try {
|
||||||
|
if (!UserGroupInformation.getLoginUser().equals(ugi)) {
|
||||||
|
FileSystem.closeAllForUGI(ugi);
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Failed to close FileSystem for: " + ugi, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG.info("Cleaned up " + path + " successfully.");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<byte[], List<Path>> secureBulkLoadHFiles(final HRegion region,
|
public Map<byte[], List<Path>> secureBulkLoadHFiles(final HRegion region,
|
||||||
|
@ -304,7 +315,7 @@ public class SecureBulkLoadManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (srcFs == null) {
|
if (srcFs == null) {
|
||||||
srcFs = FileSystem.get(p.toUri(), conf);
|
srcFs = FileSystem.newInstance(p.toUri(), conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
if(!isFile(p)) {
|
if(!isFile(p)) {
|
||||||
|
@ -334,34 +345,49 @@ public class SecureBulkLoadManager {
|
||||||
@Override
|
@Override
|
||||||
public void doneBulkLoad(byte[] family, String srcPath) throws IOException {
|
public void doneBulkLoad(byte[] family, String srcPath) throws IOException {
|
||||||
LOG.debug("Bulk Load done for: " + srcPath);
|
LOG.debug("Bulk Load done for: " + srcPath);
|
||||||
|
closeSrcFs();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void closeSrcFs() throws IOException {
|
||||||
|
if (srcFs != null) {
|
||||||
|
srcFs.close();
|
||||||
|
srcFs = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void failedBulkLoad(final byte[] family, final String srcPath) throws IOException {
|
public void failedBulkLoad(final byte[] family, final String srcPath) throws IOException {
|
||||||
if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {
|
try {
|
||||||
// files are copied so no need to move them back
|
Path p = new Path(srcPath);
|
||||||
return;
|
if (srcFs == null) {
|
||||||
}
|
srcFs = FileSystem.newInstance(p.toUri(), conf);
|
||||||
Path p = new Path(srcPath);
|
}
|
||||||
Path stageP = new Path(stagingDir,
|
if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {
|
||||||
new Path(Bytes.toString(family), p.getName()));
|
// files are copied so no need to move them back
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName()));
|
||||||
|
|
||||||
// In case of Replication for bulk load files, hfiles are not renamed by end point during
|
// In case of Replication for bulk load files, hfiles are not renamed by end point during
|
||||||
// prepare stage, so no need of rename here again
|
// prepare stage, so no need of rename here again
|
||||||
if (p.equals(stageP)) {
|
if (p.equals(stageP)) {
|
||||||
LOG.debug(p.getName() + " is already available in source directory. Skipping rename.");
|
LOG.debug(p.getName() + " is already available in source directory. Skipping rename.");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.debug("Moving " + stageP + " back to " + p);
|
LOG.debug("Moving " + stageP + " back to " + p);
|
||||||
if(!fs.rename(stageP, p))
|
if (!fs.rename(stageP, p)) {
|
||||||
throw new IOException("Failed to move HFile: " + stageP + " to " + p);
|
throw new IOException("Failed to move HFile: " + stageP + " to " + p);
|
||||||
|
}
|
||||||
|
|
||||||
// restore original permission
|
// restore original permission
|
||||||
if (origPermissions.containsKey(srcPath)) {
|
if (origPermissions.containsKey(srcPath)) {
|
||||||
fs.setPermission(p, origPermissions.get(srcPath));
|
fs.setPermission(p, origPermissions.get(srcPath));
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("Can't find previous permission for path=" + srcPath);
|
LOG.warn("Can't find previous permission for path=" + srcPath);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
closeSrcFs();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue