Merge branch 'apache:trunk' into YARN-11444

This commit is contained in:
slfan1989 2023-02-25 18:52:12 +08:00 committed by GitHub
commit 4083fcab27
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 2316 additions and 123 deletions

View File

@ -414,7 +414,14 @@ public class LocalDirAllocator {
//build the "roulette wheel"
for(int i =0; i < ctx.dirDF.length; ++i) {
availableOnDisk[i] = ctx.dirDF[i].getAvailable();
final DF target = ctx.dirDF[i];
// attempt to recreate the dir so that getAvailable() is valid
// if it fails, getAvailable() will return 0, so the dir will
// be declared unavailable.
// return value is logged at debug to keep spotbugs quiet.
final boolean b = new File(target.getDirPath()).mkdirs();
LOG.debug("mkdirs of {}={}", target, b);
availableOnDisk[i] = target.getAvailable();
totalAvailable += availableOnDisk[i];
}

View File

@ -49,6 +49,7 @@ public final class CallerContext {
public static final String CLIENT_PORT_STR = "clientPort";
public static final String CLIENT_ID_STR = "clientId";
public static final String CLIENT_CALL_ID_STR = "clientCallId";
public static final String REAL_USER_STR = "realUser";
/** The caller context.
*

View File

@ -19,7 +19,9 @@
package org.apache.hadoop.security.token.delegation;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.security.MessageDigest;
import java.util.ArrayList;
@ -41,6 +43,8 @@ import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@ -441,8 +445,9 @@ extends AbstractDelegationTokenIdentifier>
/**
* Update the current master key for generating delegation tokens
* It should be called only by tokenRemoverThread.
* @throws IOException raised on errors performing I/O.
*/
void rollMasterKey() throws IOException {
protected void rollMasterKey() throws IOException {
synchronized (this) {
removeExpiredKeys();
/* set final expiry date for retiring currentKey */
@ -677,11 +682,15 @@ extends AbstractDelegationTokenIdentifier>
/** Class to encapsulate a token's renew date and password. */
@InterfaceStability.Evolving
public static class DelegationTokenInformation {
public static class DelegationTokenInformation implements Writable {
long renewDate;
byte[] password;
String trackingId;
public DelegationTokenInformation() {
this(0, null);
}
public DelegationTokenInformation(long renewDate, byte[] password) {
this(renewDate, password, null);
}
@ -711,6 +720,29 @@ extends AbstractDelegationTokenIdentifier>
public String getTrackingId() {
return trackingId;
}
@Override
public void write(DataOutput out) throws IOException {
WritableUtils.writeVLong(out, renewDate);
if (password == null) {
WritableUtils.writeVInt(out, -1);
} else {
WritableUtils.writeVInt(out, password.length);
out.write(password);
}
WritableUtils.writeString(out, trackingId);
}
@Override
public void readFields(DataInput in) throws IOException {
renewDate = WritableUtils.readVLong(in);
int len = WritableUtils.readVInt(in);
if (len > -1) {
password = new byte[len];
in.readFully(password);
}
trackingId = WritableUtils.readString(in);
}
}
/** Remove expired delegation tokens from cache */

View File

@ -0,0 +1,400 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.security.token.delegation;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.sql.SQLException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An implementation of {@link AbstractDelegationTokenSecretManager} that
* persists TokenIdentifiers and DelegationKeys in an existing SQL database.
*/
public abstract class SQLDelegationTokenSecretManager<TokenIdent
extends AbstractDelegationTokenIdentifier>
extends AbstractDelegationTokenSecretManager<TokenIdent> {
private static final Logger LOG =
LoggerFactory.getLogger(SQLDelegationTokenSecretManager.class);
public static final String SQL_DTSM_CONF_PREFIX = "sql-dt-secret-manager.";
private static final String SQL_DTSM_TOKEN_SEQNUM_BATCH_SIZE = SQL_DTSM_CONF_PREFIX
+ "token.seqnum.batch.size";
public static final int DEFAULT_SEQ_NUM_BATCH_SIZE = 10;
// Batch of sequence numbers that will be requested by the sequenceNumCounter.
// A new batch is requested once the sequenceNums available to a secret manager are
// exhausted, including during initialization.
private final int seqNumBatchSize;
// Last sequenceNum in the current batch that has been allocated to a token.
private int currentSeqNum;
// Max sequenceNum in the current batch that can be allocated to a token.
// Unused sequenceNums in the current batch cannot be reused by other routers.
private int currentMaxSeqNum;
public SQLDelegationTokenSecretManager(Configuration conf) {
super(conf.getLong(DelegationTokenManager.UPDATE_INTERVAL,
DelegationTokenManager.UPDATE_INTERVAL_DEFAULT) * 1000,
conf.getLong(DelegationTokenManager.MAX_LIFETIME,
DelegationTokenManager.MAX_LIFETIME_DEFAULT) * 1000,
conf.getLong(DelegationTokenManager.RENEW_INTERVAL,
DelegationTokenManager.RENEW_INTERVAL_DEFAULT) * 1000,
conf.getLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL,
DelegationTokenManager.REMOVAL_SCAN_INTERVAL_DEFAULT) * 1000);
this.seqNumBatchSize = conf.getInt(SQL_DTSM_TOKEN_SEQNUM_BATCH_SIZE,
DEFAULT_SEQ_NUM_BATCH_SIZE);
}
/**
* Persists a TokenIdentifier and its corresponding TokenInformation into
* the SQL database. The TokenIdentifier is expected to be unique and any
* duplicate token attempts will result in an IOException.
* @param ident TokenIdentifier to persist.
* @param tokenInfo DelegationTokenInformation associated with the TokenIdentifier.
*/
@Override
protected void storeToken(TokenIdent ident,
DelegationTokenInformation tokenInfo) throws IOException {
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos)) {
tokenInfo.write(dos);
// Add token to SQL database
insertToken(ident.getSequenceNumber(), ident.getBytes(), bos.toByteArray());
// Add token to local cache
super.storeToken(ident, tokenInfo);
} catch (SQLException e) {
throw new IOException("Failed to store token in SQL secret manager", e);
}
}
/**
* Updates the TokenInformation of an existing TokenIdentifier in
* the SQL database.
* @param ident Existing TokenIdentifier in the SQL database.
* @param tokenInfo Updated DelegationTokenInformation associated with the TokenIdentifier.
*/
@Override
protected void updateToken(TokenIdent ident,
DelegationTokenInformation tokenInfo) throws IOException {
try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
try (DataOutputStream dos = new DataOutputStream(bos)) {
tokenInfo.write(dos);
// Update token in SQL database
updateToken(ident.getSequenceNumber(), ident.getBytes(), bos.toByteArray());
// Update token in local cache
super.updateToken(ident, tokenInfo);
}
} catch (SQLException e) {
throw new IOException("Failed to update token in SQL secret manager", e);
}
}
/**
* Cancels a token by removing it from the SQL database. This will
* call the corresponding method in {@link AbstractDelegationTokenSecretManager}
* to perform validation and remove the token from the cache.
* @return Identifier of the canceled token
*/
@Override
public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
String canceller) throws IOException {
try (ByteArrayInputStream bis = new ByteArrayInputStream(token.getIdentifier());
DataInputStream din = new DataInputStream(bis)) {
TokenIdent id = createIdentifier();
id.readFields(din);
// Calling getTokenInfo to load token into local cache if not present.
// super.cancelToken() requires token to be present in local cache.
getTokenInfo(id);
}
return super.cancelToken(token, canceller);
}
/**
* Removes the existing TokenInformation from the SQL database to
* invalidate it.
* @param ident TokenInformation to remove from the SQL database.
*/
@Override
protected void removeStoredToken(TokenIdent ident) throws IOException {
try {
deleteToken(ident.getSequenceNumber(), ident.getBytes());
} catch (SQLException e) {
LOG.warn("Failed to remove token in SQL secret manager", e);
}
}
/**
* Obtains the DelegationTokenInformation associated with the given
* TokenIdentifier in the SQL database.
* @param ident Existing TokenIdentifier in the SQL database.
* @return DelegationTokenInformation that matches the given TokenIdentifier or
* null if it doesn't exist in the database.
*/
@Override
protected DelegationTokenInformation getTokenInfo(TokenIdent ident) {
// Look for token in local cache
DelegationTokenInformation tokenInfo = super.getTokenInfo(ident);
if (tokenInfo == null) {
try {
// Look for token in SQL database
byte[] tokenInfoBytes = selectTokenInfo(ident.getSequenceNumber(), ident.getBytes());
if (tokenInfoBytes != null) {
tokenInfo = new DelegationTokenInformation();
try (ByteArrayInputStream bis = new ByteArrayInputStream(tokenInfoBytes)) {
try (DataInputStream dis = new DataInputStream(bis)) {
tokenInfo.readFields(dis);
}
}
// Update token in local cache
currentTokens.put(ident, tokenInfo);
}
} catch (IOException | SQLException e) {
LOG.error("Failed to get token in SQL secret manager", e);
}
}
return tokenInfo;
}
/**
* Obtains the value of the last reserved sequence number.
* @return Last reserved sequence number.
*/
@Override
public int getDelegationTokenSeqNum() {
try {
return selectSequenceNum();
} catch (SQLException e) {
throw new RuntimeException(
"Failed to get token sequence number in SQL secret manager", e);
}
}
/**
* Updates the value of the last reserved sequence number.
* @param seqNum Value to update the sequence number to.
*/
@Override
public void setDelegationTokenSeqNum(int seqNum) {
try {
updateSequenceNum(seqNum);
} catch (SQLException e) {
throw new RuntimeException(
"Failed to update token sequence number in SQL secret manager", e);
}
}
/**
* Obtains the next available sequence number that can be allocated to a Token.
* Sequence numbers need to be reserved using the shared sequenceNumberCounter once
* the local batch has been exhausted, which handles sequenceNumber allocation
* concurrently with other secret managers.
* This method ensures that sequence numbers are incremental in a single secret manager,
* but not across secret managers.
* @return Next available sequence number.
*/
@Override
public synchronized int incrementDelegationTokenSeqNum() {
if (currentSeqNum >= currentMaxSeqNum) {
try {
// Request a new batch of sequence numbers and use the
// lowest one available.
currentSeqNum = incrementSequenceNum(seqNumBatchSize);
currentMaxSeqNum = currentSeqNum + seqNumBatchSize;
} catch (SQLException e) {
throw new RuntimeException(
"Failed to increment token sequence number in SQL secret manager", e);
}
}
return ++currentSeqNum;
}
/**
* Persists a DelegationKey into the SQL database. The delegation keyId
* is expected to be unique and any duplicate key attempts will result
* in an IOException.
* @param key DelegationKey to persist into the SQL database.
*/
@Override
protected void storeDelegationKey(DelegationKey key) throws IOException {
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos)) {
key.write(dos);
// Add delegation key to SQL database
insertDelegationKey(key.getKeyId(), bos.toByteArray());
// Add delegation key to local cache
super.storeDelegationKey(key);
} catch (SQLException e) {
throw new IOException("Failed to store delegation key in SQL secret manager", e);
}
}
/**
* Updates an existing DelegationKey in the SQL database.
* @param key Updated DelegationKey.
*/
@Override
protected void updateDelegationKey(DelegationKey key) throws IOException {
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos)) {
key.write(dos);
// Update delegation key in SQL database
updateDelegationKey(key.getKeyId(), bos.toByteArray());
// Update delegation key in local cache
super.updateDelegationKey(key);
} catch (SQLException e) {
throw new IOException("Failed to update delegation key in SQL secret manager", e);
}
}
/**
* Removes the existing DelegationKey from the SQL database to
* invalidate it.
* @param key DelegationKey to remove from the SQL database.
*/
@Override
protected void removeStoredMasterKey(DelegationKey key) {
try {
deleteDelegationKey(key.getKeyId());
} catch (SQLException e) {
LOG.warn("Failed to remove delegation key in SQL secret manager", e);
}
}
/**
* Obtains the DelegationKey from the SQL database.
* @param keyId KeyId of the DelegationKey to obtain.
* @return DelegationKey that matches the given keyId or null
* if it doesn't exist in the database.
*/
@Override
protected DelegationKey getDelegationKey(int keyId) {
// Look for delegation key in local cache
DelegationKey delegationKey = super.getDelegationKey(keyId);
if (delegationKey == null) {
try {
// Look for delegation key in SQL database
byte[] delegationKeyBytes = selectDelegationKey(keyId);
if (delegationKeyBytes != null) {
delegationKey = new DelegationKey();
try (ByteArrayInputStream bis = new ByteArrayInputStream(delegationKeyBytes)) {
try (DataInputStream dis = new DataInputStream(bis)) {
delegationKey.readFields(dis);
}
}
// Update delegation key in local cache
allKeys.put(keyId, delegationKey);
}
} catch (IOException | SQLException e) {
LOG.error("Failed to get delegation key in SQL secret manager", e);
}
}
return delegationKey;
}
/**
* Obtains the value of the last delegation key id.
* @return Last delegation key id.
*/
@Override
public int getCurrentKeyId() {
try {
return selectKeyId();
} catch (SQLException e) {
throw new RuntimeException(
"Failed to get delegation key id in SQL secret manager", e);
}
}
/**
* Updates the value of the last delegation key id.
* @param keyId Value to update the delegation key id to.
*/
@Override
public void setCurrentKeyId(int keyId) {
try {
updateKeyId(keyId);
} catch (SQLException e) {
throw new RuntimeException(
"Failed to set delegation key id in SQL secret manager", e);
}
}
/**
* Obtains the next available delegation key id that can be allocated to a DelegationKey.
* Delegation key id need to be reserved using the shared delegationKeyIdCounter,
* which handles keyId allocation concurrently with other secret managers.
* @return Next available delegation key id.
*/
@Override
public int incrementCurrentKeyId() {
try {
return incrementKeyId(1) + 1;
} catch (SQLException e) {
throw new RuntimeException(
"Failed to increment delegation key id in SQL secret manager", e);
}
}
// Token operations in SQL database
protected abstract byte[] selectTokenInfo(int sequenceNum, byte[] tokenIdentifier)
throws SQLException;
protected abstract void insertToken(int sequenceNum, byte[] tokenIdentifier, byte[] tokenInfo)
throws SQLException;
protected abstract void updateToken(int sequenceNum, byte[] tokenIdentifier, byte[] tokenInfo)
throws SQLException;
protected abstract void deleteToken(int sequenceNum, byte[] tokenIdentifier)
throws SQLException;
// Delegation key operations in SQL database
protected abstract byte[] selectDelegationKey(int keyId) throws SQLException;
protected abstract void insertDelegationKey(int keyId, byte[] delegationKey)
throws SQLException;
protected abstract void updateDelegationKey(int keyId, byte[] delegationKey)
throws SQLException;
protected abstract void deleteDelegationKey(int keyId) throws SQLException;
// Counter operations in SQL database
protected abstract int selectSequenceNum() throws SQLException;
protected abstract void updateSequenceNum(int value) throws SQLException;
protected abstract int incrementSequenceNum(int amount) throws SQLException;
protected abstract int selectKeyId() throws SQLException;
protected abstract void updateKeyId(int value) throws SQLException;
protected abstract int incrementKeyId(int amount) throws SQLException;
}

View File

@ -548,5 +548,24 @@ public class TestLocalDirAllocator {
"p1/x", Long.MAX_VALUE - 1), "Expect a DiskErrorException.",
() -> dirAllocator.getLocalPathForWrite("p1/x", Long.MAX_VALUE - 1, conf));
}
/**
* Test for HADOOP-18636 LocalDirAllocator cannot recover from directory tree deletion.
*/
@Test(timeout = 30000)
public void testDirectoryRecovery() throws Throwable {
String dir0 = buildBufferDir(ROOT, 0);
String subdir = dir0 + "/subdir1/subdir2";
conf.set(CONTEXT, subdir);
// get local path and an ancestor
final Path pathForWrite = dirAllocator.getLocalPathForWrite("file", -1, conf);
final Path ancestor = pathForWrite.getParent().getParent();
// delete that ancestor
localFs.delete(ancestor, true);
// and expect to get a new file back
dirAllocator.getLocalPathForWrite("file2", -1, conf);
}
}

View File

@ -117,6 +117,15 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
@ -153,6 +162,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
@ -170,6 +184,14 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemProperties>
<property>
<name>derby.stream.error.file</name>
<value>${project.build.directory}/derby.log</value>
</property>
</systemProperties>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Script to create a new Database in MySQL for the TokenStore
CREATE DATABASE IF NOT EXISTS TokenStore;

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Script to generate all the tables for the TokenStore in MySQL
USE TokenStore
CREATE TABLE IF NOT EXISTS Tokens(
sequenceNum int NOT NULL,
tokenIdentifier varbinary(255) NOT NULL,
tokenInfo varbinary(255) NOT NULL,
modifiedTime timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY(sequenceNum, tokenIdentifier)
);
CREATE TABLE IF NOT EXISTS DelegationKeys(
keyId int NOT NULL,
delegationKey varbinary(255) NOT NULL,
modifiedTime timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY(keyId)
);
CREATE TABLE IF NOT EXISTS LastSequenceNum(
sequenceNum int NOT NULL
);
-- Initialize the LastSequenceNum table with a single entry
INSERT INTO LastSequenceNum (sequenceNum)
SELECT 0 WHERE NOT EXISTS (SELECT * FROM LastSequenceNum);
CREATE TABLE IF NOT EXISTS LastDelegationKeyId(
keyId int NOT NULL
);
-- Initialize the LastDelegationKeyId table with a single entry
INSERT INTO LastDelegationKeyId (keyId)
SELECT 0 WHERE NOT EXISTS (SELECT * FROM LastDelegationKeyId);

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Script to create a new User in MySQL for the TokenStore
-- Update TokenStore user and password on this script
CREATE USER IF NOT EXISTS 'TokenStoreUser'@'%' IDENTIFIED BY 'TokenStorePassword';
GRANT ALL PRIVILEGES ON TokenStore.* TO 'TokenStoreUser'@'%';
FLUSH PRIVILEGES;

View File

@ -0,0 +1,24 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
These scripts must be executed to create the TokenStore database, tables and users needed to use the
SQLDelegationTokenSecretManagerImpl as the delegation token secret manager:
1. TokenStoreDatabase.sql
2. TokenStoreTables.sql
3. TokenStoreUser.sql
Note: The TokenStoreUser.sql defines a default user/password. You are highly encouraged to set
this to a proper strong password.

View File

