HBASE-9639 SecureBulkLoad dispatches file load requests to all Regions

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1526033 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
ndimiduk 2013-09-24 22:26:08 +00:00
parent b6ad121cb9
commit 91508fd564
2 changed files with 47 additions and 48 deletions

View File

@ -18,14 +18,17 @@
package org.apache.hadoop.hbase.client.coprocessor;
import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
import static org.apache.hadoop.hbase.HConstants.LAST_ROW;
import com.google.protobuf.ByteString;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
@ -54,8 +57,8 @@ public class SecureBulkLoadClient {
try {
return
table.coprocessorService(SecureBulkLoadProtos.SecureBulkLoadService.class,
HConstants.EMPTY_START_ROW,
HConstants.EMPTY_START_ROW,
EMPTY_START_ROW,
LAST_ROW,
new Batch.Call<SecureBulkLoadProtos.SecureBulkLoadService,String>() {
@Override
public String call(SecureBulkLoadProtos.SecureBulkLoadService instance) throws IOException {
@ -87,8 +90,8 @@ public class SecureBulkLoadClient {
public void cleanupBulkLoad(final String bulkToken) throws IOException {
try {
table.coprocessorService(SecureBulkLoadProtos.SecureBulkLoadService.class,
HConstants.EMPTY_START_ROW,
HConstants.EMPTY_START_ROW,
EMPTY_START_ROW,
LAST_ROW,
new Batch.Call<SecureBulkLoadProtos.SecureBulkLoadService, String>() {
@Override
@ -121,54 +124,50 @@ public class SecureBulkLoadClient {
final Token<?> userToken,
final String bulkToken,
final byte[] startRow) throws IOException {
// we never want to send a batch of HFiles to all regions, thus cannot call
// HTable#coprocessorService methods that take start and end rowkeys; see HBASE-9639
try {
return
table.coprocessorService(SecureBulkLoadProtos.SecureBulkLoadService.class,
startRow,
startRow,
new Batch.Call<SecureBulkLoadProtos.SecureBulkLoadService,Boolean>() {
CoprocessorRpcChannel channel = table.coprocessorService(startRow);
SecureBulkLoadProtos.SecureBulkLoadService instance =
ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
@Override
public Boolean call(SecureBulkLoadProtos.SecureBulkLoadService instance) throws IOException {
SecureBulkLoadProtos.DelegationToken protoDT =
SecureBulkLoadProtos.DelegationToken.newBuilder().build();
if(userToken != null) {
protoDT =
SecureBulkLoadProtos.DelegationToken.newBuilder()
.setIdentifier(ByteString.copyFrom(userToken.getIdentifier()))
.setPassword(ByteString.copyFrom(userToken.getPassword()))
.setKind(userToken.getKind().toString())
.setService(userToken.getService().toString()).build();
}
SecureBulkLoadProtos.DelegationToken protoDT =
SecureBulkLoadProtos.DelegationToken.newBuilder().build();
if(userToken != null) {
protoDT =
SecureBulkLoadProtos.DelegationToken.newBuilder()
.setIdentifier(ByteString.copyFrom(userToken.getIdentifier()))
.setPassword(ByteString.copyFrom(userToken.getPassword()))
.setKind(userToken.getKind().toString())
.setService(userToken.getService().toString()).build();
}
List<ClientProtos.BulkLoadHFileRequest.FamilyPath> protoFamilyPaths =
new ArrayList<ClientProtos.BulkLoadHFileRequest.FamilyPath>();
for(Pair<byte[], String> el: familyPaths) {
protoFamilyPaths.add(ClientProtos.BulkLoadHFileRequest.FamilyPath.newBuilder()
.setFamily(ByteString.copyFrom(el.getFirst()))
.setPath(el.getSecond()).build());
}
List<ClientProtos.BulkLoadHFileRequest.FamilyPath> protoFamilyPaths =
new ArrayList<ClientProtos.BulkLoadHFileRequest.FamilyPath>();
for(Pair<byte[], String> el: familyPaths) {
protoFamilyPaths.add(ClientProtos.BulkLoadHFileRequest.FamilyPath.newBuilder()
.setFamily(ByteString.copyFrom(el.getFirst()))
.setPath(el.getSecond()).build());
}
SecureBulkLoadProtos.SecureBulkLoadHFilesRequest request =
SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder()
.setFsToken(protoDT)
.addAllFamilyPath(protoFamilyPaths)
.setBulkToken(bulkToken).build();
SecureBulkLoadProtos.SecureBulkLoadHFilesRequest request =
SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder()
.setFsToken(protoDT)
.addAllFamilyPath(protoFamilyPaths)
.setBulkToken(bulkToken).build();
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse> rpcCallback =
new BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse>();
instance.secureBulkLoadHFiles(controller,
request,
rpcCallback);
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse> rpcCallback =
new BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse>();
instance.secureBulkLoadHFiles(controller,
request,
rpcCallback);
SecureBulkLoadProtos.SecureBulkLoadHFilesResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
return response.getLoaded();
}
}).entrySet().iterator().next().getValue();
SecureBulkLoadProtos.SecureBulkLoadHFilesResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
return response.getLoaded();
} catch (Throwable throwable) {
throw new IOException(throwable);
}

View File

@ -557,7 +557,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
try {
LOG.debug("Going to connect to server " + getLocation() + " for row "
+ Bytes.toStringBinary(getRow()));
+ Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths);
byte[] regionName = getLocation().getRegionInfo().getRegionName();
if(!useSecure) {
success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds);