HBASE-19417 Remove boolean return value from postBulkLoadHFile hook

This commit is contained in:
tedyu 2017-12-06 07:06:28 -08:00
parent ebd8841e0e
commit 27ed4d8add
6 changed files with 29 additions and 47 deletions

View File

@ -54,17 +54,17 @@ public class BackupObserver implements RegionCoprocessor, RegionObserver {
}
@Override
public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths,
boolean hasLoaded) throws IOException {
public void postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths)
throws IOException {
Configuration cfg = ctx.getEnvironment().getConfiguration();
if (!hasLoaded) {
if (finalPaths == null) {
// there is no need to record state
return hasLoaded;
return;
}
if (finalPaths == null || !BackupManager.isBackupEnabled(cfg)) {
if (!BackupManager.isBackupEnabled(cfg)) {
LOG.debug("skipping recording bulk load in postBulkLoadHFile since backup is disabled");
return hasLoaded;
return;
}
try (Connection connection = ConnectionFactory.createConnection(cfg);
BackupSystemTable tbl = new BackupSystemTable(connection)) {
@ -75,13 +75,11 @@ public class BackupObserver implements RegionCoprocessor, RegionObserver {
if (LOG.isTraceEnabled()) {
LOG.trace(tableName + " has not gone thru full backup");
}
return hasLoaded;
return;
}
tbl.writePathsPostBulkLoad(tableName, info.getEncodedNameAsBytes(), finalPaths);
return hasLoaded;
} catch (IOException ioe) {
LOG.error("Failed to get tables which have been fully backed up", ioe);
return false;
}
}
@Override

View File

@ -974,13 +974,12 @@ public interface RegionObserver {
* @param ctx the environment provided by the region server
* @param stagingFamilyPaths pairs of { CF, HFile path } submitted for bulk load
* @param finalPaths Map of CF to List of file paths for the loaded files
* @param hasLoaded whether the bulkLoad was successful
* @return the new value of hasLoaded
* if the Map is not null, the bulkLoad was successful. Otherwise the bulk load failed.
* bulkload is done by the time this hook is called.
*/
default boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths,
boolean hasLoaded) throws IOException {
return hasLoaded;
default void postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths)
throws IOException {
}
/**

View File

@ -2214,7 +2214,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
checkOpen();
requestCount.increment();
HRegion region = getRegion(request.getRegion());
boolean loaded = false;
Map<byte[], List<Path>> map = null;
// Check to see if this bulk load would exceed the space quota for this table
@ -2233,24 +2232,20 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount());
for (FamilyPath familyPath : request.getFamilyPathList()) {
familyPaths.add(new Pair<>(familyPath.getFamily().toByteArray(), familyPath.getPath()));
}
if (!request.hasBulkToken()) {
// Old style bulk load. This will not be supported in future releases
List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount());
for (FamilyPath familyPath : request.getFamilyPathList()) {
familyPaths.add(new Pair<>(familyPath.getFamily().toByteArray(), familyPath.getPath()));
}
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
}
try {
map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
request.getCopyFile());
if (map != null) {
loaded = true;
}
} finally {
if (region.getCoprocessorHost() != null) {
loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map, loaded);
region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map);
}
}
} else {
@ -2258,10 +2253,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
map = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request);
}
BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
if (map != null) {
loaded = true;
}
builder.setLoaded(loaded);
builder.setLoaded(map != null);
return builder.build();
} catch (IOException ie) {
throw new ServiceException(ie);

View File

@ -1624,20 +1624,18 @@ public class RegionCoprocessorHost
/**
* @param familyPaths pairs of { CF, file path } submitted for bulk load
* @param map Map of CF to List of file paths for the final loaded files
* @param result whether load was successful or not
* @return the possibly modified value of hasLoaded
* @throws IOException
*/
public boolean postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
Map<byte[], List<Path>> map, boolean result) throws IOException {
public void postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
Map<byte[], List<Path>> map) throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return result;
return;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
execOperation(coprocEnvironments.isEmpty()? null:
new RegionObserverOperationWithoutResult() {
@Override
public Boolean call(RegionObserver observer) throws IOException {
return observer.postBulkLoadHFile(this, familyPaths, map, getResult());
public void call(RegionObserver observer) throws IOException {
observer.postBulkLoadHFile(this, familyPaths, map);
}
});
}

View File

@ -195,7 +195,6 @@ public class SecureBulkLoadManager {
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
}
boolean loaded = false;
Map<byte[], List<Path>> map = null;
try {
@ -238,12 +237,9 @@ public class SecureBulkLoadManager {
return null;
}
});
if (map != null) {
loaded = true;
}
} finally {
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map, loaded);
region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map);
}
}
return map;

View File

@ -567,8 +567,8 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
}
@Override
public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
List<Pair<byte[], String>> familyPaths, Map<byte[], List<Path>> map, boolean hasLoaded)
public void postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
List<Pair<byte[], String>> familyPaths, Map<byte[], List<Path>> map)
throws IOException {
RegionCoprocessorEnvironment e = ctx.getEnvironment();
assertNotNull(e);
@ -583,7 +583,6 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
assertEquals(familyPath.substring(familyPath.length()-familyName.length()-1),"/"+familyName);
}
ctPostBulkLoadHFile.incrementAndGet();
return hasLoaded;
}
@Override