From 390c3227e23ee04ca9797fdf1e3bdd93ce9a1882 Mon Sep 17 00:00:00 2001 From: mazhenlin Date: Fri, 19 Oct 2018 14:51:00 +0800 Subject: [PATCH] HBASE-21342 FileSystem in use may get closed by other bulk load call in secure bulkLoad Signed-off-by: Mike Drob Signed-off-by: Ted Yu --- .../regionserver/SecureBulkLoadManager.java | 45 +++- .../TestSecureBulkLoadManager.java | 248 ++++++++++++++++++ 2 files changed, 292 insertions(+), 1 deletion(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSecureBulkLoadManager.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java index a4ee517fd69..566a6b688f3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java @@ -25,6 +25,9 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiFunction; +import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -53,6 +56,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; @@ -106,6 +110,7 @@ public class SecureBulkLoadManager { private Path baseStagingDir; private UserProvider userProvider; + private ConcurrentHashMap ugiReferenceCounter; private Connection conn; SecureBulkLoadManager(Configuration conf, Connection conn) { @@ -116,6 +121,7 @@ public class SecureBulkLoadManager { public void start() throws IOException { random = new SecureRandom(); userProvider = UserProvider.instantiate(conf); + ugiReferenceCounter = new ConcurrentHashMap<>(); fs = FileSystem.get(conf); baseStagingDir = new Path(FSUtils.getRootDir(conf), HConstants.BULKLOAD_STAGING_DIR_NAME); @@ -158,7 +164,7 @@ public class SecureBulkLoadManager { } finally { UserGroupInformation ugi = getActiveUser().getUGI(); try { - if (!UserGroupInformation.getLoginUser().equals(ugi)) { + if (!UserGroupInformation.getLoginUser().equals(ugi) && !isUserReferenced(ugi)) { FileSystem.closeAllForUGI(ugi); } } catch (IOException e) { @@ -167,6 +173,38 @@ public class SecureBulkLoadManager { } } + private Consumer fsCreatedListener; + + @VisibleForTesting + void setFsCreatedListener(Consumer fsCreatedListener) { + this.fsCreatedListener = fsCreatedListener; + } + + + private void incrementUgiReference(UserGroupInformation ugi) { + ugiReferenceCounter.merge(ugi, 1, new BiFunction() { + @Override + public Integer apply(Integer oldvalue, Integer value) { + return ++oldvalue; + } + }); + } + + private void decrementUgiReference(UserGroupInformation ugi) { + ugiReferenceCounter.computeIfPresent(ugi, + new BiFunction() { + @Override + public Integer apply(UserGroupInformation key, Integer value) { + return value > 1 ? --value : null; + } + }); + } + + private boolean isUserReferenced(UserGroupInformation ugi) { + Integer count = ugiReferenceCounter.get(ugi); + return count != null && count > 0; + } + public Map> secureBulkLoadHFiles(final HRegion region, final BulkLoadHFileRequest request) throws IOException { final List> familyPaths = new ArrayList<>(request.getFamilyPathCount()); @@ -208,6 +246,7 @@ public class SecureBulkLoadManager { Map> map = null; try { + incrementUgiReference(ugi); // Get the target fs (HBase region server fs) delegation token // Since we have checked the permission via 'preBulkLoadHFile', now let's give // the 'request user' necessary token to operate on the target fs. @@ -237,6 +276,9 @@ public class SecureBulkLoadManager { fs.setPermission(stageFamily, PERM_ALL_ACCESS); } } + if (fsCreatedListener != null) { + fsCreatedListener.accept(region); + } //We call bulkLoadHFiles as requesting user //To enable access prior to staging return region.bulkLoadHFiles(familyPaths, true, @@ -248,6 +290,7 @@ public class SecureBulkLoadManager { } }); } finally { + decrementUgiReference(ugi); if (region.getCoprocessorHost() != null) { region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSecureBulkLoadManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSecureBulkLoadManager.java new file mode 100644 index 00000000000..75ebfd3acfc --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSecureBulkLoadManager.java @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Deque; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.crypto.Encryption; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; + + +@Category({RegionServerTests.class, MediumTests.class}) +public class TestSecureBulkLoadManager { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSecureBulkLoadManager.class); + + private static final Logger LOG = + LoggerFactory.getLogger(TestSecureBulkLoadManager.class); + + private static TableName TABLE = TableName.valueOf(Bytes.toBytes("TestSecureBulkLoadManager")); + private static byte[] FAMILY = Bytes.toBytes("family"); + private static byte[] COLUMN = Bytes.toBytes("column"); + private static byte[] key1 = Bytes.toBytes("row1"); + private static byte[] key2 = Bytes.toBytes("row2"); + private static byte[] key3 = Bytes.toBytes("row3"); + private static byte[] value1 = Bytes.toBytes("t1"); + private static byte[] value3 = Bytes.toBytes("t3"); + private static byte[] SPLIT_ROWKEY = key2; + + private Thread ealierBulkload; + private Thread laterBulkload; + + protected final static HBaseTestingUtility testUtil = new HBaseTestingUtility(); + private static Configuration conf = testUtil.getConfiguration(); + + @BeforeClass + public static void setUp() throws Exception { + testUtil.startMiniCluster(); + } + + @AfterClass + public static void tearDown() throws Exception { + testUtil.shutdownMiniCluster(); + testUtil.cleanupTestDir(); + } + + /** + * After a secure bulkload finished , there is a clean-up for FileSystems used in the bulkload. + * Sometimes, FileSystems used in the finished bulkload might also be used in other bulkload + * calls, or there are other FileSystems created by the same user, they could be closed by a + * FileSystem.closeAllForUGI call. So during the clean-up, those FileSystems need to be used + * later can not get closed ,or else a race condition occurs. + * + * testForRaceCondition tests the case that two secure bulkload calls from the same UGI go + * into two different regions and one bulkload finishes earlier when the other bulkload still + * needs its FileSystems, checks that both bulkloads succeed. + */ + @Test + public void testForRaceCondition() throws Exception { + Consumer fsCreatedListener = new Consumer() { + @Override + public void accept(HRegion hRegion) { + if (hRegion.getRegionInfo().containsRow(key3)) { + Threads.shutdown(ealierBulkload);/// wait util the other bulkload finished + } + } + } ; + testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0).getRegionServer() + .secureBulkLoadManager.setFsCreatedListener(fsCreatedListener); + /// create table + testUtil.createTable(TABLE,FAMILY,Bytes.toByteArrays(SPLIT_ROWKEY)); + + /// prepare files + Path rootdir = testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0) + .getRegionServer().getRootDir(); + Path dir1 = new Path(rootdir, "dir1"); + prepareHFile(dir1, key1, value1); + Path dir2 = new Path(rootdir, "dir2"); + prepareHFile(dir2, key3, value3); + + /// do bulkload + final AtomicReference t1Exception = new AtomicReference<>(); + final AtomicReference t2Exception = new AtomicReference<>(); + ealierBulkload = new Thread(new Runnable() { + @Override + public void run() { + try { + doBulkloadWithoutRetry(dir1); + } catch (Exception e) { + LOG.error("bulk load failed .",e); + t1Exception.set(e); + } + } + }); + laterBulkload = new Thread(new Runnable() { + @Override + public void run() { + try { + doBulkloadWithoutRetry(dir2); + } catch (Exception e) { + LOG.error("bulk load failed .",e); + t2Exception.set(e); + } + } + }); + ealierBulkload.start(); + laterBulkload.start(); + Threads.shutdown(ealierBulkload); + Threads.shutdown(laterBulkload); + Assert.assertNull(t1Exception.get()); + Assert.assertNull(t2Exception.get()); + + /// check bulkload ok + Get get1 = new Get(key1); + Get get3 = new Get(key3); + Table t = testUtil.getConnection().getTable(TABLE); + Result r = t.get(get1); + Assert.assertArrayEquals(r.getValue(FAMILY, COLUMN), value1); + r = t.get(get3); + Assert.assertArrayEquals(r.getValue(FAMILY, COLUMN), value3); + + } + + /** + * A trick is used to make sure server-side failures( if any ) not being covered up by a client + * retry. Since LoadIncrementalHFiles.doBulkLoad keeps performing bulkload calls as long as the + * HFile queue is not empty, while server-side exceptions in the doAs block do not lead + * to a client exception, a bulkload will always succeed in this case by default, thus client + * will never be aware that failures have ever happened . To avoid this kind of retry , + * a MyExceptionToAvoidRetry exception is thrown after bulkLoadPhase finished and caught + * silently outside the doBulkLoad call, so that the bulkLoadPhase would be called exactly + * once, and server-side failures, if any ,can be checked via data. + */ + class MyExceptionToAvoidRetry extends DoNotRetryIOException { + } + + private void doBulkloadWithoutRetry(Path dir) throws Exception { + Connection connection = testUtil.getConnection(); + LoadIncrementalHFiles h = new LoadIncrementalHFiles(conf) { + @Override + protected void bulkLoadPhase(final Table htable, final Connection conn, + ExecutorService pool, Deque queue, + final Multimap regionGroups, boolean copyFile, + Map item2RegionMap) throws IOException { + super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile, item2RegionMap); + throw new MyExceptionToAvoidRetry(); // throw exception to avoid retry + } + }; + try { + h.doBulkLoad(dir, testUtil.getAdmin(), connection.getTable(TABLE), + connection.getRegionLocator(TABLE)); + Assert.fail("MyExceptionToAvoidRetry is expected"); + } catch (MyExceptionToAvoidRetry e) { //expected + } + } + + private void prepareHFile(Path dir, byte[] key, byte[] value) throws Exception { + TableDescriptor desc = testUtil.getAdmin().getDescriptor(TABLE); + ColumnFamilyDescriptor family = desc.getColumnFamily(FAMILY); + Compression.Algorithm compression = HFile.DEFAULT_COMPRESSION_ALGORITHM; + + CacheConfig writerCacheConf = new CacheConfig(conf, family); + writerCacheConf.setCacheDataOnWrite(false); + HFileContext hFileContext = new HFileContextBuilder() + .withIncludesMvcc(false) + .withIncludesTags(true) + .withCompression(compression) + .withCompressTags(family.isCompressTags()) + .withChecksumType(HStore.getChecksumType(conf)) + .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) + .withBlockSize(family.getBlocksize()) + .withHBaseCheckSum(true) + .withDataBlockEncoding(family.getDataBlockEncoding()) + .withEncryptionContext(Encryption.Context.NONE) + .withCreateTime(EnvironmentEdgeManager.currentTime()) + .build(); + StoreFileWriter.Builder builder = + new StoreFileWriter.Builder(conf, writerCacheConf, dir.getFileSystem(conf)) + .withOutputDir(new Path(dir, family.getNameAsString())) + .withBloomType(family.getBloomFilterType()) + .withMaxKeyCount(Integer.MAX_VALUE) + .withFileContext(hFileContext); + StoreFileWriter writer = builder.build(); + + Put put = new Put(key); + put.addColumn(FAMILY, COLUMN, value); + for (Cell c : put.get(FAMILY, COLUMN)) { + writer.append(c); + } + + writer.close(); + } +}