HBASE-21868 Remove legacy bulk load support
Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
parent
353233c296
commit
78d3d5628a
|
@ -1,189 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.security.access;
|
|
||||||
|
|
||||||
import com.google.protobuf.RpcCallback;
|
|
||||||
import com.google.protobuf.RpcController;
|
|
||||||
import com.google.protobuf.Service;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
|
||||||
import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
|
|
||||||
import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
|
|
||||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
|
||||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
|
||||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadHFilesRequest;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadHFilesResponse;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadService;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
|
|
||||||
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Coprocessor service for bulk loads in secure mode.
|
|
||||||
* @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0
|
|
||||||
*/
|
|
||||||
@CoreCoprocessor
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
@Deprecated
|
|
||||||
public class SecureBulkLoadEndpoint extends SecureBulkLoadService implements RegionCoprocessor {
|
|
||||||
public static final long VERSION = 0L;
|
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(SecureBulkLoadEndpoint.class);
|
|
||||||
|
|
||||||
private RegionCoprocessorEnvironment env;
|
|
||||||
private RegionServerServices rsServices;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void start(CoprocessorEnvironment env) {
|
|
||||||
this.env = (RegionCoprocessorEnvironment)env;
|
|
||||||
rsServices = ((HasRegionServerServices)this.env).getRegionServerServices();
|
|
||||||
LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases.");
|
|
||||||
LOG.warn("Secure bulk load has been integrated into HBase core.");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void stop(CoprocessorEnvironment env) throws IOException {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void prepareBulkLoad(RpcController controller, PrepareBulkLoadRequest request,
|
|
||||||
RpcCallback<PrepareBulkLoadResponse> done) {
|
|
||||||
try {
|
|
||||||
SecureBulkLoadManager secureBulkLoadManager = this.rsServices.getSecureBulkLoadManager();
|
|
||||||
|
|
||||||
String bulkToken = secureBulkLoadManager.prepareBulkLoad((HRegion) this.env.getRegion(),
|
|
||||||
convert(request));
|
|
||||||
done.run(PrepareBulkLoadResponse.newBuilder().setBulkToken(bulkToken).build());
|
|
||||||
} catch (IOException e) {
|
|
||||||
CoprocessorRpcUtils.setControllerException(controller, e);
|
|
||||||
}
|
|
||||||
done.run(null);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Convert from CPEP protobuf 2.5 to internal protobuf 3.3.
|
|
||||||
*/
|
|
||||||
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest
|
|
||||||
convert(PrepareBulkLoadRequest request)
|
|
||||||
throws org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException {
|
|
||||||
byte [] bytes = request.toByteArray();
|
|
||||||
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest.Builder
|
|
||||||
builder =
|
|
||||||
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest.
|
|
||||||
newBuilder();
|
|
||||||
builder.mergeFrom(bytes);
|
|
||||||
return builder.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void cleanupBulkLoad(RpcController controller, CleanupBulkLoadRequest request,
|
|
||||||
RpcCallback<CleanupBulkLoadResponse> done) {
|
|
||||||
try {
|
|
||||||
SecureBulkLoadManager secureBulkLoadManager = this.rsServices.getSecureBulkLoadManager();
|
|
||||||
secureBulkLoadManager.cleanupBulkLoad((HRegion) this.env.getRegion(), convert(request));
|
|
||||||
done.run(CleanupBulkLoadResponse.newBuilder().build());
|
|
||||||
} catch (IOException e) {
|
|
||||||
CoprocessorRpcUtils.setControllerException(controller, e);
|
|
||||||
}
|
|
||||||
done.run(null);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Convert from CPEP protobuf 2.5 to internal protobuf 3.3.
|
|
||||||
*/
|
|
||||||
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest
|
|
||||||
convert(CleanupBulkLoadRequest request)
|
|
||||||
throws org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException {
|
|
||||||
byte [] bytes = request.toByteArray();
|
|
||||||
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest.Builder
|
|
||||||
builder =
|
|
||||||
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest.
|
|
||||||
newBuilder();
|
|
||||||
builder.mergeFrom(bytes);
|
|
||||||
return builder.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void secureBulkLoadHFiles(RpcController controller, SecureBulkLoadHFilesRequest request,
|
|
||||||
RpcCallback<SecureBulkLoadHFilesResponse> done) {
|
|
||||||
boolean loaded = false;
|
|
||||||
Map<byte[], List<Path>> map = null;
|
|
||||||
try {
|
|
||||||
SecureBulkLoadManager secureBulkLoadManager = this.rsServices.getSecureBulkLoadManager();
|
|
||||||
BulkLoadHFileRequest bulkLoadHFileRequest = ConvertSecureBulkLoadHFilesRequest(request);
|
|
||||||
map = secureBulkLoadManager.secureBulkLoadHFiles((HRegion) this.env.getRegion(),
|
|
||||||
convert(bulkLoadHFileRequest));
|
|
||||||
loaded = map != null && !map.isEmpty();
|
|
||||||
} catch (IOException e) {
|
|
||||||
CoprocessorRpcUtils.setControllerException(controller, e);
|
|
||||||
}
|
|
||||||
done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(loaded).build());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Convert from CPEP protobuf 2.5 to internal protobuf 3.3.
|
|
||||||
*/
|
|
||||||
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest
|
|
||||||
convert(BulkLoadHFileRequest request)
|
|
||||||
throws org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException {
|
|
||||||
byte [] bytes = request.toByteArray();
|
|
||||||
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.Builder
|
|
||||||
builder =
|
|
||||||
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.
|
|
||||||
newBuilder();
|
|
||||||
builder.mergeFrom(bytes);
|
|
||||||
return builder.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
private BulkLoadHFileRequest ConvertSecureBulkLoadHFilesRequest(
|
|
||||||
SecureBulkLoadHFilesRequest request) {
|
|
||||||
BulkLoadHFileRequest.Builder bulkLoadHFileRequest = BulkLoadHFileRequest.newBuilder();
|
|
||||||
RegionSpecifier region =
|
|
||||||
ProtobufUtil.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, this.env
|
|
||||||
.getRegionInfo().getRegionName());
|
|
||||||
bulkLoadHFileRequest.setRegion(region).setFsToken(request.getFsToken())
|
|
||||||
.setBulkToken(request.getBulkToken()).setAssignSeqNum(request.getAssignSeqNum())
|
|
||||||
.addAllFamilyPath(request.getFamilyPathList());
|
|
||||||
return bulkLoadHFileRequest.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Iterable<Service> getServices() {
|
|
||||||
return Collections.singleton(this);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,48 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package hbase.pb;
|
|
||||||
|
|
||||||
option java_package = "org.apache.hadoop.hbase.protobuf.generated";
|
|
||||||
option java_outer_classname = "SecureBulkLoadProtos";
|
|
||||||
option java_generic_services = true;
|
|
||||||
option java_generate_equals_and_hash = true;
|
|
||||||
option optimize_for = SPEED;
|
|
||||||
|
|
||||||
import 'Client.proto';
|
|
||||||
|
|
||||||
message SecureBulkLoadHFilesRequest {
|
|
||||||
repeated BulkLoadHFileRequest.FamilyPath family_path = 1;
|
|
||||||
optional bool assign_seq_num = 2;
|
|
||||||
required DelegationToken fs_token = 3;
|
|
||||||
required string bulk_token = 4;
|
|
||||||
}
|
|
||||||
|
|
||||||
message SecureBulkLoadHFilesResponse {
|
|
||||||
required bool loaded = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
service SecureBulkLoadService {
|
|
||||||
rpc PrepareBulkLoad(PrepareBulkLoadRequest)
|
|
||||||
returns (PrepareBulkLoadResponse);
|
|
||||||
|
|
||||||
rpc SecureBulkLoadHFiles(SecureBulkLoadHFilesRequest)
|
|
||||||
returns (SecureBulkLoadHFilesResponse);
|
|
||||||
|
|
||||||
rpc CleanupBulkLoad(CleanupBulkLoadRequest)
|
|
||||||
returns (CleanupBulkLoadResponse);
|
|
||||||
}
|
|
|
@ -1,163 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.TableName;
|
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
|
||||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
|
||||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
|
|
||||||
import org.apache.hadoop.hbase.ipc.ServerRpcController;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.DelegationToken;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos;
|
|
||||||
import org.apache.hadoop.hbase.util.ByteStringer;
|
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
|
||||||
import org.apache.hadoop.security.token.Token;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Client proxy for SecureBulkLoadProtocol used in conjunction with SecureBulkLoadEndpoint
|
|
||||||
* @deprecated Use for backward compatibility testing only. Will be removed when
|
|
||||||
* SecureBulkLoadEndpoint is not supported.
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public class SecureBulkLoadEndpointClient {
|
|
||||||
private Table table;
|
|
||||||
|
|
||||||
public SecureBulkLoadEndpointClient(Table table) {
|
|
||||||
this.table = table;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String prepareBulkLoad(final TableName tableName) throws IOException {
|
|
||||||
try {
|
|
||||||
CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW);
|
|
||||||
SecureBulkLoadProtos.SecureBulkLoadService instance =
|
|
||||||
ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
|
|
||||||
|
|
||||||
ServerRpcController controller = new ServerRpcController();
|
|
||||||
|
|
||||||
CoprocessorRpcUtils.BlockingRpcCallback<PrepareBulkLoadResponse> rpcCallback =
|
|
||||||
new CoprocessorRpcUtils.BlockingRpcCallback<>();
|
|
||||||
|
|
||||||
PrepareBulkLoadRequest request =
|
|
||||||
PrepareBulkLoadRequest.newBuilder()
|
|
||||||
.setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
|
|
||||||
|
|
||||||
instance.prepareBulkLoad(controller, request, rpcCallback);
|
|
||||||
|
|
||||||
PrepareBulkLoadResponse response = rpcCallback.get();
|
|
||||||
if (controller.failedOnException()) {
|
|
||||||
throw controller.getFailedOn();
|
|
||||||
}
|
|
||||||
|
|
||||||
return response.getBulkToken();
|
|
||||||
} catch (Throwable throwable) {
|
|
||||||
throw new IOException(throwable);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void cleanupBulkLoad(final String bulkToken) throws IOException {
|
|
||||||
try {
|
|
||||||
CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW);
|
|
||||||
SecureBulkLoadProtos.SecureBulkLoadService instance =
|
|
||||||
ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
|
|
||||||
|
|
||||||
ServerRpcController controller = new ServerRpcController();
|
|
||||||
|
|
||||||
CoprocessorRpcUtils.BlockingRpcCallback<CleanupBulkLoadResponse> rpcCallback =
|
|
||||||
new CoprocessorRpcUtils.BlockingRpcCallback<>();
|
|
||||||
|
|
||||||
CleanupBulkLoadRequest request =
|
|
||||||
CleanupBulkLoadRequest.newBuilder()
|
|
||||||
.setBulkToken(bulkToken).build();
|
|
||||||
|
|
||||||
instance.cleanupBulkLoad(controller,
|
|
||||||
request,
|
|
||||||
rpcCallback);
|
|
||||||
|
|
||||||
if (controller.failedOnException()) {
|
|
||||||
throw controller.getFailedOn();
|
|
||||||
}
|
|
||||||
} catch (Throwable throwable) {
|
|
||||||
throw new IOException(throwable);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean bulkLoadHFiles(final List<Pair<byte[], String>> familyPaths,
|
|
||||||
final Token<?> userToken, final String bulkToken, final byte[] startRow)
|
|
||||||
throws IOException {
|
|
||||||
// we never want to send a batch of HFiles to all regions, thus cannot call
|
|
||||||
// HTable#coprocessorService methods that take start and end rowkeys; see HBASE-9639
|
|
||||||
try {
|
|
||||||
CoprocessorRpcChannel channel = table.coprocessorService(startRow);
|
|
||||||
SecureBulkLoadProtos.SecureBulkLoadService instance =
|
|
||||||
ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
|
|
||||||
|
|
||||||
DelegationToken protoDT =
|
|
||||||
DelegationToken.newBuilder().build();
|
|
||||||
if(userToken != null) {
|
|
||||||
protoDT =
|
|
||||||
DelegationToken.newBuilder()
|
|
||||||
.setIdentifier(ByteStringer.wrap(userToken.getIdentifier()))
|
|
||||||
.setPassword(ByteStringer.wrap(userToken.getPassword()))
|
|
||||||
.setKind(userToken.getKind().toString())
|
|
||||||
.setService(userToken.getService().toString()).build();
|
|
||||||
}
|
|
||||||
|
|
||||||
List<ClientProtos.BulkLoadHFileRequest.FamilyPath> protoFamilyPaths =
|
|
||||||
new ArrayList<>(familyPaths.size());
|
|
||||||
for(Pair<byte[], String> el: familyPaths) {
|
|
||||||
protoFamilyPaths.add(ClientProtos.BulkLoadHFileRequest.FamilyPath.newBuilder()
|
|
||||||
.setFamily(ByteStringer.wrap(el.getFirst()))
|
|
||||||
.setPath(el.getSecond()).build());
|
|
||||||
}
|
|
||||||
|
|
||||||
SecureBulkLoadProtos.SecureBulkLoadHFilesRequest request =
|
|
||||||
SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder()
|
|
||||||
.setFsToken(protoDT)
|
|
||||||
.addAllFamilyPath(protoFamilyPaths)
|
|
||||||
.setBulkToken(bulkToken).build();
|
|
||||||
|
|
||||||
ServerRpcController controller = new ServerRpcController();
|
|
||||||
CoprocessorRpcUtils.BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse>
|
|
||||||
rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>();
|
|
||||||
instance.secureBulkLoadHFiles(controller,
|
|
||||||
request,
|
|
||||||
rpcCallback);
|
|
||||||
|
|
||||||
SecureBulkLoadProtos.SecureBulkLoadHFilesResponse response = rpcCallback.get();
|
|
||||||
if (controller.failedOnException()) {
|
|
||||||
throw controller.getFailedOn();
|
|
||||||
}
|
|
||||||
return response.getLoaded();
|
|
||||||
} catch (Throwable throwable) {
|
|
||||||
throw new IOException(throwable);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,187 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
|
|
||||||
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
|
|
||||||
import org.apache.hadoop.hbase.TableName;
|
|
||||||
import org.apache.hadoop.hbase.client.ClientServiceCallable;
|
|
||||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
|
||||||
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
|
|
||||||
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
|
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
|
||||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
|
||||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.ClassRule;
|
|
||||||
import org.junit.Ignore;
|
|
||||||
import org.junit.experimental.categories.Category;
|
|
||||||
import org.junit.runner.RunWith;
|
|
||||||
import org.junit.runners.Parameterized;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Tests bulk loading of HFiles with old secure Endpoint client for backward compatibility. Will be
|
|
||||||
* removed when old non-secure client for backward compatibility is not supported.
|
|
||||||
*/
|
|
||||||
@RunWith(Parameterized.class)
|
|
||||||
@Category({RegionServerTests.class, LargeTests.class})
|
|
||||||
@Ignore // BROKEN. FIX OR REMOVE.
|
|
||||||
public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionServerBulkLoad {
|
|
||||||
@ClassRule
|
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
|
||||||
HBaseClassTestRule.forClass(TestHRegionServerBulkLoadWithOldSecureEndpoint.class);
|
|
||||||
|
|
||||||
public TestHRegionServerBulkLoadWithOldSecureEndpoint(int duration) {
|
|
||||||
super(duration);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final Logger LOG =
|
|
||||||
LoggerFactory.getLogger(TestHRegionServerBulkLoadWithOldSecureEndpoint.class);
|
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void setUpBeforeClass() throws IOException {
|
|
||||||
conf.setInt("hbase.rpc.timeout", 10 * 1000);
|
|
||||||
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
|
|
||||||
"org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class AtomicHFileLoader extends RepeatingTestThread {
|
|
||||||
final AtomicLong numBulkLoads = new AtomicLong();
|
|
||||||
final AtomicLong numCompactions = new AtomicLong();
|
|
||||||
private TableName tableName;
|
|
||||||
|
|
||||||
public AtomicHFileLoader(TableName tableName, TestContext ctx, byte[][] targetFamilies)
|
|
||||||
throws IOException {
|
|
||||||
super(ctx);
|
|
||||||
this.tableName = tableName;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void doAnAction() throws Exception {
|
|
||||||
long iteration = numBulkLoads.getAndIncrement();
|
|
||||||
Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d",
|
|
||||||
iteration));
|
|
||||||
|
|
||||||
// create HFiles for different column families
|
|
||||||
FileSystem fs = UTIL.getTestFileSystem();
|
|
||||||
byte[] val = Bytes.toBytes(String.format("%010d", iteration));
|
|
||||||
final List<Pair<byte[], String>> famPaths = new ArrayList<>(NUM_CFS);
|
|
||||||
for (int i = 0; i < NUM_CFS; i++) {
|
|
||||||
Path hfile = new Path(dir, family(i));
|
|
||||||
byte[] fam = Bytes.toBytes(family(i));
|
|
||||||
createHFile(fs, hfile, fam, QUAL, val, 1000);
|
|
||||||
famPaths.add(new Pair<>(fam, hfile.toString()));
|
|
||||||
}
|
|
||||||
|
|
||||||
// bulk load HFiles
|
|
||||||
final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection();
|
|
||||||
Table table = conn.getTable(tableName);
|
|
||||||
final String bulkToken = new SecureBulkLoadEndpointClient(table).prepareBulkLoad(tableName);
|
|
||||||
RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration());
|
|
||||||
ClientServiceCallable<Void> callable =
|
|
||||||
new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"),
|
|
||||||
rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
|
|
||||||
@Override
|
|
||||||
protected Void rpcCall() throws Exception {
|
|
||||||
LOG.debug("Going to connect to server " + getLocation() + " for row " +
|
|
||||||
Bytes.toStringBinary(getRow()));
|
|
||||||
try (Table table = conn.getTable(getTableName())) {
|
|
||||||
boolean loaded = new SecureBulkLoadEndpointClient(table).bulkLoadHFiles(famPaths,
|
|
||||||
null, bulkToken, getLocation().getRegionInfo().getStartKey());
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
|
|
||||||
RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
|
|
||||||
caller.callWithRetries(callable, Integer.MAX_VALUE);
|
|
||||||
|
|
||||||
// Periodically do compaction to reduce the number of open file handles.
|
|
||||||
if (numBulkLoads.get() % 5 == 0) {
|
|
||||||
// 5 * 50 = 250 open file handles!
|
|
||||||
callable = new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"),
|
|
||||||
rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
|
|
||||||
@Override
|
|
||||||
protected Void rpcCall() throws Exception {
|
|
||||||
LOG.debug("compacting " + getLocation() + " for row "
|
|
||||||
+ Bytes.toStringBinary(getRow()));
|
|
||||||
AdminProtos.AdminService.BlockingInterface server =
|
|
||||||
conn.getAdmin(getLocation().getServerName());
|
|
||||||
CompactRegionRequest request =
|
|
||||||
RequestConverter.buildCompactRegionRequest(
|
|
||||||
getLocation().getRegionInfo().getRegionName(), true, null);
|
|
||||||
server.compactRegion(null, request);
|
|
||||||
numCompactions.incrementAndGet();
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
caller.callWithRetries(callable, Integer.MAX_VALUE);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners)
|
|
||||||
throws Exception {
|
|
||||||
setupTable(tableName, 10);
|
|
||||||
|
|
||||||
TestContext ctx = new TestContext(UTIL.getConfiguration());
|
|
||||||
|
|
||||||
AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
|
|
||||||
ctx.addThread(loader);
|
|
||||||
|
|
||||||
List<AtomicScanReader> scanners = Lists.newArrayList();
|
|
||||||
for (int i = 0; i < numScanners; i++) {
|
|
||||||
AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families);
|
|
||||||
scanners.add(scanner);
|
|
||||||
ctx.addThread(scanner);
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx.startThreads();
|
|
||||||
ctx.waitFor(millisToRun);
|
|
||||||
ctx.stop();
|
|
||||||
|
|
||||||
LOG.info("Loaders:");
|
|
||||||
LOG.info(" loaded " + loader.numBulkLoads.get());
|
|
||||||
LOG.info(" compations " + loader.numCompactions.get());
|
|
||||||
|
|
||||||
LOG.info("Scanners:");
|
|
||||||
for (AtomicScanReader scanner : scanners) {
|
|
||||||
LOG.info(" scanned " + scanner.numScans.get());
|
|
||||||
LOG.info(" verified " + scanner.numRowsScanned.get() + " rows");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
|
||||||
import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
|
import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||||
|
@ -61,12 +60,6 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
|
||||||
conf1.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
|
conf1.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
|
||||||
conf1.set("hbase.replication.source.fs.conf.provider",
|
conf1.set("hbase.replication.source.fs.conf.provider",
|
||||||
TestSourceFSConfigurationProvider.class.getCanonicalName());
|
TestSourceFSConfigurationProvider.class.getCanonicalName());
|
||||||
String classes = conf1.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
|
|
||||||
if (!classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint")) {
|
|
||||||
classes = classes + ",org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint";
|
|
||||||
conf1.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, classes);
|
|
||||||
}
|
|
||||||
|
|
||||||
TestReplicationBase.setUpBeforeClass();
|
TestReplicationBase.setUpBeforeClass();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2382,7 +2382,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
checkOpen();
|
checkOpen();
|
||||||
requestCount.increment();
|
requestCount.increment();
|
||||||
HRegion region = getRegion(request.getRegion());
|
HRegion region = getRegion(request.getRegion());
|
||||||
Map<byte[], List<Path>> map = null;
|
|
||||||
final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration());
|
final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration());
|
||||||
long sizeToBeLoaded = -1;
|
long sizeToBeLoaded = -1;
|
||||||
|
|
||||||
|
@ -2401,27 +2400,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
sizeToBeLoaded = enforcement.computeBulkLoadSize(regionServer.getFileSystem(), filePaths);
|
sizeToBeLoaded = enforcement.computeBulkLoadSize(regionServer.getFileSystem(), filePaths);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount());
|
|
||||||
for (FamilyPath familyPath : request.getFamilyPathList()) {
|
|
||||||
familyPaths.add(new Pair<>(familyPath.getFamily().toByteArray(), familyPath.getPath()));
|
|
||||||
}
|
|
||||||
if (!request.hasBulkToken()) {
|
|
||||||
if (region.getCoprocessorHost() != null) {
|
|
||||||
region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
|
|
||||||
request.getCopyFile());
|
|
||||||
} finally {
|
|
||||||
if (region.getCoprocessorHost() != null) {
|
|
||||||
region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// secure bulk load
|
// secure bulk load
|
||||||
map = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request);
|
Map<byte[], List<Path>> map =
|
||||||
}
|
regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request);
|
||||||
BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
|
BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
|
||||||
builder.setLoaded(map != null);
|
builder.setLoaded(map != null);
|
||||||
if (map != null) {
|
if (map != null) {
|
||||||
|
|
|
@ -70,7 +70,6 @@ import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
|
||||||
import org.apache.hadoop.hbase.client.SecureBulkLoadClient;
|
import org.apache.hadoop.hbase.client.SecureBulkLoadClient;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
|
||||||
import org.apache.hadoop.hbase.io.HFileLink;
|
import org.apache.hadoop.hbase.io.HFileLink;
|
||||||
import org.apache.hadoop.hbase.io.HalfStoreFileReader;
|
import org.apache.hadoop.hbase.io.HalfStoreFileReader;
|
||||||
import org.apache.hadoop.hbase.io.Reference;
|
import org.apache.hadoop.hbase.io.Reference;
|
||||||
|
@ -394,11 +393,6 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
SecureBulkLoadClient secureClient, boolean copyFile) throws IOException {
|
SecureBulkLoadClient secureClient, boolean copyFile) throws IOException {
|
||||||
int count = 0;
|
int count = 0;
|
||||||
|
|
||||||
if (isSecureBulkLoadEndpointAvailable()) {
|
|
||||||
LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases.");
|
|
||||||
LOG.warn("Secure bulk load has been integrated into HBase core.");
|
|
||||||
}
|
|
||||||
|
|
||||||
fsDelegationToken.acquireDelegationToken(queue.peek().getFilePath().getFileSystem(getConf()));
|
fsDelegationToken.acquireDelegationToken(queue.peek().getFilePath().getFileSystem(getConf()));
|
||||||
bulkToken = secureClient.prepareBulkLoad(admin.getConnection());
|
bulkToken = secureClient.prepareBulkLoad(admin.getConnection());
|
||||||
Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = null;
|
Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = null;
|
||||||
|
@ -1055,11 +1049,6 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isSecureBulkLoadEndpointAvailable() {
|
|
||||||
String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
|
|
||||||
return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Split a storefile into a top and bottom half, maintaining the metadata, recreating bloom
|
* Split a storefile into a top and bottom half, maintaining the metadata, recreating bloom
|
||||||
* filters, etc.
|
* filters, etc.
|
||||||
|
|
|
@ -1,177 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
|
|
||||||
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
|
|
||||||
import org.apache.hadoop.hbase.TableName;
|
|
||||||
import org.apache.hadoop.hbase.client.ClientServiceCallable;
|
|
||||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
|
||||||
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
|
|
||||||
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
|
|
||||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
|
||||||
import org.junit.ClassRule;
|
|
||||||
import org.junit.experimental.categories.Category;
|
|
||||||
import org.junit.runner.RunWith;
|
|
||||||
import org.junit.runners.Parameterized;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Tests bulk loading of HFiles with old non-secure client for backward compatibility. Will be
|
|
||||||
* removed when old non-secure client for backward compatibility is not supported.
|
|
||||||
*/
|
|
||||||
@RunWith(Parameterized.class)
|
|
||||||
@Category({RegionServerTests.class, LargeTests.class})
|
|
||||||
public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBulkLoad {
|
|
||||||
|
|
||||||
@ClassRule
|
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
|
||||||
HBaseClassTestRule.forClass(TestHRegionServerBulkLoadWithOldClient.class);
|
|
||||||
|
|
||||||
private static final Logger LOG =
|
|
||||||
LoggerFactory.getLogger(TestHRegionServerBulkLoadWithOldClient.class);
|
|
||||||
|
|
||||||
public TestHRegionServerBulkLoadWithOldClient(int duration) {
|
|
||||||
super(duration);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class AtomicHFileLoader extends RepeatingTestThread {
|
|
||||||
final AtomicLong numBulkLoads = new AtomicLong();
|
|
||||||
final AtomicLong numCompactions = new AtomicLong();
|
|
||||||
private TableName tableName;
|
|
||||||
|
|
||||||
public AtomicHFileLoader(TableName tableName, TestContext ctx,
|
|
||||||
byte targetFamilies[][]) throws IOException {
|
|
||||||
super(ctx);
|
|
||||||
this.tableName = tableName;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void doAnAction() throws Exception {
|
|
||||||
long iteration = numBulkLoads.getAndIncrement();
|
|
||||||
Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d",
|
|
||||||
iteration));
|
|
||||||
|
|
||||||
// create HFiles for different column families
|
|
||||||
FileSystem fs = UTIL.getTestFileSystem();
|
|
||||||
byte[] val = Bytes.toBytes(String.format("%010d", iteration));
|
|
||||||
final List<Pair<byte[], String>> famPaths = new ArrayList<>(NUM_CFS);
|
|
||||||
for (int i = 0; i < NUM_CFS; i++) {
|
|
||||||
Path hfile = new Path(dir, family(i));
|
|
||||||
byte[] fam = Bytes.toBytes(family(i));
|
|
||||||
createHFile(fs, hfile, fam, QUAL, val, 1000);
|
|
||||||
famPaths.add(new Pair<>(fam, hfile.toString()));
|
|
||||||
}
|
|
||||||
|
|
||||||
// bulk load HFiles
|
|
||||||
final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection();
|
|
||||||
RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration());
|
|
||||||
ClientServiceCallable<Void> callable =
|
|
||||||
new ClientServiceCallable<Void>(conn, tableName,
|
|
||||||
Bytes.toBytes("aaa"), rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
|
|
||||||
@Override
|
|
||||||
protected Void rpcCall() throws Exception {
|
|
||||||
LOG.info("Non-secure old client");
|
|
||||||
byte[] regionName = getLocation().getRegionInfo().getRegionName();
|
|
||||||
BulkLoadHFileRequest request =
|
|
||||||
RequestConverter
|
|
||||||
.buildBulkLoadHFileRequest(famPaths, regionName, true, null, null);
|
|
||||||
getStub().bulkLoadHFile(null, request);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
|
|
||||||
RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
|
|
||||||
caller.callWithRetries(callable, Integer.MAX_VALUE);
|
|
||||||
|
|
||||||
// Periodically do compaction to reduce the number of open file handles.
|
|
||||||
if (numBulkLoads.get() % 5 == 0) {
|
|
||||||
// 5 * 50 = 250 open file handles!
|
|
||||||
callable = new ClientServiceCallable<Void>(conn, tableName,
|
|
||||||
Bytes.toBytes("aaa"), rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
|
|
||||||
@Override
|
|
||||||
protected Void rpcCall() throws Exception {
|
|
||||||
LOG.debug("compacting " + getLocation() + " for row "
|
|
||||||
+ Bytes.toStringBinary(getRow()));
|
|
||||||
AdminProtos.AdminService.BlockingInterface server =
|
|
||||||
conn.getAdmin(getLocation().getServerName());
|
|
||||||
CompactRegionRequest request =
|
|
||||||
RequestConverter.buildCompactRegionRequest(
|
|
||||||
getLocation().getRegionInfo().getRegionName(), true, null);
|
|
||||||
server.compactRegion(null, request);
|
|
||||||
numCompactions.incrementAndGet();
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
caller.callWithRetries(callable, Integer.MAX_VALUE);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners)
|
|
||||||
throws Exception {
|
|
||||||
setupTable(tableName, 10);
|
|
||||||
|
|
||||||
TestContext ctx = new TestContext(UTIL.getConfiguration());
|
|
||||||
|
|
||||||
AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
|
|
||||||
ctx.addThread(loader);
|
|
||||||
|
|
||||||
List<AtomicScanReader> scanners = Lists.newArrayList();
|
|
||||||
for (int i = 0; i < numScanners; i++) {
|
|
||||||
AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families);
|
|
||||||
scanners.add(scanner);
|
|
||||||
ctx.addThread(scanner);
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx.startThreads();
|
|
||||||
ctx.waitFor(millisToRun);
|
|
||||||
ctx.stop();
|
|
||||||
|
|
||||||
LOG.info("Loaders:");
|
|
||||||
LOG.info(" loaded " + loader.numBulkLoads.get());
|
|
||||||
LOG.info(" compations " + loader.numCompactions.get());
|
|
||||||
|
|
||||||
LOG.info("Scanners:");
|
|
||||||
for (AtomicScanReader scanner : scanners) {
|
|
||||||
LOG.info(" scanned " + scanner.numScans.get());
|
|
||||||
LOG.info(" verified " + scanner.numRowsScanned.get() + " rows");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue