From 78d3d5628a6c69b1ea22a2c234c8eb558da5d45b Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Mon, 11 Feb 2019 16:05:24 +0800 Subject: [PATCH] HBASE-21868 Remove legacy bulk load support Signed-off-by: Michael Stack --- .../access/SecureBulkLoadEndpoint.java | 189 ------------------ .../src/main/protobuf/SecureBulkLoad.proto | 48 ----- .../SecureBulkLoadEndpointClient.java | 163 --------------- ...onServerBulkLoadWithOldSecureEndpoint.java | 187 ----------------- ...plicationSyncUpToolWithBulkLoadedData.java | 7 - .../hbase/regionserver/RSRpcServices.java | 25 +-- .../hbase/tool/LoadIncrementalHFiles.java | 11 - ...estHRegionServerBulkLoadWithOldClient.java | 177 ---------------- 8 files changed, 3 insertions(+), 804 deletions(-) delete mode 100644 hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java delete mode 100644 hbase-endpoint/src/main/protobuf/SecureBulkLoad.proto delete mode 100644 hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java delete mode 100644 hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java deleted file mode 100644 index fb161d94661..00000000000 --- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.security.access; - -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; -import com.google.protobuf.Service; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.CoprocessorEnvironment; -import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; -import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; -import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadHFilesRequest; -import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadHFilesResponse; -import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadService; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.RegionServerServices; -import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager; - -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Coprocessor service for bulk loads in secure mode. - * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0 - */ -@CoreCoprocessor -@InterfaceAudience.Private -@Deprecated -public class SecureBulkLoadEndpoint extends SecureBulkLoadService implements RegionCoprocessor { - public static final long VERSION = 0L; - - private static final Logger LOG = LoggerFactory.getLogger(SecureBulkLoadEndpoint.class); - - private RegionCoprocessorEnvironment env; - private RegionServerServices rsServices; - - @Override - public void start(CoprocessorEnvironment env) { - this.env = (RegionCoprocessorEnvironment)env; - rsServices = ((HasRegionServerServices)this.env).getRegionServerServices(); - LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases."); - LOG.warn("Secure bulk load has been integrated into HBase core."); - } - - @Override - public void stop(CoprocessorEnvironment env) throws IOException { - } - - @Override - public void prepareBulkLoad(RpcController controller, PrepareBulkLoadRequest request, - RpcCallback done) { - try { - SecureBulkLoadManager secureBulkLoadManager = this.rsServices.getSecureBulkLoadManager(); - - String bulkToken = secureBulkLoadManager.prepareBulkLoad((HRegion) this.env.getRegion(), - convert(request)); - done.run(PrepareBulkLoadResponse.newBuilder().setBulkToken(bulkToken).build()); - } catch (IOException e) { - CoprocessorRpcUtils.setControllerException(controller, e); - } - done.run(null); - } - - /** - * Convert from CPEP protobuf 2.5 to internal protobuf 3.3. - */ - org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest - convert(PrepareBulkLoadRequest request) - throws org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException { - byte [] bytes = request.toByteArray(); - org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest.Builder - builder = - org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest. - newBuilder(); - builder.mergeFrom(bytes); - return builder.build(); - } - - @Override - public void cleanupBulkLoad(RpcController controller, CleanupBulkLoadRequest request, - RpcCallback done) { - try { - SecureBulkLoadManager secureBulkLoadManager = this.rsServices.getSecureBulkLoadManager(); - secureBulkLoadManager.cleanupBulkLoad((HRegion) this.env.getRegion(), convert(request)); - done.run(CleanupBulkLoadResponse.newBuilder().build()); - } catch (IOException e) { - CoprocessorRpcUtils.setControllerException(controller, e); - } - done.run(null); - } - - /** - * Convert from CPEP protobuf 2.5 to internal protobuf 3.3. - */ - org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest - convert(CleanupBulkLoadRequest request) - throws org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException { - byte [] bytes = request.toByteArray(); - org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest.Builder - builder = - org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest. - newBuilder(); - builder.mergeFrom(bytes); - return builder.build(); - } - - @Override - public void secureBulkLoadHFiles(RpcController controller, SecureBulkLoadHFilesRequest request, - RpcCallback done) { - boolean loaded = false; - Map> map = null; - try { - SecureBulkLoadManager secureBulkLoadManager = this.rsServices.getSecureBulkLoadManager(); - BulkLoadHFileRequest bulkLoadHFileRequest = ConvertSecureBulkLoadHFilesRequest(request); - map = secureBulkLoadManager.secureBulkLoadHFiles((HRegion) this.env.getRegion(), - convert(bulkLoadHFileRequest)); - loaded = map != null && !map.isEmpty(); - } catch (IOException e) { - CoprocessorRpcUtils.setControllerException(controller, e); - } - done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(loaded).build()); - } - - /** - * Convert from CPEP protobuf 2.5 to internal protobuf 3.3. - */ - org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest - convert(BulkLoadHFileRequest request) - throws org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException { - byte [] bytes = request.toByteArray(); - org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest.Builder - builder = - org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest. - newBuilder(); - builder.mergeFrom(bytes); - return builder.build(); - } - - private BulkLoadHFileRequest ConvertSecureBulkLoadHFilesRequest( - SecureBulkLoadHFilesRequest request) { - BulkLoadHFileRequest.Builder bulkLoadHFileRequest = BulkLoadHFileRequest.newBuilder(); - RegionSpecifier region = - ProtobufUtil.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, this.env - .getRegionInfo().getRegionName()); - bulkLoadHFileRequest.setRegion(region).setFsToken(request.getFsToken()) - .setBulkToken(request.getBulkToken()).setAssignSeqNum(request.getAssignSeqNum()) - .addAllFamilyPath(request.getFamilyPathList()); - return bulkLoadHFileRequest.build(); - } - - @Override - public Iterable getServices() { - return Collections.singleton(this); - } -} diff --git a/hbase-endpoint/src/main/protobuf/SecureBulkLoad.proto b/hbase-endpoint/src/main/protobuf/SecureBulkLoad.proto deleted file mode 100644 index d86d1621270..00000000000 --- a/hbase-endpoint/src/main/protobuf/SecureBulkLoad.proto +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package hbase.pb; - -option java_package = "org.apache.hadoop.hbase.protobuf.generated"; -option java_outer_classname = "SecureBulkLoadProtos"; -option java_generic_services = true; -option java_generate_equals_and_hash = true; -option optimize_for = SPEED; - -import 'Client.proto'; - -message SecureBulkLoadHFilesRequest { - repeated BulkLoadHFileRequest.FamilyPath family_path = 1; - optional bool assign_seq_num = 2; - required DelegationToken fs_token = 3; - required string bulk_token = 4; -} - -message SecureBulkLoadHFilesResponse { - required bool loaded = 1; -} - -service SecureBulkLoadService { - rpc PrepareBulkLoad(PrepareBulkLoadRequest) - returns (PrepareBulkLoadResponse); - - rpc SecureBulkLoadHFiles(SecureBulkLoadHFilesRequest) - returns (SecureBulkLoadHFilesResponse); - - rpc CleanupBulkLoad(CleanupBulkLoadRequest) - returns (CleanupBulkLoadResponse); -} diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java deleted file mode 100644 index 0d15f93d9f5..00000000000 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; -import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; -import org.apache.hadoop.hbase.ipc.ServerRpcController; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.DelegationToken; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; -import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos; -import org.apache.hadoop.hbase.util.ByteStringer; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.security.token.Token; -import org.apache.yetus.audience.InterfaceAudience; - -/** - * Client proxy for SecureBulkLoadProtocol used in conjunction with SecureBulkLoadEndpoint - * @deprecated Use for backward compatibility testing only. Will be removed when - * SecureBulkLoadEndpoint is not supported. - */ -@Deprecated -@InterfaceAudience.Private -public class SecureBulkLoadEndpointClient { - private Table table; - - public SecureBulkLoadEndpointClient(Table table) { - this.table = table; - } - - public String prepareBulkLoad(final TableName tableName) throws IOException { - try { - CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW); - SecureBulkLoadProtos.SecureBulkLoadService instance = - ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel); - - ServerRpcController controller = new ServerRpcController(); - - CoprocessorRpcUtils.BlockingRpcCallback rpcCallback = - new CoprocessorRpcUtils.BlockingRpcCallback<>(); - - PrepareBulkLoadRequest request = - PrepareBulkLoadRequest.newBuilder() - .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); - - instance.prepareBulkLoad(controller, request, rpcCallback); - - PrepareBulkLoadResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); - } - - return response.getBulkToken(); - } catch (Throwable throwable) { - throw new IOException(throwable); - } - } - - public void cleanupBulkLoad(final String bulkToken) throws IOException { - try { - CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW); - SecureBulkLoadProtos.SecureBulkLoadService instance = - ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel); - - ServerRpcController controller = new ServerRpcController(); - - CoprocessorRpcUtils.BlockingRpcCallback rpcCallback = - new CoprocessorRpcUtils.BlockingRpcCallback<>(); - - CleanupBulkLoadRequest request = - CleanupBulkLoadRequest.newBuilder() - .setBulkToken(bulkToken).build(); - - instance.cleanupBulkLoad(controller, - request, - rpcCallback); - - if (controller.failedOnException()) { - throw controller.getFailedOn(); - } - } catch (Throwable throwable) { - throw new IOException(throwable); - } - } - - public boolean bulkLoadHFiles(final List> familyPaths, - final Token userToken, final String bulkToken, final byte[] startRow) - throws IOException { - // we never want to send a batch of HFiles to all regions, thus cannot call - // HTable#coprocessorService methods that take start and end rowkeys; see HBASE-9639 - try { - CoprocessorRpcChannel channel = table.coprocessorService(startRow); - SecureBulkLoadProtos.SecureBulkLoadService instance = - ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel); - - DelegationToken protoDT = - DelegationToken.newBuilder().build(); - if(userToken != null) { - protoDT = - DelegationToken.newBuilder() - .setIdentifier(ByteStringer.wrap(userToken.getIdentifier())) - .setPassword(ByteStringer.wrap(userToken.getPassword())) - .setKind(userToken.getKind().toString()) - .setService(userToken.getService().toString()).build(); - } - - List protoFamilyPaths = - new ArrayList<>(familyPaths.size()); - for(Pair el: familyPaths) { - protoFamilyPaths.add(ClientProtos.BulkLoadHFileRequest.FamilyPath.newBuilder() - .setFamily(ByteStringer.wrap(el.getFirst())) - .setPath(el.getSecond()).build()); - } - - SecureBulkLoadProtos.SecureBulkLoadHFilesRequest request = - SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder() - .setFsToken(protoDT) - .addAllFamilyPath(protoFamilyPaths) - .setBulkToken(bulkToken).build(); - - ServerRpcController controller = new ServerRpcController(); - CoprocessorRpcUtils.BlockingRpcCallback - rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>(); - instance.secureBulkLoadHFiles(controller, - request, - rpcCallback); - - SecureBulkLoadProtos.SecureBulkLoadHFilesResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); - } - return response.getLoaded(); - } catch (Throwable throwable) { - throw new IOException(throwable); - } - } -} diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java deleted file mode 100644 index 49697b83150..00000000000 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; -import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.ClientServiceCallable; -import org.apache.hadoop.hbase.client.ClusterConnection; -import org.apache.hadoop.hbase.client.RpcRetryingCaller; -import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.testclassification.RegionServerTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Ignore; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; - -import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; - -/** - * Tests bulk loading of HFiles with old secure Endpoint client for backward compatibility. Will be - * removed when old non-secure client for backward compatibility is not supported. - */ -@RunWith(Parameterized.class) -@Category({RegionServerTests.class, LargeTests.class}) -@Ignore // BROKEN. FIX OR REMOVE. -public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionServerBulkLoad { - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestHRegionServerBulkLoadWithOldSecureEndpoint.class); - - public TestHRegionServerBulkLoadWithOldSecureEndpoint(int duration) { - super(duration); - } - - private static final Logger LOG = - LoggerFactory.getLogger(TestHRegionServerBulkLoadWithOldSecureEndpoint.class); - - @BeforeClass - public static void setUpBeforeClass() throws IOException { - conf.setInt("hbase.rpc.timeout", 10 * 1000); - conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, - "org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); - } - - public static class AtomicHFileLoader extends RepeatingTestThread { - final AtomicLong numBulkLoads = new AtomicLong(); - final AtomicLong numCompactions = new AtomicLong(); - private TableName tableName; - - public AtomicHFileLoader(TableName tableName, TestContext ctx, byte[][] targetFamilies) - throws IOException { - super(ctx); - this.tableName = tableName; - } - - public void doAnAction() throws Exception { - long iteration = numBulkLoads.getAndIncrement(); - Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d", - iteration)); - - // create HFiles for different column families - FileSystem fs = UTIL.getTestFileSystem(); - byte[] val = Bytes.toBytes(String.format("%010d", iteration)); - final List> famPaths = new ArrayList<>(NUM_CFS); - for (int i = 0; i < NUM_CFS; i++) { - Path hfile = new Path(dir, family(i)); - byte[] fam = Bytes.toBytes(family(i)); - createHFile(fs, hfile, fam, QUAL, val, 1000); - famPaths.add(new Pair<>(fam, hfile.toString())); - } - - // bulk load HFiles - final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection(); - Table table = conn.getTable(tableName); - final String bulkToken = new SecureBulkLoadEndpointClient(table).prepareBulkLoad(tableName); - RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration()); - ClientServiceCallable callable = - new ClientServiceCallable(conn, tableName, Bytes.toBytes("aaa"), - rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { - @Override - protected Void rpcCall() throws Exception { - LOG.debug("Going to connect to server " + getLocation() + " for row " + - Bytes.toStringBinary(getRow())); - try (Table table = conn.getTable(getTableName())) { - boolean loaded = new SecureBulkLoadEndpointClient(table).bulkLoadHFiles(famPaths, - null, bulkToken, getLocation().getRegionInfo().getStartKey()); - } - return null; - } - }; - RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf); - RpcRetryingCaller caller = factory. newCaller(); - caller.callWithRetries(callable, Integer.MAX_VALUE); - - // Periodically do compaction to reduce the number of open file handles. - if (numBulkLoads.get() % 5 == 0) { - // 5 * 50 = 250 open file handles! - callable = new ClientServiceCallable(conn, tableName, Bytes.toBytes("aaa"), - rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { - @Override - protected Void rpcCall() throws Exception { - LOG.debug("compacting " + getLocation() + " for row " - + Bytes.toStringBinary(getRow())); - AdminProtos.AdminService.BlockingInterface server = - conn.getAdmin(getLocation().getServerName()); - CompactRegionRequest request = - RequestConverter.buildCompactRegionRequest( - getLocation().getRegionInfo().getRegionName(), true, null); - server.compactRegion(null, request); - numCompactions.incrementAndGet(); - return null; - } - }; - caller.callWithRetries(callable, Integer.MAX_VALUE); - } - } - } - - void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners) - throws Exception { - setupTable(tableName, 10); - - TestContext ctx = new TestContext(UTIL.getConfiguration()); - - AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null); - ctx.addThread(loader); - - List scanners = Lists.newArrayList(); - for (int i = 0; i < numScanners; i++) { - AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families); - scanners.add(scanner); - ctx.addThread(scanner); - } - - ctx.startThreads(); - ctx.waitFor(millisToRun); - ctx.stop(); - - LOG.info("Loaders:"); - LOG.info(" loaded " + loader.numBulkLoads.get()); - LOG.info(" compations " + loader.numCompactions.get()); - - LOG.info("Scanners:"); - for (AtomicScanReader scanner : scanners) { - LOG.info(" scanned " + scanner.numScans.get()); - LOG.info(" verified " + scanner.numRowsScanned.get() + " rows"); - } - } -} diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java index 96010d92ecc..eb575c5eb6b 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; @@ -61,12 +60,6 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication conf1.set(HConstants.REPLICATION_CLUSTER_ID, "12345"); conf1.set("hbase.replication.source.fs.conf.provider", TestSourceFSConfigurationProvider.class.getCanonicalName()); - String classes = conf1.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); - if (!classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint")) { - classes = classes + ",org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"; - conf1.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, classes); - } - TestReplicationBase.setUpBeforeClass(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 592f99c431b..9b99ff82d62 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -2382,7 +2382,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, checkOpen(); requestCount.increment(); HRegion region = getRegion(request.getRegion()); - Map> map = null; final boolean spaceQuotaEnabled = QuotaUtil.isQuotaEnabled(getConfiguration()); long sizeToBeLoaded = -1; @@ -2401,27 +2400,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, sizeToBeLoaded = enforcement.computeBulkLoadSize(regionServer.getFileSystem(), filePaths); } } - - List> familyPaths = new ArrayList<>(request.getFamilyPathCount()); - for (FamilyPath familyPath : request.getFamilyPathList()) { - familyPaths.add(new Pair<>(familyPath.getFamily().toByteArray(), familyPath.getPath())); - } - if (!request.hasBulkToken()) { - if (region.getCoprocessorHost() != null) { - region.getCoprocessorHost().preBulkLoadHFile(familyPaths); - } - try { - map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null, - request.getCopyFile()); - } finally { - if (region.getCoprocessorHost() != null) { - region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map); - } - } - } else { - // secure bulk load - map = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request); - } + // secure bulk load + Map> map = + regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request); BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); builder.setLoaded(map != null); if (map != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java index 314f2cba97c..82f5da45e79 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java @@ -70,7 +70,6 @@ import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.client.SecureBulkLoadClient; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; -import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.HalfStoreFileReader; import org.apache.hadoop.hbase.io.Reference; @@ -394,11 +393,6 @@ public class LoadIncrementalHFiles extends Configured implements Tool { SecureBulkLoadClient secureClient, boolean copyFile) throws IOException { int count = 0; - if (isSecureBulkLoadEndpointAvailable()) { - LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases."); - LOG.warn("Secure bulk load has been integrated into HBase core."); - } - fsDelegationToken.acquireDelegationToken(queue.peek().getFilePath().getFileSystem(getConf())); bulkToken = secureClient.prepareBulkLoad(admin.getConnection()); Pair, Set> pair = null; @@ -1055,11 +1049,6 @@ public class LoadIncrementalHFiles extends Configured implements Tool { return sb.toString(); } - private boolean isSecureBulkLoadEndpointAvailable() { - String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); - return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); - } - /** * Split a storefile into a top and bottom half, maintaining the metadata, recreating bloom * filters, etc. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java deleted file mode 100644 index de01401d262..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java +++ /dev/null @@ -1,177 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; -import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.ClientServiceCallable; -import org.apache.hadoop.hbase.client.ClusterConnection; -import org.apache.hadoop.hbase.client.RpcRetryingCaller; -import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.testclassification.RegionServerTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.junit.ClassRule; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; - -import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; - -/** - * Tests bulk loading of HFiles with old non-secure client for backward compatibility. Will be - * removed when old non-secure client for backward compatibility is not supported. - */ -@RunWith(Parameterized.class) -@Category({RegionServerTests.class, LargeTests.class}) -public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBulkLoad { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestHRegionServerBulkLoadWithOldClient.class); - - private static final Logger LOG = - LoggerFactory.getLogger(TestHRegionServerBulkLoadWithOldClient.class); - - public TestHRegionServerBulkLoadWithOldClient(int duration) { - super(duration); - } - - public static class AtomicHFileLoader extends RepeatingTestThread { - final AtomicLong numBulkLoads = new AtomicLong(); - final AtomicLong numCompactions = new AtomicLong(); - private TableName tableName; - - public AtomicHFileLoader(TableName tableName, TestContext ctx, - byte targetFamilies[][]) throws IOException { - super(ctx); - this.tableName = tableName; - } - - @Override - public void doAnAction() throws Exception { - long iteration = numBulkLoads.getAndIncrement(); - Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d", - iteration)); - - // create HFiles for different column families - FileSystem fs = UTIL.getTestFileSystem(); - byte[] val = Bytes.toBytes(String.format("%010d", iteration)); - final List> famPaths = new ArrayList<>(NUM_CFS); - for (int i = 0; i < NUM_CFS; i++) { - Path hfile = new Path(dir, family(i)); - byte[] fam = Bytes.toBytes(family(i)); - createHFile(fs, hfile, fam, QUAL, val, 1000); - famPaths.add(new Pair<>(fam, hfile.toString())); - } - - // bulk load HFiles - final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection(); - RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration()); - ClientServiceCallable callable = - new ClientServiceCallable(conn, tableName, - Bytes.toBytes("aaa"), rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { - @Override - protected Void rpcCall() throws Exception { - LOG.info("Non-secure old client"); - byte[] regionName = getLocation().getRegionInfo().getRegionName(); - BulkLoadHFileRequest request = - RequestConverter - .buildBulkLoadHFileRequest(famPaths, regionName, true, null, null); - getStub().bulkLoadHFile(null, request); - return null; - } - }; - RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf); - RpcRetryingCaller caller = factory. newCaller(); - caller.callWithRetries(callable, Integer.MAX_VALUE); - - // Periodically do compaction to reduce the number of open file handles. - if (numBulkLoads.get() % 5 == 0) { - // 5 * 50 = 250 open file handles! - callable = new ClientServiceCallable(conn, tableName, - Bytes.toBytes("aaa"), rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { - @Override - protected Void rpcCall() throws Exception { - LOG.debug("compacting " + getLocation() + " for row " - + Bytes.toStringBinary(getRow())); - AdminProtos.AdminService.BlockingInterface server = - conn.getAdmin(getLocation().getServerName()); - CompactRegionRequest request = - RequestConverter.buildCompactRegionRequest( - getLocation().getRegionInfo().getRegionName(), true, null); - server.compactRegion(null, request); - numCompactions.incrementAndGet(); - return null; - } - }; - caller.callWithRetries(callable, Integer.MAX_VALUE); - } - } - } - - @Override - void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners) - throws Exception { - setupTable(tableName, 10); - - TestContext ctx = new TestContext(UTIL.getConfiguration()); - - AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null); - ctx.addThread(loader); - - List scanners = Lists.newArrayList(); - for (int i = 0; i < numScanners; i++) { - AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families); - scanners.add(scanner); - ctx.addThread(scanner); - } - - ctx.startThreads(); - ctx.waitFor(millisToRun); - ctx.stop(); - - LOG.info("Loaders:"); - LOG.info(" loaded " + loader.numBulkLoads.get()); - LOG.info(" compations " + loader.numCompactions.get()); - - LOG.info("Scanners:"); - for (AtomicScanReader scanner : scanners) { - LOG.info(" scanned " + scanner.numScans.get()); - LOG.info(" verified " + scanner.numRowsScanned.get() + " rows"); - } - } -}