HBASE-17123 Add postBulkLoadHFile variant that notifies the final paths for the hfiles

This commit is contained in:
tedyu 2016-11-20 09:26:42 -08:00
parent bb645bcfda
commit 768b4119da
10 changed files with 83 additions and 34 deletions

View File

@ -19,9 +19,12 @@
package org.apache.hadoop.hbase.security.access;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -128,12 +131,14 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
public void secureBulkLoadHFiles(RpcController controller, SecureBulkLoadHFilesRequest request,
RpcCallback<SecureBulkLoadHFilesResponse> done) {
boolean loaded = false;
Map<byte[], List<Path>> map = null;
try {
SecureBulkLoadManager secureBulkLoadManager =
this.env.getRegionServerServices().getSecureBulkLoadManager();
BulkLoadHFileRequest bulkLoadHFileRequest = ConvertSecureBulkLoadHFilesRequest(request);
loaded = secureBulkLoadManager.secureBulkLoadHFiles(this.env.getRegion(),
map = secureBulkLoadManager.secureBulkLoadHFiles(this.env.getRegion(),
convert(bulkLoadHFileRequest));
loaded = map != null && !map.isEmpty();
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import org.apache.hadoop.fs.FileSystem;
@ -511,7 +512,14 @@ public class BaseRegionObserver implements RegionObserver {
@Override
public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException {
List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths,
boolean hasLoaded) throws IOException {
return hasLoaded;
}
@Override
public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
List<Pair<byte[], String>> stagingFamilyPaths, boolean hasLoaded) throws IOException {
return hasLoaded;
}

View File

@ -21,16 +21,17 @@ package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
@ -45,11 +46,12 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Region.Operation;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.OperationStatus;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Region.Operation;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
@ -58,9 +60,9 @@ import org.apache.hadoop.hbase.regionserver.StoreFileReader;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALKey;
import com.google.common.collect.ImmutableList;
@ -1386,13 +1388,30 @@ public interface RegionObserver extends Coprocessor {
* Called after bulkLoadHFile.
*
* @param ctx
* @param familyPaths pairs of { CF, HFile path } submitted for bulk load
* @param stagingFamilyPaths pairs of { CF, HFile path } submitted for bulk load
* @param finalPaths Map of CF to List of file paths for the final loaded files
* @param hasLoaded whether the bulkLoad was successful
* @return the new value of hasLoaded
* @throws IOException
*/
default boolean postBulkLoadHFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths,
boolean hasLoaded) throws IOException {
return postBulkLoadHFile(ctx, stagingFamilyPaths, hasLoaded);
}
/**
* Called after bulkLoadHFile.
*
* @param ctx
* @param stagingFamilyPaths pairs of { CF, HFile path } submitted for bulk load
* @param hasLoaded whether the bulkLoad was successful
* @return the new value of hasLoaded
* @throws IOException
* @deprecated Use {@link #postBulkLoadHFile(ObserverContext, List, Map, boolean)}
*/
boolean postBulkLoadHFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException;
List<Pair<byte[], String>> stagingFamilyPaths, boolean hasLoaded) throws IOException;
/**
* Called before creation of Reader for a store file.

View File

@ -5470,14 +5470,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
@Override
public boolean bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
BulkLoadListener bulkLoadListener) throws IOException {
return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false);
}
@Override
public boolean bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException {
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths,
boolean assignSeqId, BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException {
long seqId = -1;
Map<byte[], List<Path>> storeFiles = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
Map<String, Long> storeFilesSizes = new HashMap<String, Long>();
@ -5532,7 +5532,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// problem when validating
LOG.warn("There was a recoverable bulk load failure likely due to a" +
" split. These (family, HFile) pairs were not loaded: " + list);
return isSuccessful;
return null;
}
// We need to assign a sequential ID that's in between two memstores in order to preserve
@ -5626,7 +5626,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
closeBulkRegionOperation();
}
return isSuccessful;
return isSuccessful ? storeFiles : null;
}
@Override

View File

@ -43,6 +43,7 @@ import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ByteBufferedCell;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScannable;
@ -2142,6 +2143,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
Region region = getRegion(request.getRegion());
boolean bypass = false;
boolean loaded = false;
Map<byte[], List<Path>> map = null;
if (!request.hasBulkToken()) {
// Old style bulk load. This will not be supported in future releases
@ -2155,17 +2157,23 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
}
if (!bypass) {
loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
request.getCopyFile());
if (map != null) {
loaded = true;
}
}
if (region.getCoprocessorHost() != null) {
loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map, loaded);
}
} else {
// secure bulk load
loaded = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request);
map = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request);
}
BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
if (map != null) {
loaded = true;
}
builder.setLoaded(loaded);
return builder.build();
} catch (IOException ie) {

View File

@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@ -558,11 +559,11 @@ public interface Region extends ConfigurationObserver {
* @param bulkLoadListener Internal hooks enabling massaging/preparation of a
* file about to be bulk loaded
* @param assignSeqId
* @return true if successful, false if failed recoverably
* @return Map from family to List of store file paths if successful, null if failed recoverably
* @throws IOException if failed unrecoverably.
*/
boolean bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
BulkLoadListener bulkLoadListener) throws IOException;
Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths,
boolean assignSeqId, BulkLoadListener bulkLoadListener) throws IOException;
/**
* Attempts to atomically load a group of hfiles. This is critical for loading
@ -573,11 +574,11 @@ public interface Region extends ConfigurationObserver {
* @param bulkLoadListener Internal hooks enabling massaging/preparation of a
* file about to be bulk loaded
* @param copyFile always copy hfiles if true
* @return true if successful, false if failed recoverably
* @return Map from family to List of store file paths if successful, null if failed recoverably
* @throws IOException if failed unrecoverably.
*/
boolean bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException;
Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths,
boolean assignSeqId, BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException;
///////////////////////////////////////////////////////////////////////////
// Coprocessors

