HBASE-21342 FileSystem in use may get closed by other bulk load call in secure bulkLoad

Signed-off-by: Mike Drob <mdrob@apache.org>
Signed-off-by: Ted Yu <tyu@apache.org>
This commit is contained in:
mazhenlin 2018-10-19 14:51:00 +08:00 committed by Mike Drob
parent 807736fcf1
commit 1e9d998727
No known key found for this signature in database
GPG Key ID: 3E48C0C6EF362B9E
2 changed files with 292 additions and 1 deletions

View File

@ -25,6 +25,9 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@ -53,6 +56,7 @@ import org.apache.hadoop.security.token.Token;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
@ -106,6 +110,7 @@ public class SecureBulkLoadManager {
private Path baseStagingDir;
private UserProvider userProvider;
private ConcurrentHashMap<UserGroupInformation, Integer> ugiReferenceCounter;
private Connection conn;
SecureBulkLoadManager(Configuration conf, Connection conn) {
@ -116,6 +121,7 @@ public class SecureBulkLoadManager {
public void start() throws IOException {
random = new SecureRandom();
userProvider = UserProvider.instantiate(conf);
ugiReferenceCounter = new ConcurrentHashMap<>();
fs = FileSystem.get(conf);
baseStagingDir = new Path(FSUtils.getRootDir(conf), HConstants.BULKLOAD_STAGING_DIR_NAME);
@ -158,7 +164,7 @@ public class SecureBulkLoadManager {
} finally {
UserGroupInformation ugi = getActiveUser().getUGI();
try {
if (!UserGroupInformation.getLoginUser().equals(ugi)) {
if (!UserGroupInformation.getLoginUser().equals(ugi) && !isUserReferenced(ugi)) {
FileSystem.closeAllForUGI(ugi);
}
} catch (IOException e) {
@ -167,6 +173,38 @@ public class SecureBulkLoadManager {
}
}
private Consumer<HRegion> fsCreatedListener;
@VisibleForTesting
void setFsCreatedListener(Consumer<HRegion> fsCreatedListener) {
this.fsCreatedListener = fsCreatedListener;
}
private void incrementUgiReference(UserGroupInformation ugi) {
ugiReferenceCounter.merge(ugi, 1, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer oldvalue, Integer value) {
return ++oldvalue;
}
});
}
private void decrementUgiReference(UserGroupInformation ugi) {
ugiReferenceCounter.computeIfPresent(ugi,
new BiFunction<UserGroupInformation, Integer, Integer>() {
@Override
public Integer apply(UserGroupInformation key, Integer value) {
return value > 1 ? --value : null;
}
});
}
private boolean isUserReferenced(UserGroupInformation ugi) {
Integer count = ugiReferenceCounter.get(ugi);
return count != null && count > 0;
}
public Map<byte[], List<Path>> secureBulkLoadHFiles(final HRegion region,
final BulkLoadHFileRequest request) throws IOException {
final List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount());
@ -208,6 +246,7 @@ public class SecureBulkLoadManager {
Map<byte[], List<Path>> map = null;
try {
incrementUgiReference(ugi);
// Get the target fs (HBase region server fs) delegation token
// Since we have checked the permission via 'preBulkLoadHFile', now let's give
// the 'request user' necessary token to operate on the target fs.
@ -237,6 +276,9 @@ public class SecureBulkLoadManager {
fs.setPermission(stageFamily, PERM_ALL_ACCESS);
}
}
if (fsCreatedListener != null) {
fsCreatedListener.accept(region);
}
//We call bulkLoadHFiles as requesting user
//To enable access prior to staging
return region.bulkLoadHFiles(familyPaths, true,
@ -248,6 +290,7 @@ public class SecureBulkLoadManager {
}
});
} finally {
decrementUgiReference(ugi);
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map);
}

View File

@ -0,0 +1,248 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Deque;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
@Category({RegionServerTests.class, MediumTests.class})
public class TestSecureBulkLoadManager {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSecureBulkLoadManager.class);
private static final Logger LOG =
LoggerFactory.getLogger(TestSecureBulkLoadManager.class);
private static TableName TABLE = TableName.valueOf(Bytes.toBytes("TestSecureBulkLoadManager"));
private static byte[] FAMILY = Bytes.toBytes("family");
private static byte[] COLUMN = Bytes.toBytes("column");
private static byte[] key1 = Bytes.toBytes("row1");
private static byte[] key2 = Bytes.toBytes("row2");
private static byte[] key3 = Bytes.toBytes("row3");
private static byte[] value1 = Bytes.toBytes("t1");
private static byte[] value3 = Bytes.toBytes("t3");
private static byte[] SPLIT_ROWKEY = key2;
private Thread ealierBulkload;
private Thread laterBulkload;
protected final static HBaseTestingUtility testUtil = new HBaseTestingUtility();
private static Configuration conf = testUtil.getConfiguration();
@BeforeClass
public static void setUp() throws Exception {
testUtil.startMiniCluster();
}
@AfterClass
public static void tearDown() throws Exception {
testUtil.shutdownMiniCluster();
testUtil.cleanupTestDir();
}
/**
* After a secure bulkload finished , there is a clean-up for FileSystems used in the bulkload.
* Sometimes, FileSystems used in the finished bulkload might also be used in other bulkload
* calls, or there are other FileSystems created by the same user, they could be closed by a
* FileSystem.closeAllForUGI call. So during the clean-up, those FileSystems need to be used
* later can not get closed ,or else a race condition occurs.
*
* testForRaceCondition tests the case that two secure bulkload calls from the same UGI go
* into two different regions and one bulkload finishes earlier when the other bulkload still
* needs its FileSystems, checks that both bulkloads succeed.
*/
@Test
public void testForRaceCondition() throws Exception {
Consumer<HRegion> fsCreatedListener = new Consumer<HRegion>() {
@Override
public void accept(HRegion hRegion) {
if (hRegion.getRegionInfo().containsRow(key3)) {
Threads.shutdown(ealierBulkload);/// wait util the other bulkload finished
}
}
} ;
testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
.secureBulkLoadManager.setFsCreatedListener(fsCreatedListener);
/// create table
testUtil.createTable(TABLE,FAMILY,Bytes.toByteArrays(SPLIT_ROWKEY));
/// prepare files
Path rootdir = testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0)
.getRegionServer().getRootDir();
Path dir1 = new Path(rootdir, "dir1");
prepareHFile(dir1, key1, value1);
Path dir2 = new Path(rootdir, "dir2");
prepareHFile(dir2, key3, value3);
/// do bulkload
final AtomicReference<Throwable> t1Exception = new AtomicReference<>();
final AtomicReference<Throwable> t2Exception = new AtomicReference<>();
ealierBulkload = new Thread(new Runnable() {
@Override
public void run() {
try {
doBulkloadWithoutRetry(dir1);
} catch (Exception e) {
LOG.error("bulk load failed .",e);
t1Exception.set(e);
}
}
});
laterBulkload = new Thread(new Runnable() {
@Override
public void run() {
try {
doBulkloadWithoutRetry(dir2);
} catch (Exception e) {
LOG.error("bulk load failed .",e);
t2Exception.set(e);
}
}
});
ealierBulkload.start();
laterBulkload.start();
Threads.shutdown(ealierBulkload);
Threads.shutdown(laterBulkload);
Assert.assertNull(t1Exception.get());
Assert.assertNull(t2Exception.get());
/// check bulkload ok
Get get1 = new Get(key1);
Get get3 = new Get(key3);
Table t = testUtil.getConnection().getTable(TABLE);
Result r = t.get(get1);
Assert.assertArrayEquals(r.getValue(FAMILY, COLUMN), value1);
r = t.get(get3);
Assert.assertArrayEquals(r.getValue(FAMILY, COLUMN), value3);
}
/**
* A trick is used to make sure server-side failures( if any ) not being covered up by a client
* retry. Since LoadIncrementalHFiles.doBulkLoad keeps performing bulkload calls as long as the
* HFile queue is not empty, while server-side exceptions in the doAs block do not lead
* to a client exception, a bulkload will always succeed in this case by default, thus client
* will never be aware that failures have ever happened . To avoid this kind of retry ,
* a MyExceptionToAvoidRetry exception is thrown after bulkLoadPhase finished and caught
* silently outside the doBulkLoad call, so that the bulkLoadPhase would be called exactly
* once, and server-side failures, if any ,can be checked via data.
*/
class MyExceptionToAvoidRetry extends DoNotRetryIOException {
}
private void doBulkloadWithoutRetry(Path dir) throws Exception {
Connection connection = testUtil.getConnection();
LoadIncrementalHFiles h = new LoadIncrementalHFiles(conf) {
@Override
protected void bulkLoadPhase(final Table htable, final Connection conn,
ExecutorService pool, Deque<LoadQueueItem> queue,
final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile,
Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile, item2RegionMap);
throw new MyExceptionToAvoidRetry(); // throw exception to avoid retry
}
};
try {
h.doBulkLoad(dir, testUtil.getAdmin(), connection.getTable(TABLE),
connection.getRegionLocator(TABLE));
Assert.fail("MyExceptionToAvoidRetry is expected");
} catch (MyExceptionToAvoidRetry e) { //expected
}
}
private void prepareHFile(Path dir, byte[] key, byte[] value) throws Exception {
TableDescriptor desc = testUtil.getAdmin().getDescriptor(TABLE);
ColumnFamilyDescriptor family = desc.getColumnFamily(FAMILY);
Compression.Algorithm compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
CacheConfig writerCacheConf = new CacheConfig(conf, family);
writerCacheConf.setCacheDataOnWrite(false);
HFileContext hFileContext = new HFileContextBuilder()
.withIncludesMvcc(false)
.withIncludesTags(true)
.withCompression(compression)
.withCompressTags(family.isCompressTags())
.withChecksumType(HStore.getChecksumType(conf))
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
.withBlockSize(family.getBlocksize())
.withHBaseCheckSum(true)
.withDataBlockEncoding(family.getDataBlockEncoding())
.withEncryptionContext(Encryption.Context.NONE)
.withCreateTime(EnvironmentEdgeManager.currentTime())
.build();
StoreFileWriter.Builder builder =
new StoreFileWriter.Builder(conf, writerCacheConf, dir.getFileSystem(conf))
.withOutputDir(new Path(dir, family.getNameAsString()))
.withBloomType(family.getBloomFilterType())
.withMaxKeyCount(Integer.MAX_VALUE)
.withFileContext(hFileContext);
StoreFileWriter writer = builder.build();
Put put = new Put(key);
put.addColumn(FAMILY, COLUMN, value);
for (Cell c : put.get(FAMILY, COLUMN)) {
writer.append(c);
}
writer.close();
}
}