From 31b8b2d92b524a025ddc7dbf81f2d716a701d73c Mon Sep 17 00:00:00 2001 From: mbertozzi Date: Wed, 26 Feb 2014 01:03:50 +0000 Subject: [PATCH] HBASE-10608 Acquire the FS Delegation Token for Secure ExportSnapshot git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1571890 13f79535-47bb-0310-9956-ffa450edef68 --- .../mapreduce/LoadIncrementalHFiles.java | 34 ++--- .../security/token/FsDelegationToken.java | 117 ++++++++++++++++++ .../hadoop/hbase/snapshot/ExportSnapshot.java | 22 +++- .../hbase/snapshot/TestExportSnapshot.java | 39 ++++-- .../snapshot/TestSecureExportSnapshot.java | 53 ++++++++ 5 files changed, 226 insertions(+), 39 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestSecureExportSnapshot.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index e666e5b2aa5..43e167f0aaf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -79,10 +79,10 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.security.FsDelegationToken; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -110,8 +110,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private int maxFilesPerRegionPerFamily; private boolean assignSeqIds; - private boolean hasForwardedToken; - private Token userToken; + private FsDelegationToken fsDelegationToken; private String bulkToken; private UserProvider userProvider; @@ -123,6 +122,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { getConf().setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0); this.hbAdmin = new HBaseAdmin(conf); this.userProvider = UserProvider.instantiate(conf); + this.fsDelegationToken = new FsDelegationToken(userProvider, "renewer"); assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true); maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32); } @@ -261,19 +261,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { //prepare staging directory and token if (userProvider.isHBaseSecurityEnabled()) { FileSystem fs = FileSystem.get(getConf()); - //This condition is here for unit testing - //Since delegation token doesn't work in mini cluster - if (userProvider.isHadoopSecurityEnabled()) { - userToken = userProvider.getCurrent().getToken("HDFS_DELEGATION_TOKEN", - fs.getCanonicalServiceName()); - if (userToken == null) { - hasForwardedToken = false; - userToken = fs.getDelegationToken("renewer"); - } else { - hasForwardedToken = true; - LOG.info("Use the existing token: " + userToken); - } - } + fsDelegationToken.acquireDelegationToken(fs); + bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getName()); } @@ -312,13 +301,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } finally { if (userProvider.isHBaseSecurityEnabled()) { - if (userToken != null && !hasForwardedToken) { - try { - userToken.cancel(getConf()); - } catch (Exception e) { - LOG.warn("Failed to cancel HDFS delegation token.", e); - } - } + fsDelegationToken.releaseDelegationToken(); + if(bulkToken != null) { new SecureBulkLoadClient(table).cleanupBulkLoad(bulkToken); } @@ -609,8 +593,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } else { HTable table = new HTable(conn.getConfiguration(), getTableName()); secureClient = new SecureBulkLoadClient(table); - success = secureClient.bulkLoadHFiles(famPaths, userToken, bulkToken, - getLocation().getRegionInfo().getStartKey()); + success = secureClient.bulkLoadHFiles(famPaths, fsDelegationToken.getUserToken(), + bulkToken, getLocation().getRegionInfo().getStartKey()); } return success; } finally { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java new file mode 100644 index 00000000000..628e836683b --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/FsDelegationToken.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.security; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.security.token.Token; + +/** + * Helper class to obtain a filesystem delegation token. + * Mainly used by Map-Reduce jobs that requires to read/write data to + * a remote file-system (e.g. BulkLoad, ExportSnapshot). + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class FsDelegationToken { + private static final Log LOG = LogFactory.getLog(FsDelegationToken.class); + + private final UserProvider userProvider; + private final String renewer; + + private boolean hasForwardedToken = false; + private Token userToken = null; + private FileSystem fs = null; + + /* + * @param renewer the account name that is allowed to renew the token. + */ + public FsDelegationToken(final UserProvider userProvider, final String renewer) { + this.userProvider = userProvider; + this.renewer = renewer; + } + + /** + * Acquire the delegation token for the specified filesytem. + * Before requesting a new delegation token, tries to find one already available. + * + * @param fs the filesystem that requires the delegation token + * @throws IOException on fs.getDelegationToken() failure + */ + public void acquireDelegationToken(final FileSystem fs) + throws IOException { + if (userProvider.isHadoopSecurityEnabled()) { + this.fs = fs; + userToken = userProvider.getCurrent().getToken("HDFS_DELEGATION_TOKEN", + fs.getCanonicalServiceName()); + if (userToken == null) { + hasForwardedToken = false; + userToken = fs.getDelegationToken(renewer); + } else { + hasForwardedToken = true; + LOG.info("Use the existing token: " + userToken); + } + } + } + + /** + * Releases a previously acquired delegation token. + */ + public void releaseDelegationToken() { + if (userProvider.isHBaseSecurityEnabled()) { + if (userToken != null && !hasForwardedToken) { + try { + userToken.cancel(this.fs.getConf()); + } catch (Exception e) { + LOG.warn("Failed to cancel HDFS delegation token: " + userToken, e); + } + } + this.userToken = null; + this.fs = null; + } + } + + public UserProvider getUserProvider() { + return userProvider; + } + + /** + * @return the account name that is allowed to renew the token. + */ + public String getRenewer() { + return renewer; + } + + /** + * @return the delegation token acquired, or null in case it was not acquired + */ + public Token getUserToken() { + return userToken; + } + + public FileSystem getFileSystem() { + return fs; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java index feae1c42a94..73f35423431 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java @@ -50,6 +50,8 @@ import org.apache.hadoop.hbase.mapreduce.JobUtil; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.security.FsDelegationToken; +import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; @@ -549,7 +551,8 @@ public final class ExportSnapshot extends Configured implements Tool { /** * Run Map-Reduce Job to perform the files copy. */ - private boolean runCopyJob(final Path inputRoot, final Path outputRoot, + private boolean runCopyJob(final FileSystem inputFs, final Path inputRoot, + final FileSystem outputFs, final Path outputRoot, final List> snapshotFiles, final boolean verifyChecksum, final String filesUser, final String filesGroup, final int filesMode, final int mappers) throws IOException, InterruptedException, ClassNotFoundException { @@ -576,7 +579,20 @@ public final class ExportSnapshot extends Configured implements Tool { SequenceFileInputFormat.addInputPath(job, path); } - return job.waitForCompletion(true); + UserProvider userProvider = UserProvider.instantiate(job.getConfiguration()); + FsDelegationToken inputFsToken = new FsDelegationToken(userProvider, "irenewer"); + FsDelegationToken outputFsToken = new FsDelegationToken(userProvider, "orenewer"); + try { + // Acquire the delegation Tokens + inputFsToken.acquireDelegationToken(inputFs); + outputFsToken.acquireDelegationToken(outputFs); + + // Run the MR Job + return job.waitForCompletion(true); + } finally { + inputFsToken.releaseDelegationToken(); + outputFsToken.releaseDelegationToken(); + } } /** @@ -689,7 +705,7 @@ public final class ExportSnapshot extends Configured implements Tool { if (files.size() == 0) { LOG.warn("There are 0 store file to be copied. There may be no data in the table."); } else { - if (!runCopyJob(inputRoot, outputRoot, files, verifyChecksum, + if (!runCopyJob(inputFs, inputRoot, outputFs, outputRoot, files, verifyChecksum, filesUser, filesGroup, filesMode, mappers)) { throw new ExportSnapshotException("Snapshot export failed!"); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java index 86223d43fef..d29ec8c54d4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java @@ -66,7 +66,7 @@ import org.junit.experimental.categories.Category; public class TestExportSnapshot { private final Log LOG = LogFactory.getLog(getClass()); - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private final static byte[] FAMILY = Bytes.toBytes("cf"); @@ -75,15 +75,19 @@ public class TestExportSnapshot { private TableName tableName; private HBaseAdmin admin; + public static void setUpBaseConf(Configuration conf) { + conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); + conf.setInt("hbase.regionserver.msginterval", 100); + conf.setInt("hbase.client.pause", 250); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); + conf.setBoolean("hbase.master.enabletable.roundrobin", true); + conf.setInt("mapreduce.map.max.attempts", 10); + conf.setInt("mapred.map.max.attempts", 10); + } + @BeforeClass public static void setUpBeforeClass() throws Exception { - TEST_UTIL.getConfiguration().setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); - TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100); - TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250); - TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); - TEST_UTIL.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true); - TEST_UTIL.getConfiguration().setInt("mapreduce.map.max.attempts", 10); - TEST_UTIL.getConfiguration().setInt("mapred.map.max.attempts", 10); + setUpBaseConf(TEST_UTIL.getConfiguration()); TEST_UTIL.startMiniCluster(3); TEST_UTIL.startMiniMapReduceCluster(); } @@ -180,7 +184,7 @@ public class TestExportSnapshot { @Test public void testConsecutiveExports() throws Exception { - Path copyDir = TEST_UTIL.getDataTestDir("export-" + System.currentTimeMillis()); + Path copyDir = getLocalDestinationDir(); testExportFileSystemState(tableName, snapshotName, 2, copyDir, false); testExportFileSystemState(tableName, snapshotName, 2, copyDir, true); removeExportDir(copyDir); @@ -239,7 +243,7 @@ public class TestExportSnapshot { private void testExportFileSystemState(final TableName tableName, final byte[] snapshotName, int filesExpected) throws Exception { - Path copyDir = TEST_UTIL.getDataTestDir("export-" + System.currentTimeMillis()); + Path copyDir = getHdfsDestinationDir(); testExportFileSystemState(tableName, snapshotName, filesExpected, copyDir, false); removeExportDir(copyDir); } @@ -305,7 +309,7 @@ public class TestExportSnapshot { */ private int runExportAndInjectFailures(final byte[] snapshotName, boolean retry) throws Exception { - Path copyDir = TEST_UTIL.getDataTestDir("export-" + System.currentTimeMillis()); + Path copyDir = getLocalDestinationDir(); URI hdfsUri = FileSystem.get(TEST_UTIL.getConfiguration()).getUri(); FileSystem fs = FileSystem.get(copyDir.toUri(), new Configuration()); copyDir = copyDir.makeQualified(fs); @@ -385,6 +389,19 @@ public class TestExportSnapshot { return files; } + private Path getHdfsDestinationDir() { + Path rootDir = TEST_UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); + Path path = new Path(new Path(rootDir, "export-test"), "export-" + System.currentTimeMillis()); + LOG.info("HDFS export destination path: " + path); + return path; + } + + private Path getLocalDestinationDir() { + Path path = TEST_UTIL.getDataTestDir("local-export-" + System.currentTimeMillis()); + LOG.info("Local export destination path: " + path); + return path; + } + private void removeExportDir(final Path path) throws IOException { FileSystem fs = FileSystem.get(path.toUri(), new Configuration()); FSUtils.logFileSystemState(fs, path, LOG); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestSecureExportSnapshot.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestSecureExportSnapshot.java new file mode 100644 index 00000000000..b9f76e6862e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestSecureExportSnapshot.java @@ -0,0 +1,53 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.snapshot; + +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.mapreduce.HadoopSecurityEnabledUserProviderForTesting; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.security.access.AccessControlLists; +import org.apache.hadoop.hbase.security.access.SecureTestUtil; + +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +/** + * Reruns TestExportSnapshot using ExportSnapshot in secure mode. + */ +@Category(LargeTests.class) +public class TestSecureExportSnapshot extends TestExportSnapshot { + @BeforeClass + public static void setUpBeforeClass() throws Exception { + setUpBaseConf(TEST_UTIL.getConfiguration()); + + // set the always on security provider + UserProvider.setUserProviderForTesting(TEST_UTIL.getConfiguration(), + HadoopSecurityEnabledUserProviderForTesting.class); + + // setup configuration + SecureTestUtil.enableSecurity(TEST_UTIL.getConfiguration()); + + TEST_UTIL.startMiniCluster(3); + TEST_UTIL.startMiniMapReduceCluster(); + + // Wait for the ACL table to become available + TEST_UTIL.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME.getName()); + } +}