Consolidate SecureBulkLoadEndpoint into HBase core as default for bulk load

This commit is contained in:
Jerry He 2016-07-18 20:58:28 -07:00
parent 5051ab4e70
commit 70f330dc84
30 changed files with 5221 additions and 3607 deletions

View File

@ -444,14 +444,22 @@ public class MetricsConnection implements StatisticTrackable {
// use generic implementation
break;
case 4:
assert "ExecService".equals(method.getName());
assert "PrepareBulkLoad".equals(method.getName());
// use generic implementation
break;
case 5:
assert "ExecRegionServerService".equals(method.getName());
assert "CleanupBulkLoad".equals(method.getName());
// use generic implementation
break;
case 6:
assert "ExecService".equals(method.getName());
// use generic implementation
break;
case 7:
assert "ExecRegionServerService".equals(method.getName());
// use generic implementation
break;
case 8:
assert "Multi".equals(method.getName());
multiTracker.updateRpc(stats);
return;

View File

@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.security.token.Token;
import com.google.protobuf.ServiceException;
/**
* Client proxy for SecureBulkLoadProtocol
*/
@InterfaceAudience.Private
public class SecureBulkLoadClient {
private Table table;
public SecureBulkLoadClient(Table table) {
this.table = table;
}
public String prepareBulkLoad(final Connection conn) throws IOException {
try {
RegionServerCallable<String> callable =
new RegionServerCallable<String>(conn, table.getName(), HConstants.EMPTY_START_ROW) {
@Override
public String call(int callTimeout) throws IOException {
byte[] regionName = getLocation().getRegionInfo().getRegionName();
RegionSpecifier region =
RequestConverter
.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
try {
PrepareBulkLoadRequest request =
PrepareBulkLoadRequest.newBuilder()
.setTableName(ProtobufUtil.toProtoTableName(table.getName()))
.setRegion(region).build();
PrepareBulkLoadResponse response = getStub().prepareBulkLoad(null, request);
return response.getBulkToken();
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
};
return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null)
.<String> newCaller().callWithRetries(callable, Integer.MAX_VALUE);
} catch (Throwable throwable) {
throw new IOException(throwable);
}
}
public void cleanupBulkLoad(final Connection conn, final String bulkToken) throws IOException {
try {
RegionServerCallable<Void> callable =
new RegionServerCallable<Void>(conn, table.getName(), HConstants.EMPTY_START_ROW) {
@Override
public Void call(int callTimeout) throws IOException {
byte[] regionName = getLocation().getRegionInfo().getRegionName();
RegionSpecifier region = RequestConverter.buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
try {
CleanupBulkLoadRequest request =
CleanupBulkLoadRequest.newBuilder().setRegion(region)
.setBulkToken(bulkToken).build();
getStub().cleanupBulkLoad(null, request);
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
return null;
}
};
RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null)
.<Void> newCaller().callWithRetries(callable, Integer.MAX_VALUE);
} catch (Throwable throwable) {
throw new IOException(throwable);
}
}
/**
* Securely bulk load a list of HFiles using client protocol.
*
* @param client
* @param familyPaths
* @param regionName
* @param assignSeqNum
* @param userToken
* @param bulkToken
* @return true if all are loaded
* @throws IOException
*/
public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client,
final List<Pair<byte[], String>> familyPaths,
final byte[] regionName, boolean assignSeqNum,
final Token<?> userToken, final String bulkToken) throws IOException {
BulkLoadHFileRequest request =
RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum,
userToken, bulkToken);
try {
BulkLoadHFileResponse response = client.bulkLoadHFile(null, request);
return response.getLoaded();
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
public Path getStagingPath(String bulkToken, byte[] family) throws IOException {
return SecureBulkLoadUtil.getStagingPath(table.getConfiguration(), bulkToken, family);
}
}

View File

@ -108,8 +108,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionReques
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
import org.apache.hadoop.hbase.protobuf.generated.CellProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
@ -1578,30 +1576,6 @@ public final class ProtobufUtil {
// Start helpers for Client
/**
* A helper to bulk load a list of HFiles using client protocol.
*
* @param client
* @param familyPaths
* @param regionName
* @param assignSeqNum
* @return true if all are loaded
* @throws IOException
*/
public static boolean bulkLoadHFile(final ClientService.BlockingInterface client,
final List<Pair<byte[], String>> familyPaths,
final byte[] regionName, boolean assignSeqNum) throws IOException {
BulkLoadHFileRequest request =
RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum);
try {
BulkLoadHFileResponse response =
client.bulkLoadHFile(null, request);
return response.getLoaded();
} catch (ServiceException se) {
throw getRemoteException(se);
}
}
public static CoprocessorServiceResponse execService(final RpcController controller,
final ClientService.BlockingInterface client, final CoprocessorServiceCall call,
final byte[] regionName) throws IOException {
@ -1618,8 +1592,8 @@ public final class ProtobufUtil {
}
public static CoprocessorServiceResponse execService(final RpcController controller,
final MasterService.BlockingInterface client, final CoprocessorServiceCall call)
throws IOException {
final MasterService.BlockingInterface client, final CoprocessorServiceCall call)
throws IOException {
CoprocessorServiceRequest request = CoprocessorServiceRequest.newBuilder()
.setCall(call).setRegion(
RequestConverter.buildRegionSpecifier(REGION_NAME, HConstants.EMPTY_BYTE_ARRAY)).build();

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.protobuf;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
@ -64,7 +65,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodes
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
@ -103,8 +103,8 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableReques
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.NormalizeRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ReleaseSplitOrMergeLockAndRollbackRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
@ -115,6 +115,7 @@ import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.security.token.Token;
import com.google.protobuf.ByteString;
@ -526,19 +527,41 @@ public final class RequestConverter {
*/
public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
final List<Pair<byte[], String>> familyPaths,
final byte[] regionName, boolean assignSeqNum) {
BulkLoadHFileRequest.Builder builder = BulkLoadHFileRequest.newBuilder();
RegionSpecifier region = buildRegionSpecifier(
final byte[] regionName, boolean assignSeqNum,
final Token<?> userToken, final String bulkToken) {
RegionSpecifier region = RequestConverter.buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
FamilyPath.Builder familyPathBuilder = FamilyPath.newBuilder();
for (Pair<byte[], String> familyPath: familyPaths) {
familyPathBuilder.setFamily(ByteStringer.wrap(familyPath.getFirst()));
familyPathBuilder.setPath(familyPath.getSecond());
builder.addFamilyPath(familyPathBuilder.build());
ClientProtos.DelegationToken protoDT = null;
if (userToken != null) {
protoDT =
ClientProtos.DelegationToken.newBuilder()
.setIdentifier(ByteStringer.wrap(userToken.getIdentifier()))
.setPassword(ByteStringer.wrap(userToken.getPassword()))
.setKind(userToken.getKind().toString())
.setService(userToken.getService().toString()).build();
}
builder.setAssignSeqNum(assignSeqNum);
return builder.build();
List<ClientProtos.BulkLoadHFileRequest.FamilyPath> protoFamilyPaths =
new ArrayList<ClientProtos.BulkLoadHFileRequest.FamilyPath>(familyPaths.size());
for(Pair<byte[], String> el: familyPaths) {
protoFamilyPaths.add(ClientProtos.BulkLoadHFileRequest.FamilyPath.newBuilder()
.setFamily(ByteStringer.wrap(el.getFirst()))
.setPath(el.getSecond()).build());
}
BulkLoadHFileRequest.Builder request =
ClientProtos.BulkLoadHFileRequest.newBuilder()
.setRegion(region)
.setAssignSeqNum(assignSeqNum)
.addAllFamilyPath(protoFamilyPaths);
if (userToken != null) {
request.setFsToken(protoDT);
}
if (bulkToken != null) {
request.setBulkToken(bulkToken);
}
return request.build();
}
/**

View File

@ -53,6 +53,8 @@ import org.apache.hadoop.hbase.protobuf.generated.CellProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService.BlockingInterface;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
@ -63,6 +65,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
@ -478,6 +482,18 @@ public class TestClientNoCluster extends Configured implements Tool {
CoprocessorServiceRequest request) throws ServiceException {
throw new NotImplementedException();
}
@Override
public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller,
PrepareBulkLoadRequest request) throws ServiceException {
throw new NotImplementedException();
}
@Override
public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller,
CleanupBulkLoadRequest request) throws ServiceException {
throw new NotImplementedException();
}
}
static ScanResponse doMetaScanResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,

View File

@ -118,6 +118,7 @@ public class HBaseCommonTestingUtility {
if (deleteOnExit()) this.dataTestDir.deleteOnExit();
createSubDir("hbase.local.dir", testPath, "hbase-local-dir");
createSubDir("hbase.bulkload.staging.dir", testPath, "staging");
return testPath;
}

View File

@ -337,6 +337,8 @@ message BulkLoadHFileRequest {
required RegionSpecifier region = 1;
repeated FamilyPath family_path = 2;
optional bool assign_seq_num = 3;
optional DelegationToken fs_token = 4;
optional string bulk_token = 5;
message FamilyPath {
required bytes family = 1;
@ -348,6 +350,30 @@ message BulkLoadHFileResponse {
required bool loaded = 1;
}
message DelegationToken {
optional bytes identifier = 1;
optional bytes password = 2;
optional string kind = 3;
optional string service = 4;
}
message PrepareBulkLoadRequest {
required TableName table_name = 1;
optional RegionSpecifier region = 2;
}
message PrepareBulkLoadResponse {
required string bulk_token = 1;
}
message CleanupBulkLoadRequest {
required string bulk_token = 1;
optional RegionSpecifier region = 2;
}
message CleanupBulkLoadResponse {
}
message CoprocessorServiceCall {
required bytes row = 1;
required string service_name = 2;
@ -467,6 +493,12 @@ service ClientService {
rpc BulkLoadHFile(BulkLoadHFileRequest)
returns(BulkLoadHFileResponse);
rpc PrepareBulkLoad(PrepareBulkLoadRequest)
returns (PrepareBulkLoadResponse);
rpc CleanupBulkLoad(CleanupBulkLoadRequest)
returns (CleanupBulkLoadResponse);
rpc ExecService(CoprocessorServiceRequest)
returns(CoprocessorServiceResponse);

View File

@ -37,29 +37,6 @@ message SecureBulkLoadHFilesResponse {
required bool loaded = 1;
}
message DelegationToken {
optional bytes identifier = 1;
optional bytes password = 2;
optional string kind = 3;
optional string service = 4;
}
message PrepareBulkLoadRequest {
required TableName table_name = 1;
}
message PrepareBulkLoadResponse {
required string bulk_token = 1;
}
message CleanupBulkLoadRequest {
required string bulk_token = 1;
}
message CleanupBulkLoadResponse {
}
service SecureBulkLoadService {
rpc PrepareBulkLoad(PrepareBulkLoadRequest)
returns (PrepareBulkLoadResponse);

View File

@ -25,8 +25,8 @@ import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
/**
* Coprocessors implement this interface to observe and mediate bulk load operations.

View File

@ -333,6 +333,26 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
return ret;
}
/**
* Find list of CoprocessorEnvironment that extend/implement the given class/interface
* @param cls the class/interface to look for
* @return the list of CoprocessorEnvironment, or null if not found
*/
public List<CoprocessorEnvironment> findCoprocessorEnvironment(Class<?> cls) {
ArrayList<CoprocessorEnvironment> ret = new ArrayList<CoprocessorEnvironment>();
for (E env: coprocessors) {
Coprocessor cp = env.getInstance();
if(cp != null) {
if (cls.isAssignableFrom(cp.getClass())) {
ret.add(env);
}
}
}
return ret;
}
/**
* Find a coprocessor environment by class name
* @param className the class name

View File

@ -74,8 +74,8 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.RegionServerCallable;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.client.SecureBulkLoadClient;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.HalfStoreFileReader;
@ -87,7 +87,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
@ -323,6 +322,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
// LQI queue does not need to be threadsafe -- all operations on this queue
// happen in this thread
Deque<LoadQueueItem> queue = new LinkedList<>();
SecureBulkLoadClient secureClient = new SecureBulkLoadClient(table);
try {
/*
* Checking hfile format is a time-consuming operation, we should have an option to skip
@ -346,13 +347,16 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
return;
}
if(isSecureBulkLoadEndpointAvailable()) {
LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases.");
LOG.warn("Secure bulk load has been integrated into HBase core.");
}
//If using secure bulk load, get source delegation token, and
//prepare staging directory and token
// fs is the source filesystem
fsDelegationToken.acquireDelegationToken(fs);
if(isSecureBulkLoadEndpointAvailable()) {
bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getName());
}
bulkToken = secureClient.prepareBulkLoad(admin.getConnection());
// Assumes that region splits can happen while this occurs.
while (!queue.isEmpty()) {
@ -391,7 +395,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
} finally {
fsDelegationToken.releaseDelegationToken();
if(bulkToken != null) {
new SecureBulkLoadClient(table).cleanupBulkLoad(bulkToken);
secureClient.cleanupBulkLoad(admin.getConnection(), bulkToken);
}
pool.shutdown();
if (queue != null && !queue.isEmpty()) {
@ -789,21 +793,18 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
LOG.debug("Going to connect to server " + getLocation() + " for row "
+ Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths);
byte[] regionName = getLocation().getRegionInfo().getRegionName();
if (!isSecureBulkLoadEndpointAvailable()) {
success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds);
} else {
try (Table table = conn.getTable(getTableName())) {
secureClient = new SecureBulkLoadClient(table);
success = secureClient.bulkLoadHFiles(famPaths, fsDelegationToken.getUserToken(),
bulkToken, getLocation().getRegionInfo().getStartKey());
}
try (Table table = conn.getTable(getTableName())) {
secureClient = new SecureBulkLoadClient(table);
success =
secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
assignSeqIds, fsDelegationToken.getUserToken(), bulkToken);
}
return success;
} finally {
//Best effort copying of files that might not have been imported
//from the staging directory back to original location
//in user directory
if(secureClient != null && !success) {
if (secureClient != null && !success) {
FileSystem targetFs = FileSystem.get(getConf());
// fs is the source filesystem
if(fs == null) {

View File

@ -498,6 +498,8 @@ public class HRegionServer extends HasThread implements
private volatile ThroughputController flushThroughputController;
protected final SecureBulkLoadManager secureBulkLoadManager;
/**
* Starts a HRegionServer at the default location.
* @param conf
@ -618,6 +620,9 @@ public class HRegionServer extends HasThread implements
}
this.configurationManager = new ConfigurationManager();
this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf);
this.secureBulkLoadManager.start();
rpcServices.start();
putUpWebUI();
this.walRoller = new LogRoller(this, this);
@ -3431,4 +3436,9 @@ public class HRegionServer extends HasThread implements
public MetricsRegionServer getMetrics() {
return metricsRegionServer;
}
@Override
public SecureBulkLoadManager getSecureBulkLoadManager() {
return this.secureBulkLoadManager;
}
}

View File

@ -134,6 +134,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
@ -147,6 +149,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
@ -2042,21 +2046,29 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
checkOpen();
requestCount.increment();
Region region = getRegion(request.getRegion());
List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
for (FamilyPath familyPath: request.getFamilyPathList()) {
familyPaths.add(new Pair<byte[], String>(familyPath.getFamily().toByteArray(),
familyPath.getPath()));
}
boolean bypass = false;
if (region.getCoprocessorHost() != null) {
bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
}
boolean loaded = false;
if (!bypass) {
loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null);
}
if (region.getCoprocessorHost() != null) {
loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
if (!request.hasBulkToken()) {
// Old style bulk load. This will not be supported in future releases
List<Pair<byte[], String>> familyPaths =
new ArrayList<Pair<byte[], String>>(request.getFamilyPathCount());
for (FamilyPath familyPath : request.getFamilyPathList()) {
familyPaths.add(new Pair<byte[], String>(familyPath.getFamily().toByteArray(), familyPath
.getPath()));
}
if (region.getCoprocessorHost() != null) {
bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
}
if (!bypass) {
loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null);
}
if (region.getCoprocessorHost() != null) {
loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
}
} else {
// secure bulk load
loaded = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request);
}
BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
builder.setLoaded(loaded);
@ -2066,6 +2078,41 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
@Override
public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller,
PrepareBulkLoadRequest request) throws ServiceException {
try {
checkOpen();
requestCount.increment();
Region region = getRegion(request.getRegion());
String bulkToken = regionServer.secureBulkLoadManager.prepareBulkLoad(region, request);
PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder();
builder.setBulkToken(bulkToken);
return builder.build();
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
@Override
public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller,
CleanupBulkLoadRequest request) throws ServiceException {
try {
checkOpen();
requestCount.increment();
Region region = getRegion(request.getRegion());
regionServer.secureBulkLoadManager.cleanupBulkLoad(region, request);
CleanupBulkLoadResponse response = CleanupBulkLoadResponse.newBuilder().build();
return response;
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
@Override
public CoprocessorServiceResponse execService(final RpcController controller,
final CoprocessorServiceRequest request) throws ServiceException {
@ -2930,4 +2977,5 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
return UpdateConfigurationResponse.getDefaultInstance();
}
}

View File

@ -80,6 +80,11 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
*/
RegionServerQuotaManager getRegionServerQuotaManager();
/**
* @return RegionServer's instance of {@link SecureBulkLoadManager}
*/
SecureBulkLoadManager getSecureBulkLoadManager();
/**
* Context for postOpenDeployTasks().
*/

View File

@ -0,0 +1,419 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
import org.apache.hadoop.hbase.regionserver.Region.BulkLoadListener;
import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.FsDelegationToken;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSHDFSUtils;
import org.apache.hadoop.hbase.util.Methods;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import java.io.IOException;
import java.math.BigInteger;
import java.security.PrivilegedAction;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Bulk loads in secure mode.
*
* This service addresses two issues:
* <ol>
* <li>Moving files in a secure filesystem wherein the HBase Client
* and HBase Server are different filesystem users.</li>
* <li>Does moving in a secure manner. Assuming that the filesystem
* is POSIX compliant.</li>
* </ol>
*
* The algorithm is as follows:
* <ol>
* <li>Create an hbase owned staging directory which is
* world traversable (711): {@code /hbase/staging}</li>
* <li>A user writes out data to his secure output directory: {@code /user/foo/data}</li>
* <li>A call is made to hbase to create a secret staging directory
* which globally rwx (777): {@code /user/staging/averylongandrandomdirectoryname}</li>
* <li>The user moves the data into the random staging directory,
* then calls bulkLoadHFiles()</li>
* </ol>
*
* Like delegation tokens the strength of the security lies in the length
* and randomness of the secret directory.
*
*/
@InterfaceAudience.Private
public class SecureBulkLoadManager {
public static final long VERSION = 0L;
//320/5 = 64 characters
private static final int RANDOM_WIDTH = 320;
private static final int RANDOM_RADIX = 32;
private static final Log LOG = LogFactory.getLog(SecureBulkLoadManager.class);
private final static FsPermission PERM_ALL_ACCESS = FsPermission.valueOf("-rwxrwxrwx");
private final static FsPermission PERM_HIDDEN = FsPermission.valueOf("-rwx--x--x");
private SecureRandom random;
private FileSystem fs;
private Configuration conf;
//two levels so it doesn't get deleted accidentally
//no sticky bit in Hadoop 1.0
private Path baseStagingDir;
private UserProvider userProvider;
SecureBulkLoadManager(Configuration conf) {
this.conf = conf;
}
public void start() {
random = new SecureRandom();
baseStagingDir = SecureBulkLoadUtil.getBaseStagingDir(conf);
this.userProvider = UserProvider.instantiate(conf);
try {
fs = FileSystem.get(conf);
fs.mkdirs(baseStagingDir, PERM_HIDDEN);
fs.setPermission(baseStagingDir, PERM_HIDDEN);
FileStatus status = fs.getFileStatus(baseStagingDir);
//no sticky bit in hadoop-1.0, making directory nonempty so it never gets erased
fs.mkdirs(new Path(baseStagingDir,"DONOTERASE"), PERM_HIDDEN);
if (status == null) {
throw new IllegalStateException("Failed to create staging directory "
+ baseStagingDir.toString());
}
if (!status.getPermission().equals(PERM_HIDDEN)) {
throw new IllegalStateException(
"Staging directory already exists but permissions aren't set to '-rwx--x--x' "
+ baseStagingDir.toString());
}
} catch (IOException e) {
LOG.error("Failed to create or set permission on staging directory "
+ baseStagingDir.toString(), e);
throw new IllegalStateException("Failed to create or set permission on staging directory "
+ baseStagingDir.toString(), e);
}
}
public void stop() throws IOException {
}
public String prepareBulkLoad(final Region region, final PrepareBulkLoadRequest request)
throws IOException {
List<BulkLoadObserver> bulkLoadObservers = getBulkLoadObservers(region);
if (bulkLoadObservers != null && bulkLoadObservers.size() != 0) {
ObserverContext<RegionCoprocessorEnvironment> ctx =
new ObserverContext<RegionCoprocessorEnvironment>();
ctx.prepare((RegionCoprocessorEnvironment) region.getCoprocessorHost()
.findCoprocessorEnvironment(BulkLoadObserver.class).get(0));
for (BulkLoadObserver bulkLoadObserver : bulkLoadObservers) {
bulkLoadObserver.prePrepareBulkLoad(ctx, request);
}
}
String bulkToken =
createStagingDir(baseStagingDir, getActiveUser(), region.getTableDesc().getTableName())
.toString();
return bulkToken;
}
public void cleanupBulkLoad(final Region region, final CleanupBulkLoadRequest request)
throws IOException {
List<BulkLoadObserver> bulkLoadObservers = getBulkLoadObservers(region);
if (bulkLoadObservers != null && bulkLoadObservers.size() != 0) {
ObserverContext<RegionCoprocessorEnvironment> ctx =
new ObserverContext<RegionCoprocessorEnvironment>();
ctx.prepare((RegionCoprocessorEnvironment) region.getCoprocessorHost()
.findCoprocessorEnvironment(BulkLoadObserver.class).get(0));
for (BulkLoadObserver bulkLoadObserver : bulkLoadObservers) {
bulkLoadObserver.preCleanupBulkLoad(ctx, request);
}
}
fs.delete(new Path(request.getBulkToken()), true);
}
public boolean secureBulkLoadHFiles(final Region region,
final BulkLoadHFileRequest request) throws IOException {
final List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>(request.getFamilyPathCount());
for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) {
familyPaths.add(new Pair<byte[], String>(el.getFamily().toByteArray(), el.getPath()));
}
Token userToken = null;
if (userProvider.isHadoopSecurityEnabled()) {
userToken = new Token(request.getFsToken().getIdentifier().toByteArray(), request.getFsToken()
.getPassword().toByteArray(), new Text(request.getFsToken().getKind()), new Text(
request.getFsToken().getService()));
}
final String bulkToken = request.getBulkToken();
User user = getActiveUser();
final UserGroupInformation ugi = user.getUGI();
if(userToken != null) {
ugi.addToken(userToken);
} else if (userProvider.isHadoopSecurityEnabled()) {
//we allow this to pass through in "simple" security mode
//for mini cluster testing
throw new DoNotRetryIOException("User token cannot be null");
}
boolean bypass = false;
if (region.getCoprocessorHost() != null) {
bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
}
boolean loaded = false;
if (!bypass) {
// Get the target fs (HBase region server fs) delegation token
// Since we have checked the permission via 'preBulkLoadHFile', now let's give
// the 'request user' necessary token to operate on the target fs.
// After this point the 'doAs' user will hold two tokens, one for the source fs
// ('request user'), another for the target fs (HBase region server principal).
if (userProvider.isHadoopSecurityEnabled()) {
FsDelegationToken targetfsDelegationToken = new FsDelegationToken(userProvider, "renewer");
targetfsDelegationToken.acquireDelegationToken(fs);
Token<?> targetFsToken = targetfsDelegationToken.getUserToken();
if (targetFsToken != null
&& (userToken == null || !targetFsToken.getService().equals(userToken.getService()))) {
ugi.addToken(targetFsToken);
}
}
loaded = ugi.doAs(new PrivilegedAction<Boolean>() {
@Override
public Boolean run() {
FileSystem fs = null;
try {
fs = FileSystem.get(conf);
for(Pair<byte[], String> el: familyPaths) {
Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst()));
if(!fs.exists(stageFamily)) {
fs.mkdirs(stageFamily);
fs.setPermission(stageFamily, PERM_ALL_ACCESS);
}
}
//We call bulkLoadHFiles as requesting user
//To enable access prior to staging
return region.bulkLoadHFiles(familyPaths, true,
new SecureBulkLoadListener(fs, bulkToken, conf));
} catch (Exception e) {
LOG.error("Failed to complete bulk load", e);
} finally {
if (fs != null) {
try {
if (!UserGroupInformation.getLoginUser().equals(ugi)) {
FileSystem.closeAllForUGI(ugi);
}
} catch (IOException e) {
LOG.error("Failed to close FileSystem for " + ugi.getUserName(), e);
}
}
}
return false;
}
});
}
if (region.getCoprocessorHost() != null) {
loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
}
return loaded;
}
private List<BulkLoadObserver> getBulkLoadObservers(Region region) {
List<BulkLoadObserver> coprocessorList =
region.getCoprocessorHost().findCoprocessors(BulkLoadObserver.class);
return coprocessorList;
}
private Path createStagingDir(Path baseDir,
User user,
TableName tableName) throws IOException {
String tblName = tableName.getNameAsString().replace(":", "_");
String randomDir = user.getShortName()+"__"+ tblName +"__"+
(new BigInteger(RANDOM_WIDTH, random).toString(RANDOM_RADIX));
return createStagingDir(baseDir, user, randomDir);
}
private Path createStagingDir(Path baseDir,
User user,
String randomDir) throws IOException {
Path p = new Path(baseDir, randomDir);
fs.mkdirs(p, PERM_ALL_ACCESS);
fs.setPermission(p, PERM_ALL_ACCESS);
return p;
}
private User getActiveUser() throws IOException {
User user = RpcServer.getRequestUser();
if (user == null) {
// for non-rpc handling, fallback to system user
user = userProvider.getCurrent();
}
//this is for testing
if (userProvider.isHadoopSecurityEnabled()
&& "simple".equalsIgnoreCase(conf.get(User.HBASE_SECURITY_CONF_KEY))) {
return User.createUserForTesting(conf, user.getShortName(), new String[]{});
}
return user;
}
private static class SecureBulkLoadListener implements BulkLoadListener {
// Target filesystem
private final FileSystem fs;
private final String stagingDir;
private final Configuration conf;
// Source filesystem
private FileSystem srcFs = null;
private Map<String, FsPermission> origPermissions = null;
public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf) {
this.fs = fs;
this.stagingDir = stagingDir;
this.conf = conf;
this.origPermissions = new HashMap<String, FsPermission>();
}
@Override
public String prepareBulkLoad(final byte[] family, final String srcPath) throws IOException {
Path p = new Path(srcPath);
Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName()));
// In case of Replication for bulk load files, hfiles are already copied in staging directory
if (p.equals(stageP)) {
LOG.debug(p.getName()
+ " is already available in staging directory. Skipping copy or rename.");
return stageP.toString();
}
if (srcFs == null) {
srcFs = FileSystem.get(p.toUri(), conf);
}
if(!isFile(p)) {
throw new IOException("Path does not reference a file: " + p);
}
// Check to see if the source and target filesystems are the same
if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {
LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than " +
"the destination filesystem. Copying file over to destination staging dir.");
FileUtil.copy(srcFs, p, fs, stageP, false, conf);
} else {
LOG.debug("Moving " + p + " to " + stageP);
FileStatus origFileStatus = fs.getFileStatus(p);
origPermissions.put(srcPath, origFileStatus.getPermission());
if(!fs.rename(p, stageP)) {
throw new IOException("Failed to move HFile: " + p + " to " + stageP);
}
}
fs.setPermission(stageP, PERM_ALL_ACCESS);
return stageP.toString();
}
@Override
public void doneBulkLoad(byte[] family, String srcPath) throws IOException {
LOG.debug("Bulk Load done for: " + srcPath);
}
@Override
public void failedBulkLoad(final byte[] family, final String srcPath) throws IOException {
if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {
// files are copied so no need to move them back
return;
}
Path p = new Path(srcPath);
Path stageP = new Path(stagingDir,
new Path(Bytes.toString(family), p.getName()));
// In case of Replication for bulk load files, hfiles are not renamed by end point during
// prepare stage, so no need of rename here again
if (p.equals(stageP)) {
LOG.debug(p.getName() + " is already available in source directory. Skipping rename.");
return;
}
LOG.debug("Moving " + stageP + " back to " + p);
if(!fs.rename(stageP, p))
throw new IOException("Failed to move HFile: " + stageP + " to " + p);
// restore original permission
if (origPermissions.containsKey(srcPath)) {
fs.setPermission(p, origPermissions.get(srcPath));
} else {
LOG.warn("Can't find previous permission for path=" + srcPath);
}
}
/**
* Check if the path is referencing a file.
* This is mainly needed to avoid symlinks.
* @param p
* @return true if the p is a file
* @throws IOException
*/
private boolean isFile(Path p) throws IOException {
FileStatus status = srcFs.getFileStatus(p);
boolean isFile = !status.isDirectory();
try {
isFile = isFile && !(Boolean)Methods.call(FileStatus.class, status, "isSymlink", null, null);
} catch (Exception e) {
}
return isFile;
}
}
}

