HBASE-17292 Add observer notification before bulk loaded hfile is moved to region directory

This commit is contained in:
tedyu 2016-12-15 09:43:22 -08:00
parent d1147eeb7e
commit 35f0718a41
8 changed files with 203 additions and 81 deletions

View File

@ -497,6 +497,16 @@ public class BaseRegionObserver implements RegionObserver {
List<Pair<byte[], String>> familyPaths) throws IOException { List<Pair<byte[], String>> familyPaths) throws IOException {
} }
@Override
public void preCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException {
}
@Override
public void postCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
final byte[] family, Path srcPath, Path dstPath) throws IOException {
}
@Override @Override
public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx, public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths, List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths,

View File

@ -1335,6 +1335,31 @@ public interface RegionObserver extends Coprocessor {
void preBulkLoadHFile(final ObserverContext<RegionCoprocessorEnvironment> ctx, void preBulkLoadHFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
List<Pair<byte[], String>> familyPaths) throws IOException; List<Pair<byte[], String>> familyPaths) throws IOException;
/**
* Called before moving bulk loaded hfile to region directory.
*
* @param ctx
* @param family column family
* @param pairs List of pairs of { HFile location in staging dir, HFile path in region dir }
* Each pair are for the same hfile.
* @throws IOException
*/
default void preCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException {
}
/**
* Called after moving bulk loaded hfile to region directory.
*
* @param ctx
* @param family column family
* @param srcPath Path to file before the move
* @param dstPath Path to file after the move
*/
default void postCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
final byte[] family, Path srcPath, Path dstPath) throws IOException {
}
/** /**
* Called after bulkLoadHFile. * Called after bulkLoadHFile.
* *

View File

@ -5599,17 +5599,54 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
} }
Map<byte[], List<Pair<Path, Path>>> familyWithFinalPath =
new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (Pair<byte[], String> p : familyPaths) { for (Pair<byte[], String> p : familyPaths) {
byte[] familyName = p.getFirst(); byte[] familyName = p.getFirst();
String path = p.getSecond(); String path = p.getSecond();
HStore store = getHStore(familyName); HStore store = getHStore(familyName);
if (!familyWithFinalPath.containsKey(familyName)) {
familyWithFinalPath.put(familyName, new ArrayList<>());
}
List<Pair<Path, Path>> lst = familyWithFinalPath.get(familyName);
try { try {
String finalPath = path; String finalPath = path;
if (bulkLoadListener != null) { if (bulkLoadListener != null) {
finalPath = bulkLoadListener.prepareBulkLoad(familyName, path, copyFile); finalPath = bulkLoadListener.prepareBulkLoad(familyName, path, copyFile);
} }
Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId); Path commitedStoreFile = store.preBulkLoadHFile(finalPath, seqId);
lst.add(new Pair<Path, Path>(new Path(finalPath), commitedStoreFile));
} catch (IOException ioe) {
// A failure here can cause an atomicity violation that we currently
// cannot recover from since it is likely a failed HDFS operation.
LOG.error("There was a partial failure due to IO when attempting to" +
" load " + Bytes.toString(p.getFirst()) + " : " + p.getSecond(), ioe);
if (bulkLoadListener != null) {
try {
bulkLoadListener.failedBulkLoad(familyName, path);
} catch (Exception ex) {
LOG.error("Error while calling failedBulkLoad for family " +
Bytes.toString(familyName) + " with path " + path, ex);
}
}
throw ioe;
}
}
if (this.getCoprocessorHost() != null) {
for (Map.Entry<byte[], List<Pair<Path, Path>>> entry : familyWithFinalPath.entrySet()) {
this.getCoprocessorHost().preCommitStoreFile(entry.getKey(), entry.getValue());
}
}
for (Map.Entry<byte[], List<Pair<Path, Path>>> entry : familyWithFinalPath.entrySet()) {
byte[] familyName = entry.getKey();
for (Pair<Path, Path> p : entry.getValue()) {
String path = p.getFirst().toString();
Path commitedStoreFile = p.getSecond();
HStore store = getHStore(familyName);
try {
store.bulkLoadHFile(familyName, path, commitedStoreFile);
// Note the size of the store file // Note the size of the store file
try { try {
FileSystem fs = commitedStoreFile.getFileSystem(baseConf); FileSystem fs = commitedStoreFile.getFileSystem(baseConf);
@ -5636,7 +5673,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// TODO Need a better story for reverting partial failures due to HDFS. // TODO Need a better story for reverting partial failures due to HDFS.
LOG.error("There was a partial failure due to IO when attempting to" + LOG.error("There was a partial failure due to IO when attempting to" +
" load " + Bytes.toString(p.getFirst()) + " : " + p.getSecond(), ioe); " load " + Bytes.toString(familyName) + " : " + p.getSecond(), ioe);
if (bulkLoadListener != null) { if (bulkLoadListener != null) {
try { try {
bulkLoadListener.failedBulkLoad(familyName, path); bulkLoadListener.failedBulkLoad(familyName, path);
@ -5648,6 +5685,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
throw ioe; throw ioe;
} }
} }
}
isSuccessful = true; isSuccessful = true;
} finally { } finally {

View File

@ -365,19 +365,21 @@ public class HRegionFileSystem {
* @throws IOException * @throws IOException
*/ */
public Path commitStoreFile(final String familyName, final Path buildPath) throws IOException { public Path commitStoreFile(final String familyName, final Path buildPath) throws IOException {
return commitStoreFile(familyName, buildPath, -1, false); Path dstPath = preCommitStoreFile(familyName, buildPath, -1, false);
return commitStoreFile(buildPath, dstPath);
} }
/** /**
* Move the file from a build/temp location to the main family store directory. * Generate the filename in the main family store directory for moving the file from a build/temp
* location.
* @param familyName Family that will gain the file * @param familyName Family that will gain the file
* @param buildPath {@link Path} to the file to commit. * @param buildPath {@link Path} to the file to commit.
* @param seqNum Sequence Number to append to the file name (less then 0 if no sequence number) * @param seqNum Sequence Number to append to the file name (less then 0 if no sequence number)
* @param generateNewName False if you want to keep the buildPath name * @param generateNewName False if you want to keep the buildPath name
* @return The new {@link Path} of the committed file * @return The new {@link Path} of the to be committed file
* @throws IOException * @throws IOException
*/ */
private Path commitStoreFile(final String familyName, final Path buildPath, private Path preCommitStoreFile(final String familyName, final Path buildPath,
final long seqNum, final boolean generateNewName) throws IOException { final long seqNum, final boolean generateNewName) throws IOException {
Path storeDir = getStoreDir(familyName); Path storeDir = getStoreDir(familyName);
if(!fs.exists(storeDir) && !createDir(storeDir)) if(!fs.exists(storeDir) && !createDir(storeDir))
@ -394,6 +396,17 @@ public class HRegionFileSystem {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Committing store file " + buildPath + " as " + dstPath); LOG.debug("Committing store file " + buildPath + " as " + dstPath);
} }
return dstPath;
}
/*
* Moves file from staging dir to region dir
* @param buildPath {@link Path} to the file to commit.
* @param dstPath {@link Path} to the file under region dir
* @return The {@link Path} of the committed file
* @throws IOException
*/
Path commitStoreFile(final Path buildPath, Path dstPath) throws IOException {
// buildPath exists, therefore not doing an exists() check. // buildPath exists, therefore not doing an exists() check.
if (!rename(buildPath, dstPath)) { if (!rename(buildPath, dstPath)) {
throw new IOException("Failed rename of " + buildPath + " to " + dstPath); throw new IOException("Failed rename of " + buildPath + " to " + dstPath);
@ -401,7 +414,6 @@ public class HRegionFileSystem {
return dstPath; return dstPath;
} }
/** /**
* Moves multiple store files to the relative region's family store directory. * Moves multiple store files to the relative region's family store directory.
* @param storeFiles list of store files divided by family * @param storeFiles list of store files divided by family
@ -469,7 +481,7 @@ public class HRegionFileSystem {
srcPath = tmpPath; srcPath = tmpPath;
} }
return commitStoreFile(familyName, srcPath, seqNum, true); return preCommitStoreFile(familyName, srcPath, seqNum, true);
} }
// =========================================================================== // ===========================================================================

View File

@ -780,9 +780,20 @@ public class HStore implements Store {
* @param srcPathStr * @param srcPathStr
* @param seqNum sequence Id associated with the HFile * @param seqNum sequence Id associated with the HFile
*/ */
public Path bulkLoadHFile(String srcPathStr, long seqNum) throws IOException { public Path preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
Path srcPath = new Path(srcPathStr); Path srcPath = new Path(srcPathStr);
Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum); return fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
}
public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException {
Path srcPath = new Path(srcPathStr);
try {
fs.commitStoreFile(srcPath, dstPath);
} finally {
if (this.getCoprocessorHost() != null) {
this.getCoprocessorHost().postCommitStoreFile(family, srcPath, dstPath);
}
}
LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as " LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as "
+ dstPath + " - updating store file list."); + dstPath + " - updating store file list.");