@ -491,7 +491,7 @@ public class RouterRpcClient {
+ router.getRouterId());
}
addClientInfoToCallerContext();
addClientInfoToCallerContext(ugi);
Object ret = null;
if (rpcMonitor != null) {
@ -627,14 +627,18 @@ public class RouterRpcClient {
/**
* For tracking some information about the actual client.
* It adds trace info "clientIp:ip", "clientPort:port",
* "clientId:id" and "clientCallId:callId"
* "clientId:id", "clientCallId:callId" and "realUser:userName"
* in the caller context, removing the old values if they were
* already present.
*/
private void addClientInfoToCallerContext() {
private void addClientInfoToCallerContext(UserGroupInformation ugi) {
CallerContext ctx = CallerContext.getCurrent();
String origContext = ctx == null ? null : ctx.getContext();
byte[] origSignature = ctx == null ? null : ctx.getSignature();
String realUser = null;
if (ugi.getRealUser() != null) {
realUser = ugi.getRealUser().getUserName();
}
CallerContext.Builder builder =
new CallerContext.Builder("", contextFieldSeparator)
.append(CallerContext.CLIENT_IP_STR, Server.getRemoteAddress())
@ -644,6 +648,7 @@ public class RouterRpcClient {
StringUtils.byteToHexString(Server.getClientId()))
.append(CallerContext.CLIENT_CALL_ID_STR,
Integer.toString(Server.getCallId()))
.append(CallerContext.REAL_USER_STR, realUser)
.setSignature(origSignature);
// Append the original caller context
if (origContext != null) {

View File

@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router.security.token;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Distributed counter that relies on a SQL database to synchronize
* between multiple clients. This expects a table with a single int field
* to exist in the database. One record must exist on the table at all times,
* representing the last used value reserved by a client.
*/
public class DistributedSQLCounter {
private static final Logger LOG =
LoggerFactory.getLogger(DistributedSQLCounter.class);
private final String field;
private final String table;
private final SQLConnectionFactory connectionFactory;
public DistributedSQLCounter(String field, String table,
SQLConnectionFactory connectionFactory) {
this.field = field;
this.table = table;
this.connectionFactory = connectionFactory;
}
/**
* Obtains the value of the counter.
* @return counter value.
*/
public int selectCounterValue() throws SQLException {
try (Connection connection = connectionFactory.getConnection()) {
return selectCounterValue(false, connection);
}
}
private int selectCounterValue(boolean forUpdate, Connection connection) throws SQLException {
String query = String.format("SELECT %s FROM %s %s", field, table,
forUpdate ? "FOR UPDATE" : "");
LOG.debug("Select counter statement: " + query);
try (Statement statement = connection.createStatement();
ResultSet result = statement.executeQuery(query)) {
if (result.next()) {
return result.getInt(field);
} else {
throw new IllegalStateException("Counter table not initialized: " + table);
}
}
}
/**
* Sets the counter to the given value.
* @param value Value to assign to counter.
*/
public void updateCounterValue(int value) throws SQLException {
try (Connection connection = connectionFactory.getConnection(true)) {
updateCounterValue(value, connection);
}
}
/**
* Sets the counter to the given value.
* @param connection Connection to database hosting the counter table.
* @param value Value to assign to counter.
*/
public void updateCounterValue(int value, Connection connection) throws SQLException {
String queryText = String.format("UPDATE %s SET %s = ?", table, field);
LOG.debug("Update counter statement: " + queryText + ". Value: " + value);
try (PreparedStatement statement = connection.prepareStatement(queryText)) {
statement.setInt(1, value);
statement.execute();
}
}
/**
* Increments the counter by the given amount and
* returns the previous counter value.
* @param amount Amount to increase the counter.
* @return Previous counter value.
*/
public int incrementCounterValue(int amount) throws SQLException {
// Disabling auto-commit to ensure that all statements on this transaction
// are committed at once.
try (Connection connection = connectionFactory.getConnection(false)) {
// Preventing dirty reads and non-repeatable reads to ensure that the
// value read will not be updated by a different connection.
if (connection.getTransactionIsolation() < Connection.TRANSACTION_REPEATABLE_READ) {
connection.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
}
try {
// Reading the counter value "FOR UPDATE" to lock the value record,
// forcing other connections to wait until this transaction is committed.
int lastValue = selectCounterValue(true, connection);
// Calculate the new counter value and handling overflow by
// resetting the counter to 0.
int newValue = lastValue + amount;
if (newValue < 0) {
lastValue = 0;
newValue = amount;
}
updateCounterValue(newValue, connection);
connection.commit();
return lastValue;
} catch (Exception e) {
// Rollback transaction to release table locks
connection.rollback();
throw e;
}
}
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router.security.token;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.SQLDelegationTokenSecretManager;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;
/**
* Class that relies on a HikariDataSource to provide SQL connections.
*/
class HikariDataSourceConnectionFactory implements SQLConnectionFactory {
protected final static String HIKARI_PROPS = SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX
+ "connection.hikari.";
private final HikariDataSource dataSource;
HikariDataSourceConnectionFactory(Configuration conf) {
Properties properties = new Properties();
properties.setProperty("jdbcUrl", conf.get(CONNECTION_URL));
properties.setProperty("username", conf.get(CONNECTION_USERNAME));
properties.setProperty("password", conf.get(CONNECTION_PASSWORD));
properties.setProperty("driverClassName", conf.get(CONNECTION_DRIVER));
// Include hikari connection properties
properties.putAll(conf.getPropsWithPrefix(HIKARI_PROPS));
HikariConfig hikariConfig = new HikariConfig(properties);
this.dataSource = new HikariDataSource(hikariConfig);
}
@Override
public Connection getConnection() throws SQLException {
return dataSource.getConnection();
}
@Override
public void shutdown() {
// Close database connections
dataSource.close();
}
@VisibleForTesting
HikariDataSource getDataSource() {
return dataSource;
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router.security.token;
import com.mysql.cj.jdbc.MysqlDataSource;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.SQLDelegationTokenSecretManager;
/**
* Interface to provide SQL connections to the {@link SQLDelegationTokenSecretManagerImpl}.
*/
public interface SQLConnectionFactory {
String CONNECTION_URL = SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX
+ "connection.url";
String CONNECTION_USERNAME = SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX
+ "connection.username";
String CONNECTION_PASSWORD = SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX
+ "connection.password";
String CONNECTION_DRIVER = SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX
+ "connection.driver";
Connection getConnection() throws SQLException;
void shutdown();
default Connection getConnection(boolean autocommit) throws SQLException {
Connection connection = getConnection();
connection.setAutoCommit(autocommit);
return connection;
}
}

View File

@ -0,0 +1,242 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router.security.token;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.SQLDelegationTokenSecretManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An implementation of {@link SQLDelegationTokenSecretManager} that
* persists TokenIdentifiers and DelegationKeys in a SQL database.
* This implementation relies on the Datanucleus JDO PersistenceManager, which
* can be configured with datanucleus.* configuration properties.
*/
public class SQLDelegationTokenSecretManagerImpl
extends SQLDelegationTokenSecretManager<AbstractDelegationTokenIdentifier> {
private static final Logger LOG =
LoggerFactory.getLogger(SQLDelegationTokenSecretManagerImpl.class);
private static final String SEQ_NUM_COUNTER_FIELD = "sequenceNum";
private static final String SEQ_NUM_COUNTER_TABLE = "LastSequenceNum";
private static final String KEY_ID_COUNTER_FIELD = "keyId";
private static final String KEY_ID_COUNTER_TABLE = "LastDelegationKeyId";
private final SQLConnectionFactory connectionFactory;
private final DistributedSQLCounter sequenceNumCounter;
private final DistributedSQLCounter delegationKeyIdCounter;
private final SQLSecretManagerRetriableHandler retryHandler;
public SQLDelegationTokenSecretManagerImpl(Configuration conf) {
this(conf, new HikariDataSourceConnectionFactory(conf),
SQLSecretManagerRetriableHandlerImpl.getInstance(conf));
}
public SQLDelegationTokenSecretManagerImpl(Configuration conf,
SQLConnectionFactory connectionFactory, SQLSecretManagerRetriableHandler retryHandler) {
super(conf);
this.connectionFactory = connectionFactory;
this.sequenceNumCounter = new DistributedSQLCounter(SEQ_NUM_COUNTER_FIELD,
SEQ_NUM_COUNTER_TABLE, connectionFactory);
this.delegationKeyIdCounter = new DistributedSQLCounter(KEY_ID_COUNTER_FIELD,
KEY_ID_COUNTER_TABLE, connectionFactory);
this.retryHandler = retryHandler;
try {
super.startThreads();
} catch (IOException e) {
throw new RuntimeException("Error starting threads for MySQL secret manager", e);
}
LOG.info("MySQL delegation token secret manager instantiated");
}
@Override
public DelegationTokenIdentifier createIdentifier() {
return new DelegationTokenIdentifier();
}
@Override
public void stopThreads() {
super.stopThreads();
connectionFactory.shutdown();
}
@Override
protected void insertToken(int sequenceNum, byte[] tokenIdentifier, byte[] tokenInfo)
throws SQLException {
retryHandler.execute(() -> {
try (Connection connection = connectionFactory.getConnection(true);
PreparedStatement statement = connection.prepareStatement(
"INSERT INTO Tokens (sequenceNum, tokenIdentifier, tokenInfo) VALUES (?, ?, ?)")) {
statement.setInt(1, sequenceNum);
statement.setBytes(2, tokenIdentifier);
statement.setBytes(3, tokenInfo);
statement.execute();
}
});
}
@Override
protected void updateToken(int sequenceNum, byte[] tokenIdentifier, byte[] tokenInfo)
throws SQLException {
retryHandler.execute(() -> {
try (Connection connection = connectionFactory.getConnection(true);
PreparedStatement statement = connection.prepareStatement(
"UPDATE Tokens SET tokenInfo = ? WHERE sequenceNum = ? AND tokenIdentifier = ?")) {
statement.setBytes(1, tokenInfo);
statement.setInt(2, sequenceNum);
statement.setBytes(3, tokenIdentifier);
statement.execute();
}
});
}
@Override
protected void deleteToken(int sequenceNum, byte[] tokenIdentifier) throws SQLException {
retryHandler.execute(() -> {
try (Connection connection = connectionFactory.getConnection(true);
PreparedStatement statement = connection.prepareStatement(
"DELETE FROM Tokens WHERE sequenceNum = ? AND tokenIdentifier = ?")) {
statement.setInt(1, sequenceNum);
statement.setBytes(2, tokenIdentifier);
statement.execute();
}
});
}
@Override
protected byte[] selectTokenInfo(int sequenceNum, byte[] tokenIdentifier) throws SQLException {
return retryHandler.execute(() -> {
try (Connection connection = connectionFactory.getConnection();
PreparedStatement statement = connection.prepareStatement(
"SELECT tokenInfo FROM Tokens WHERE sequenceNum = ? AND tokenIdentifier = ?")) {
statement.setInt(1, sequenceNum);
statement.setBytes(2, tokenIdentifier);
try (ResultSet result = statement.executeQuery()) {
if (result.next()) {
return result.getBytes("tokenInfo");
}
}
}
return null;
});
}
@Override
protected void insertDelegationKey(int keyId, byte[] delegationKey) throws SQLException {
retryHandler.execute(() -> {
try (Connection connection = connectionFactory.getConnection(true);
PreparedStatement statement = connection.prepareStatement(
"INSERT INTO DelegationKeys (keyId, delegationKey) VALUES (?, ?)")) {
statement.setInt(1, keyId);
statement.setBytes(2, delegationKey);
statement.execute();
}
});
}
@Override
protected void updateDelegationKey(int keyId, byte[] delegationKey) throws SQLException {
retryHandler.execute(() -> {
try (Connection connection = connectionFactory.getConnection(true);
PreparedStatement statement = connection.prepareStatement(
"UPDATE DelegationKeys SET delegationKey = ? WHERE keyId = ?")) {
statement.setBytes(1, delegationKey);
statement.setInt(2, keyId);
statement.execute();
}
});
}
@Override
protected void deleteDelegationKey(int keyId) throws SQLException {
retryHandler.execute(() -> {
try (Connection connection = connectionFactory.getConnection(true);
PreparedStatement statement = connection.prepareStatement(
"DELETE FROM DelegationKeys WHERE keyId = ?")) {
statement.setInt(1, keyId);
statement.execute();
}
});
}
@Override
protected byte[] selectDelegationKey(int keyId) throws SQLException {
return retryHandler.execute(() -> {
try (Connection connection = connectionFactory.getConnection();
PreparedStatement statement = connection.prepareStatement(
"SELECT delegationKey FROM DelegationKeys WHERE keyId = ?")) {
statement.setInt(1, keyId);
try (ResultSet result = statement.executeQuery()) {
if (result.next()) {
return result.getBytes("delegationKey");
}
}
}
return null;
});
}
@Override
protected int selectSequenceNum() throws SQLException {
return retryHandler.execute(() -> sequenceNumCounter.selectCounterValue());
}
@Override
protected void updateSequenceNum(int value) throws SQLException {
retryHandler.execute(() -> sequenceNumCounter.updateCounterValue(value));
}
@Override
protected int incrementSequenceNum(int amount) throws SQLException {
return retryHandler.execute(() -> sequenceNumCounter.incrementCounterValue(amount));
}
@Override
protected int selectKeyId() throws SQLException {
return retryHandler.execute(delegationKeyIdCounter::selectCounterValue);
}
@Override
protected void updateKeyId(int value) throws SQLException {
retryHandler.execute(() -> delegationKeyIdCounter.updateCounterValue(value));
}
@Override
protected int incrementKeyId(int amount) throws SQLException {
return retryHandler.execute(() -> delegationKeyIdCounter.incrementCounterValue(amount));
}
@VisibleForTesting
protected SQLConnectionFactory getConnectionFactory() {
return connectionFactory;
}
}

View File

@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router.security.token;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.security.token.delegation.SQLDelegationTokenSecretManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Interface to handle retries when {@link SQLDelegationTokenSecretManagerImpl}
* throws expected errors.
*/
public interface SQLSecretManagerRetriableHandler {
void execute(SQLCommandVoid command) throws SQLException;
<T> T execute(SQLCommand<T> command) throws SQLException;
@FunctionalInterface
interface SQLCommandVoid {
void doCall() throws SQLException;
}
@FunctionalInterface
interface SQLCommand<T> {
T doCall() throws SQLException;
}
}
/**
* Implementation of {@link SQLSecretManagerRetriableHandler} that uses a
* {@link RetryProxy} to simplify the retryable operations.
*/
class SQLSecretManagerRetriableHandlerImpl implements SQLSecretManagerRetriableHandler {
public final static String MAX_RETRIES =
SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX + "max-retries";
public final static int MAX_RETRIES_DEFAULT = 0;
public final static String RETRY_SLEEP_TIME_MS =
SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX + "retry-sleep-time-ms";
public final static long RETRY_SLEEP_TIME_MS_DEFAULT = 100;
private static final Logger LOG =
LoggerFactory.getLogger(SQLSecretManagerRetriableHandlerImpl.class);
static SQLSecretManagerRetriableHandler getInstance(Configuration conf) {
return getInstance(conf, new SQLSecretManagerRetriableHandlerImpl());
}
static SQLSecretManagerRetriableHandler getInstance(Configuration conf,
SQLSecretManagerRetriableHandlerImpl retryHandler) {
RetryPolicy basePolicy = RetryPolicies.exponentialBackoffRetry(
conf.getInt(MAX_RETRIES, MAX_RETRIES_DEFAULT),
conf.getLong(RETRY_SLEEP_TIME_MS, RETRY_SLEEP_TIME_MS_DEFAULT),
TimeUnit.MILLISECONDS);
// Configure SQLSecretManagerRetriableException to retry with exponential backoff
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = new HashMap<>();
exceptionToPolicyMap.put(SQLSecretManagerRetriableException.class, basePolicy);
// Configure all other exceptions to fail after one attempt
RetryPolicy retryPolicy = RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
return (SQLSecretManagerRetriableHandler) RetryProxy.create(
SQLSecretManagerRetriableHandler.class, retryHandler, retryPolicy);
}
/**
* Executes a SQL command and raises retryable errors as
* {@link SQLSecretManagerRetriableException}s so they are recognized by the
* {@link RetryProxy}.
* @param command SQL command to execute
* @throws SQLException When SQL connection errors occur
*/
@Override
public void execute(SQLCommandVoid command) throws SQLException {
try {
command.doCall();
} catch (SQLException e) {
LOG.warn("Failed to execute SQL command", e);
throw new SQLSecretManagerRetriableException(e);
}
}
/**
* Executes a SQL command and raises retryable errors as
* {@link SQLSecretManagerRetriableException}s so they are recognized by the
* {@link RetryProxy}.
* @param command SQL command to execute
* @throws SQLException When SQL connection errors occur
*/
@Override
public <T> T execute(SQLCommand<T> command) throws SQLException {
try {
return command.doCall();
} catch (SQLException e) {
LOG.warn("Failed to execute SQL command", e);
throw new SQLSecretManagerRetriableException(e);
}
}
/**
* Class used to identify errors that can be retried.
*/
static class SQLSecretManagerRetriableException extends SQLException {
SQLSecretManagerRetriableException(Throwable cause) {
super(cause);
}
}
}

View File

@ -39,6 +39,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
@ -218,6 +219,14 @@ public class TestRouterRpc {
cluster.setIndependentDNs();
Configuration conf = new Configuration();
// Setup proxy users.
conf.set("hadoop.proxyuser.testRealUser.groups", "*");
conf.set("hadoop.proxyuser.testRealUser.hosts", "*");
String loginUser = UserGroupInformation.getLoginUser().getUserName();
conf.set(String.format("hadoop.proxyuser.%s.groups", loginUser), "*");
conf.set(String.format("hadoop.proxyuser.%s.hosts", loginUser), "*");
// Enable IP proxy users.
conf.set(DFSConfigKeys.DFS_NAMENODE_IP_PROXY_USERS, "placeholder");
conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 5);
cluster.addNamenodeOverrides(conf);
// Start NNs and DNs and wait until ready
@ -2077,6 +2086,38 @@ public class TestRouterRpc {
assertTrue(verifyFileExists(routerFS, dirPath));
}
@Test
public void testRealUserPropagationInCallerContext()
throws IOException, InterruptedException {
GenericTestUtils.LogCapturer auditlog =
GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.AUDIT_LOG);
// Current callerContext is null
assertNull(CallerContext.getCurrent());
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
UserGroupInformation realUser = UserGroupInformation
.createUserForTesting("testRealUser", new String[]{"group"});
UserGroupInformation proxyUser = UserGroupInformation
.createProxyUser("testProxyUser", realUser);
FileSystem proxyFs = proxyUser.doAs(
(PrivilegedExceptionAction<FileSystem>) () -> router.getFileSystem());
proxyFs.listStatus(new Path("/"));
final String logOutput = auditlog.getOutput();
// Login user, which is used as the router's user, is different from the realUser.
assertNotEquals(loginUser.getUserName(), realUser.getUserName());
// Login user is used in the audit log's ugi field.
assertTrue("The login user is the proxyUser in the UGI field",
logOutput.contains(String.format("ugi=%s (auth:PROXY) via %s (auth:SIMPLE)",
proxyUser.getUserName(),
loginUser.getUserName())));
// Real user is added to the caller context.
assertTrue("The audit log should contain the real user.",
logOutput.contains(String.format("realUser:%s", realUser.getUserName())));
}
@Test
public void testSetBalancerBandwidth() throws Exception {
long defaultBandwidth =

View File

@ -0,0 +1,471 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router.security.token;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestSQLDelegationTokenSecretManagerImpl {
private static final String CONNECTION_URL = "jdbc:derby:memory:TokenStore";
private static final int TEST_MAX_RETRIES = 3;
private static Configuration conf;
@Before
public void init() throws SQLException {
createTestDBTables();
}
@After
public void cleanup() throws SQLException {
dropTestDBTables();
}
@BeforeClass
public static void initDatabase() throws SQLException {
DriverManager.getConnection(CONNECTION_URL + ";create=true");
conf = new Configuration();
conf.set(SQLConnectionFactory.CONNECTION_URL, CONNECTION_URL);
conf.set(SQLConnectionFactory.CONNECTION_USERNAME, "testuser");
conf.set(SQLConnectionFactory.CONNECTION_PASSWORD, "testpassword");
conf.set(SQLConnectionFactory.CONNECTION_DRIVER, "org.apache.derby.jdbc.EmbeddedDriver");
conf.setInt(SQLSecretManagerRetriableHandlerImpl.MAX_RETRIES, TEST_MAX_RETRIES);
conf.setInt(SQLSecretManagerRetriableHandlerImpl.RETRY_SLEEP_TIME_MS, 10);
}
@AfterClass
public static void cleanupDatabase() {
try {
DriverManager.getConnection(CONNECTION_URL + ";drop=true");
} catch (SQLException e) {
// SQLException expected when database is dropped
if (!e.getMessage().contains("dropped")) {
throw new RuntimeException(e);
}
}
}
@Test
public void testSingleSecretManager() throws Exception {
DelegationTokenManager tokenManager = createTokenManager();
try {
Token<? extends AbstractDelegationTokenIdentifier> token =
tokenManager.createToken(UserGroupInformation.getCurrentUser(), "foo");
validateToken(tokenManager, token);
} finally {
stopTokenManager(tokenManager);
}
}
@Test
public void testMultipleSecretManagers() throws Exception {
DelegationTokenManager tokenManager1 = createTokenManager();
DelegationTokenManager tokenManager2 = createTokenManager();
try {
Token<? extends AbstractDelegationTokenIdentifier> token1 =
tokenManager1.createToken(UserGroupInformation.getCurrentUser(), "foo");
Token<? extends AbstractDelegationTokenIdentifier> token2 =
tokenManager2.createToken(UserGroupInformation.getCurrentUser(), "foo");
validateToken(tokenManager1, token2);
validateToken(tokenManager2, token1);
} finally {
stopTokenManager(tokenManager1);
stopTokenManager(tokenManager2);
}
}
@Test
public void testSequenceNumAllocation() throws Exception {
int tokensPerManager = SQLDelegationTokenSecretManagerImpl.DEFAULT_SEQ_NUM_BATCH_SIZE * 5;
Set<Integer> sequenceNums1 = new HashSet<>();
Set<Integer> sequenceNums2 = new HashSet<>();
Set<Integer> sequenceNums3 = new HashSet<>();
Set<Integer> sequenceNums = new HashSet<>();
DelegationTokenManager tokenManager1 = createTokenManager();
DelegationTokenManager tokenManager2 = createTokenManager();
DelegationTokenManager tokenManager3 = createTokenManager();
try {
for (int i = 0; i < tokensPerManager; i++) {
allocateSequenceNum(tokenManager1, sequenceNums1);
allocateSequenceNum(tokenManager2, sequenceNums2);
allocateSequenceNum(tokenManager3, sequenceNums3);
sequenceNums.addAll(sequenceNums1);
sequenceNums.addAll(sequenceNums2);
sequenceNums.addAll(sequenceNums3);
}
Assert.assertEquals("Verify that all tokens were created with unique sequence numbers",
tokensPerManager * 3, sequenceNums.size());
Assert.assertEquals("Verify that tokenManager1 generated unique sequence numbers",
tokensPerManager, sequenceNums1.size());
Assert.assertEquals("Verify that tokenManager2 generated unique sequence number",
tokensPerManager, sequenceNums2.size());
Assert.assertEquals("Verify that tokenManager3 generated unique sequence numbers",
tokensPerManager, sequenceNums3.size());
// Validate sequence number batches allocated in order to each token manager
int batchSize = SQLDelegationTokenSecretManagerImpl.DEFAULT_SEQ_NUM_BATCH_SIZE;
for (int seqNum = 1; seqNum < tokensPerManager;) {
// First batch allocated tokenManager1
for (int i = 0; i < batchSize; i++, seqNum++) {
Assert.assertTrue(sequenceNums1.contains(seqNum));
}
// Second batch allocated tokenManager2
for (int i = 0; i < batchSize; i++, seqNum++) {
Assert.assertTrue(sequenceNums2.contains(seqNum));
}
// Third batch allocated tokenManager3
for (int i = 0; i < batchSize; i++, seqNum++) {
Assert.assertTrue(sequenceNums3.contains(seqNum));
}
}
SQLDelegationTokenSecretManagerImpl secretManager =
(SQLDelegationTokenSecretManagerImpl) tokenManager1.getDelegationTokenSecretManager();
Assert.assertEquals("Verify that the counter is set to the highest sequence number",
tokensPerManager * 3, secretManager.getDelegationTokenSeqNum());
} finally {
stopTokenManager(tokenManager1);
stopTokenManager(tokenManager2);
stopTokenManager(tokenManager3);
}
}
@Test
public void testSequenceNumRollover() throws Exception {
int tokenBatch = SQLDelegationTokenSecretManagerImpl.DEFAULT_SEQ_NUM_BATCH_SIZE;
Set<Integer> sequenceNums = new HashSet<>();
DelegationTokenManager tokenManager = createTokenManager();
try {
SQLDelegationTokenSecretManagerImpl secretManager =
(SQLDelegationTokenSecretManagerImpl) tokenManager.getDelegationTokenSecretManager();
secretManager.setDelegationTokenSeqNum(Integer.MAX_VALUE - tokenBatch);
// Allocate sequence numbers before they are rolled over
for (int seqNum = Integer.MAX_VALUE - tokenBatch; seqNum < Integer.MAX_VALUE; seqNum++) {
allocateSequenceNum(tokenManager, sequenceNums);
Assert.assertTrue(sequenceNums.contains(seqNum + 1));
}
// Allocate sequence numbers after they are rolled over
for (int seqNum = 0; seqNum < tokenBatch; seqNum++) {
allocateSequenceNum(tokenManager, sequenceNums);
Assert.assertTrue(sequenceNums.contains(seqNum + 1));
}
} finally {
stopTokenManager(tokenManager);
}
}
@Test
public void testDelegationKeyAllocation() throws Exception {
DelegationTokenManager tokenManager1 = createTokenManager();
try {
SQLDelegationTokenSecretManagerImpl secretManager1 =
(SQLDelegationTokenSecretManagerImpl) tokenManager1.getDelegationTokenSecretManager();
// Prevent delegation keys to roll for the rest of the test to avoid race conditions
// between the keys generated and the active keys in the database.
((TestDelegationTokenSecretManager) secretManager1).lockKeyRoll();
int keyId1 = secretManager1.getCurrentKeyId();
// Validate that latest key1 is assigned to tokenManager1 tokens
Token<? extends AbstractDelegationTokenIdentifier> token1 =
tokenManager1.createToken(UserGroupInformation.getCurrentUser(), "foo");
validateKeyId(token1, keyId1);
DelegationTokenManager tokenManager2 = createTokenManager();
try {
SQLDelegationTokenSecretManagerImpl secretManager2 =
(SQLDelegationTokenSecretManagerImpl) tokenManager2.getDelegationTokenSecretManager();
// Prevent delegation keys to roll for the rest of the test
((TestDelegationTokenSecretManager) secretManager2).lockKeyRoll();
int keyId2 = secretManager2.getCurrentKeyId();
Assert.assertNotEquals("Each secret manager has its own key", keyId1, keyId2);
// Validate that latest key2 is assigned to tokenManager2 tokens
Token<? extends AbstractDelegationTokenIdentifier> token2 =
tokenManager2.createToken(UserGroupInformation.getCurrentUser(), "foo");
validateKeyId(token2, keyId2);
// Validate that key1 is still assigned to tokenManager1 tokens
token1 = tokenManager1.createToken(UserGroupInformation.getCurrentUser(), "foo");
validateKeyId(token1, keyId1);
// Validate that key2 is still assigned to tokenManager2 tokens
token2 = tokenManager2.createToken(UserGroupInformation.getCurrentUser(), "foo");
validateKeyId(token2, keyId2);
} finally {
stopTokenManager(tokenManager2);
}
} finally {
stopTokenManager(tokenManager1);
}
}
@Test
public void testHikariConfigs() {
HikariDataSourceConnectionFactory factory1 = new HikariDataSourceConnectionFactory(conf);
int defaultMaximumPoolSize = factory1.getDataSource().getMaximumPoolSize();
factory1.shutdown();
// Changing default maximumPoolSize
Configuration hikariConf = new Configuration(conf);
hikariConf.setInt(HikariDataSourceConnectionFactory.HIKARI_PROPS + "maximumPoolSize",
defaultMaximumPoolSize + 1);
// Verifying property is correctly set in datasource
HikariDataSourceConnectionFactory factory2 = new HikariDataSourceConnectionFactory(hikariConf);
Assert.assertEquals(factory2.getDataSource().getMaximumPoolSize(),
defaultMaximumPoolSize + 1);
factory2.shutdown();
}
@Test
public void testRetries() throws Exception {
DelegationTokenManager tokenManager = createTokenManager();
TestDelegationTokenSecretManager secretManager =
(TestDelegationTokenSecretManager) tokenManager.getDelegationTokenSecretManager();
try {
// Prevent delegation keys to roll for the rest of the test
secretManager.lockKeyRoll();
// Reset counter and expect a single request when inserting a token
TestRetryHandler.resetExecutionAttemptCounter();
tokenManager.createToken(UserGroupInformation.getCurrentUser(), "foo");
Assert.assertEquals(1, TestRetryHandler.getExecutionAttempts());
// Breaking database connections to cause retries
secretManager.setReadOnly(true);
// Reset counter and expect a multiple retries when failing to insert a token
TestRetryHandler.resetExecutionAttemptCounter();
tokenManager.createToken(UserGroupInformation.getCurrentUser(), "foo");
Assert.assertEquals(TEST_MAX_RETRIES + 1, TestRetryHandler.getExecutionAttempts());
} finally {
// Fix database connections
secretManager.setReadOnly(false);
stopTokenManager(tokenManager);
}
}
private DelegationTokenManager createTokenManager() {
DelegationTokenManager tokenManager = new DelegationTokenManager(new Configuration(), null);
tokenManager.setExternalDelegationTokenSecretManager(new TestDelegationTokenSecretManager());
return tokenManager;
}
private void allocateSequenceNum(DelegationTokenManager tokenManager, Set<Integer> sequenceNums)
throws IOException {
Token<? extends AbstractDelegationTokenIdentifier> token =
tokenManager.createToken(UserGroupInformation.getCurrentUser(), "foo");
AbstractDelegationTokenIdentifier tokenIdentifier = token.decodeIdentifier();
Assert.assertFalse("Verify sequence number is unique",
sequenceNums.contains(tokenIdentifier.getSequenceNumber()));
sequenceNums.add(tokenIdentifier.getSequenceNumber());
}
private void validateToken(DelegationTokenManager tokenManager,
Token<? extends AbstractDelegationTokenIdentifier> token)
throws Exception {
SQLDelegationTokenSecretManagerImpl secretManager =
(SQLDelegationTokenSecretManagerImpl) tokenManager.getDelegationTokenSecretManager();
AbstractDelegationTokenIdentifier tokenIdentifier = token.decodeIdentifier();
// Verify token using token manager
tokenManager.verifyToken(token);
byte[] tokenInfo1 = secretManager.selectTokenInfo(tokenIdentifier.getSequenceNumber(),
tokenIdentifier.getBytes());
Assert.assertNotNull("Verify token exists in database", tokenInfo1);
// Renew token using token manager
tokenManager.renewToken(token, "foo");
byte[] tokenInfo2 = secretManager.selectTokenInfo(tokenIdentifier.getSequenceNumber(),
tokenIdentifier.getBytes());
Assert.assertNotNull("Verify token exists in database", tokenInfo2);
Assert.assertFalse("Verify token has been updated in database",
Arrays.equals(tokenInfo1, tokenInfo2));
// Cancel token using token manager
tokenManager.cancelToken(token, "foo");
byte[] tokenInfo3 = secretManager.selectTokenInfo(tokenIdentifier.getSequenceNumber(),
tokenIdentifier.getBytes());
Assert.assertNull("Verify token was removed from database", tokenInfo3);
}
private void validateKeyId(Token<? extends AbstractDelegationTokenIdentifier> token,
int expectedKeyiD) throws IOException {
AbstractDelegationTokenIdentifier tokenIdentifier = token.decodeIdentifier();
Assert.assertEquals("Verify that keyId is assigned to token",
tokenIdentifier.getMasterKeyId(), expectedKeyiD);
}
private static Connection getTestDBConnection() {
try {
return DriverManager.getConnection(CONNECTION_URL);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static void createTestDBTables() throws SQLException {
execute("CREATE TABLE Tokens (sequenceNum INT NOT NULL, "
+ "tokenIdentifier VARCHAR (255) FOR BIT DATA NOT NULL, "
+ "tokenInfo VARCHAR (255) FOR BIT DATA NOT NULL, "
+ "modifiedTime TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, "
+ "PRIMARY KEY(sequenceNum))");
execute("CREATE TABLE DelegationKeys (keyId INT NOT NULL, "
+ "delegationKey VARCHAR (255) FOR BIT DATA NOT NULL, "
+ "modifiedTime TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, "
+ "PRIMARY KEY(keyId))");
execute("CREATE TABLE LastSequenceNum (sequenceNum INT NOT NULL)");
execute("INSERT INTO LastSequenceNum VALUES (0)");
execute("CREATE TABLE LastDelegationKeyId (keyId INT NOT NULL)");
execute("INSERT INTO LastDelegationKeyId VALUES (0)");
}
private static void dropTestDBTables() throws SQLException {
execute("DROP TABLE Tokens");
execute("DROP TABLE DelegationKeys");
execute("DROP TABLE LastSequenceNum");
execute("DROP TABLE LastDelegationKeyId");
}
private static void execute(String statement) throws SQLException {
try (Connection connection = getTestDBConnection()) {
connection.createStatement().execute(statement);
}
}
private void stopTokenManager(DelegationTokenManager tokenManager) {
TestDelegationTokenSecretManager secretManager =
(TestDelegationTokenSecretManager) tokenManager.getDelegationTokenSecretManager();
// Release any locks on tables
secretManager.unlockKeyRoll();
// Stop threads to close database connections
secretManager.stopThreads();
}
static class TestDelegationTokenSecretManager extends SQLDelegationTokenSecretManagerImpl {
private ReentrantLock keyRollLock;
private synchronized ReentrantLock getKeyRollLock() {
if (keyRollLock == null) {
keyRollLock = new ReentrantLock();
}
return keyRollLock;
}
TestDelegationTokenSecretManager() {
super(conf, new TestConnectionFactory(conf),
SQLSecretManagerRetriableHandlerImpl.getInstance(conf, new TestRetryHandler()));
}
// Tests can call this method to prevent delegation keys from
// being rolled in the middle of a test to prevent race conditions
public void lockKeyRoll() {
getKeyRollLock().lock();
}
public void unlockKeyRoll() {
if (getKeyRollLock().isHeldByCurrentThread()) {
getKeyRollLock().unlock();
}
}
@Override
protected void rollMasterKey() throws IOException {
try {
lockKeyRoll();
super.rollMasterKey();
} finally {
unlockKeyRoll();
}
}
public void setReadOnly(boolean readOnly) {
((TestConnectionFactory) getConnectionFactory()).readOnly = readOnly;
}
}
static class TestConnectionFactory extends HikariDataSourceConnectionFactory {
private boolean readOnly = false;
TestConnectionFactory(Configuration conf) {
super(conf);
}
@Override
public Connection getConnection() throws SQLException {
Connection connection = super.getConnection();
// Change to default schema as derby driver looks for user schema
connection.setSchema("APP");
connection.setReadOnly(readOnly);
return connection;
}
}
static class TestRetryHandler extends SQLSecretManagerRetriableHandlerImpl {
// Tracks the amount of times that a SQL command is executed, regardless of
// whether it completed successfully or not.
private static AtomicInteger executionAttemptCounter = new AtomicInteger();
static void resetExecutionAttemptCounter() {
executionAttemptCounter = new AtomicInteger();
}
static int getExecutionAttempts() {
return executionAttemptCounter.get();
}
@Override
public void execute(SQLCommandVoid command) throws SQLException {
executionAttemptCounter.incrementAndGet();
super.execute(command);
}
}
}

View File

@ -129,6 +129,8 @@
<jcache.version>1.0-alpha-1</jcache.version>
<ehcache.version>3.3.1</ehcache.version>
<hikari.version>4.0.3</hikari.version>
<derby.version>10.10.2.0</derby.version>
<mysql-connector-java.version>8.0.29</mysql-connector-java.version>
<mssql.version>6.2.1.jre7</mssql.version>
<okhttp3.version>4.10.0</okhttp3.version>
<okio.version>3.2.0</okio.version>
@ -1809,6 +1811,16 @@
<artifactId>HikariCP</artifactId>
<version>${hikari.version}</version>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>${derby.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql-connector-java.version}</version>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>

View File

@ -62,7 +62,10 @@ possibly in parallel, with results potentially coming in out-of-order.
Benchmarking of enhanced Apache ORC and Apache Parquet clients through `file://` and `s3a://`
show significant improvements in query performance.
Further Reading: [FsDataInputStream](./hadoop-project-dist/hadoop-common/filesystem/fsdatainputstream.html).
Further Reading:
* [FsDataInputStream](./hadoop-project-dist/hadoop-common/filesystem/fsdatainputstream.html).
* [Hadoop Vectored IO: Your Data Just Got Faster!](https://apachecon.com/acasia2022/sessions/bigdata-1148.html)
Apachecon 2022 talk.
Mapreduce: Manifest Committer for Azure ABFS and google GCS
----------------------------------------------------------
@ -88,14 +91,6 @@ More details are available in the
documentation.
HDFS: Router Based Federation
-----------------------------
A lot of effort has been invested into stabilizing/improving the HDFS Router Based Federation feature.
1. HDFS-13522, HDFS-16767 & Related Jiras: Allow Observer Reads in HDFS Router Based Federation.
2. HDFS-13248: RBF supports Client Locality
HDFS: Dynamic Datanode Reconfiguration
--------------------------------------

View File

@ -156,10 +156,7 @@ public class CopyCommitter extends FileOutputCommitter {
final boolean directWrite = conf.getBoolean(
DistCpOptionSwitch.DIRECT_WRITE.getConfigLabel(), false);
final boolean append = conf.getBoolean(
DistCpOptionSwitch.APPEND.getConfigLabel(), false);
final boolean useTempTarget = !append && !directWrite;
if (!useTempTarget) {
if (directWrite) {
return;
}

View File

@ -586,13 +586,11 @@ public class TestCopyCommitter {
@Test
public void testCommitWithCleanupTempFiles() throws IOException {
testCommitWithCleanup(true, false);
testCommitWithCleanup(false, true);
testCommitWithCleanup(true, true);
testCommitWithCleanup(false, false);
testCommitWithCleanup(true);
testCommitWithCleanup(false);
}
private void testCommitWithCleanup(boolean append, boolean directWrite)throws IOException {
private void testCommitWithCleanup(boolean directWrite) throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
JobID jobID = taskAttemptContext.getTaskAttemptID().getJobID();
JobContext jobContext = new JobContextImpl(
@ -611,7 +609,7 @@ public class TestCopyCommitter {
DistCpOptions options = new DistCpOptions.Builder(
Collections.singletonList(new Path(sourceBase)),
new Path("/out"))
.withAppend(append)
.withAppend(true)
.withSyncFolder(true)
.withDirectWrite(directWrite)
.build();
@ -631,7 +629,7 @@ public class TestCopyCommitter {
null, taskAttemptContext);
committer.commitJob(jobContext);
if (append || directWrite) {
if (directWrite) {
ContractTestUtils.assertPathExists(fs, "Temp files should not be cleanup with append or direct option",
tempFilePath);
} else {

View File

@ -60,7 +60,21 @@ public interface FederationStateStore extends
* Load the version information from the federation state store.
*
* @return the {@link Version} of the federation state store
* @throws Exception an exception occurred in load version.
*/
Version loadVersion();
Version loadVersion() throws Exception;
/**
* Store the Version information in federation state store.
*
* @throws Exception an exception occurred in store version.
*/
void storeVersion() throws Exception;
/**
* Check the version of federation stateStore.
*
* @throws Exception an exception occurred in check version.
*/
void checkVersion() throws Exception;
}

View File

@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.Comparator;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.DelegationKey;
@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
@ -107,7 +109,7 @@ import static org.apache.hadoop.yarn.server.federation.store.utils.FederationSta
public class MemoryFederationStateStore implements FederationStateStore {
private Map<SubClusterId, SubClusterInfo> membership;
private Map<ApplicationId, SubClusterId> applications;
private Map<ApplicationId, ApplicationHomeSubCluster> applications;
private Map<ReservationId, SubClusterId> reservations;
private Map<String, SubClusterPolicyConfiguration> policies;
private RouterRMDTSecretManagerState routerRMSecretManagerState;
@ -122,10 +124,10 @@ public class MemoryFederationStateStore implements FederationStateStore {
@Override
public void init(Configuration conf) {
membership = new ConcurrentHashMap<SubClusterId, SubClusterInfo>();
applications = new ConcurrentHashMap<ApplicationId, SubClusterId>();
reservations = new ConcurrentHashMap<ReservationId, SubClusterId>();
policies = new ConcurrentHashMap<String, SubClusterPolicyConfiguration>();
membership = new ConcurrentHashMap<>();
applications = new ConcurrentHashMap<>();
reservations = new ConcurrentHashMap<>();
policies = new ConcurrentHashMap<>();
routerRMSecretManagerState = new RouterRMDTSecretManagerState();
maxAppsInStateStore = conf.getInt(
YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS,
@ -143,14 +145,15 @@ public class MemoryFederationStateStore implements FederationStateStore {
}
@Override
public SubClusterRegisterResponse registerSubCluster(
SubClusterRegisterRequest request) throws YarnException {
public SubClusterRegisterResponse registerSubCluster(SubClusterRegisterRequest request)
throws YarnException {
long startTime = clock.getTime();
FederationMembershipStateStoreInputValidator.validate(request);
SubClusterInfo subClusterInfo = request.getSubClusterInfo();
long currentTime =
Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
SubClusterInfo subClusterInfoToSave =
SubClusterInfo.newInstance(subClusterInfo.getSubClusterId(),
subClusterInfo.getAMRMServiceAddress(),
@ -161,18 +164,21 @@ public class MemoryFederationStateStore implements FederationStateStore {
subClusterInfo.getCapability());
membership.put(subClusterInfo.getSubClusterId(), subClusterInfoToSave);
long stopTime = clock.getTime();
FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
return SubClusterRegisterResponse.newInstance();
}
@Override
public SubClusterDeregisterResponse deregisterSubCluster(
SubClusterDeregisterRequest request) throws YarnException {
public SubClusterDeregisterResponse deregisterSubCluster(SubClusterDeregisterRequest request)
throws YarnException {
FederationMembershipStateStoreInputValidator.validate(request);
SubClusterInfo subClusterInfo = membership.get(request.getSubClusterId());
if (subClusterInfo == null) {
String errMsg =
"SubCluster " + request.getSubClusterId().toString() + " not found";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
FederationStateStoreUtils.logAndThrowStoreException(
LOG, "SubCluster %s not found", request.getSubClusterId());
} else {
subClusterInfo.setState(request.getState());
}
@ -181,17 +187,16 @@ public class MemoryFederationStateStore implements FederationStateStore {
}
@Override
public SubClusterHeartbeatResponse subClusterHeartbeat(
SubClusterHeartbeatRequest request) throws YarnException {
public SubClusterHeartbeatResponse subClusterHeartbeat(SubClusterHeartbeatRequest request)
throws YarnException {
FederationMembershipStateStoreInputValidator.validate(request);
SubClusterId subClusterId = request.getSubClusterId();
SubClusterInfo subClusterInfo = membership.get(subClusterId);
if (subClusterInfo == null) {
String errMsg = "SubCluster " + subClusterId.toString()
+ " does not exist; cannot heartbeat";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
FederationStateStoreUtils.logAndThrowStoreException(
LOG, "SubCluster %s does not exist; cannot heartbeat.", request.getSubClusterId());
}
long currentTime =
@ -205,11 +210,12 @@ public class MemoryFederationStateStore implements FederationStateStore {
}
@Override
public GetSubClusterInfoResponse getSubCluster(
GetSubClusterInfoRequest request) throws YarnException {
public GetSubClusterInfoResponse getSubCluster(GetSubClusterInfoRequest request)
throws YarnException {
FederationMembershipStateStoreInputValidator.validate(request);
SubClusterId subClusterId = request.getSubClusterId();
if (!membership.containsKey(subClusterId)) {
LOG.warn("The queried SubCluster: {} does not exist.", subClusterId);
return null;
@ -219,16 +225,17 @@ public class MemoryFederationStateStore implements FederationStateStore {
}
@Override
public GetSubClustersInfoResponse getSubClusters(
GetSubClustersInfoRequest request) throws YarnException {
List<SubClusterInfo> result = new ArrayList<SubClusterInfo>();
public GetSubClustersInfoResponse getSubClusters(GetSubClustersInfoRequest request)
throws YarnException {
List<SubClusterInfo> result = new ArrayList<>();
for (SubClusterInfo info : membership.values()) {
if (!request.getFilterInactiveSubClusters()
|| info.getState().isActive()) {
if (!request.getFilterInactiveSubClusters() || info.getState().isActive()) {
result.add(info);
}
}
return GetSubClustersInfoResponse.newInstance(result);
}
@ -239,16 +246,16 @@ public class MemoryFederationStateStore implements FederationStateStore {
AddApplicationHomeSubClusterRequest request) throws YarnException {
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
ApplicationId appId =
request.getApplicationHomeSubCluster().getApplicationId();
ApplicationHomeSubCluster homeSubCluster = request.getApplicationHomeSubCluster();
ApplicationId appId = homeSubCluster.getApplicationId();
if (!applications.containsKey(appId)) {
applications.put(appId,
request.getApplicationHomeSubCluster().getHomeSubCluster());
applications.put(appId, homeSubCluster);
}
return AddApplicationHomeSubClusterResponse
.newInstance(applications.get(appId));
ApplicationHomeSubCluster respHomeSubCluster = applications.get(appId);
return AddApplicationHomeSubClusterResponse.newInstance(respHomeSubCluster.getHomeSubCluster());
}
@Override
@ -256,15 +263,16 @@ public class MemoryFederationStateStore implements FederationStateStore {
UpdateApplicationHomeSubClusterRequest request) throws YarnException {
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
ApplicationId appId =
request.getApplicationHomeSubCluster().getApplicationId();
if (!applications.containsKey(appId)) {
String errMsg = "Application " + appId + " does not exist";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"Application %s does not exist.", appId);
}
applications.put(appId,
request.getApplicationHomeSubCluster().getHomeSubCluster());
applications.put(appId, request.getApplicationHomeSubCluster());
return UpdateApplicationHomeSubClusterResponse.newInstance();
}
@ -275,11 +283,12 @@ public class MemoryFederationStateStore implements FederationStateStore {
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
ApplicationId appId = request.getApplicationId();
if (!applications.containsKey(appId)) {
String errMsg = "Application " + appId + " does not exist";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"Application %s does not exist.", appId);
}
return GetApplicationHomeSubClusterResponse.newInstance(appId, applications.get(appId));
return GetApplicationHomeSubClusterResponse.newInstance(appId,
applications.get(appId).getHomeSubCluster());
}
@Override
@ -303,7 +312,7 @@ public class MemoryFederationStateStore implements FederationStateStore {
}
private ApplicationHomeSubCluster generateAppHomeSC(ApplicationId applicationId) {
SubClusterId subClusterId = applications.get(applicationId);
SubClusterId subClusterId = applications.get(applicationId).getHomeSubCluster();
return ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
}
@ -314,8 +323,8 @@ public class MemoryFederationStateStore implements FederationStateStore {
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
ApplicationId appId = request.getApplicationId();
if (!applications.containsKey(appId)) {
String errMsg = "Application " + appId + " does not exist";
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"Application %s does not exist.", appId);
}
applications.remove(appId);
@ -329,12 +338,11 @@ public class MemoryFederationStateStore implements FederationStateStore {
FederationPolicyStoreInputValidator.validate(request);
String queue = request.getQueue();
if (!policies.containsKey(queue)) {
LOG.warn("Policy for queue: {} does not exist.", queue);
LOG.warn("Policy for queue : {} does not exist.", queue);
return null;
}
return GetSubClusterPolicyConfigurationResponse
.newInstance(policies.get(queue));
return GetSubClusterPolicyConfigurationResponse.newInstance(policies.get(queue));
}
@Override
@ -350,8 +358,7 @@ public class MemoryFederationStateStore implements FederationStateStore {
@Override
public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
GetSubClusterPoliciesConfigurationsRequest request) throws YarnException {
ArrayList<SubClusterPolicyConfiguration> result =
new ArrayList<SubClusterPolicyConfiguration>();
ArrayList<SubClusterPolicyConfiguration> result = new ArrayList<>();
for (SubClusterPolicyConfiguration policy : policies.values()) {
result.add(policy);
}
@ -360,12 +367,22 @@ public class MemoryFederationStateStore implements FederationStateStore {
@Override
public Version getCurrentVersion() {
return null;
throw new NotImplementedException("Code is not implemented");
}
@Override
public Version loadVersion() {
return null;
public Version loadVersion() throws Exception {
throw new NotImplementedException("Code is not implemented");
}
@Override
public void storeVersion() throws Exception {
throw new NotImplementedException("Code is not implemented");
}
@Override
public void checkVersion() throws Exception {
throw new NotImplementedException("Code is not implemented");
}
@Override
@ -386,7 +403,8 @@ public class MemoryFederationStateStore implements FederationStateStore {
FederationReservationHomeSubClusterStoreInputValidator.validate(request);
ReservationId reservationId = request.getReservationId();
if (!reservations.containsKey(reservationId)) {
throw new YarnException("Reservation " + reservationId + " does not exist");
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"Reservation %s does not exist.", reservationId);
}
SubClusterId subClusterId = reservations.get(reservationId);
ReservationHomeSubCluster homeSubCluster =
@ -417,7 +435,8 @@ public class MemoryFederationStateStore implements FederationStateStore {
ReservationId reservationId = request.getReservationHomeSubCluster().getReservationId();
if (!reservations.containsKey(reservationId)) {
throw new YarnException("Reservation " + reservationId + " does not exist.");
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"Reservation %s does not exist.", reservationId);
}
SubClusterId subClusterId = request.getReservationHomeSubCluster().getHomeSubCluster();
@ -431,7 +450,8 @@ public class MemoryFederationStateStore implements FederationStateStore {
FederationReservationHomeSubClusterStoreInputValidator.validate(request);
ReservationId reservationId = request.getReservationId();
if (!reservations.containsKey(reservationId)) {
throw new YarnException("Reservation " + reservationId + " does not exist");
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"Reservation %s does not exist.", reservationId);
}
reservations.remove(reservationId);
return DeleteReservationHomeSubClusterResponse.newInstance();
@ -446,9 +466,8 @@ public class MemoryFederationStateStore implements FederationStateStore {
Set<DelegationKey> rmDTMasterKeyState = routerRMSecretManagerState.getMasterKeyState();
if (rmDTMasterKeyState.contains(delegationKey)) {
LOG.info("Error storing info for RMDTMasterKey with keyID: {}.", delegationKey.getKeyId());
throw new IOException("RMDTMasterKey with keyID: " + delegationKey.getKeyId() +
" is already stored");
FederationStateStoreUtils.logAndThrowStoreException(LOG,
"Error storing info for RMDTMasterKey with keyID: %s.", delegationKey.getKeyId());
}
routerRMSecretManagerState.getMasterKeyState().add(delegationKey);

View File

@ -997,7 +997,17 @@ public class SQLFederationStateStore implements FederationStateStore {
}
@Override
public Version loadVersion() {
public Version loadVersion() throws Exception {
throw new NotImplementedException("Code is not implemented");
}
@Override
public void storeVersion() throws Exception {
throw new NotImplementedException("Code is not implemented");
}
@Override
public void checkVersion() throws Exception {
throw new NotImplementedException("Code is not implemented");
}

View File

@ -30,6 +30,7 @@ import java.util.TimeZone;
import java.util.Comparator;
import java.util.stream.Collectors;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.curator.framework.recipes.shared.VersionedValue;
import org.apache.hadoop.classification.VisibleForTesting;
@ -642,12 +643,22 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
@Override
public Version getCurrentVersion() {
return null;
throw new NotImplementedException("Code is not implemented");
}
@Override
public Version loadVersion() {
return null;
public Version loadVersion() throws Exception {
throw new NotImplementedException("Code is not implemented");
}
@Override
public void storeVersion() throws Exception {
throw new NotImplementedException("Code is not implemented");
}
@Override
public void checkVersion() throws Exception {
throw new NotImplementedException("Code is not implemented");
}
/**

View File

@ -26,6 +26,7 @@ import java.util.Set;
import java.util.HashSet;
import java.util.TimeZone;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.delegation.DelegationKey;
@ -164,12 +165,9 @@ public abstract class FederationStateStoreBaseTest {
SubClusterDeregisterRequest deregisterRequest = SubClusterDeregisterRequest
.newInstance(subClusterId, SubClusterState.SC_UNREGISTERED);
try {
stateStore.deregisterSubCluster(deregisterRequest);
Assert.fail();
} catch (FederationStateStoreException e) {
Assert.assertTrue(e.getMessage().startsWith("SubCluster SC not found"));
}
LambdaTestUtils.intercept(YarnException.class,
"SubCluster SC not found", () -> stateStore.deregisterSubCluster(deregisterRequest));
}
@Test
@ -266,13 +264,9 @@ public abstract class FederationStateStoreBaseTest {
SubClusterHeartbeatRequest heartbeatRequest = SubClusterHeartbeatRequest
.newInstance(subClusterId, SubClusterState.SC_RUNNING, "capability");
try {
stateStore.subClusterHeartbeat(heartbeatRequest);
Assert.fail();
} catch (FederationStateStoreException e) {
Assert.assertTrue(e.getMessage()
.startsWith("SubCluster SC does not exist; cannot heartbeat"));
}
LambdaTestUtils.intercept(YarnException.class,
"SubCluster SC does not exist; cannot heartbeat",
() -> stateStore.subClusterHeartbeat(heartbeatRequest));
}
// Test FederationApplicationHomeSubClusterStore
@ -1050,4 +1044,24 @@ public abstract class FederationStateStoreBaseTest {
checkRouterStoreToken(identifier, getStoreTokenResp);
}
@Test(expected = NotImplementedException.class)
public void testGetCurrentVersion() {
stateStore.getCurrentVersion();
}
@Test(expected = NotImplementedException.class)
public void testStoreVersion() throws Exception {
stateStore.storeVersion();
}
@Test(expected = NotImplementedException.class)
public void testLoadVersion() throws Exception {
stateStore.loadVersion();
}
@Test(expected = NotImplementedException.class)
public void testCheckVersion() throws Exception {
stateStore.checkVersion();
}
}

View File

@ -266,10 +266,20 @@ public class FederationStateStoreService extends AbstractService
}
@Override
public Version loadVersion() {
public Version loadVersion() throws Exception {
return stateStoreClient.getCurrentVersion();
}
@Override
public void storeVersion() throws Exception {
stateStoreClient.storeVersion();
}
@Override
public void checkVersion() throws Exception {
stateStoreClient.checkVersion();
}
@Override
public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
GetSubClusterPolicyConfigurationRequest request) throws YarnException {

View File

@ -16,6 +16,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.AUTO_QUEUE_CREATION_V2_ENABLED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DEFAULT_AUTO_QUEUE_CREATION_ENABLED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.USER_LIMIT_FACTOR;
@ -82,13 +84,13 @@ public class FSQueueConverter {
emitMaxParallelApps(queueName, queue);
emitMaxAllocations(queueName, queue);
emitPreemptionDisabled(queueName, queue);
emitDefaultUserLimitFactor(queueName, children);
emitChildCapacity(queue);
emitMaximumCapacity(queueName, queue);
emitSizeBasedWeight(queueName);
emitOrderingPolicy(queueName, queue);
checkMaxChildCapacitySetting(queue);
emitDefaultUserLimitFactor(queueName, children);
for (FSQueue childQueue : children) {
convertQueueHierarchy(childQueue);
@ -220,7 +222,7 @@ public class FSQueueConverter {
}
public void emitDefaultUserLimitFactor(String queueName, List<FSQueue> children) {
if (children.isEmpty()) {
if (children.isEmpty() && checkAutoQueueCreationV2Disabled(queueName)) {
capacitySchedulerConfig.setFloat(
CapacitySchedulerConfiguration.
PREFIX + queueName + DOT + USER_LIMIT_FACTOR,
@ -309,6 +311,12 @@ public class FSQueueConverter {
}
}
private boolean checkAutoQueueCreationV2Disabled(String queueName) {
return !capacitySchedulerConfig.getBoolean(
PREFIX + queueName + DOT + AUTO_QUEUE_CREATION_V2_ENABLED,
DEFAULT_AUTO_QUEUE_CREATION_ENABLED);
}
private String getQueueShortName(String queueName) {
int lastDot = queueName.lastIndexOf(".");
return queueName.substring(lastDot + 1);

View File

@ -194,6 +194,8 @@ public class TestFSConfigToCSConfigConverter {
assertNull("root.users user-limit-factor should be null",
conf.get(PREFIX + "root.users." + USER_LIMIT_FACTOR));
assertEquals("root.users auto-queue-creation-v2.enabled", "true",
conf.get(PREFIX + "root.users.auto-queue-creation-v2.enabled"));
assertEquals("root.default user-limit-factor", "-1.0",
conf.get(PREFIX + "root.default.user-limit-factor"));
@ -203,6 +205,8 @@ public class TestFSConfigToCSConfigConverter {
assertEquals("root.admins.bob user-limit-factor", "-1.0",
conf.get(PREFIX + "root.admins.bob.user-limit-factor"));
assertNull("root.admin.bob auto-queue-creation-v2.enabled should be null",
conf.get(PREFIX + "root.admin.bob.auto-queue-creation-v2.enabled"));
}
@Test

View File

@ -147,6 +147,10 @@ public final class RouterMetrics {
private MutableGaugeInt numRefreshSuperUserGroupsConfigurationFailedRetrieved;
@Metric("# of refreshUserToGroupsMappings failed to be retrieved")
private MutableGaugeInt numRefreshUserToGroupsMappingsFailedRetrieved;
@Metric("# of addToClusterNodeLabels failed to be retrieved")
private MutableGaugeInt numAddToClusterNodeLabelsFailedRetrieved;
@Metric("# of removeFromClusterNodeLabels failed to be retrieved")
private MutableGaugeInt numRemoveFromClusterNodeLabelsFailedRetrieved;
// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)")
@ -253,9 +257,12 @@ public final class RouterMetrics {
private MutableRate totalSucceededRefreshSuperUserGroupsConfigurationRetrieved;
@Metric("Total number of successful Retrieved RefreshUserToGroupsMappings and latency(ms)")
private MutableRate totalSucceededRefreshUserToGroupsMappingsRetrieved;
@Metric("Total number of successful Retrieved GetSchedulerInfo and latency(ms)")
private MutableRate totalSucceededGetSchedulerInfoRetrieved;
@Metric("Total number of successful Retrieved AddToClusterNodeLabels and latency(ms)")
private MutableRate totalSucceededAddToClusterNodeLabelsRetrieved;
@Metric("Total number of successful Retrieved RemoveFromClusterNodeLabels and latency(ms)")
private MutableRate totalSucceededRemoveFromClusterNodeLabelsRetrieved;
/**
* Provide quantile counters for all latencies.
@ -313,6 +320,8 @@ public final class RouterMetrics {
private MutableQuantiles getSchedulerInfoRetrievedLatency;
private MutableQuantiles refreshSuperUserGroupsConfLatency;
private MutableQuantiles refreshUserToGroupsMappingsLatency;
private MutableQuantiles addToClusterNodeLabelsLatency;
private MutableQuantiles removeFromClusterNodeLabelsLatency;
private static volatile RouterMetrics instance = null;
private static MetricsRegistry registry;
@ -504,6 +513,12 @@ public final class RouterMetrics {
refreshUserToGroupsMappingsLatency = registry.newQuantiles("refreshUserToGroupsMappingsLatency",
"latency of refresh user to groups mappings timeouts", "ops", "latency", 10);
addToClusterNodeLabelsLatency = registry.newQuantiles("addToClusterNodeLabelsLatency",
"latency of add cluster nodelabels timeouts", "ops", "latency", 10);
removeFromClusterNodeLabelsLatency = registry.newQuantiles("removeFromClusterNodeLabelsLatency",
"latency of remove cluster nodelabels timeouts", "ops", "latency", 10);
}
public static RouterMetrics getMetrics() {
@ -780,6 +795,16 @@ public final class RouterMetrics {
return totalSucceededGetSchedulerInfoRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededAddToClusterNodeLabelsRetrieved() {
return totalSucceededAddToClusterNodeLabelsRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededRemoveFromClusterNodeLabelsRetrieved() {
return totalSucceededRemoveFromClusterNodeLabelsRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededRefreshSuperUserGroupsConfigurationRetrieved() {
return totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.lastStat().numSamples();
@ -1040,6 +1065,16 @@ public final class RouterMetrics {
return totalSucceededGetSchedulerInfoRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededAddToClusterNodeLabelsRetrieved() {
return totalSucceededAddToClusterNodeLabelsRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededRemoveFromClusterNodeLabelsRetrieved() {
return totalSucceededRemoveFromClusterNodeLabelsRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededRefreshSuperUserGroupsConfigurationRetrieved() {
return totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.lastStat().mean();
@ -1251,6 +1286,14 @@ public final class RouterMetrics {
return numRefreshUserToGroupsMappingsFailedRetrieved.value();
}
public int getNumAddToClusterNodeLabelsFailedRetrieved() {
return numAddToClusterNodeLabelsFailedRetrieved.value();
}
public int getNumRemoveFromClusterNodeLabelsFailedRetrieved() {
return numRemoveFromClusterNodeLabelsFailedRetrieved.value();
}
public int getDelegationTokenFailedRetrieved() {
return numGetDelegationTokenFailedRetrieved.value();
}
@ -1534,6 +1577,16 @@ public final class RouterMetrics {
getSchedulerInfoRetrievedLatency.add(duration);
}
public void succeededAddToClusterNodeLabelsRetrieved(long duration) {
totalSucceededAddToClusterNodeLabelsRetrieved.add(duration);
addToClusterNodeLabelsLatency.add(duration);
}
public void succeededRemoveFromClusterNodeLabelsRetrieved(long duration) {
totalSucceededRemoveFromClusterNodeLabelsRetrieved.add(duration);
removeFromClusterNodeLabelsLatency.add(duration);
}
public void succeededRefreshSuperUserGroupsConfRetrieved(long duration) {
totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.add(duration);
refreshSuperUserGroupsConfLatency.add(duration);
@ -1728,6 +1781,14 @@ public final class RouterMetrics {
numRefreshUserToGroupsMappingsFailedRetrieved.incr();
}
public void incrAddToClusterNodeLabelsFailedRetrieved() {
numAddToClusterNodeLabelsFailedRetrieved.incr();
}
public void incrRemoveFromClusterNodeLabelsFailedRetrieved() {
numRemoveFromClusterNodeLabelsFailedRetrieved.incr();
}
public void incrGetDelegationTokenFailedRetrieved() {
numGetDelegationTokenFailedRetrieved.incr();
}

View File

@ -43,6 +43,7 @@ import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
@ -72,6 +73,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
@ -1580,16 +1582,126 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
throw new RuntimeException("getClusterNodeLabels Failed.");
}
/**
* This method adds specific node labels for specific nodes, and it is
* reachable by using {@link RMWSConsts#ADD_NODE_LABELS}.
*
* @see ResourceManagerAdministrationProtocol#addToClusterNodeLabels
* @param newNodeLabels the node labels to add. It is a content param.
* @param hsr the servlet request
* @return Response containing the status code
* @throws Exception in case of bad request
*/
@Override
public Response addToClusterNodeLabels(NodeLabelsInfo newNodeLabels,
HttpServletRequest hsr) throws Exception {
throw new NotImplementedException("Code is not implemented");
if (newNodeLabels == null) {
routerMetrics.incrAddToClusterNodeLabelsFailedRetrieved();
throw new IllegalArgumentException("Parameter error, the newNodeLabels is null.");
}
List<NodeLabelInfo> nodeLabelInfos = newNodeLabels.getNodeLabelsInfo();
if (CollectionUtils.isEmpty(nodeLabelInfos)) {
routerMetrics.incrAddToClusterNodeLabelsFailedRetrieved();
throw new IllegalArgumentException("Parameter error, the nodeLabelsInfo is null or empty.");
}
try {
long startTime = clock.getTime();
Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
final HttpServletRequest hsrCopy = clone(hsr);
Class[] argsClasses = new Class[]{NodeLabelsInfo.class, HttpServletRequest.class};
Object[] args = new Object[]{newNodeLabels, hsrCopy};
ClientMethod remoteMethod = new ClientMethod("addToClusterNodeLabels", argsClasses, args);
Map<SubClusterInfo, Response> responseInfoMap =
invokeConcurrent(subClustersActive.values(), remoteMethod, Response.class);
StringBuffer buffer = new StringBuffer();
// SubCluster-0:SUCCESS,SubCluster-1:SUCCESS
responseInfoMap.forEach((subClusterInfo, response) -> {
buildAppendMsg(subClusterInfo, buffer, response);
});
long stopTime = clock.getTime();
routerMetrics.succeededAddToClusterNodeLabelsRetrieved((stopTime - startTime));
return Response.status(Status.OK).entity(buffer.toString()).build();
} catch (NotFoundException e) {
routerMetrics.incrAddToClusterNodeLabelsFailedRetrieved();
RouterServerUtil.logAndThrowIOException("get all active sub cluster(s) error.", e);
} catch (YarnException e) {
routerMetrics.incrAddToClusterNodeLabelsFailedRetrieved();
RouterServerUtil.logAndThrowIOException("addToClusterNodeLabels with yarn error.", e);
}
routerMetrics.incrAddToClusterNodeLabelsFailedRetrieved();
throw new RuntimeException("addToClusterNodeLabels Failed.");
}
/**
* This method removes all the node labels for specific nodes, and it is
* reachable by using {@link RMWSConsts#REMOVE_NODE_LABELS}.
*
* @see ResourceManagerAdministrationProtocol#removeFromClusterNodeLabels
* @param oldNodeLabels the node labels to remove. It is a QueryParam.
* @param hsr the servlet request
* @return Response containing the status code
* @throws Exception in case of bad request
*/
@Override
public Response removeFromClusterNodeLabels(Set<String> oldNodeLabels,
HttpServletRequest hsr) throws Exception {
throw new NotImplementedException("Code is not implemented");
if (CollectionUtils.isEmpty(oldNodeLabels)) {
routerMetrics.incrRemoveFromClusterNodeLabelsFailedRetrieved();
throw new IllegalArgumentException("Parameter error, the oldNodeLabels is null or empty.");
}
try {
long startTime = clock.getTime();
Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
final HttpServletRequest hsrCopy = clone(hsr);
Class[] argsClasses = new Class[]{Set.class, HttpServletRequest.class};
Object[] args = new Object[]{oldNodeLabels, hsrCopy};
ClientMethod remoteMethod =
new ClientMethod("removeFromClusterNodeLabels", argsClasses, args);
Map<SubClusterInfo, Response> responseInfoMap =
invokeConcurrent(subClustersActive.values(), remoteMethod, Response.class);
StringBuffer buffer = new StringBuffer();
// SubCluster-0:SUCCESS,SubCluster-1:SUCCESS
responseInfoMap.forEach((subClusterInfo, response) -> {
buildAppendMsg(subClusterInfo, buffer, response);
});
long stopTime = clock.getTime();
routerMetrics.succeededRemoveFromClusterNodeLabelsRetrieved(stopTime - startTime);
return Response.status(Status.OK).entity(buffer.toString()).build();
} catch (NotFoundException e) {
routerMetrics.incrRemoveFromClusterNodeLabelsFailedRetrieved();
RouterServerUtil.logAndThrowIOException("get all active sub cluster(s) error.", e);
} catch (YarnException e) {
routerMetrics.incrRemoveFromClusterNodeLabelsFailedRetrieved();
RouterServerUtil.logAndThrowIOException("removeFromClusterNodeLabels with yarn error.", e);
}
routerMetrics.incrRemoveFromClusterNodeLabelsFailedRetrieved();
throw new RuntimeException("removeFromClusterNodeLabels Failed.");
}
/**
* Bbulid Append information.
*
* @param subClusterInfo subCluster information.
* @param buffer StringBuffer.
* @param response response message.
*/
private void buildAppendMsg(SubClusterInfo subClusterInfo, StringBuffer buffer,
Response response) {
SubClusterId subClusterId = subClusterInfo.getSubClusterId();
String state = response != null &&
(response.getStatus() == Status.OK.getStatusCode()) ? "SUCCESS" : "FAILED";
buffer.append("SubCluster-")
.append(subClusterId.getId())
.append(":")
.append(state)
.append(",");
}
@Override

View File

@ -793,6 +793,11 @@ public class TestRouterMetrics {
LOG.info("Mocked: successful GetBulkActivities call with duration {}", duration);
metrics.succeededGetBulkActivitiesRetrieved(duration);
}
public void addToClusterNodeLabelsRetrieved(long duration) {
LOG.info("Mocked: successful AddToClusterNodeLabels call with duration {}", duration);
metrics.succeededAddToClusterNodeLabelsRetrieved(duration);
}
}
@Test
@ -1696,4 +1701,19 @@ public class TestRouterMetrics {
Assert.assertEquals(totalBadBefore + 1,
metrics.getBulkActivitiesFailedRetrieved());
}
@Test
public void testAddToClusterNodeLabelsRetrieved() {
long totalGoodBefore = metrics.getNumSucceededAddToClusterNodeLabelsRetrieved();
goodSubCluster.addToClusterNodeLabelsRetrieved(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededAddToClusterNodeLabelsRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededAddToClusterNodeLabelsRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.addToClusterNodeLabelsRetrieved(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededAddToClusterNodeLabelsRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededAddToClusterNodeLabelsRetrieved(), ASSERT_DOUBLE_DELTA);
}
}

View File

@ -1307,4 +1307,44 @@ public class MockDefaultRequestInterceptorREST
throw new RuntimeException(e);
}
}
@Override
public Response addToClusterNodeLabels(NodeLabelsInfo newNodeLabels, HttpServletRequest hsr)
throws Exception {
List<NodeLabelInfo> nodeLabelInfoList = newNodeLabels.getNodeLabelsInfo();
NodeLabelInfo nodeLabelInfo = nodeLabelInfoList.get(0);
String nodeLabelName = nodeLabelInfo.getName();
// If nodeLabelName is ALL, we let all subclusters pass
if (StringUtils.equals("ALL", nodeLabelName)) {
return Response.status(Status.OK).build();
} else if (StringUtils.equals("A0", nodeLabelName)) {
SubClusterId subClusterId = getSubClusterId();
String id = subClusterId.getId();
if (StringUtils.contains("A0", id)) {
return Response.status(Status.OK).build();
} else {
return Response.status(Status.BAD_REQUEST).entity(null).build();
}
}
throw new YarnException("addToClusterNodeLabels Error");
}
@Override
public Response removeFromClusterNodeLabels(Set<String> oldNodeLabels, HttpServletRequest hsr)
throws Exception {
// If oldNodeLabels contains ALL, we let all subclusters pass
if (oldNodeLabels.contains("ALL")) {
return Response.status(Status.OK).build();
} else if (oldNodeLabels.contains("A0")) {
SubClusterId subClusterId = getSubClusterId();
String id = subClusterId.getId();
if (StringUtils.contains("A0", id)) {
return Response.status(Status.OK).build();
} else {
return Response.status(Status.BAD_REQUEST).entity(null).build();
}
}
throw new YarnException("removeFromClusterNodeLabels Error");
}
}

View File

@ -25,6 +25,7 @@ import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Collections;
import java.util.stream.Collectors;
@ -40,6 +41,7 @@ import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -1899,4 +1901,132 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
"'groupBy' must not be empty.",
() -> interceptor.getBulkActivities(null, "", 1));
}
@Test
public void testAddToClusterNodeLabels1() throws Exception {
// In this test, we try to add ALL label, all subClusters will return success.
NodeLabelsInfo nodeLabelsInfo = new NodeLabelsInfo();
NodeLabelInfo nodeLabelInfo = new NodeLabelInfo("ALL", true);
nodeLabelsInfo.getNodeLabelsInfo().add(nodeLabelInfo);
Response response = interceptor.addToClusterNodeLabels(nodeLabelsInfo, null);
Assert.assertNotNull(response);
Object entityObj = response.getEntity();
Assert.assertNotNull(entityObj);
String entity = String.valueOf(entityObj);
String[] entities = StringUtils.split(entity, ",");
Assert.assertNotNull(entities);
Assert.assertEquals(4, entities.length);
// The order in which the cluster returns messages is uncertain,
// we confirm the result by contains
String expectedMsg =
"SubCluster-0:SUCCESS,SubCluster-1:SUCCESS,SubCluster-2:SUCCESS,SubCluster-3:SUCCESS";
Arrays.stream(entities).forEach(item -> {
Assert.assertTrue(expectedMsg.contains(item));
});
}
@Test
public void testAddToClusterNodeLabels2() throws Exception {
// In this test, we try to add A0 label,
// subCluster0 will return success, and other subClusters will return null
NodeLabelsInfo nodeLabelsInfo = new NodeLabelsInfo();
NodeLabelInfo nodeLabelInfo = new NodeLabelInfo("A0", true);
nodeLabelsInfo.getNodeLabelsInfo().add(nodeLabelInfo);
Response response = interceptor.addToClusterNodeLabels(nodeLabelsInfo, null);
Assert.assertNotNull(response);
Object entityObj = response.getEntity();
Assert.assertNotNull(entityObj);
String expectedValue = "SubCluster-0:SUCCESS,";
String entity = String.valueOf(entityObj);
Assert.assertTrue(entity.contains(expectedValue));
}
@Test
public void testAddToClusterNodeLabelsError() throws Exception {
// the newNodeLabels is null
LambdaTestUtils.intercept(IllegalArgumentException.class,
"Parameter error, the newNodeLabels is null.",
() -> interceptor.addToClusterNodeLabels(null, null));
// the nodeLabelsInfo is null
NodeLabelsInfo nodeLabelsInfo = new NodeLabelsInfo();
LambdaTestUtils.intercept(IllegalArgumentException.class,
"Parameter error, the nodeLabelsInfo is null or empty.",
() -> interceptor.addToClusterNodeLabels(nodeLabelsInfo, null));
// error nodeLabelsInfo
NodeLabelsInfo nodeLabelsInfo1 = new NodeLabelsInfo();
NodeLabelInfo nodeLabelInfo1 = new NodeLabelInfo("A", true);
nodeLabelsInfo1.getNodeLabelsInfo().add(nodeLabelInfo1);
LambdaTestUtils.intercept(YarnRuntimeException.class, "addToClusterNodeLabels Error",
() -> interceptor.addToClusterNodeLabels(nodeLabelsInfo1, null));
}
@Test
public void testRemoveFromClusterNodeLabels1() throws Exception {
Set<String> oldNodeLabels = Sets.newHashSet();
oldNodeLabels.add("ALL");
Response response = interceptor.removeFromClusterNodeLabels(oldNodeLabels, null);
Assert.assertNotNull(response);
Object entityObj = response.getEntity();
Assert.assertNotNull(entityObj);
String entity = String.valueOf(entityObj);
String[] entities = StringUtils.split(entity, ",");
Assert.assertNotNull(entities);
Assert.assertEquals(4, entities.length);
// The order in which the cluster returns messages is uncertain,
// we confirm the result by contains
String expectedMsg =
"SubCluster-0:SUCCESS,SubCluster-1:SUCCESS,SubCluster-2:SUCCESS,SubCluster-3:SUCCESS";
Arrays.stream(entities).forEach(item -> {
Assert.assertTrue(expectedMsg.contains(item));
});
}
@Test
public void testRemoveFromClusterNodeLabels2() throws Exception {
Set<String> oldNodeLabels = Sets.newHashSet();
oldNodeLabels.add("A0");
Response response = interceptor.removeFromClusterNodeLabels(oldNodeLabels, null);
Assert.assertNotNull(response);
Object entityObj = response.getEntity();
Assert.assertNotNull(entityObj);
String expectedValue = "SubCluster-0:SUCCESS,";
String entity = String.valueOf(entityObj);
Assert.assertTrue(entity.contains(expectedValue));
}
@Test
public void testRemoveFromClusterNodeLabelsError() throws Exception {
// the oldNodeLabels is null
LambdaTestUtils.intercept(IllegalArgumentException.class,
"Parameter error, the oldNodeLabels is null or empty.",
() -> interceptor.removeFromClusterNodeLabels(null, null));
// the oldNodeLabels is empty
Set<String> oldNodeLabels = Sets.newHashSet();
LambdaTestUtils.intercept(IllegalArgumentException.class,
"Parameter error, the oldNodeLabels is null or empty.",
() -> interceptor.removeFromClusterNodeLabels(oldNodeLabels, null));
// error oldNodeLabels
Set<String> oldNodeLabels1 = Sets.newHashSet();
oldNodeLabels1.add("A1");
LambdaTestUtils.intercept(YarnRuntimeException.class, "removeFromClusterNodeLabels Error",
() -> interceptor.removeFromClusterNodeLabels(oldNodeLabels1, null));
}
}

18
pom.xml
View File

@ -118,7 +118,6 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/x
<spotbugs-maven-plugin.version>4.2.0</spotbugs-maven-plugin.version>
<jsonschema2pojo-maven-plugin.version>1.1.1</jsonschema2pojo-maven-plugin.version>
<maven-compiler-plugin.version>3.10.1</maven-compiler-plugin.version>
<cyclonedx.version>2.7.3</cyclonedx.version>
<shell-executable>bash</shell-executable>
@ -500,19 +499,6 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/x
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
</plugin>
<plugin>
<groupId>org.cyclonedx</groupId>
<artifactId>cyclonedx-maven-plugin</artifactId>
<version>${cyclonedx.version}</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>makeBom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</pluginManagement>
@ -621,10 +607,6 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/x
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.cyclonedx</groupId>
<artifactId>cyclonedx-maven-plugin</artifactId>
</plugin>
</plugins>
</build>