View File

@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -1549,18 +1550,19 @@ public class RegionCoprocessorHost
/**
* @param familyPaths pairs of { CF, file path } submitted for bulk load
* @param map Map of CF to List of file paths for the final loaded files
* @param hasLoaded whether load was successful or not
* @return the possibly modified value of hasLoaded
* @throws IOException
*/
public boolean postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
boolean hasLoaded) throws IOException {
Map<byte[], List<Path>> map, boolean hasLoaded) throws IOException {
return execOperationWithResult(hasLoaded,
coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
@Override
public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
throws IOException {
setResult(oserver.postBulkLoadHFile(ctx, familyPaths, getResult()));
setResult(oserver.postBulkLoadHFile(ctx, familyPaths, map, getResult()));
}
});
}

View File

@ -171,7 +171,7 @@ public class SecureBulkLoadManager {
fs.delete(new Path(request.getBulkToken()), true);
}
public boolean secureBulkLoadHFiles(final Region region,
public Map<byte[], List<Path>> secureBulkLoadHFiles(final Region region,
final BulkLoadHFileRequest request) throws IOException {
final List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>(request.getFamilyPathCount());
for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) {
@ -200,6 +200,8 @@ public class SecureBulkLoadManager {
bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
}
boolean loaded = false;
Map<byte[], List<Path>> map = null;
if (!bypass) {
// Get the target fs (HBase region server fs) delegation token
// Since we have checked the permission via 'preBulkLoadHFile', now let's give
@ -217,9 +219,9 @@ public class SecureBulkLoadManager {
}
}
loaded = ugi.doAs(new PrivilegedAction<Boolean>() {
map = ugi.doAs(new PrivilegedAction<Map<byte[], List<Path>>>() {
@Override
public Boolean run() {
public Map<byte[], List<Path>> run() {
FileSystem fs = null;
try {
fs = FileSystem.get(conf);
@ -237,14 +239,17 @@ public class SecureBulkLoadManager {
} catch (Exception e) {
LOG.error("Failed to complete bulk load", e);
}
return false;
return null;
}
});
if (map != null) {
loaded = true;
}
}
if (region.getCoprocessorHost() != null) {
loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map, loaded);
}
return loaded;
return map;
}
private List<BulkLoadObserver> getBulkLoadObservers(Region region) {

View File

@ -635,7 +635,8 @@ public class SimpleRegionObserver extends BaseRegionObserver {
@Override
public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException {
List<Pair<byte[], String>> familyPaths, Map<byte[], List<Path>> map, boolean hasLoaded)
throws IOException {
RegionCoprocessorEnvironment e = ctx.getEnvironment();
assertNotNull(e);
assertNotNull(e.getRegion());

View File

@ -196,8 +196,8 @@ public class TestMobStoreCompaction {
// The following will bulk load the above generated store files and compact, with 600(fileSize)
// > 300(threshold)
boolean result = region.bulkLoadHFiles(hfiles, true, null);
assertTrue("Bulkload result:", result);
Map<byte[], List<Path>> map = region.bulkLoadHFiles(hfiles, true, null);
assertTrue("Bulkload result:", !map.isEmpty());
assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
assertEquals("Before compaction: mob file count", 0, countMobFiles());
assertEquals("Before compaction: rows", compactionThreshold, UTIL.countRows(region));