View File

@ -2153,6 +2153,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (region.getCoprocessorHost() != null) { if (region.getCoprocessorHost() != null) {
bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths); bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
} }
try {
if (!bypass) { if (!bypass) {
map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null, map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
request.getCopyFile()); request.getCopyFile());
@ -2160,9 +2161,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
loaded = true; loaded = true;
} }
} }
} finally {
if (region.getCoprocessorHost() != null) { if (region.getCoprocessorHost() != null) {
loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map, loaded); loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map, loaded);
} }
}
} else { } else {
// secure bulk load // secure bulk load
map = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request); map = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request);

View File

@ -1470,6 +1470,26 @@ public class RegionCoprocessorHost
}); });
} }
public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>> pairs)
throws IOException {
return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
@Override
public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
throws IOException {
oserver.preCommitStoreFile(ctx, family, pairs);
}
});
}
public void postCommitStoreFile(final byte[] family, Path srcPath, Path dstPath) throws IOException {
execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
@Override
public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
throws IOException {
oserver.postCommitStoreFile(ctx, family, srcPath, dstPath);
}
});
}
/** /**
* @param familyPaths pairs of { CF, file path } submitted for bulk load * @param familyPaths pairs of { CF, file path } submitted for bulk load
* @param map Map of CF to List of file paths for the final loaded files * @param map Map of CF to List of file paths for the final loaded files

View File

@ -202,6 +202,7 @@ public class SecureBulkLoadManager {
boolean loaded = false; boolean loaded = false;
Map<byte[], List<Path>> map = null; Map<byte[], List<Path>> map = null;
try {
if (!bypass) { if (!bypass) {
// Get the target fs (HBase region server fs) delegation token // Get the target fs (HBase region server fs) delegation token
// Since we have checked the permission via 'preBulkLoadHFile', now let's give // Since we have checked the permission via 'preBulkLoadHFile', now let's give
@ -246,9 +247,11 @@ public class SecureBulkLoadManager {
loaded = true; loaded = true;
} }
} }
} finally {
if (region.getCoprocessorHost() != null) { if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map, loaded); region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map, loaded);
} }
}
return map; return map;
} }