HDFS-15005. Backport HDFS-12300 to branch-2. Contributed by Chao Sun.
Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
This commit is contained in:
parent
cdb0cafff5
commit
c6ce4011d7
|
@ -36,6 +36,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYPASSWORD_
|
|||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_PASSWORD_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_TRUSTSTORE_PASSWORD_KEY;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.net.InetAddress;
|
||||
|
@ -71,6 +73,7 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.http.HttpConfig;
|
||||
|
@ -81,6 +84,7 @@ import org.apache.hadoop.net.NetUtils;
|
|||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
@ -1643,4 +1647,21 @@ public class DFSUtil {
|
|||
return cryptoProvider;
|
||||
}
|
||||
|
||||
/**
|
||||
* Decodes an HDFS delegation token to its identifier.
|
||||
*
|
||||
* @param token the token
|
||||
* @return the decoded identifier.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static DelegationTokenIdentifier decodeDelegationToken(
|
||||
final Token<DelegationTokenIdentifier> token) throws IOException {
|
||||
final DelegationTokenIdentifier id = new DelegationTokenIdentifier();
|
||||
final ByteArrayInputStream buf =
|
||||
new ByteArrayInputStream(token.getIdentifier());
|
||||
try (DataInputStream in = new DataInputStream(buf)) {
|
||||
id.readFields(in);
|
||||
}
|
||||
return id;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -100,9 +100,7 @@ import static org.apache.hadoop.util.Time.monotonicNow;
|
|||
import static org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics.TOPMETRICS_METRICS_SOURCE_NAME;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInput;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
|
@ -5212,6 +5210,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
*/
|
||||
Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
|
||||
throws IOException {
|
||||
final String operationName = "getDelegationToken";
|
||||
final boolean success;
|
||||
final String tokenId;
|
||||
Token<DelegationTokenIdentifier> token;
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
writeLock();
|
||||
|
@ -5240,13 +5241,25 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
dtId, dtSecretManager);
|
||||
long expiryTime = dtSecretManager.getTokenExpiryTime(dtId);
|
||||
getEditLog().logGetDelegationToken(dtId, expiryTime);
|
||||
tokenId = makeTokenId(dtId);
|
||||
success = true;
|
||||
} finally {
|
||||
writeUnlock("getDelegationToken");
|
||||
}
|
||||
getEditLog().logSync();
|
||||
logAuditEvent(success, operationName, tokenId);
|
||||
return token;
|
||||
}
|
||||
|
||||
private String makeTokenId(DelegationTokenIdentifier dtId) {
|
||||
return dtId.getKind() +
|
||||
" token " +
|
||||
dtId.getSequenceNumber() +
|
||||
" for " +
|
||||
dtId.getUser().getShortUserName() +
|
||||
" with renewer " + dtId.getRenewer();
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param token token to renew
|
||||
|
@ -5256,6 +5269,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
*/
|
||||
long renewDelegationToken(Token<DelegationTokenIdentifier> token)
|
||||
throws InvalidToken, IOException {
|
||||
final String operationName = "renewDelegationToken";
|
||||
boolean success = false;
|
||||
String tokenId;
|
||||
long expiryTime;
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
writeLock();
|
||||
|
@ -5269,15 +5285,20 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
}
|
||||
String renewer = getRemoteUser().getShortUserName();
|
||||
expiryTime = dtSecretManager.renewToken(token, renewer);
|
||||
DelegationTokenIdentifier id = new DelegationTokenIdentifier();
|
||||
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
|
||||
DataInputStream in = new DataInputStream(buf);
|
||||
id.readFields(in);
|
||||
final DelegationTokenIdentifier id = DFSUtil.decodeDelegationToken(token);
|
||||
getEditLog().logRenewDelegationToken(id, expiryTime);
|
||||
tokenId = makeTokenId(id);
|
||||
success = true;
|
||||
} catch (AccessControlException ace) {
|
||||
final DelegationTokenIdentifier id = DFSUtil.decodeDelegationToken(token);
|
||||
tokenId = makeTokenId(id);
|
||||
logAuditEvent(success, operationName, tokenId);
|
||||
throw ace;
|
||||
} finally {
|
||||
writeUnlock("renewDelegationToken");
|
||||
}
|
||||
getEditLog().logSync();
|
||||
logAuditEvent(success, operationName, tokenId);
|
||||
return expiryTime;
|
||||
}
|
||||
|
||||
|
@ -5288,6 +5309,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
*/
|
||||
void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
|
||||
throws IOException {
|
||||
final String operationName = "cancelDelegationToken";
|
||||
boolean success = false;
|
||||
String tokenId;
|
||||
checkOperation(OperationCategory.WRITE);
|
||||
writeLock();
|
||||
try {
|
||||
|
@ -5298,10 +5322,18 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
DelegationTokenIdentifier id = dtSecretManager
|
||||
.cancelToken(token, canceller);
|
||||
getEditLog().logCancelDelegationToken(id);
|
||||
tokenId = makeTokenId(id);
|
||||
success = true;
|
||||
} catch (AccessControlException ace) {
|
||||
final DelegationTokenIdentifier id = DFSUtil.decodeDelegationToken(token);
|
||||
tokenId = makeTokenId(id);
|
||||
logAuditEvent(success, operationName, tokenId);
|
||||
throw ace;
|
||||
} finally {
|
||||
writeUnlock("cancelDelegationToken");
|
||||
}
|
||||
getEditLog().logSync();
|
||||
logAuditEvent(success, operationName, tokenId);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,151 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.BatchedRemoteIterator;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Test;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class TestAuditLoggerWithCommands {
|
||||
|
||||
static final int NUM_DATA_NODES = 2;
|
||||
static final long seed = 0xDEADBEEFL;
|
||||
static final int blockSize = 8192;
|
||||
private static MiniDFSCluster cluster = null;
|
||||
private static FileSystem fileSys = null;
|
||||
private static FileSystem fs2 = null;
|
||||
private static FileSystem fs = null;
|
||||
private static LogCapturer auditlog;
|
||||
static Configuration conf;
|
||||
static UserGroupInformation user1;
|
||||
static UserGroupInformation user2;
|
||||
private static NamenodeProtocols proto;
|
||||
|
||||
@BeforeClass
|
||||
public static void initialize() throws Exception {
|
||||
// start a cluster
|
||||
conf = new HdfsConfiguration();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY,true);
|
||||
conf.setBoolean(DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
|
||||
cluster =
|
||||
new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build();
|
||||
cluster.waitActive();
|
||||
user1 =
|
||||
UserGroupInformation.createUserForTesting("theDoctor",
|
||||
new String[]{"tardis"});
|
||||
user2 =
|
||||
UserGroupInformation.createUserForTesting("theEngineer",
|
||||
new String[]{"hadoop"});
|
||||
auditlog = LogCapturer.captureLogs(FSNamesystem.auditLog);
|
||||
proto = cluster.getNameNodeRpc();
|
||||
fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
|
||||
fs2 = DFSTestUtil.getFileSystemAs(user2, conf);
|
||||
fs = cluster.getFileSystem();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
fs.close();
|
||||
fs2.close();
|
||||
fileSys.close();
|
||||
cluster.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDelegationTokens() throws Exception {
|
||||
final Token dt = fs.getDelegationToken("foo");
|
||||
final String getDT =
|
||||
".*src=HDFS_DELEGATION_TOKEN token 1.*with renewer foo.*";
|
||||
verifyAuditLogs(true, ".*cmd=getDelegationToken" + getDT);
|
||||
|
||||
// renew
|
||||
final UserGroupInformation foo =
|
||||
UserGroupInformation.createUserForTesting("foo", new String[] {});
|
||||
foo.doAs(new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
dt.renew(conf);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
verifyAuditLogs(true, ".*cmd=renewDelegationToken" + getDT);
|
||||
try {
|
||||
dt.renew(conf);
|
||||
fail("Renewing a token with non-renewer should fail");
|
||||
} catch (AccessControlException expected) {
|
||||
}
|
||||
verifyAuditLogs(false, ".*cmd=renewDelegationToken" + getDT);
|
||||
|
||||
// cancel
|
||||
final UserGroupInformation bar =
|
||||
UserGroupInformation.createUserForTesting("bar", new String[] {});
|
||||
try {
|
||||
bar.doAs(new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
dt.cancel(conf);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
fail("Canceling a token with non-renewer should fail");
|
||||
} catch (AccessControlException expected) {
|
||||
}
|
||||
verifyAuditLogs(false, ".*cmd=cancelDelegationToken" + getDT);
|
||||
dt.cancel(conf);
|
||||
verifyAuditLogs(true, ".*cmd=cancelDelegationToken" + getDT);
|
||||
}
|
||||
|
||||
private int verifyAuditLogs(final boolean allowed, final String pattern) {
|
||||
return verifyAuditLogs(".*allowed=" + allowed + pattern);
|
||||
}
|
||||
|
||||
private int verifyAuditLogs(String pattern) {
|
||||
int length = auditlog.getOutput().split("\n").length;
|
||||
String lastAudit = auditlog.getOutput().split("\n")[length - 1];
|
||||
assertTrue("Unexpected log!", lastAudit.matches(pattern));
|
||||
return length;
|
||||
}
|
||||
|
||||
private void removeExistingCachePools(String prevPool) throws Exception {
|
||||
BatchedRemoteIterator.BatchedEntries<CachePoolEntry> entries =
|
||||
proto.listCachePools(prevPool);
|
||||
for(int i =0;i < entries.size();i++) {
|
||||
proto.removeCachePool(entries.get(i).getInfo().getPoolName());
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue