HBASE-21342 FileSystem in use may get closed by other bulk load call in secure bulkLoad
Signed-off-by: Mike Drob <mdrob@apache.org> Signed-off-by: Ted Yu <tyu@apache.org>
This commit is contained in:
parent
ae13a5c6ea
commit
d35f65f396
|
@ -25,6 +25,9 @@ import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.function.BiFunction;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
@ -53,6 +56,7 @@ import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
|
||||||
|
@ -106,6 +110,7 @@ public class SecureBulkLoadManager {
|
||||||
private Path baseStagingDir;
|
private Path baseStagingDir;
|
||||||
|
|
||||||
private UserProvider userProvider;
|
private UserProvider userProvider;
|
||||||
|
private ConcurrentHashMap<UserGroupInformation, Integer> ugiReferenceCounter;
|
||||||
private Connection conn;
|
private Connection conn;
|
||||||
|
|
||||||
SecureBulkLoadManager(Configuration conf, Connection conn) {
|
SecureBulkLoadManager(Configuration conf, Connection conn) {
|
||||||
|
@ -116,6 +121,7 @@ public class SecureBulkLoadManager {
|
||||||
public void start() throws IOException {
|
public void start() throws IOException {
|
||||||
random = new SecureRandom();
|
random = new SecureRandom();
|
||||||
userProvider = UserProvider.instantiate(conf);
|
userProvider = UserProvider.instantiate(conf);
|
||||||
|
ugiReferenceCounter = new ConcurrentHashMap<>();
|
||||||
fs = FileSystem.get(conf);
|
fs = FileSystem.get(conf);
|
||||||
baseStagingDir = new Path(FSUtils.getRootDir(conf), HConstants.BULKLOAD_STAGING_DIR_NAME);
|
baseStagingDir = new Path(FSUtils.getRootDir(conf), HConstants.BULKLOAD_STAGING_DIR_NAME);
|
||||||
|
|
||||||
|
@ -158,7 +164,7 @@ public class SecureBulkLoadManager {
|
||||||
} finally {
|
} finally {
|
||||||
UserGroupInformation ugi = getActiveUser().getUGI();
|
UserGroupInformation ugi = getActiveUser().getUGI();
|
||||||
try {
|
try {
|
||||||
if (!UserGroupInformation.getLoginUser().equals(ugi)) {
|
if (!UserGroupInformation.getLoginUser().equals(ugi) && !isUserReferenced(ugi)) {
|
||||||
FileSystem.closeAllForUGI(ugi);
|
FileSystem.closeAllForUGI(ugi);
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -167,6 +173,38 @@ public class SecureBulkLoadManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Consumer<HRegion> fsCreatedListener;
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
void setFsCreatedListener(Consumer<HRegion> fsCreatedListener) {
|
||||||
|
this.fsCreatedListener = fsCreatedListener;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void incrementUgiReference(UserGroupInformation ugi) {
|
||||||
|
ugiReferenceCounter.merge(ugi, 1, new BiFunction<Integer, Integer, Integer>() {
|
||||||
|
@Override
|
||||||
|
public Integer apply(Integer oldvalue, Integer value) {
|
||||||
|
return ++oldvalue;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void decrementUgiReference(UserGroupInformation ugi) {
|
||||||
|
ugiReferenceCounter.computeIfPresent(ugi,
|
||||||
|
new BiFunction<UserGroupInformation, Integer, Integer>() {
|
||||||
|
@Override
|
||||||
|
public Integer apply(UserGroupInformation key, Integer value) {
|
||||||
|
return value > 1 ? --value : null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isUserReferenced(UserGroupInformation ugi) {
|
||||||
|
Integer count = ugiReferenceCounter.get(ugi);
|
||||||
|
return count != null && count > 0;
|
||||||
|
}
|
||||||
|
|
||||||
public Map<byte[], List<Path>> secureBulkLoadHFiles(final HRegion region,
|
public Map<byte[], List<Path>> secureBulkLoadHFiles(final HRegion region,
|
||||||
final BulkLoadHFileRequest request) throws IOException {
|
final BulkLoadHFileRequest request) throws IOException {
|
||||||
final List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount());
|
final List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount());
|
||||||
|
@ -208,6 +246,7 @@ public class SecureBulkLoadManager {
|
||||||
Map<byte[], List<Path>> map = null;
|
Map<byte[], List<Path>> map = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
incrementUgiReference(ugi);
|
||||||
// Get the target fs (HBase region server fs) delegation token
|
// Get the target fs (HBase region server fs) delegation token
|
||||||
// Since we have checked the permission via 'preBulkLoadHFile', now let's give
|
// Since we have checked the permission via 'preBulkLoadHFile', now let's give
|
||||||
// the 'request user' necessary token to operate on the target fs.
|
// the 'request user' necessary token to operate on the target fs.
|
||||||
|
@ -237,6 +276,9 @@ public class SecureBulkLoadManager {
|
||||||
fs.setPermission(stageFamily, PERM_ALL_ACCESS);
|
fs.setPermission(stageFamily, PERM_ALL_ACCESS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (fsCreatedListener != null) {
|
||||||
|
fsCreatedListener.accept(region);
|
||||||
|
}
|
||||||
//We call bulkLoadHFiles as requesting user
|
//We call bulkLoadHFiles as requesting user
|
||||||
//To enable access prior to staging
|
//To enable access prior to staging
|
||||||
return region.bulkLoadHFiles(familyPaths, true,
|
return region.bulkLoadHFiles(familyPaths, true,
|
||||||
|
@ -248,6 +290,7 @@ public class SecureBulkLoadManager {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} finally {
|
} finally {
|
||||||
|
decrementUgiReference(ugi);
|
||||||
if (region.getCoprocessorHost() != null) {
|
if (region.getCoprocessorHost() != null) {
|
||||||
region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map);
|
region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,248 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.Deque;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||||
|
import org.apache.hadoop.hbase.io.crypto.Encryption;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
|
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
|
||||||
|
|
||||||
|
|
||||||
|
@Category({RegionServerTests.class, MediumTests.class})
|
||||||
|
public class TestSecureBulkLoadManager {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestSecureBulkLoadManager.class);
|
||||||
|
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(TestSecureBulkLoadManager.class);
|
||||||
|
|
||||||
|
private static TableName TABLE = TableName.valueOf(Bytes.toBytes("TestSecureBulkLoadManager"));
|
||||||
|
private static byte[] FAMILY = Bytes.toBytes("family");
|
||||||
|
private static byte[] COLUMN = Bytes.toBytes("column");
|
||||||
|
private static byte[] key1 = Bytes.toBytes("row1");
|
||||||
|
private static byte[] key2 = Bytes.toBytes("row2");
|
||||||
|
private static byte[] key3 = Bytes.toBytes("row3");
|
||||||
|
private static byte[] value1 = Bytes.toBytes("t1");
|
||||||
|
private static byte[] value3 = Bytes.toBytes("t3");
|
||||||
|
private static byte[] SPLIT_ROWKEY = key2;
|
||||||
|
|
||||||
|
private Thread ealierBulkload;
|
||||||
|
private Thread laterBulkload;
|
||||||
|
|
||||||
|
protected final static HBaseTestingUtility testUtil = new HBaseTestingUtility();
|
||||||
|
private static Configuration conf = testUtil.getConfiguration();
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws Exception {
|
||||||
|
testUtil.startMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() throws Exception {
|
||||||
|
testUtil.shutdownMiniCluster();
|
||||||
|
testUtil.cleanupTestDir();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* After a secure bulkload finished , there is a clean-up for FileSystems used in the bulkload.
|
||||||
|
* Sometimes, FileSystems used in the finished bulkload might also be used in other bulkload
|
||||||
|
* calls, or there are other FileSystems created by the same user, they could be closed by a
|
||||||
|
* FileSystem.closeAllForUGI call. So during the clean-up, those FileSystems need to be used
|
||||||
|
* later can not get closed ,or else a race condition occurs.
|
||||||
|
*
|
||||||
|
* testForRaceCondition tests the case that two secure bulkload calls from the same UGI go
|
||||||
|
* into two different regions and one bulkload finishes earlier when the other bulkload still
|
||||||
|
* needs its FileSystems, checks that both bulkloads succeed.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testForRaceCondition() throws Exception {
|
||||||
|
Consumer<HRegion> fsCreatedListener = new Consumer<HRegion>() {
|
||||||
|
@Override
|
||||||
|
public void accept(HRegion hRegion) {
|
||||||
|
if (hRegion.getRegionInfo().containsRow(key3)) {
|
||||||
|
Threads.shutdown(ealierBulkload);/// wait util the other bulkload finished
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} ;
|
||||||
|
testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
|
||||||
|
.secureBulkLoadManager.setFsCreatedListener(fsCreatedListener);
|
||||||
|
/// create table
|
||||||
|
testUtil.createTable(TABLE,FAMILY,Bytes.toByteArrays(SPLIT_ROWKEY));
|
||||||
|
|
||||||
|
/// prepare files
|
||||||
|
Path rootdir = testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0)
|
||||||
|
.getRegionServer().getRootDir();
|
||||||
|
Path dir1 = new Path(rootdir, "dir1");
|
||||||
|
prepareHFile(dir1, key1, value1);
|
||||||
|
Path dir2 = new Path(rootdir, "dir2");
|
||||||
|
prepareHFile(dir2, key3, value3);
|
||||||
|
|
||||||
|
/// do bulkload
|
||||||
|
final AtomicReference<Throwable> t1Exception = new AtomicReference<>();
|
||||||
|
final AtomicReference<Throwable> t2Exception = new AtomicReference<>();
|
||||||
|
ealierBulkload = new Thread(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
doBulkloadWithoutRetry(dir1);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("bulk load failed .",e);
|
||||||
|
t1Exception.set(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
laterBulkload = new Thread(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
doBulkloadWithoutRetry(dir2);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("bulk load failed .",e);
|
||||||
|
t2Exception.set(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
ealierBulkload.start();
|
||||||
|
laterBulkload.start();
|
||||||
|
Threads.shutdown(ealierBulkload);
|
||||||
|
Threads.shutdown(laterBulkload);
|
||||||
|
Assert.assertNull(t1Exception.get());
|
||||||
|
Assert.assertNull(t2Exception.get());
|
||||||
|
|
||||||
|
/// check bulkload ok
|
||||||
|
Get get1 = new Get(key1);
|
||||||
|
Get get3 = new Get(key3);
|
||||||
|
Table t = testUtil.getConnection().getTable(TABLE);
|
||||||
|
Result r = t.get(get1);
|
||||||
|
Assert.assertArrayEquals(r.getValue(FAMILY, COLUMN), value1);
|
||||||
|
r = t.get(get3);
|
||||||
|
Assert.assertArrayEquals(r.getValue(FAMILY, COLUMN), value3);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A trick is used to make sure server-side failures( if any ) not being covered up by a client
|
||||||
|
* retry. Since LoadIncrementalHFiles.doBulkLoad keeps performing bulkload calls as long as the
|
||||||
|
* HFile queue is not empty, while server-side exceptions in the doAs block do not lead
|
||||||
|
* to a client exception, a bulkload will always succeed in this case by default, thus client
|
||||||
|
* will never be aware that failures have ever happened . To avoid this kind of retry ,
|
||||||
|
* a MyExceptionToAvoidRetry exception is thrown after bulkLoadPhase finished and caught
|
||||||
|
* silently outside the doBulkLoad call, so that the bulkLoadPhase would be called exactly
|
||||||
|
* once, and server-side failures, if any ,can be checked via data.
|
||||||
|
*/
|
||||||
|
class MyExceptionToAvoidRetry extends DoNotRetryIOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doBulkloadWithoutRetry(Path dir) throws Exception {
|
||||||
|
Connection connection = testUtil.getConnection();
|
||||||
|
LoadIncrementalHFiles h = new LoadIncrementalHFiles(conf) {
|
||||||
|
@Override
|
||||||
|
protected void bulkLoadPhase(final Table htable, final Connection conn,
|
||||||
|
ExecutorService pool, Deque<LoadQueueItem> queue,
|
||||||
|
final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile,
|
||||||
|
Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
|
||||||
|
super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile, item2RegionMap);
|
||||||
|
throw new MyExceptionToAvoidRetry(); // throw exception to avoid retry
|
||||||
|
}
|
||||||
|
};
|
||||||
|
try {
|
||||||
|
h.doBulkLoad(dir, testUtil.getAdmin(), connection.getTable(TABLE),
|
||||||
|
connection.getRegionLocator(TABLE));
|
||||||
|
Assert.fail("MyExceptionToAvoidRetry is expected");
|
||||||
|
} catch (MyExceptionToAvoidRetry e) { //expected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void prepareHFile(Path dir, byte[] key, byte[] value) throws Exception {
|
||||||
|
TableDescriptor desc = testUtil.getAdmin().getDescriptor(TABLE);
|
||||||
|
ColumnFamilyDescriptor family = desc.getColumnFamily(FAMILY);
|
||||||
|
Compression.Algorithm compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
|
||||||
|
|
||||||
|
CacheConfig writerCacheConf = new CacheConfig(conf, family);
|
||||||
|
writerCacheConf.setCacheDataOnWrite(false);
|
||||||
|
HFileContext hFileContext = new HFileContextBuilder()
|
||||||
|
.withIncludesMvcc(false)
|
||||||
|
.withIncludesTags(true)
|
||||||
|
.withCompression(compression)
|
||||||
|
.withCompressTags(family.isCompressTags())
|
||||||
|
.withChecksumType(HStore.getChecksumType(conf))
|
||||||
|
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
|
||||||
|
.withBlockSize(family.getBlocksize())
|
||||||
|
.withHBaseCheckSum(true)
|
||||||
|
.withDataBlockEncoding(family.getDataBlockEncoding())
|
||||||
|
.withEncryptionContext(Encryption.Context.NONE)
|
||||||
|
.withCreateTime(EnvironmentEdgeManager.currentTime())
|
||||||
|
.build();
|
||||||
|
StoreFileWriter.Builder builder =
|
||||||
|
new StoreFileWriter.Builder(conf, writerCacheConf, dir.getFileSystem(conf))
|
||||||
|
.withOutputDir(new Path(dir, family.getNameAsString()))
|
||||||
|
.withBloomType(family.getBloomFilterType())
|
||||||
|
.withMaxKeyCount(Integer.MAX_VALUE)
|
||||||
|
.withFileContext(hFileContext);
|
||||||
|
StoreFileWriter writer = builder.build();
|
||||||
|
|
||||||
|
Put put = new Put(key);
|
||||||
|
put.addColumn(FAMILY, COLUMN, value);
|
||||||
|
for (Cell c : put.get(FAMILY, COLUMN)) {
|
||||||
|
writer.append(c);
|
||||||
|
}
|
||||||
|
|
||||||
|
writer.close();
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue