diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java index 660c63198a0..f5cc4ded288 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java @@ -497,6 +497,16 @@ public class BaseRegionObserver implements RegionObserver { List> familyPaths) throws IOException { } + @Override + public void preCommitStoreFile(final ObserverContext ctx, + final byte[] family, final List> pairs) throws IOException { + } + + @Override + public void postCommitStoreFile(final ObserverContext ctx, + final byte[] family, Path srcPath, Path dstPath) throws IOException { + } + @Override public boolean postBulkLoadHFile(ObserverContext ctx, List> stagingFamilyPaths, Map> finalPaths, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index 0d01baac9ec..390b723c4ba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -1335,6 +1335,31 @@ public interface RegionObserver extends Coprocessor { void preBulkLoadHFile(final ObserverContext ctx, List> familyPaths) throws IOException; + /** + * Called before moving bulk loaded hfile to region directory. + * + * @param ctx + * @param family column family + * @param pairs List of pairs of { HFile location in staging dir, HFile path in region dir } + * Each pair are for the same hfile. + * @throws IOException + */ + default void preCommitStoreFile(final ObserverContext ctx, + final byte[] family, final List> pairs) throws IOException { + } + + /** + * Called after moving bulk loaded hfile to region directory. + * + * @param ctx + * @param family column family + * @param srcPath Path to file before the move + * @param dstPath Path to file after the move + */ + default void postCommitStoreFile(final ObserverContext ctx, + final byte[] family, Path srcPath, Path dstPath) throws IOException { + } + /** * Called after bulkLoadHFile. * diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 788a9d9f60d..a93b1c0e7be 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5599,42 +5599,27 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + Map>> familyWithFinalPath = + new TreeMap<>(Bytes.BYTES_COMPARATOR); for (Pair p : familyPaths) { byte[] familyName = p.getFirst(); String path = p.getSecond(); HStore store = getHStore(familyName); + if (!familyWithFinalPath.containsKey(familyName)) { + familyWithFinalPath.put(familyName, new ArrayList<>()); + } + List> lst = familyWithFinalPath.get(familyName); try { String finalPath = path; if (bulkLoadListener != null) { finalPath = bulkLoadListener.prepareBulkLoad(familyName, path, copyFile); } - Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId); - - // Note the size of the store file - try { - FileSystem fs = commitedStoreFile.getFileSystem(baseConf); - storeFilesSizes.put(commitedStoreFile.getName(), fs.getFileStatus(commitedStoreFile) - .getLen()); - } catch (IOException e) { - LOG.warn("Failed to find the size of hfile " + commitedStoreFile); - storeFilesSizes.put(commitedStoreFile.getName(), 0L); - } - - if(storeFiles.containsKey(familyName)) { - storeFiles.get(familyName).add(commitedStoreFile); - } else { - List storeFileNames = new ArrayList(); - storeFileNames.add(commitedStoreFile); - storeFiles.put(familyName, storeFileNames); - } - if (bulkLoadListener != null) { - bulkLoadListener.doneBulkLoad(familyName, path); - } + Path commitedStoreFile = store.preBulkLoadHFile(finalPath, seqId); + lst.add(new Pair(new Path(finalPath), commitedStoreFile)); } catch (IOException ioe) { // A failure here can cause an atomicity violation that we currently // cannot recover from since it is likely a failed HDFS operation. - // TODO Need a better story for reverting partial failures due to HDFS. LOG.error("There was a partial failure due to IO when attempting to" + " load " + Bytes.toString(p.getFirst()) + " : " + p.getSecond(), ioe); if (bulkLoadListener != null) { @@ -5649,6 +5634,59 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + if (this.getCoprocessorHost() != null) { + for (Map.Entry>> entry : familyWithFinalPath.entrySet()) { + this.getCoprocessorHost().preCommitStoreFile(entry.getKey(), entry.getValue()); + } + } + for (Map.Entry>> entry : familyWithFinalPath.entrySet()) { + byte[] familyName = entry.getKey(); + for (Pair p : entry.getValue()) { + String path = p.getFirst().toString(); + Path commitedStoreFile = p.getSecond(); + HStore store = getHStore(familyName); + try { + store.bulkLoadHFile(familyName, path, commitedStoreFile); + // Note the size of the store file + try { + FileSystem fs = commitedStoreFile.getFileSystem(baseConf); + storeFilesSizes.put(commitedStoreFile.getName(), fs.getFileStatus(commitedStoreFile) + .getLen()); + } catch (IOException e) { + LOG.warn("Failed to find the size of hfile " + commitedStoreFile); + storeFilesSizes.put(commitedStoreFile.getName(), 0L); + } + + if(storeFiles.containsKey(familyName)) { + storeFiles.get(familyName).add(commitedStoreFile); + } else { + List storeFileNames = new ArrayList(); + storeFileNames.add(commitedStoreFile); + storeFiles.put(familyName, storeFileNames); + } + if (bulkLoadListener != null) { + bulkLoadListener.doneBulkLoad(familyName, path); + } + } catch (IOException ioe) { + // A failure here can cause an atomicity violation that we currently + // cannot recover from since it is likely a failed HDFS operation. + + // TODO Need a better story for reverting partial failures due to HDFS. + LOG.error("There was a partial failure due to IO when attempting to" + + " load " + Bytes.toString(familyName) + " : " + p.getSecond(), ioe); + if (bulkLoadListener != null) { + try { + bulkLoadListener.failedBulkLoad(familyName, path); + } catch (Exception ex) { + LOG.error("Error while calling failedBulkLoad for family " + + Bytes.toString(familyName) + " with path " + path, ex); + } + } + throw ioe; + } + } + } + isSuccessful = true; } finally { if (wal != null && !storeFiles.isEmpty()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index d4e80c31586..80e3d18345d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -365,19 +365,21 @@ public class HRegionFileSystem { * @throws IOException */ public Path commitStoreFile(final String familyName, final Path buildPath) throws IOException { - return commitStoreFile(familyName, buildPath, -1, false); + Path dstPath = preCommitStoreFile(familyName, buildPath, -1, false); + return commitStoreFile(buildPath, dstPath); } /** - * Move the file from a build/temp location to the main family store directory. + * Generate the filename in the main family store directory for moving the file from a build/temp + * location. * @param familyName Family that will gain the file * @param buildPath {@link Path} to the file to commit. * @param seqNum Sequence Number to append to the file name (less then 0 if no sequence number) * @param generateNewName False if you want to keep the buildPath name - * @return The new {@link Path} of the committed file + * @return The new {@link Path} of the to be committed file * @throws IOException */ - private Path commitStoreFile(final String familyName, final Path buildPath, + private Path preCommitStoreFile(final String familyName, final Path buildPath, final long seqNum, final boolean generateNewName) throws IOException { Path storeDir = getStoreDir(familyName); if(!fs.exists(storeDir) && !createDir(storeDir)) @@ -394,6 +396,17 @@ public class HRegionFileSystem { if (LOG.isDebugEnabled()) { LOG.debug("Committing store file " + buildPath + " as " + dstPath); } + return dstPath; + } + + /* + * Moves file from staging dir to region dir + * @param buildPath {@link Path} to the file to commit. + * @param dstPath {@link Path} to the file under region dir + * @return The {@link Path} of the committed file + * @throws IOException + */ + Path commitStoreFile(final Path buildPath, Path dstPath) throws IOException { // buildPath exists, therefore not doing an exists() check. if (!rename(buildPath, dstPath)) { throw new IOException("Failed rename of " + buildPath + " to " + dstPath); @@ -401,7 +414,6 @@ public class HRegionFileSystem { return dstPath; } - /** * Moves multiple store files to the relative region's family store directory. * @param storeFiles list of store files divided by family @@ -469,7 +481,7 @@ public class HRegionFileSystem { srcPath = tmpPath; } - return commitStoreFile(familyName, srcPath, seqNum, true); + return preCommitStoreFile(familyName, srcPath, seqNum, true); } // =========================================================================== diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index c56abaa6288..f8e91cf00ee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -780,9 +780,20 @@ public class HStore implements Store { * @param srcPathStr * @param seqNum sequence Id associated with the HFile */ - public Path bulkLoadHFile(String srcPathStr, long seqNum) throws IOException { + public Path preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException { Path srcPath = new Path(srcPathStr); - Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum); + return fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum); + } + + public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException { + Path srcPath = new Path(srcPathStr); + try { + fs.commitStoreFile(srcPath, dstPath); + } finally { + if (this.getCoprocessorHost() != null) { + this.getCoprocessorHost().postCommitStoreFile(family, srcPath, dstPath); + } + } LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as " + dstPath + " - updating store file list."); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 7ec4f6831dd..4c5c9350968 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -2153,15 +2153,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (region.getCoprocessorHost() != null) { bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths); } - if (!bypass) { - map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null, - request.getCopyFile()); - if (map != null) { - loaded = true; + try { + if (!bypass) { + map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null, + request.getCopyFile()); + if (map != null) { + loaded = true; + } + } + } finally { + if (region.getCoprocessorHost() != null) { + loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map, loaded); } - } - if (region.getCoprocessorHost() != null) { - loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map, loaded); } } else { // secure bulk load diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 2ddb1c02b4a..94508b669a4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -1470,6 +1470,26 @@ public class RegionCoprocessorHost }); } + public boolean preCommitStoreFile(final byte[] family, final List> pairs) + throws IOException { + return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.preCommitStoreFile(ctx, family, pairs); + } + }); + } + public void postCommitStoreFile(final byte[] family, Path srcPath, Path dstPath) throws IOException { + execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext ctx) + throws IOException { + oserver.postCommitStoreFile(ctx, family, srcPath, dstPath); + } + }); + } + /** * @param familyPaths pairs of { CF, file path } submitted for bulk load * @param map Map of CF to List of file paths for the final loaded files diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java index 8b44f22b37d..7a18cbb7edb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java @@ -202,52 +202,55 @@ public class SecureBulkLoadManager { boolean loaded = false; Map> map = null; - if (!bypass) { - // Get the target fs (HBase region server fs) delegation token - // Since we have checked the permission via 'preBulkLoadHFile', now let's give - // the 'request user' necessary token to operate on the target fs. - // After this point the 'doAs' user will hold two tokens, one for the source fs - // ('request user'), another for the target fs (HBase region server principal). - if (userProvider.isHadoopSecurityEnabled()) { - FsDelegationToken targetfsDelegationToken = new FsDelegationToken(userProvider, "renewer"); - targetfsDelegationToken.acquireDelegationToken(fs); + try { + if (!bypass) { + // Get the target fs (HBase region server fs) delegation token + // Since we have checked the permission via 'preBulkLoadHFile', now let's give + // the 'request user' necessary token to operate on the target fs. + // After this point the 'doAs' user will hold two tokens, one for the source fs + // ('request user'), another for the target fs (HBase region server principal). + if (userProvider.isHadoopSecurityEnabled()) { + FsDelegationToken targetfsDelegationToken = new FsDelegationToken(userProvider,"renewer"); + targetfsDelegationToken.acquireDelegationToken(fs); - Token targetFsToken = targetfsDelegationToken.getUserToken(); - if (targetFsToken != null - && (userToken == null || !targetFsToken.getService().equals(userToken.getService()))) { - ugi.addToken(targetFsToken); - } - } - - map = ugi.doAs(new PrivilegedAction>>() { - @Override - public Map> run() { - FileSystem fs = null; - try { - fs = FileSystem.get(conf); - for(Pair el: familyPaths) { - Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst())); - if(!fs.exists(stageFamily)) { - fs.mkdirs(stageFamily); - fs.setPermission(stageFamily, PERM_ALL_ACCESS); - } - } - //We call bulkLoadHFiles as requesting user - //To enable access prior to staging - return region.bulkLoadHFiles(familyPaths, true, - new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile()); - } catch (Exception e) { - LOG.error("Failed to complete bulk load", e); + Token targetFsToken = targetfsDelegationToken.getUserToken(); + if (targetFsToken != null + && (userToken == null || !targetFsToken.getService().equals(userToken.getService()))){ + ugi.addToken(targetFsToken); } - return null; } - }); - if (map != null) { - loaded = true; + + map = ugi.doAs(new PrivilegedAction>>() { + @Override + public Map> run() { + FileSystem fs = null; + try { + fs = FileSystem.get(conf); + for(Pair el: familyPaths) { + Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst())); + if(!fs.exists(stageFamily)) { + fs.mkdirs(stageFamily); + fs.setPermission(stageFamily, PERM_ALL_ACCESS); + } + } + //We call bulkLoadHFiles as requesting user + //To enable access prior to staging + return region.bulkLoadHFiles(familyPaths, true, + new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile()); + } catch (Exception e) { + LOG.error("Failed to complete bulk load", e); + } + return null; + } + }); + if (map != null) { + loaded = true; + } + } + } finally { + if (region.getCoprocessorHost() != null) { + region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map, loaded); } - } - if (region.getCoprocessorHost() != null) { - region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map, loaded); } return map; }