View File

@ -95,10 +95,10 @@ import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.Region;
@ -2145,7 +2145,7 @@ public class AccessController extends BaseMasterAndRegionObserver
*/
@Override
public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
PrepareBulkLoadRequest request) throws IOException {
PrepareBulkLoadRequest request) throws IOException {
requireAccess("prePareBulkLoad",
ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE);
}
@ -2159,7 +2159,7 @@ public class AccessController extends BaseMasterAndRegionObserver
*/
@Override
public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
CleanupBulkLoadRequest request) throws IOException {
CleanupBulkLoadRequest request) throws IOException {
requireAccess("preCleanupBulkLoad",
ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE);
}

View File

@ -18,140 +18,52 @@
package org.apache.hadoop.hbase.security.access;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadHFilesRequest;
import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadHFilesResponse;
import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadService;
import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadService;
import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest;
import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadResponse;
import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadResponse;
import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadHFilesRequest;
import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadHFilesResponse;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Region.BulkLoadListener;
import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.FsDelegationToken;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSHDFSUtils;
import org.apache.hadoop.hbase.util.Methods;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import java.io.IOException;
import java.math.BigInteger;
import java.security.PrivilegedAction;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Coprocessor service for bulk loads in secure mode.
* This coprocessor has to be installed as part of enabling
* security in HBase.
*
* This service addresses two issues:
* <ol>
* <li>Moving files in a secure filesystem wherein the HBase Client
* and HBase Server are different filesystem users.</li>
* <li>Does moving in a secure manner. Assuming that the filesystem
* is POSIX compliant.</li>
* </ol>
*
* The algorithm is as follows:
* <ol>
* <li>Create an hbase owned staging directory which is
* world traversable (711): {@code /hbase/staging}</li>
* <li>A user writes out data to his secure output directory: {@code /user/foo/data}</li>
* <li>A call is made to hbase to create a secret staging directory
* which globally rwx (777): {@code /user/staging/averylongandrandomdirectoryname}</li>
* <li>The user moves the data into the random staging directory,
* then calls bulkLoadHFiles()</li>
* </ol>
* Like delegation tokens the strength of the security lies in the length
* and randomness of the secret directory.
*
* @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0
*/
@InterfaceAudience.Private
@Deprecated
public class SecureBulkLoadEndpoint extends SecureBulkLoadService
implements CoprocessorService, Coprocessor {
public static final long VERSION = 0L;
//320/5 = 64 characters
private static final int RANDOM_WIDTH = 320;
private static final int RANDOM_RADIX = 32;
private static final Log LOG = LogFactory.getLog(SecureBulkLoadEndpoint.class);
private final static FsPermission PERM_ALL_ACCESS = FsPermission.valueOf("-rwxrwxrwx");
private final static FsPermission PERM_HIDDEN = FsPermission.valueOf("-rwx--x--x");
private SecureRandom random;
private FileSystem fs;
private Configuration conf;
//two levels so it doesn't get deleted accidentally
//no sticky bit in Hadoop 1.0
private Path baseStagingDir;
private RegionCoprocessorEnvironment env;
private UserProvider userProvider;
@Override
public void start(CoprocessorEnvironment env) {
this.env = (RegionCoprocessorEnvironment)env;
random = new SecureRandom();
conf = env.getConfiguration();
baseStagingDir = SecureBulkLoadUtil.getBaseStagingDir(conf);
this.userProvider = UserProvider.instantiate(conf);
try {
fs = FileSystem.get(conf);
fs.mkdirs(baseStagingDir, PERM_HIDDEN);
fs.setPermission(baseStagingDir, PERM_HIDDEN);
//no sticky bit in hadoop-1.0, making directory nonempty so it never gets erased
fs.mkdirs(new Path(baseStagingDir,"DONOTERASE"), PERM_HIDDEN);
FileStatus status = fs.getFileStatus(baseStagingDir);
if(status == null) {
throw new IllegalStateException("Failed to create staging directory");
}
if(!status.getPermission().equals(PERM_HIDDEN)) {
throw new IllegalStateException(
"Directory already exists but permissions aren't set to '-rwx--x--x' ");
}
} catch (IOException e) {
throw new IllegalStateException("Failed to get FileSystem instance",e);
}
LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases.");
LOG.warn("Secure bulk load has been integrated into HBase core.");
}
@Override
@ -159,24 +71,12 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
}
@Override
public void prepareBulkLoad(RpcController controller,
PrepareBulkLoadRequest request,
RpcCallback<PrepareBulkLoadResponse> done){
public void prepareBulkLoad(RpcController controller, PrepareBulkLoadRequest request,
RpcCallback<PrepareBulkLoadResponse> done) {
try {
List<BulkLoadObserver> bulkLoadObservers = getBulkLoadObservers();
if(bulkLoadObservers != null) {
ObserverContext<RegionCoprocessorEnvironment> ctx =
new ObserverContext<RegionCoprocessorEnvironment>();
ctx.prepare(env);
for(BulkLoadObserver bulkLoadObserver : bulkLoadObservers) {
bulkLoadObserver.prePrepareBulkLoad(ctx, request);
}
}
String bulkToken = createStagingDir(baseStagingDir,
getActiveUser(), ProtobufUtil.toTableName(request.getTableName())).toString();
SecureBulkLoadManager secureBulkLoadManager =
this.env.getRegionServerServices().getSecureBulkLoadManager();
String bulkToken = secureBulkLoadManager.prepareBulkLoad(this.env.getRegion(), request);
done.run(PrepareBulkLoadResponse.newBuilder().setBulkToken(bulkToken).build());
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
@ -185,23 +85,12 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
}
@Override
public void cleanupBulkLoad(RpcController controller,
CleanupBulkLoadRequest request,
RpcCallback<CleanupBulkLoadResponse> done) {
public void cleanupBulkLoad(RpcController controller, CleanupBulkLoadRequest request,
RpcCallback<CleanupBulkLoadResponse> done) {
try {
List<BulkLoadObserver> bulkLoadObservers = getBulkLoadObservers();
if(bulkLoadObservers != null) {
ObserverContext<RegionCoprocessorEnvironment> ctx =
new ObserverContext<RegionCoprocessorEnvironment>();
ctx.prepare(env);
for(BulkLoadObserver bulkLoadObserver : bulkLoadObservers) {
bulkLoadObserver.preCleanupBulkLoad(ctx, request);
}
}
fs.delete(new Path(request.getBulkToken()), true);
SecureBulkLoadManager secureBulkLoadManager =
this.env.getRegionServerServices().getSecureBulkLoadManager();
secureBulkLoadManager.cleanupBulkLoad(this.env.getRegion(), request);
done.run(CleanupBulkLoadResponse.newBuilder().build());
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
@ -210,262 +99,35 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
}
@Override
public void secureBulkLoadHFiles(RpcController controller,
SecureBulkLoadHFilesRequest request,
RpcCallback<SecureBulkLoadHFilesResponse> done) {
final List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) {
familyPaths.add(new Pair(el.getFamily().toByteArray(),el.getPath()));
}
Token userToken = null;
if (userProvider.isHadoopSecurityEnabled()) {
userToken = new Token(request.getFsToken().getIdentifier().toByteArray(), request.getFsToken()
.getPassword().toByteArray(), new Text(request.getFsToken().getKind()), new Text(
request.getFsToken().getService()));
}
final String bulkToken = request.getBulkToken();
User user = getActiveUser();
final UserGroupInformation ugi = user.getUGI();
if(userToken != null) {
ugi.addToken(userToken);
} else if (userProvider.isHadoopSecurityEnabled()) {
//we allow this to pass through in "simple" security mode
//for mini cluster testing
ResponseConverter.setControllerException(controller,
new DoNotRetryIOException("User token cannot be null"));
done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(false).build());
return;
}
Region region = env.getRegion();
boolean bypass = false;
if (region.getCoprocessorHost() != null) {
try {
bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(false).build());
return;
}
}
public void secureBulkLoadHFiles(RpcController controller, SecureBulkLoadHFilesRequest request,
RpcCallback<SecureBulkLoadHFilesResponse> done) {
boolean loaded = false;
if (!bypass) {
// Get the target fs (HBase region server fs) delegation token
// Since we have checked the permission via 'preBulkLoadHFile', now let's give
// the 'request user' necessary token to operate on the target fs.
// After this point the 'doAs' user will hold two tokens, one for the source fs
// ('request user'), another for the target fs (HBase region server principal).
if (userProvider.isHadoopSecurityEnabled()) {
FsDelegationToken targetfsDelegationToken = new FsDelegationToken(userProvider, "renewer");
try {
targetfsDelegationToken.acquireDelegationToken(fs);
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(false).build());
return;
}
Token<?> targetFsToken = targetfsDelegationToken.getUserToken();
if (targetFsToken != null
&& (userToken == null || !targetFsToken.getService().equals(userToken.getService()))) {
ugi.addToken(targetFsToken);
}
}
loaded = ugi.doAs(new PrivilegedAction<Boolean>() {
@Override
public Boolean run() {
FileSystem fs = null;
try {
Configuration conf = env.getConfiguration();
fs = FileSystem.get(conf);
for(Pair<byte[], String> el: familyPaths) {
Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst()));
if(!fs.exists(stageFamily)) {
fs.mkdirs(stageFamily);
fs.setPermission(stageFamily, PERM_ALL_ACCESS);
}
}
//We call bulkLoadHFiles as requesting user
//To enable access prior to staging
return env.getRegion().bulkLoadHFiles(familyPaths, true,
new SecureBulkLoadListener(fs, bulkToken, conf));
} catch (Exception e) {
LOG.error("Failed to complete bulk load", e);
} finally {
if (fs != null) {
try {
if (!UserGroupInformation.getLoginUser().equals(ugi)) {
FileSystem.closeAllForUGI(ugi);
}
} catch (IOException e) {
LOG.error("Failed to close FileSystem for " + ugi.getUserName(), e);
}
}
}
return false;
}
});
}
if (region.getCoprocessorHost() != null) {
try {
loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(false).build());
return;
}
try {
SecureBulkLoadManager secureBulkLoadManager =
this.env.getRegionServerServices().getSecureBulkLoadManager();
BulkLoadHFileRequest bulkLoadHFileRequest = ConvertSecureBulkLoadHFilesRequest(request);
loaded = secureBulkLoadManager.secureBulkLoadHFiles(this.env.getRegion(), bulkLoadHFileRequest);
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
}
done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(loaded).build());
}
private List<BulkLoadObserver> getBulkLoadObservers() {
List<BulkLoadObserver> coprocessorList =
this.env.getRegion().getCoprocessorHost().findCoprocessors(BulkLoadObserver.class);
private BulkLoadHFileRequest ConvertSecureBulkLoadHFilesRequest(
SecureBulkLoadHFilesRequest request) {
BulkLoadHFileRequest.Builder bulkLoadHFileRequest = BulkLoadHFileRequest.newBuilder();
RegionSpecifier region =
RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, this.env
.getRegionInfo().getRegionName());
return coprocessorList;
}
private Path createStagingDir(Path baseDir,
User user,
TableName tableName) throws IOException {
String tblName = tableName.getNameAsString().replace(":", "_");
String randomDir = user.getShortName()+"__"+ tblName +"__"+
(new BigInteger(RANDOM_WIDTH, random).toString(RANDOM_RADIX));
return createStagingDir(baseDir, user, randomDir);
}
private Path createStagingDir(Path baseDir,
User user,
String randomDir) throws IOException {
Path p = new Path(baseDir, randomDir);
fs.mkdirs(p, PERM_ALL_ACCESS);
fs.setPermission(p, PERM_ALL_ACCESS);
return p;
}
private User getActiveUser() {
User user = RpcServer.getRequestUser();
if (user == null) {
return null;
}
//this is for testing
if (userProvider.isHadoopSecurityEnabled()
&& "simple".equalsIgnoreCase(conf.get(User.HBASE_SECURITY_CONF_KEY))) {
return User.createUserForTesting(conf, user.getShortName(), new String[]{});
}
return user;
bulkLoadHFileRequest.setRegion(region).setFsToken(request.getFsToken())
.setBulkToken(request.getBulkToken()).setAssignSeqNum(request.getAssignSeqNum())
.addAllFamilyPath(request.getFamilyPathList());
return bulkLoadHFileRequest.build();
}
@Override
public Service getService() {
return this;
}
private static class SecureBulkLoadListener implements BulkLoadListener {
// Target filesystem
private FileSystem fs;
private String stagingDir;
private Configuration conf;
// Source filesystem
private FileSystem srcFs = null;
private Map<String, FsPermission> origPermissions = null;
public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf) {
this.fs = fs;
this.stagingDir = stagingDir;
this.conf = conf;
this.origPermissions = new HashMap<String, FsPermission>();
}
@Override
public String prepareBulkLoad(final byte[] family, final String srcPath) throws IOException {
Path p = new Path(srcPath);
Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName()));
// In case of Replication for bulk load files, hfiles are already copied in staging directory
if (p.equals(stageP)) {
LOG.debug(p.getName()
+ " is already available in staging directory. Skipping copy or rename.");
return stageP.toString();
}
if (srcFs == null) {
srcFs = FileSystem.get(p.toUri(), conf);
}
if(!isFile(p)) {
throw new IOException("Path does not reference a file: " + p);
}
// Check to see if the source and target filesystems are the same
if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {
LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than " +
"the destination filesystem. Copying file over to destination staging dir.");
FileUtil.copy(srcFs, p, fs, stageP, false, conf);
} else {
LOG.debug("Moving " + p + " to " + stageP);
FileStatus origFileStatus = fs.getFileStatus(p);
origPermissions.put(srcPath, origFileStatus.getPermission());
if(!fs.rename(p, stageP)) {
throw new IOException("Failed to move HFile: " + p + " to " + stageP);
}
}
fs.setPermission(stageP, PERM_ALL_ACCESS);
return stageP.toString();
}
@Override
public void doneBulkLoad(byte[] family, String srcPath) throws IOException {
LOG.debug("Bulk Load done for: " + srcPath);
}
@Override
public void failedBulkLoad(final byte[] family, final String srcPath) throws IOException {
if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {
// files are copied so no need to move them back
return;
}
Path p = new Path(srcPath);
Path stageP = new Path(stagingDir,
new Path(Bytes.toString(family), p.getName()));
// In case of Replication for bulk load files, hfiles are not renamed by end point during
// prepare stage, so no need of rename here again
if (p.equals(stageP)) {
LOG.debug(p.getName() + " is already available in source directory. Skipping rename.");
return;
}
LOG.debug("Moving " + stageP + " back to " + p);
if(!fs.rename(stageP, p))
throw new IOException("Failed to move HFile: " + stageP + " to " + p);
// restore original permission
if (origPermissions.containsKey(srcPath)) {
fs.setPermission(p, origPermissions.get(srcPath));
} else {
LOG.warn("Can't find previous permission for path=" + srcPath);
}
}
/**
* Check if the path is referencing a file.
* This is mainly needed to avoid symlinks.
* @param p
* @return true if the p is a file
* @throws IOException
*/
private boolean isFile(Path p) throws IOException {
FileStatus status = srcFs.getFileStatus(p);
boolean isFile = !status.isDirectory();
try {
isFile = isFile && !(Boolean)Methods.call(FileStatus.class, status, "isSymlink", null, null);
} catch (Exception e) {
}
return isFile;
}
}
}

View File

@ -1080,6 +1080,10 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
// Now do the mini hbase cluster. Set the hbase.rootdir in config.
createRootDir(create);
// Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is
// for tests that do not read hbase-defaults.xml
setHBaseFsTmpDir();
// These settings will make the server waits until this exact number of
// regions servers are connected.
if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
@ -1104,10 +1108,6 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
getAdmin(); // create immediately the hbaseAdmin
LOG.info("Minicluster is up");
// Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is
// for tests that do not read hbase-defaults.xml
setHBaseFsTmpDir();
return (MiniHBaseCluster)this.hbaseCluster;
}
@ -1278,6 +1278,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
} else {
LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString);
}
this.conf.set("hbase.bulkload.staging.dir", this.conf.get("hbase.fs.tmp.dir"));
}
/**

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.util.Bytes;
@ -316,7 +317,6 @@ public class MockRegionServerServices implements RegionServerServices {
@Override
public ClusterConnection getClusterConnection() {
// TODO Auto-generated method stub
return null;
}
@ -334,4 +334,9 @@ public class MockRegionServerServices implements RegionServerServices {
public MetricsRegionServer getMetrics() {
return null;
}
@Override
public SecureBulkLoadManager getSecureBulkLoadManager() {
return null;
}
}

View File

@ -35,8 +35,6 @@ import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@ -336,16 +334,21 @@ public class TestReplicaWithCluster {
LOG.debug("Loading test data");
@SuppressWarnings("deprecation")
final ClusterConnection conn = (ClusterConnection) HTU.getAdmin().getConnection();
table = conn.getTable(hdt.getTableName());
final String bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(conn);
RegionServerCallable<Void> callable = new RegionServerCallable<Void>(
conn, hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0)) {
@Override
public Void call(int timeout) throws Exception {
LOG.debug("Going to connect to server " + getLocation() + " for row "
+ Bytes.toStringBinary(getRow()));
SecureBulkLoadClient secureClient = null;
byte[] regionName = getLocation().getRegionInfo().getRegionName();
BulkLoadHFileRequest request =
RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true);
getStub().bulkLoadHFile(null, request);
try (Table table = conn.getTable(getTableName())) {
secureClient = new SecureBulkLoadClient(table);
secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
true, null, bulkToken);
}
return null;
}
};

View File

@ -1,45 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
@Category(LargeTests.class)
public class TestLoadIncrementalHFilesUseSecurityEndPoint extends TestLoadIncrementalHFiles {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
util.getConfiguration().setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
MAX_FILES_PER_REGION_PER_FAMILY);
util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
"org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
// change default behavior so that tag values are returned with normal rpcs
util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
KeyValueCodecWithTags.class.getCanonicalName());
util.startMiniCluster();
setupNamespace();
}
}

View File

@ -83,6 +83,8 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodes
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
@ -90,6 +92,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
@ -103,6 +107,7 @@ import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.util.Bytes;
@ -661,7 +666,6 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
@Override
public ClusterConnection getClusterConnection() {
// TODO Auto-generated method stub
return null;
}
@ -679,4 +683,21 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
public MetricsRegionServer getMetrics() {
return null;
}
}
@Override
public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller,
PrepareBulkLoadRequest request) throws ServiceException {
return null;
}
@Override
public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller,
CleanupBulkLoadRequest request) throws ServiceException {
return null;
}
@Override
public SecureBulkLoadManager getSecureBulkLoadManager() {
return null;
}
}

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.coprocessor;
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
@ -32,6 +32,11 @@ import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.DelegationToken;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos;
import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
import org.apache.hadoop.hbase.util.ByteStringer;
@ -39,14 +44,15 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.security.token.Token;
/**
* Client proxy for SecureBulkLoadProtocol
* used in conjunction with SecureBulkLoadEndpoint
* Client proxy for SecureBulkLoadProtocol used in conjunction with SecureBulkLoadEndpoint
* @deprecated Use for backward compatibility testing only. Will be removed when
* SecureBulkLoadEndpoint is not supported.
*/
@InterfaceAudience.Private
public class SecureBulkLoadClient {
public class SecureBulkLoadEndpointClient {
private Table table;
public SecureBulkLoadClient(Table table) {
public SecureBulkLoadEndpointClient(Table table) {
this.table = table;
}
@ -58,22 +64,22 @@ public class SecureBulkLoadClient {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<SecureBulkLoadProtos.PrepareBulkLoadResponse> rpcCallback =
new BlockingRpcCallback<SecureBulkLoadProtos.PrepareBulkLoadResponse>();
BlockingRpcCallback<PrepareBulkLoadResponse> rpcCallback =
new BlockingRpcCallback<PrepareBulkLoadResponse>();
SecureBulkLoadProtos.PrepareBulkLoadRequest request =
SecureBulkLoadProtos.PrepareBulkLoadRequest.newBuilder()
PrepareBulkLoadRequest request =
PrepareBulkLoadRequest.newBuilder()
.setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
instance.prepareBulkLoad(controller,
request,
rpcCallback);
SecureBulkLoadProtos.PrepareBulkLoadResponse response = rpcCallback.get();
PrepareBulkLoadResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
return response.getBulkToken();
} catch (Throwable throwable) {
throw new IOException(throwable);
@ -88,11 +94,11 @@ public class SecureBulkLoadClient {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<SecureBulkLoadProtos.CleanupBulkLoadResponse> rpcCallback =
new BlockingRpcCallback<SecureBulkLoadProtos.CleanupBulkLoadResponse>();
BlockingRpcCallback<CleanupBulkLoadResponse> rpcCallback =
new BlockingRpcCallback<CleanupBulkLoadResponse>();
SecureBulkLoadProtos.CleanupBulkLoadRequest request =
SecureBulkLoadProtos.CleanupBulkLoadRequest.newBuilder()
CleanupBulkLoadRequest request =
CleanupBulkLoadRequest.newBuilder()
.setBulkToken(bulkToken).build();
instance.cleanupBulkLoad(controller,
@ -118,11 +124,11 @@ public class SecureBulkLoadClient {
SecureBulkLoadProtos.SecureBulkLoadService instance =
ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
SecureBulkLoadProtos.DelegationToken protoDT =
SecureBulkLoadProtos.DelegationToken.newBuilder().build();
DelegationToken protoDT =
DelegationToken.newBuilder().build();
if(userToken != null) {
protoDT =
SecureBulkLoadProtos.DelegationToken.newBuilder()
DelegationToken.newBuilder()
.setIdentifier(ByteStringer.wrap(userToken.getIdentifier()))
.setPassword(ByteStringer.wrap(userToken.getPassword()))
.setKind(userToken.getKind().toString())

View File

@ -26,7 +26,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
@ -52,6 +51,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.SecureBulkLoadClient;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@ -65,7 +65,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.LargeTests;
@ -91,15 +90,15 @@ import com.google.common.collect.Lists;
@Category({RegionServerTests.class, LargeTests.class})
public class TestHRegionServerBulkLoad {
private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
private final static Configuration conf = UTIL.getConfiguration();
private final static byte[] QUAL = Bytes.toBytes("qual");
private final static int NUM_CFS = 10;
protected static HBaseTestingUtility UTIL = new HBaseTestingUtility();
protected final static Configuration conf = UTIL.getConfiguration();
protected final static byte[] QUAL = Bytes.toBytes("qual");
protected final static int NUM_CFS = 10;
private int sleepDuration;
public static int BLOCKSIZE = 64 * 1024;
public static Algorithm COMPRESSION = Compression.Algorithm.NONE;
private final static byte[][] families = new byte[NUM_CFS][];
protected final static byte[][] families = new byte[NUM_CFS][];
static {
for (int i = 0; i < NUM_CFS; i++) {
families[i] = Bytes.toBytes(family(i));
@ -200,16 +199,21 @@ public class TestHRegionServerBulkLoad {
// bulk load HFiles
final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection();
Table table = conn.getTable(tableName);
final String bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(conn);
RegionServerCallable<Void> callable =
new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
@Override
public Void call(int callTimeout) throws Exception {
LOG.debug("Going to connect to server " + getLocation() + " for row "
+ Bytes.toStringBinary(getRow()));
SecureBulkLoadClient secureClient = null;
byte[] regionName = getLocation().getRegionInfo().getRegionName();
BulkLoadHFileRequest request =
RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true);
getStub().bulkLoadHFile(null, request);
try (Table table = conn.getTable(getTableName())) {
secureClient = new SecureBulkLoadClient(table);
secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
true, null, bulkToken);
}
return null;
}
};
@ -320,7 +324,7 @@ public class TestHRegionServerBulkLoad {
* Creates a table with given table name and specified number of column
* families if the table does not already exist.
*/
private void setupTable(TableName table, int cfs) throws IOException {
public void setupTable(TableName table, int cfs) throws IOException {
try {
LOG.info("Creating table " + table);
HTableDescriptor htd = new HTableDescriptor(table);

View File

@ -0,0 +1,162 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.RegionServerCallable;
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import com.google.common.collect.Lists;
/**
* Tests bulk loading of HFiles with old non-secure client for backward compatibility. Will be
* removed when old non-secure client for backward compatibility is not supported.
*/
@RunWith(Parameterized.class)
@Category({RegionServerTests.class, LargeTests.class})
public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBulkLoad {
public TestHRegionServerBulkLoadWithOldClient(int duration) {
super(duration);
}
private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoadWithOldClient.class);
public static class AtomicHFileLoader extends RepeatingTestThread {
final AtomicLong numBulkLoads = new AtomicLong();
final AtomicLong numCompactions = new AtomicLong();
private TableName tableName;
public AtomicHFileLoader(TableName tableName, TestContext ctx,
byte targetFamilies[][]) throws IOException {
super(ctx);
this.tableName = tableName;
}
public void doAnAction() throws Exception {
long iteration = numBulkLoads.getAndIncrement();
Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d",
iteration));
// create HFiles for different column families
FileSystem fs = UTIL.getTestFileSystem();
byte[] val = Bytes.toBytes(String.format("%010d", iteration));
final List<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[], String>>(
NUM_CFS);
for (int i = 0; i < NUM_CFS; i++) {
Path hfile = new Path(dir, family(i));
byte[] fam = Bytes.toBytes(family(i));
createHFile(fs, hfile, fam, QUAL, val, 1000);
famPaths.add(new Pair<>(fam, hfile.toString()));
}
// bulk load HFiles
final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection();
RegionServerCallable<Void> callable =
new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
@Override
public Void call(int callTimeout) throws Exception {
LOG.info("Non-secure old client");
byte[] regionName = getLocation().getRegionInfo().getRegionName();
BulkLoadHFileRequest request =
RequestConverter
.buildBulkLoadHFileRequest(famPaths, regionName, true, null, null);
getStub().bulkLoadHFile(null, request);
return null;
}
};
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
caller.callWithRetries(callable, Integer.MAX_VALUE);
// Periodically do compaction to reduce the number of open file handles.
if (numBulkLoads.get() % 5 == 0) {
// 5 * 50 = 250 open file handles!
callable = new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
@Override
public Void call(int callTimeout) throws Exception {
LOG.debug("compacting " + getLocation() + " for row "
+ Bytes.toStringBinary(getRow()));
AdminProtos.AdminService.BlockingInterface server =
conn.getAdmin(getLocation().getServerName());
CompactRegionRequest request =
RequestConverter.buildCompactRegionRequest(
getLocation().getRegionInfo().getRegionName(), true, null);
server.compactRegion(null, request);
numCompactions.incrementAndGet();
return null;
}
};
caller.callWithRetries(callable, Integer.MAX_VALUE);
}
}
}
void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners)
throws Exception {
setupTable(tableName, 10);
TestContext ctx = new TestContext(UTIL.getConfiguration());
AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
ctx.addThread(loader);
List<AtomicScanReader> scanners = Lists.newArrayList();
for (int i = 0; i < numScanners; i++) {
AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families);
scanners.add(scanner);
ctx.addThread(scanner);
}
ctx.startThreads();
ctx.waitFor(millisToRun);
ctx.stop();
LOG.info("Loaders:");
LOG.info(" loaded " + loader.numBulkLoads.get());
LOG.info(" compations " + loader.numCompactions.get());
LOG.info("Scanners:");
for (AtomicScanReader scanner : scanners) {
LOG.info(" scanned " + scanner.numScans.get());
LOG.info(" verified " + scanner.numRowsScanned.get() + " rows");
}
}
}

View File

@ -0,0 +1,177 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.RegionServerCallable;
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.client.SecureBulkLoadClient;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import com.google.common.collect.Lists;
/**
* Tests bulk loading of HFiles with old secure Endpoint client for backward compatibility. Will be
* removed when old non-secure client for backward compatibility is not supported.
*/
@RunWith(Parameterized.class)
@Category({RegionServerTests.class, LargeTests.class})
public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionServerBulkLoad {
public TestHRegionServerBulkLoadWithOldSecureEndpoint(int duration) {
super(duration);
}
private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoadWithOldSecureEndpoint.class);
@BeforeClass
public static void setUpBeforeClass() throws IOException {
conf.setInt("hbase.rpc.timeout", 10 * 1000);
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
"org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
}
public static class AtomicHFileLoader extends RepeatingTestThread {
final AtomicLong numBulkLoads = new AtomicLong();
final AtomicLong numCompactions = new AtomicLong();
private TableName tableName;
public AtomicHFileLoader(TableName tableName, TestContext ctx,
byte targetFamilies[][]) throws IOException {
super(ctx);
this.tableName = tableName;
}
public void doAnAction() throws Exception {
long iteration = numBulkLoads.getAndIncrement();
Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d",
iteration));
// create HFiles for different column families
FileSystem fs = UTIL.getTestFileSystem();
byte[] val = Bytes.toBytes(String.format("%010d", iteration));
final List<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[], String>>(
NUM_CFS);
for (int i = 0; i < NUM_CFS; i++) {
Path hfile = new Path(dir, family(i));
byte[] fam = Bytes.toBytes(family(i));
createHFile(fs, hfile, fam, QUAL, val, 1000);
famPaths.add(new Pair<>(fam, hfile.toString()));
}
// bulk load HFiles
final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection();
Table table = conn.getTable(tableName);
final String bulkToken = new SecureBulkLoadEndpointClient(table).prepareBulkLoad(tableName);
RegionServerCallable<Void> callable =
new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
@Override
public Void call(int callTimeout) throws Exception {
LOG.debug("Going to connect to server " + getLocation() + " for row "
+ Bytes.toStringBinary(getRow()));
try (Table table = conn.getTable(getTableName())) {
boolean loaded =
new SecureBulkLoadEndpointClient(table).bulkLoadHFiles(famPaths, null,
bulkToken, getLocation().getRegionInfo().getStartKey());
}
return null;
}
};
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
caller.callWithRetries(callable, Integer.MAX_VALUE);
// Periodically do compaction to reduce the number of open file handles.
if (numBulkLoads.get() % 5 == 0) {
// 5 * 50 = 250 open file handles!
callable = new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
@Override
public Void call(int callTimeout) throws Exception {
LOG.debug("compacting " + getLocation() + " for row "
+ Bytes.toStringBinary(getRow()));
AdminProtos.AdminService.BlockingInterface server =
conn.getAdmin(getLocation().getServerName());
CompactRegionRequest request =
RequestConverter.buildCompactRegionRequest(
getLocation().getRegionInfo().getRegionName(), true, null);
server.compactRegion(null, request);
numCompactions.incrementAndGet();
return null;
}
};
caller.callWithRetries(callable, Integer.MAX_VALUE);
}
}
}
void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners)
throws Exception {
setupTable(tableName, 10);
TestContext ctx = new TestContext(UTIL.getConfiguration());
AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
ctx.addThread(loader);
List<AtomicScanReader> scanners = Lists.newArrayList();
for (int i = 0; i < numScanners; i++) {
AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families);
scanners.add(scanner);
ctx.addThread(scanner);
}
ctx.startThreads();
ctx.waitFor(millisToRun);
ctx.stop();
LOG.info("Loaders:");
LOG.info(" loaded " + loader.numBulkLoads.get());
LOG.info(" compations " + loader.numCompactions.get());
LOG.info("Scanners:");
for (AtomicScanReader scanner : scanners) {
LOG.info(" scanned " + scanner.numScans.get());
LOG.info(" verified " + scanner.numRowsScanned.get() + " rows");
}
}
}

View File

@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@ -32,6 +31,7 @@ import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.CoordinatedStateManager;
@ -61,6 +61,8 @@ public class TestPriorityRpc {
public void setup() {
Configuration conf = HBaseConfiguration.create();
conf.setBoolean("hbase.testing.nocluster", true); // No need to do ZK
final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
TEST_UTIL.getDataTestDir(this.getClass().getName());
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
regionServer = HRegionServer.constructRegionServer(HRegionServer.class, conf, cp);
priority = regionServer.rpcServices.getPriority();

View File

@ -99,8 +99,7 @@ public class SecureTestUtil {
conf.set("hadoop.security.authentication", "simple");
conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, AccessController.class.getName() +
"," + MasterSyncObserver.class.getName());
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, AccessController.class.getName() +
"," + SecureBulkLoadEndpoint.class.getName());
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, AccessController.class.getName());
conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, AccessController.class.getName());
// Need HFile V3 for tags for security features
conf.setInt(HFile.FORMAT_VERSION_KEY, 3);