diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java index 774e015b373..d8ab16f41d3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java @@ -414,7 +414,14 @@ public class LocalDirAllocator { //build the "roulette wheel" for(int i =0; i < ctx.dirDF.length; ++i) { - availableOnDisk[i] = ctx.dirDF[i].getAvailable(); + final DF target = ctx.dirDF[i]; + // attempt to recreate the dir so that getAvailable() is valid + // if it fails, getAvailable() will return 0, so the dir will + // be declared unavailable. + // return value is logged at debug to keep spotbugs quiet. + final boolean b = new File(target.getDirPath()).mkdirs(); + LOG.debug("mkdirs of {}={}", target, b); + availableOnDisk[i] = target.getAvailable(); totalAvailable += availableOnDisk[i]; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java index b6e6f0c57a8..fec2e36f85b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java @@ -49,6 +49,7 @@ public final class CallerContext { public static final String CLIENT_PORT_STR = "clientPort"; public static final String CLIENT_ID_STR = "clientId"; public static final String CLIENT_CALL_ID_STR = "clientCallId"; + public static final String REAL_USER_STR = "realUser"; /** The caller context. * diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java index 8aaf9bbd8de..cde4cf48413 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java @@ -19,7 +19,9 @@ package org.apache.hadoop.security.token.delegation; import java.io.ByteArrayInputStream; +import java.io.DataInput; import java.io.DataInputStream; +import java.io.DataOutput; import java.io.IOException; import java.security.MessageDigest; import java.util.ArrayList; @@ -41,6 +43,8 @@ import org.apache.hadoop.fs.statistics.DurationTrackerFactory; import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding; import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.metrics2.annotation.Metric; import org.apache.hadoop.metrics2.annotation.Metrics; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; @@ -441,8 +445,9 @@ extends AbstractDelegationTokenIdentifier> /** * Update the current master key for generating delegation tokens * It should be called only by tokenRemoverThread. + * @throws IOException raised on errors performing I/O. */ - void rollMasterKey() throws IOException { + protected void rollMasterKey() throws IOException { synchronized (this) { removeExpiredKeys(); /* set final expiry date for retiring currentKey */ @@ -677,11 +682,15 @@ extends AbstractDelegationTokenIdentifier> /** Class to encapsulate a token's renew date and password. */ @InterfaceStability.Evolving - public static class DelegationTokenInformation { + public static class DelegationTokenInformation implements Writable { long renewDate; byte[] password; String trackingId; + public DelegationTokenInformation() { + this(0, null); + } + public DelegationTokenInformation(long renewDate, byte[] password) { this(renewDate, password, null); } @@ -711,6 +720,29 @@ extends AbstractDelegationTokenIdentifier> public String getTrackingId() { return trackingId; } + + @Override + public void write(DataOutput out) throws IOException { + WritableUtils.writeVLong(out, renewDate); + if (password == null) { + WritableUtils.writeVInt(out, -1); + } else { + WritableUtils.writeVInt(out, password.length); + out.write(password); + } + WritableUtils.writeString(out, trackingId); + } + + @Override + public void readFields(DataInput in) throws IOException { + renewDate = WritableUtils.readVLong(in); + int len = WritableUtils.readVInt(in); + if (len > -1) { + password = new byte[len]; + in.readFully(password); + } + trackingId = WritableUtils.readString(in); + } } /** Remove expired delegation tokens from cache */ diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java new file mode 100644 index 00000000000..4b6ae21d7a9 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.security.token.delegation; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.sql.SQLException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * An implementation of {@link AbstractDelegationTokenSecretManager} that + * persists TokenIdentifiers and DelegationKeys in an existing SQL database. + */ +public abstract class SQLDelegationTokenSecretManager + extends AbstractDelegationTokenSecretManager { + + private static final Logger LOG = + LoggerFactory.getLogger(SQLDelegationTokenSecretManager.class); + + public static final String SQL_DTSM_CONF_PREFIX = "sql-dt-secret-manager."; + private static final String SQL_DTSM_TOKEN_SEQNUM_BATCH_SIZE = SQL_DTSM_CONF_PREFIX + + "token.seqnum.batch.size"; + public static final int DEFAULT_SEQ_NUM_BATCH_SIZE = 10; + + // Batch of sequence numbers that will be requested by the sequenceNumCounter. + // A new batch is requested once the sequenceNums available to a secret manager are + // exhausted, including during initialization. + private final int seqNumBatchSize; + + // Last sequenceNum in the current batch that has been allocated to a token. + private int currentSeqNum; + + // Max sequenceNum in the current batch that can be allocated to a token. + // Unused sequenceNums in the current batch cannot be reused by other routers. + private int currentMaxSeqNum; + + public SQLDelegationTokenSecretManager(Configuration conf) { + super(conf.getLong(DelegationTokenManager.UPDATE_INTERVAL, + DelegationTokenManager.UPDATE_INTERVAL_DEFAULT) * 1000, + conf.getLong(DelegationTokenManager.MAX_LIFETIME, + DelegationTokenManager.MAX_LIFETIME_DEFAULT) * 1000, + conf.getLong(DelegationTokenManager.RENEW_INTERVAL, + DelegationTokenManager.RENEW_INTERVAL_DEFAULT) * 1000, + conf.getLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, + DelegationTokenManager.REMOVAL_SCAN_INTERVAL_DEFAULT) * 1000); + + this.seqNumBatchSize = conf.getInt(SQL_DTSM_TOKEN_SEQNUM_BATCH_SIZE, + DEFAULT_SEQ_NUM_BATCH_SIZE); + } + + /** + * Persists a TokenIdentifier and its corresponding TokenInformation into + * the SQL database. The TokenIdentifier is expected to be unique and any + * duplicate token attempts will result in an IOException. + * @param ident TokenIdentifier to persist. + * @param tokenInfo DelegationTokenInformation associated with the TokenIdentifier. + */ + @Override + protected void storeToken(TokenIdent ident, + DelegationTokenInformation tokenInfo) throws IOException { + try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos)) { + tokenInfo.write(dos); + // Add token to SQL database + insertToken(ident.getSequenceNumber(), ident.getBytes(), bos.toByteArray()); + // Add token to local cache + super.storeToken(ident, tokenInfo); + } catch (SQLException e) { + throw new IOException("Failed to store token in SQL secret manager", e); + } + } + + /** + * Updates the TokenInformation of an existing TokenIdentifier in + * the SQL database. + * @param ident Existing TokenIdentifier in the SQL database. + * @param tokenInfo Updated DelegationTokenInformation associated with the TokenIdentifier. + */ + @Override + protected void updateToken(TokenIdent ident, + DelegationTokenInformation tokenInfo) throws IOException { + try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) { + try (DataOutputStream dos = new DataOutputStream(bos)) { + tokenInfo.write(dos); + // Update token in SQL database + updateToken(ident.getSequenceNumber(), ident.getBytes(), bos.toByteArray()); + // Update token in local cache + super.updateToken(ident, tokenInfo); + } + } catch (SQLException e) { + throw new IOException("Failed to update token in SQL secret manager", e); + } + } + + /** + * Cancels a token by removing it from the SQL database. This will + * call the corresponding method in {@link AbstractDelegationTokenSecretManager} + * to perform validation and remove the token from the cache. + * @return Identifier of the canceled token + */ + @Override + public synchronized TokenIdent cancelToken(Token token, + String canceller) throws IOException { + try (ByteArrayInputStream bis = new ByteArrayInputStream(token.getIdentifier()); + DataInputStream din = new DataInputStream(bis)) { + TokenIdent id = createIdentifier(); + id.readFields(din); + + // Calling getTokenInfo to load token into local cache if not present. + // super.cancelToken() requires token to be present in local cache. + getTokenInfo(id); + } + + return super.cancelToken(token, canceller); + } + + /** + * Removes the existing TokenInformation from the SQL database to + * invalidate it. + * @param ident TokenInformation to remove from the SQL database. + */ + @Override + protected void removeStoredToken(TokenIdent ident) throws IOException { + try { + deleteToken(ident.getSequenceNumber(), ident.getBytes()); + } catch (SQLException e) { + LOG.warn("Failed to remove token in SQL secret manager", e); + } + } + + /** + * Obtains the DelegationTokenInformation associated with the given + * TokenIdentifier in the SQL database. + * @param ident Existing TokenIdentifier in the SQL database. + * @return DelegationTokenInformation that matches the given TokenIdentifier or + * null if it doesn't exist in the database. + */ + @Override + protected DelegationTokenInformation getTokenInfo(TokenIdent ident) { + // Look for token in local cache + DelegationTokenInformation tokenInfo = super.getTokenInfo(ident); + + if (tokenInfo == null) { + try { + // Look for token in SQL database + byte[] tokenInfoBytes = selectTokenInfo(ident.getSequenceNumber(), ident.getBytes()); + + if (tokenInfoBytes != null) { + tokenInfo = new DelegationTokenInformation(); + try (ByteArrayInputStream bis = new ByteArrayInputStream(tokenInfoBytes)) { + try (DataInputStream dis = new DataInputStream(bis)) { + tokenInfo.readFields(dis); + } + } + + // Update token in local cache + currentTokens.put(ident, tokenInfo); + } + } catch (IOException | SQLException e) { + LOG.error("Failed to get token in SQL secret manager", e); + } + } + + return tokenInfo; + } + + /** + * Obtains the value of the last reserved sequence number. + * @return Last reserved sequence number. + */ + @Override + public int getDelegationTokenSeqNum() { + try { + return selectSequenceNum(); + } catch (SQLException e) { + throw new RuntimeException( + "Failed to get token sequence number in SQL secret manager", e); + } + } + + /** + * Updates the value of the last reserved sequence number. + * @param seqNum Value to update the sequence number to. + */ + @Override + public void setDelegationTokenSeqNum(int seqNum) { + try { + updateSequenceNum(seqNum); + } catch (SQLException e) { + throw new RuntimeException( + "Failed to update token sequence number in SQL secret manager", e); + } + } + + /** + * Obtains the next available sequence number that can be allocated to a Token. + * Sequence numbers need to be reserved using the shared sequenceNumberCounter once + * the local batch has been exhausted, which handles sequenceNumber allocation + * concurrently with other secret managers. + * This method ensures that sequence numbers are incremental in a single secret manager, + * but not across secret managers. + * @return Next available sequence number. + */ + @Override + public synchronized int incrementDelegationTokenSeqNum() { + if (currentSeqNum >= currentMaxSeqNum) { + try { + // Request a new batch of sequence numbers and use the + // lowest one available. + currentSeqNum = incrementSequenceNum(seqNumBatchSize); + currentMaxSeqNum = currentSeqNum + seqNumBatchSize; + } catch (SQLException e) { + throw new RuntimeException( + "Failed to increment token sequence number in SQL secret manager", e); + } + } + + return ++currentSeqNum; + } + + /** + * Persists a DelegationKey into the SQL database. The delegation keyId + * is expected to be unique and any duplicate key attempts will result + * in an IOException. + * @param key DelegationKey to persist into the SQL database. + */ + @Override + protected void storeDelegationKey(DelegationKey key) throws IOException { + try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos)) { + key.write(dos); + // Add delegation key to SQL database + insertDelegationKey(key.getKeyId(), bos.toByteArray()); + // Add delegation key to local cache + super.storeDelegationKey(key); + } catch (SQLException e) { + throw new IOException("Failed to store delegation key in SQL secret manager", e); + } + } + + /** + * Updates an existing DelegationKey in the SQL database. + * @param key Updated DelegationKey. + */ + @Override + protected void updateDelegationKey(DelegationKey key) throws IOException { + try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos)) { + key.write(dos); + // Update delegation key in SQL database + updateDelegationKey(key.getKeyId(), bos.toByteArray()); + // Update delegation key in local cache + super.updateDelegationKey(key); + } catch (SQLException e) { + throw new IOException("Failed to update delegation key in SQL secret manager", e); + } + } + + /** + * Removes the existing DelegationKey from the SQL database to + * invalidate it. + * @param key DelegationKey to remove from the SQL database. + */ + @Override + protected void removeStoredMasterKey(DelegationKey key) { + try { + deleteDelegationKey(key.getKeyId()); + } catch (SQLException e) { + LOG.warn("Failed to remove delegation key in SQL secret manager", e); + } + } + + /** + * Obtains the DelegationKey from the SQL database. + * @param keyId KeyId of the DelegationKey to obtain. + * @return DelegationKey that matches the given keyId or null + * if it doesn't exist in the database. + */ + @Override + protected DelegationKey getDelegationKey(int keyId) { + // Look for delegation key in local cache + DelegationKey delegationKey = super.getDelegationKey(keyId); + + if (delegationKey == null) { + try { + // Look for delegation key in SQL database + byte[] delegationKeyBytes = selectDelegationKey(keyId); + + if (delegationKeyBytes != null) { + delegationKey = new DelegationKey(); + try (ByteArrayInputStream bis = new ByteArrayInputStream(delegationKeyBytes)) { + try (DataInputStream dis = new DataInputStream(bis)) { + delegationKey.readFields(dis); + } + } + + // Update delegation key in local cache + allKeys.put(keyId, delegationKey); + } + } catch (IOException | SQLException e) { + LOG.error("Failed to get delegation key in SQL secret manager", e); + } + } + + return delegationKey; + } + + /** + * Obtains the value of the last delegation key id. + * @return Last delegation key id. + */ + @Override + public int getCurrentKeyId() { + try { + return selectKeyId(); + } catch (SQLException e) { + throw new RuntimeException( + "Failed to get delegation key id in SQL secret manager", e); + } + } + + /** + * Updates the value of the last delegation key id. + * @param keyId Value to update the delegation key id to. + */ + @Override + public void setCurrentKeyId(int keyId) { + try { + updateKeyId(keyId); + } catch (SQLException e) { + throw new RuntimeException( + "Failed to set delegation key id in SQL secret manager", e); + } + } + + /** + * Obtains the next available delegation key id that can be allocated to a DelegationKey. + * Delegation key id need to be reserved using the shared delegationKeyIdCounter, + * which handles keyId allocation concurrently with other secret managers. + * @return Next available delegation key id. + */ + @Override + public int incrementCurrentKeyId() { + try { + return incrementKeyId(1) + 1; + } catch (SQLException e) { + throw new RuntimeException( + "Failed to increment delegation key id in SQL secret manager", e); + } + } + + // Token operations in SQL database + protected abstract byte[] selectTokenInfo(int sequenceNum, byte[] tokenIdentifier) + throws SQLException; + protected abstract void insertToken(int sequenceNum, byte[] tokenIdentifier, byte[] tokenInfo) + throws SQLException; + protected abstract void updateToken(int sequenceNum, byte[] tokenIdentifier, byte[] tokenInfo) + throws SQLException; + protected abstract void deleteToken(int sequenceNum, byte[] tokenIdentifier) + throws SQLException; + // Delegation key operations in SQL database + protected abstract byte[] selectDelegationKey(int keyId) throws SQLException; + protected abstract void insertDelegationKey(int keyId, byte[] delegationKey) + throws SQLException; + protected abstract void updateDelegationKey(int keyId, byte[] delegationKey) + throws SQLException; + protected abstract void deleteDelegationKey(int keyId) throws SQLException; + // Counter operations in SQL database + protected abstract int selectSequenceNum() throws SQLException; + protected abstract void updateSequenceNum(int value) throws SQLException; + protected abstract int incrementSequenceNum(int amount) throws SQLException; + protected abstract int selectKeyId() throws SQLException; + protected abstract void updateKeyId(int value) throws SQLException; + protected abstract int incrementKeyId(int amount) throws SQLException; +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalDirAllocator.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalDirAllocator.java index 939881f39df..3693b4f0acd 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalDirAllocator.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalDirAllocator.java @@ -548,5 +548,24 @@ public class TestLocalDirAllocator { "p1/x", Long.MAX_VALUE - 1), "Expect a DiskErrorException.", () -> dirAllocator.getLocalPathForWrite("p1/x", Long.MAX_VALUE - 1, conf)); } + + /** + * Test for HADOOP-18636 LocalDirAllocator cannot recover from directory tree deletion. + */ + @Test(timeout = 30000) + public void testDirectoryRecovery() throws Throwable { + String dir0 = buildBufferDir(ROOT, 0); + String subdir = dir0 + "/subdir1/subdir2"; + + conf.set(CONTEXT, subdir); + // get local path and an ancestor + final Path pathForWrite = dirAllocator.getLocalPathForWrite("file", -1, conf); + final Path ancestor = pathForWrite.getParent().getParent(); + + // delete that ancestor + localFs.delete(ancestor, true); + // and expect to get a new file back + dirAllocator.getLocalPathForWrite("file2", -1, conf); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml index 9fb868f79f3..9e6f12ba8a4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml @@ -117,6 +117,15 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> com.fasterxml.jackson.core jackson-databind + + com.zaxxer + HikariCP + + + mysql + mysql-connector-java + provided + junit junit @@ -153,6 +162,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> curator-test test + + org.apache.derby + derby + test + org.mockito mockito-core @@ -170,6 +184,14 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> org.apache.maven.plugins maven-surefire-plugin + + + + derby.stream.error.file + ${project.build.directory}/derby.log + + + org.apache.maven.plugins diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreDatabase.sql b/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreDatabase.sql new file mode 100644 index 00000000000..07fea4c24bc --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreDatabase.sql @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Script to create a new Database in MySQL for the TokenStore + +CREATE DATABASE IF NOT EXISTS TokenStore; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreTables.sql b/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreTables.sql new file mode 100644 index 00000000000..d377c4e15f2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreTables.sql @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Script to generate all the tables for the TokenStore in MySQL + +USE TokenStore + +CREATE TABLE IF NOT EXISTS Tokens( + sequenceNum int NOT NULL, + tokenIdentifier varbinary(255) NOT NULL, + tokenInfo varbinary(255) NOT NULL, + modifiedTime timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY(sequenceNum, tokenIdentifier) +); + +CREATE TABLE IF NOT EXISTS DelegationKeys( + keyId int NOT NULL, + delegationKey varbinary(255) NOT NULL, + modifiedTime timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY(keyId) +); + +CREATE TABLE IF NOT EXISTS LastSequenceNum( + sequenceNum int NOT NULL +); + +-- Initialize the LastSequenceNum table with a single entry +INSERT INTO LastSequenceNum (sequenceNum) +SELECT 0 WHERE NOT EXISTS (SELECT * FROM LastSequenceNum); + +CREATE TABLE IF NOT EXISTS LastDelegationKeyId( + keyId int NOT NULL +); + +-- Initialize the LastDelegationKeyId table with a single entry +INSERT INTO LastDelegationKeyId (keyId) +SELECT 0 WHERE NOT EXISTS (SELECT * FROM LastDelegationKeyId); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreUser.sql b/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreUser.sql new file mode 100644 index 00000000000..844d7a2f944 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreUser.sql @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Script to create a new User in MySQL for the TokenStore + +-- Update TokenStore user and password on this script +CREATE USER IF NOT EXISTS 'TokenStoreUser'@'%' IDENTIFIED BY 'TokenStorePassword'; + +GRANT ALL PRIVILEGES ON TokenStore.* TO 'TokenStoreUser'@'%'; + +FLUSH PRIVILEGES; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/README b/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/README new file mode 100644 index 00000000000..72425315319 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/README @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +These scripts must be executed to create the TokenStore database, tables and users needed to use the +SQLDelegationTokenSecretManagerImpl as the delegation token secret manager: +1. TokenStoreDatabase.sql +2. TokenStoreTables.sql +3. TokenStoreUser.sql + +Note: The TokenStoreUser.sql defines a default user/password. You are highly encouraged to set +this to a proper strong password. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 62ae4b0b95d..06e64439011 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -491,7 +491,7 @@ public class RouterRpcClient { + router.getRouterId()); } - addClientInfoToCallerContext(); + addClientInfoToCallerContext(ugi); Object ret = null; if (rpcMonitor != null) { @@ -627,14 +627,18 @@ public class RouterRpcClient { /** * For tracking some information about the actual client. * It adds trace info "clientIp:ip", "clientPort:port", - * "clientId:id" and "clientCallId:callId" + * "clientId:id", "clientCallId:callId" and "realUser:userName" * in the caller context, removing the old values if they were * already present. */ - private void addClientInfoToCallerContext() { + private void addClientInfoToCallerContext(UserGroupInformation ugi) { CallerContext ctx = CallerContext.getCurrent(); String origContext = ctx == null ? null : ctx.getContext(); byte[] origSignature = ctx == null ? null : ctx.getSignature(); + String realUser = null; + if (ugi.getRealUser() != null) { + realUser = ugi.getRealUser().getUserName(); + } CallerContext.Builder builder = new CallerContext.Builder("", contextFieldSeparator) .append(CallerContext.CLIENT_IP_STR, Server.getRemoteAddress()) @@ -644,6 +648,7 @@ public class RouterRpcClient { StringUtils.byteToHexString(Server.getClientId())) .append(CallerContext.CLIENT_CALL_ID_STR, Integer.toString(Server.getCallId())) + .append(CallerContext.REAL_USER_STR, realUser) .setSignature(origSignature); // Append the original caller context if (origContext != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/DistributedSQLCounter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/DistributedSQLCounter.java new file mode 100644 index 00000000000..14b232783f5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/DistributedSQLCounter.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.router.security.token; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Distributed counter that relies on a SQL database to synchronize + * between multiple clients. This expects a table with a single int field + * to exist in the database. One record must exist on the table at all times, + * representing the last used value reserved by a client. + */ +public class DistributedSQLCounter { + private static final Logger LOG = + LoggerFactory.getLogger(DistributedSQLCounter.class); + + private final String field; + private final String table; + private final SQLConnectionFactory connectionFactory; + + public DistributedSQLCounter(String field, String table, + SQLConnectionFactory connectionFactory) { + this.field = field; + this.table = table; + this.connectionFactory = connectionFactory; + } + + /** + * Obtains the value of the counter. + * @return counter value. + */ + public int selectCounterValue() throws SQLException { + try (Connection connection = connectionFactory.getConnection()) { + return selectCounterValue(false, connection); + } + } + + private int selectCounterValue(boolean forUpdate, Connection connection) throws SQLException { + String query = String.format("SELECT %s FROM %s %s", field, table, + forUpdate ? "FOR UPDATE" : ""); + LOG.debug("Select counter statement: " + query); + try (Statement statement = connection.createStatement(); + ResultSet result = statement.executeQuery(query)) { + if (result.next()) { + return result.getInt(field); + } else { + throw new IllegalStateException("Counter table not initialized: " + table); + } + } + } + + /** + * Sets the counter to the given value. + * @param value Value to assign to counter. + */ + public void updateCounterValue(int value) throws SQLException { + try (Connection connection = connectionFactory.getConnection(true)) { + updateCounterValue(value, connection); + } + } + + /** + * Sets the counter to the given value. + * @param connection Connection to database hosting the counter table. + * @param value Value to assign to counter. + */ + public void updateCounterValue(int value, Connection connection) throws SQLException { + String queryText = String.format("UPDATE %s SET %s = ?", table, field); + LOG.debug("Update counter statement: " + queryText + ". Value: " + value); + try (PreparedStatement statement = connection.prepareStatement(queryText)) { + statement.setInt(1, value); + statement.execute(); + } + } + + /** + * Increments the counter by the given amount and + * returns the previous counter value. + * @param amount Amount to increase the counter. + * @return Previous counter value. + */ + public int incrementCounterValue(int amount) throws SQLException { + // Disabling auto-commit to ensure that all statements on this transaction + // are committed at once. + try (Connection connection = connectionFactory.getConnection(false)) { + // Preventing dirty reads and non-repeatable reads to ensure that the + // value read will not be updated by a different connection. + if (connection.getTransactionIsolation() < Connection.TRANSACTION_REPEATABLE_READ) { + connection.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ); + } + + try { + // Reading the counter value "FOR UPDATE" to lock the value record, + // forcing other connections to wait until this transaction is committed. + int lastValue = selectCounterValue(true, connection); + + // Calculate the new counter value and handling overflow by + // resetting the counter to 0. + int newValue = lastValue + amount; + if (newValue < 0) { + lastValue = 0; + newValue = amount; + } + + updateCounterValue(newValue, connection); + connection.commit(); + return lastValue; + } catch (Exception e) { + // Rollback transaction to release table locks + connection.rollback(); + throw e; + } + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/HikariDataSourceConnectionFactory.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/HikariDataSourceConnectionFactory.java new file mode 100644 index 00000000000..5510e9f54b9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/HikariDataSourceConnectionFactory.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.router.security.token; + +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.delegation.SQLDelegationTokenSecretManager; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Properties; + +/** + * Class that relies on a HikariDataSource to provide SQL connections. + */ +class HikariDataSourceConnectionFactory implements SQLConnectionFactory { + protected final static String HIKARI_PROPS = SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX + + "connection.hikari."; + private final HikariDataSource dataSource; + + HikariDataSourceConnectionFactory(Configuration conf) { + Properties properties = new Properties(); + properties.setProperty("jdbcUrl", conf.get(CONNECTION_URL)); + properties.setProperty("username", conf.get(CONNECTION_USERNAME)); + properties.setProperty("password", conf.get(CONNECTION_PASSWORD)); + properties.setProperty("driverClassName", conf.get(CONNECTION_DRIVER)); + + // Include hikari connection properties + properties.putAll(conf.getPropsWithPrefix(HIKARI_PROPS)); + + HikariConfig hikariConfig = new HikariConfig(properties); + this.dataSource = new HikariDataSource(hikariConfig); + } + + @Override + public Connection getConnection() throws SQLException { + return dataSource.getConnection(); + } + + @Override + public void shutdown() { + // Close database connections + dataSource.close(); + } + + @VisibleForTesting + HikariDataSource getDataSource() { + return dataSource; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLConnectionFactory.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLConnectionFactory.java new file mode 100644 index 00000000000..a464cc81968 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLConnectionFactory.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.router.security.token; + +import com.mysql.cj.jdbc.MysqlDataSource; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Properties; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.delegation.SQLDelegationTokenSecretManager; + + +/** + * Interface to provide SQL connections to the {@link SQLDelegationTokenSecretManagerImpl}. + */ +public interface SQLConnectionFactory { + String CONNECTION_URL = SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX + + "connection.url"; + String CONNECTION_USERNAME = SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX + + "connection.username"; + String CONNECTION_PASSWORD = SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX + + "connection.password"; + String CONNECTION_DRIVER = SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX + + "connection.driver"; + + Connection getConnection() throws SQLException; + void shutdown(); + + default Connection getConnection(boolean autocommit) throws SQLException { + Connection connection = getConnection(); + connection.setAutoCommit(autocommit); + return connection; + } +} + diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLDelegationTokenSecretManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLDelegationTokenSecretManagerImpl.java new file mode 100644 index 00000000000..7da54778f31 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLDelegationTokenSecretManagerImpl.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.router.security.token; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; +import org.apache.hadoop.security.token.delegation.SQLDelegationTokenSecretManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * An implementation of {@link SQLDelegationTokenSecretManager} that + * persists TokenIdentifiers and DelegationKeys in a SQL database. + * This implementation relies on the Datanucleus JDO PersistenceManager, which + * can be configured with datanucleus.* configuration properties. + */ +public class SQLDelegationTokenSecretManagerImpl + extends SQLDelegationTokenSecretManager { + + private static final Logger LOG = + LoggerFactory.getLogger(SQLDelegationTokenSecretManagerImpl.class); + private static final String SEQ_NUM_COUNTER_FIELD = "sequenceNum"; + private static final String SEQ_NUM_COUNTER_TABLE = "LastSequenceNum"; + private static final String KEY_ID_COUNTER_FIELD = "keyId"; + private static final String KEY_ID_COUNTER_TABLE = "LastDelegationKeyId"; + + private final SQLConnectionFactory connectionFactory; + private final DistributedSQLCounter sequenceNumCounter; + private final DistributedSQLCounter delegationKeyIdCounter; + private final SQLSecretManagerRetriableHandler retryHandler; + + public SQLDelegationTokenSecretManagerImpl(Configuration conf) { + this(conf, new HikariDataSourceConnectionFactory(conf), + SQLSecretManagerRetriableHandlerImpl.getInstance(conf)); + } + + public SQLDelegationTokenSecretManagerImpl(Configuration conf, + SQLConnectionFactory connectionFactory, SQLSecretManagerRetriableHandler retryHandler) { + super(conf); + + this.connectionFactory = connectionFactory; + this.sequenceNumCounter = new DistributedSQLCounter(SEQ_NUM_COUNTER_FIELD, + SEQ_NUM_COUNTER_TABLE, connectionFactory); + this.delegationKeyIdCounter = new DistributedSQLCounter(KEY_ID_COUNTER_FIELD, + KEY_ID_COUNTER_TABLE, connectionFactory); + this.retryHandler = retryHandler; + + try { + super.startThreads(); + } catch (IOException e) { + throw new RuntimeException("Error starting threads for MySQL secret manager", e); + } + + LOG.info("MySQL delegation token secret manager instantiated"); + } + + @Override + public DelegationTokenIdentifier createIdentifier() { + return new DelegationTokenIdentifier(); + } + + @Override + public void stopThreads() { + super.stopThreads(); + connectionFactory.shutdown(); + } + + @Override + protected void insertToken(int sequenceNum, byte[] tokenIdentifier, byte[] tokenInfo) + throws SQLException { + retryHandler.execute(() -> { + try (Connection connection = connectionFactory.getConnection(true); + PreparedStatement statement = connection.prepareStatement( + "INSERT INTO Tokens (sequenceNum, tokenIdentifier, tokenInfo) VALUES (?, ?, ?)")) { + statement.setInt(1, sequenceNum); + statement.setBytes(2, tokenIdentifier); + statement.setBytes(3, tokenInfo); + statement.execute(); + } + }); + } + + @Override + protected void updateToken(int sequenceNum, byte[] tokenIdentifier, byte[] tokenInfo) + throws SQLException { + retryHandler.execute(() -> { + try (Connection connection = connectionFactory.getConnection(true); + PreparedStatement statement = connection.prepareStatement( + "UPDATE Tokens SET tokenInfo = ? WHERE sequenceNum = ? AND tokenIdentifier = ?")) { + statement.setBytes(1, tokenInfo); + statement.setInt(2, sequenceNum); + statement.setBytes(3, tokenIdentifier); + statement.execute(); + } + }); + } + + @Override + protected void deleteToken(int sequenceNum, byte[] tokenIdentifier) throws SQLException { + retryHandler.execute(() -> { + try (Connection connection = connectionFactory.getConnection(true); + PreparedStatement statement = connection.prepareStatement( + "DELETE FROM Tokens WHERE sequenceNum = ? AND tokenIdentifier = ?")) { + statement.setInt(1, sequenceNum); + statement.setBytes(2, tokenIdentifier); + statement.execute(); + } + }); + } + + @Override + protected byte[] selectTokenInfo(int sequenceNum, byte[] tokenIdentifier) throws SQLException { + return retryHandler.execute(() -> { + try (Connection connection = connectionFactory.getConnection(); + PreparedStatement statement = connection.prepareStatement( + "SELECT tokenInfo FROM Tokens WHERE sequenceNum = ? AND tokenIdentifier = ?")) { + statement.setInt(1, sequenceNum); + statement.setBytes(2, tokenIdentifier); + try (ResultSet result = statement.executeQuery()) { + if (result.next()) { + return result.getBytes("tokenInfo"); + } + } + } + return null; + }); + } + + @Override + protected void insertDelegationKey(int keyId, byte[] delegationKey) throws SQLException { + retryHandler.execute(() -> { + try (Connection connection = connectionFactory.getConnection(true); + PreparedStatement statement = connection.prepareStatement( + "INSERT INTO DelegationKeys (keyId, delegationKey) VALUES (?, ?)")) { + statement.setInt(1, keyId); + statement.setBytes(2, delegationKey); + statement.execute(); + } + }); + } + + @Override + protected void updateDelegationKey(int keyId, byte[] delegationKey) throws SQLException { + retryHandler.execute(() -> { + try (Connection connection = connectionFactory.getConnection(true); + PreparedStatement statement = connection.prepareStatement( + "UPDATE DelegationKeys SET delegationKey = ? WHERE keyId = ?")) { + statement.setBytes(1, delegationKey); + statement.setInt(2, keyId); + statement.execute(); + } + }); + } + + @Override + protected void deleteDelegationKey(int keyId) throws SQLException { + retryHandler.execute(() -> { + try (Connection connection = connectionFactory.getConnection(true); + PreparedStatement statement = connection.prepareStatement( + "DELETE FROM DelegationKeys WHERE keyId = ?")) { + statement.setInt(1, keyId); + statement.execute(); + } + }); + } + + @Override + protected byte[] selectDelegationKey(int keyId) throws SQLException { + return retryHandler.execute(() -> { + try (Connection connection = connectionFactory.getConnection(); + PreparedStatement statement = connection.prepareStatement( + "SELECT delegationKey FROM DelegationKeys WHERE keyId = ?")) { + statement.setInt(1, keyId); + try (ResultSet result = statement.executeQuery()) { + if (result.next()) { + return result.getBytes("delegationKey"); + } + } + } + return null; + }); + } + + @Override + protected int selectSequenceNum() throws SQLException { + return retryHandler.execute(() -> sequenceNumCounter.selectCounterValue()); + } + + @Override + protected void updateSequenceNum(int value) throws SQLException { + retryHandler.execute(() -> sequenceNumCounter.updateCounterValue(value)); + } + + @Override + protected int incrementSequenceNum(int amount) throws SQLException { + return retryHandler.execute(() -> sequenceNumCounter.incrementCounterValue(amount)); + } + + @Override + protected int selectKeyId() throws SQLException { + return retryHandler.execute(delegationKeyIdCounter::selectCounterValue); + } + + @Override + protected void updateKeyId(int value) throws SQLException { + retryHandler.execute(() -> delegationKeyIdCounter.updateCounterValue(value)); + } + + @Override + protected int incrementKeyId(int amount) throws SQLException { + return retryHandler.execute(() -> delegationKeyIdCounter.incrementCounterValue(amount)); + } + + @VisibleForTesting + protected SQLConnectionFactory getConnectionFactory() { + return connectionFactory; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLSecretManagerRetriableHandler.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLSecretManagerRetriableHandler.java new file mode 100644 index 00000000000..16151226217 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLSecretManagerRetriableHandler.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.router.security.token; + +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.security.token.delegation.SQLDelegationTokenSecretManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Interface to handle retries when {@link SQLDelegationTokenSecretManagerImpl} + * throws expected errors. + */ +public interface SQLSecretManagerRetriableHandler { + void execute(SQLCommandVoid command) throws SQLException; + T execute(SQLCommand command) throws SQLException; + + @FunctionalInterface + interface SQLCommandVoid { + void doCall() throws SQLException; + } + + @FunctionalInterface + interface SQLCommand { + T doCall() throws SQLException; + } +} + +/** + * Implementation of {@link SQLSecretManagerRetriableHandler} that uses a + * {@link RetryProxy} to simplify the retryable operations. + */ +class SQLSecretManagerRetriableHandlerImpl implements SQLSecretManagerRetriableHandler { + public final static String MAX_RETRIES = + SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX + "max-retries"; + public final static int MAX_RETRIES_DEFAULT = 0; + public final static String RETRY_SLEEP_TIME_MS = + SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX + "retry-sleep-time-ms"; + public final static long RETRY_SLEEP_TIME_MS_DEFAULT = 100; + + private static final Logger LOG = + LoggerFactory.getLogger(SQLSecretManagerRetriableHandlerImpl.class); + + static SQLSecretManagerRetriableHandler getInstance(Configuration conf) { + return getInstance(conf, new SQLSecretManagerRetriableHandlerImpl()); + } + + static SQLSecretManagerRetriableHandler getInstance(Configuration conf, + SQLSecretManagerRetriableHandlerImpl retryHandler) { + RetryPolicy basePolicy = RetryPolicies.exponentialBackoffRetry( + conf.getInt(MAX_RETRIES, MAX_RETRIES_DEFAULT), + conf.getLong(RETRY_SLEEP_TIME_MS, RETRY_SLEEP_TIME_MS_DEFAULT), + TimeUnit.MILLISECONDS); + + // Configure SQLSecretManagerRetriableException to retry with exponential backoff + Map, RetryPolicy> exceptionToPolicyMap = new HashMap<>(); + exceptionToPolicyMap.put(SQLSecretManagerRetriableException.class, basePolicy); + + // Configure all other exceptions to fail after one attempt + RetryPolicy retryPolicy = RetryPolicies.retryByException( + RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); + + return (SQLSecretManagerRetriableHandler) RetryProxy.create( + SQLSecretManagerRetriableHandler.class, retryHandler, retryPolicy); + } + + /** + * Executes a SQL command and raises retryable errors as + * {@link SQLSecretManagerRetriableException}s so they are recognized by the + * {@link RetryProxy}. + * @param command SQL command to execute + * @throws SQLException When SQL connection errors occur + */ + @Override + public void execute(SQLCommandVoid command) throws SQLException { + try { + command.doCall(); + } catch (SQLException e) { + LOG.warn("Failed to execute SQL command", e); + throw new SQLSecretManagerRetriableException(e); + } + } + + /** + * Executes a SQL command and raises retryable errors as + * {@link SQLSecretManagerRetriableException}s so they are recognized by the + * {@link RetryProxy}. + * @param command SQL command to execute + * @throws SQLException When SQL connection errors occur + */ + @Override + public T execute(SQLCommand command) throws SQLException { + try { + return command.doCall(); + } catch (SQLException e) { + LOG.warn("Failed to execute SQL command", e); + throw new SQLSecretManagerRetriableException(e); + } + } + + /** + * Class used to identify errors that can be retried. + */ + static class SQLSecretManagerRetriableException extends SQLException { + SQLSecretManagerRetriableException(Throwable cause) { + super(cause); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index cd98b635b50..d5e7f7173fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -39,6 +39,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.lang.reflect.Method; import java.net.URISyntaxException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; @@ -218,6 +219,14 @@ public class TestRouterRpc { cluster.setIndependentDNs(); Configuration conf = new Configuration(); + // Setup proxy users. + conf.set("hadoop.proxyuser.testRealUser.groups", "*"); + conf.set("hadoop.proxyuser.testRealUser.hosts", "*"); + String loginUser = UserGroupInformation.getLoginUser().getUserName(); + conf.set(String.format("hadoop.proxyuser.%s.groups", loginUser), "*"); + conf.set(String.format("hadoop.proxyuser.%s.hosts", loginUser), "*"); + // Enable IP proxy users. + conf.set(DFSConfigKeys.DFS_NAMENODE_IP_PROXY_USERS, "placeholder"); conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 5); cluster.addNamenodeOverrides(conf); // Start NNs and DNs and wait until ready @@ -2077,6 +2086,38 @@ public class TestRouterRpc { assertTrue(verifyFileExists(routerFS, dirPath)); } + @Test + public void testRealUserPropagationInCallerContext() + throws IOException, InterruptedException { + GenericTestUtils.LogCapturer auditlog = + GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.AUDIT_LOG); + + // Current callerContext is null + assertNull(CallerContext.getCurrent()); + + UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); + UserGroupInformation realUser = UserGroupInformation + .createUserForTesting("testRealUser", new String[]{"group"}); + UserGroupInformation proxyUser = UserGroupInformation + .createProxyUser("testProxyUser", realUser); + FileSystem proxyFs = proxyUser.doAs( + (PrivilegedExceptionAction) () -> router.getFileSystem()); + proxyFs.listStatus(new Path("/")); + + + final String logOutput = auditlog.getOutput(); + // Login user, which is used as the router's user, is different from the realUser. + assertNotEquals(loginUser.getUserName(), realUser.getUserName()); + // Login user is used in the audit log's ugi field. + assertTrue("The login user is the proxyUser in the UGI field", + logOutput.contains(String.format("ugi=%s (auth:PROXY) via %s (auth:SIMPLE)", + proxyUser.getUserName(), + loginUser.getUserName()))); + // Real user is added to the caller context. + assertTrue("The audit log should contain the real user.", + logOutput.contains(String.format("realUser:%s", realUser.getUserName()))); + } + @Test public void testSetBalancerBandwidth() throws Exception { long defaultBandwidth = diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java new file mode 100644 index 00000000000..569a274042b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.router.security.token; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + + +public class TestSQLDelegationTokenSecretManagerImpl { + private static final String CONNECTION_URL = "jdbc:derby:memory:TokenStore"; + private static final int TEST_MAX_RETRIES = 3; + private static Configuration conf; + + @Before + public void init() throws SQLException { + createTestDBTables(); + } + + @After + public void cleanup() throws SQLException { + dropTestDBTables(); + } + + @BeforeClass + public static void initDatabase() throws SQLException { + DriverManager.getConnection(CONNECTION_URL + ";create=true"); + + conf = new Configuration(); + conf.set(SQLConnectionFactory.CONNECTION_URL, CONNECTION_URL); + conf.set(SQLConnectionFactory.CONNECTION_USERNAME, "testuser"); + conf.set(SQLConnectionFactory.CONNECTION_PASSWORD, "testpassword"); + conf.set(SQLConnectionFactory.CONNECTION_DRIVER, "org.apache.derby.jdbc.EmbeddedDriver"); + conf.setInt(SQLSecretManagerRetriableHandlerImpl.MAX_RETRIES, TEST_MAX_RETRIES); + conf.setInt(SQLSecretManagerRetriableHandlerImpl.RETRY_SLEEP_TIME_MS, 10); + } + + @AfterClass + public static void cleanupDatabase() { + try { + DriverManager.getConnection(CONNECTION_URL + ";drop=true"); + } catch (SQLException e) { + // SQLException expected when database is dropped + if (!e.getMessage().contains("dropped")) { + throw new RuntimeException(e); + } + } + } + + @Test + public void testSingleSecretManager() throws Exception { + DelegationTokenManager tokenManager = createTokenManager(); + try { + Token token = + tokenManager.createToken(UserGroupInformation.getCurrentUser(), "foo"); + validateToken(tokenManager, token); + } finally { + stopTokenManager(tokenManager); + } + } + + @Test + public void testMultipleSecretManagers() throws Exception { + DelegationTokenManager tokenManager1 = createTokenManager(); + DelegationTokenManager tokenManager2 = createTokenManager(); + + try { + Token token1 = + tokenManager1.createToken(UserGroupInformation.getCurrentUser(), "foo"); + Token token2 = + tokenManager2.createToken(UserGroupInformation.getCurrentUser(), "foo"); + + validateToken(tokenManager1, token2); + validateToken(tokenManager2, token1); + } finally { + stopTokenManager(tokenManager1); + stopTokenManager(tokenManager2); + } + } + + @Test + public void testSequenceNumAllocation() throws Exception { + int tokensPerManager = SQLDelegationTokenSecretManagerImpl.DEFAULT_SEQ_NUM_BATCH_SIZE * 5; + Set sequenceNums1 = new HashSet<>(); + Set sequenceNums2 = new HashSet<>(); + Set sequenceNums3 = new HashSet<>(); + Set sequenceNums = new HashSet<>(); + DelegationTokenManager tokenManager1 = createTokenManager(); + DelegationTokenManager tokenManager2 = createTokenManager(); + DelegationTokenManager tokenManager3 = createTokenManager(); + + try { + for (int i = 0; i < tokensPerManager; i++) { + allocateSequenceNum(tokenManager1, sequenceNums1); + allocateSequenceNum(tokenManager2, sequenceNums2); + allocateSequenceNum(tokenManager3, sequenceNums3); + sequenceNums.addAll(sequenceNums1); + sequenceNums.addAll(sequenceNums2); + sequenceNums.addAll(sequenceNums3); + } + + Assert.assertEquals("Verify that all tokens were created with unique sequence numbers", + tokensPerManager * 3, sequenceNums.size()); + Assert.assertEquals("Verify that tokenManager1 generated unique sequence numbers", + tokensPerManager, sequenceNums1.size()); + Assert.assertEquals("Verify that tokenManager2 generated unique sequence number", + tokensPerManager, sequenceNums2.size()); + Assert.assertEquals("Verify that tokenManager3 generated unique sequence numbers", + tokensPerManager, sequenceNums3.size()); + + // Validate sequence number batches allocated in order to each token manager + int batchSize = SQLDelegationTokenSecretManagerImpl.DEFAULT_SEQ_NUM_BATCH_SIZE; + for (int seqNum = 1; seqNum < tokensPerManager;) { + // First batch allocated tokenManager1 + for (int i = 0; i < batchSize; i++, seqNum++) { + Assert.assertTrue(sequenceNums1.contains(seqNum)); + } + // Second batch allocated tokenManager2 + for (int i = 0; i < batchSize; i++, seqNum++) { + Assert.assertTrue(sequenceNums2.contains(seqNum)); + } + // Third batch allocated tokenManager3 + for (int i = 0; i < batchSize; i++, seqNum++) { + Assert.assertTrue(sequenceNums3.contains(seqNum)); + } + } + + SQLDelegationTokenSecretManagerImpl secretManager = + (SQLDelegationTokenSecretManagerImpl) tokenManager1.getDelegationTokenSecretManager(); + Assert.assertEquals("Verify that the counter is set to the highest sequence number", + tokensPerManager * 3, secretManager.getDelegationTokenSeqNum()); + } finally { + stopTokenManager(tokenManager1); + stopTokenManager(tokenManager2); + stopTokenManager(tokenManager3); + } + } + + @Test + public void testSequenceNumRollover() throws Exception { + int tokenBatch = SQLDelegationTokenSecretManagerImpl.DEFAULT_SEQ_NUM_BATCH_SIZE; + Set sequenceNums = new HashSet<>(); + + DelegationTokenManager tokenManager = createTokenManager(); + + try { + SQLDelegationTokenSecretManagerImpl secretManager = + (SQLDelegationTokenSecretManagerImpl) tokenManager.getDelegationTokenSecretManager(); + secretManager.setDelegationTokenSeqNum(Integer.MAX_VALUE - tokenBatch); + + // Allocate sequence numbers before they are rolled over + for (int seqNum = Integer.MAX_VALUE - tokenBatch; seqNum < Integer.MAX_VALUE; seqNum++) { + allocateSequenceNum(tokenManager, sequenceNums); + Assert.assertTrue(sequenceNums.contains(seqNum + 1)); + } + + // Allocate sequence numbers after they are rolled over + for (int seqNum = 0; seqNum < tokenBatch; seqNum++) { + allocateSequenceNum(tokenManager, sequenceNums); + Assert.assertTrue(sequenceNums.contains(seqNum + 1)); + } + } finally { + stopTokenManager(tokenManager); + } + } + + @Test + public void testDelegationKeyAllocation() throws Exception { + DelegationTokenManager tokenManager1 = createTokenManager(); + + try { + SQLDelegationTokenSecretManagerImpl secretManager1 = + (SQLDelegationTokenSecretManagerImpl) tokenManager1.getDelegationTokenSecretManager(); + // Prevent delegation keys to roll for the rest of the test to avoid race conditions + // between the keys generated and the active keys in the database. + ((TestDelegationTokenSecretManager) secretManager1).lockKeyRoll(); + int keyId1 = secretManager1.getCurrentKeyId(); + + // Validate that latest key1 is assigned to tokenManager1 tokens + Token token1 = + tokenManager1.createToken(UserGroupInformation.getCurrentUser(), "foo"); + validateKeyId(token1, keyId1); + + DelegationTokenManager tokenManager2 = createTokenManager(); + try { + SQLDelegationTokenSecretManagerImpl secretManager2 = + (SQLDelegationTokenSecretManagerImpl) tokenManager2.getDelegationTokenSecretManager(); + // Prevent delegation keys to roll for the rest of the test + ((TestDelegationTokenSecretManager) secretManager2).lockKeyRoll(); + int keyId2 = secretManager2.getCurrentKeyId(); + + Assert.assertNotEquals("Each secret manager has its own key", keyId1, keyId2); + + // Validate that latest key2 is assigned to tokenManager2 tokens + Token token2 = + tokenManager2.createToken(UserGroupInformation.getCurrentUser(), "foo"); + validateKeyId(token2, keyId2); + + // Validate that key1 is still assigned to tokenManager1 tokens + token1 = tokenManager1.createToken(UserGroupInformation.getCurrentUser(), "foo"); + validateKeyId(token1, keyId1); + + // Validate that key2 is still assigned to tokenManager2 tokens + token2 = tokenManager2.createToken(UserGroupInformation.getCurrentUser(), "foo"); + validateKeyId(token2, keyId2); + } finally { + stopTokenManager(tokenManager2); + } + } finally { + stopTokenManager(tokenManager1); + } + } + + @Test + public void testHikariConfigs() { + HikariDataSourceConnectionFactory factory1 = new HikariDataSourceConnectionFactory(conf); + int defaultMaximumPoolSize = factory1.getDataSource().getMaximumPoolSize(); + factory1.shutdown(); + + // Changing default maximumPoolSize + Configuration hikariConf = new Configuration(conf); + hikariConf.setInt(HikariDataSourceConnectionFactory.HIKARI_PROPS + "maximumPoolSize", + defaultMaximumPoolSize + 1); + + // Verifying property is correctly set in datasource + HikariDataSourceConnectionFactory factory2 = new HikariDataSourceConnectionFactory(hikariConf); + Assert.assertEquals(factory2.getDataSource().getMaximumPoolSize(), + defaultMaximumPoolSize + 1); + factory2.shutdown(); + } + + @Test + public void testRetries() throws Exception { + DelegationTokenManager tokenManager = createTokenManager(); + TestDelegationTokenSecretManager secretManager = + (TestDelegationTokenSecretManager) tokenManager.getDelegationTokenSecretManager(); + + try { + // Prevent delegation keys to roll for the rest of the test + secretManager.lockKeyRoll(); + + // Reset counter and expect a single request when inserting a token + TestRetryHandler.resetExecutionAttemptCounter(); + tokenManager.createToken(UserGroupInformation.getCurrentUser(), "foo"); + Assert.assertEquals(1, TestRetryHandler.getExecutionAttempts()); + + // Breaking database connections to cause retries + secretManager.setReadOnly(true); + + // Reset counter and expect a multiple retries when failing to insert a token + TestRetryHandler.resetExecutionAttemptCounter(); + tokenManager.createToken(UserGroupInformation.getCurrentUser(), "foo"); + Assert.assertEquals(TEST_MAX_RETRIES + 1, TestRetryHandler.getExecutionAttempts()); + } finally { + // Fix database connections + secretManager.setReadOnly(false); + stopTokenManager(tokenManager); + } + } + + private DelegationTokenManager createTokenManager() { + DelegationTokenManager tokenManager = new DelegationTokenManager(new Configuration(), null); + tokenManager.setExternalDelegationTokenSecretManager(new TestDelegationTokenSecretManager()); + return tokenManager; + } + + private void allocateSequenceNum(DelegationTokenManager tokenManager, Set sequenceNums) + throws IOException { + Token token = + tokenManager.createToken(UserGroupInformation.getCurrentUser(), "foo"); + AbstractDelegationTokenIdentifier tokenIdentifier = token.decodeIdentifier(); + Assert.assertFalse("Verify sequence number is unique", + sequenceNums.contains(tokenIdentifier.getSequenceNumber())); + + sequenceNums.add(tokenIdentifier.getSequenceNumber()); + } + + private void validateToken(DelegationTokenManager tokenManager, + Token token) + throws Exception { + SQLDelegationTokenSecretManagerImpl secretManager = + (SQLDelegationTokenSecretManagerImpl) tokenManager.getDelegationTokenSecretManager(); + AbstractDelegationTokenIdentifier tokenIdentifier = token.decodeIdentifier(); + + // Verify token using token manager + tokenManager.verifyToken(token); + + byte[] tokenInfo1 = secretManager.selectTokenInfo(tokenIdentifier.getSequenceNumber(), + tokenIdentifier.getBytes()); + Assert.assertNotNull("Verify token exists in database", tokenInfo1); + + // Renew token using token manager + tokenManager.renewToken(token, "foo"); + + byte[] tokenInfo2 = secretManager.selectTokenInfo(tokenIdentifier.getSequenceNumber(), + tokenIdentifier.getBytes()); + Assert.assertNotNull("Verify token exists in database", tokenInfo2); + Assert.assertFalse("Verify token has been updated in database", + Arrays.equals(tokenInfo1, tokenInfo2)); + + // Cancel token using token manager + tokenManager.cancelToken(token, "foo"); + byte[] tokenInfo3 = secretManager.selectTokenInfo(tokenIdentifier.getSequenceNumber(), + tokenIdentifier.getBytes()); + Assert.assertNull("Verify token was removed from database", tokenInfo3); + } + + private void validateKeyId(Token token, + int expectedKeyiD) throws IOException { + AbstractDelegationTokenIdentifier tokenIdentifier = token.decodeIdentifier(); + Assert.assertEquals("Verify that keyId is assigned to token", + tokenIdentifier.getMasterKeyId(), expectedKeyiD); + } + + private static Connection getTestDBConnection() { + try { + return DriverManager.getConnection(CONNECTION_URL); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static void createTestDBTables() throws SQLException { + execute("CREATE TABLE Tokens (sequenceNum INT NOT NULL, " + + "tokenIdentifier VARCHAR (255) FOR BIT DATA NOT NULL, " + + "tokenInfo VARCHAR (255) FOR BIT DATA NOT NULL, " + + "modifiedTime TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, " + + "PRIMARY KEY(sequenceNum))"); + execute("CREATE TABLE DelegationKeys (keyId INT NOT NULL, " + + "delegationKey VARCHAR (255) FOR BIT DATA NOT NULL, " + + "modifiedTime TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, " + + "PRIMARY KEY(keyId))"); + execute("CREATE TABLE LastSequenceNum (sequenceNum INT NOT NULL)"); + execute("INSERT INTO LastSequenceNum VALUES (0)"); + execute("CREATE TABLE LastDelegationKeyId (keyId INT NOT NULL)"); + execute("INSERT INTO LastDelegationKeyId VALUES (0)"); + } + + private static void dropTestDBTables() throws SQLException { + execute("DROP TABLE Tokens"); + execute("DROP TABLE DelegationKeys"); + execute("DROP TABLE LastSequenceNum"); + execute("DROP TABLE LastDelegationKeyId"); + } + + private static void execute(String statement) throws SQLException { + try (Connection connection = getTestDBConnection()) { + connection.createStatement().execute(statement); + } + } + + private void stopTokenManager(DelegationTokenManager tokenManager) { + TestDelegationTokenSecretManager secretManager = + (TestDelegationTokenSecretManager) tokenManager.getDelegationTokenSecretManager(); + // Release any locks on tables + secretManager.unlockKeyRoll(); + // Stop threads to close database connections + secretManager.stopThreads(); + } + + static class TestDelegationTokenSecretManager extends SQLDelegationTokenSecretManagerImpl { + private ReentrantLock keyRollLock; + + private synchronized ReentrantLock getKeyRollLock() { + if (keyRollLock == null) { + keyRollLock = new ReentrantLock(); + } + return keyRollLock; + } + + TestDelegationTokenSecretManager() { + super(conf, new TestConnectionFactory(conf), + SQLSecretManagerRetriableHandlerImpl.getInstance(conf, new TestRetryHandler())); + } + + // Tests can call this method to prevent delegation keys from + // being rolled in the middle of a test to prevent race conditions + public void lockKeyRoll() { + getKeyRollLock().lock(); + } + + public void unlockKeyRoll() { + if (getKeyRollLock().isHeldByCurrentThread()) { + getKeyRollLock().unlock(); + } + } + + @Override + protected void rollMasterKey() throws IOException { + try { + lockKeyRoll(); + super.rollMasterKey(); + } finally { + unlockKeyRoll(); + } + } + + public void setReadOnly(boolean readOnly) { + ((TestConnectionFactory) getConnectionFactory()).readOnly = readOnly; + } + } + + static class TestConnectionFactory extends HikariDataSourceConnectionFactory { + private boolean readOnly = false; + TestConnectionFactory(Configuration conf) { + super(conf); + } + + @Override + public Connection getConnection() throws SQLException { + Connection connection = super.getConnection(); + // Change to default schema as derby driver looks for user schema + connection.setSchema("APP"); + connection.setReadOnly(readOnly); + return connection; + } + } + + static class TestRetryHandler extends SQLSecretManagerRetriableHandlerImpl { + // Tracks the amount of times that a SQL command is executed, regardless of + // whether it completed successfully or not. + private static AtomicInteger executionAttemptCounter = new AtomicInteger(); + + static void resetExecutionAttemptCounter() { + executionAttemptCounter = new AtomicInteger(); + } + + static int getExecutionAttempts() { + return executionAttemptCounter.get(); + } + + @Override + public void execute(SQLCommandVoid command) throws SQLException { + executionAttemptCounter.incrementAndGet(); + super.execute(command); + } + } +} diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 9e12edaf55a..550c716d485 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -129,6 +129,8 @@ 1.0-alpha-1 3.3.1 4.0.3 + 10.10.2.0 + 8.0.29 6.2.1.jre7 4.10.0 3.2.0 @@ -1809,6 +1811,16 @@ HikariCP ${hikari.version} + + org.apache.derby + derby + ${derby.version} + + + mysql + mysql-connector-java + ${mysql-connector-java.version} + com.microsoft.sqlserver mssql-jdbc diff --git a/hadoop-project/src/site/markdown/index.md.vm b/hadoop-project/src/site/markdown/index.md.vm index e7ed0fe8066..33c86bbc06e 100644 --- a/hadoop-project/src/site/markdown/index.md.vm +++ b/hadoop-project/src/site/markdown/index.md.vm @@ -62,7 +62,10 @@ possibly in parallel, with results potentially coming in out-of-order. Benchmarking of enhanced Apache ORC and Apache Parquet clients through `file://` and `s3a://` show significant improvements in query performance. -Further Reading: [FsDataInputStream](./hadoop-project-dist/hadoop-common/filesystem/fsdatainputstream.html). +Further Reading: +* [FsDataInputStream](./hadoop-project-dist/hadoop-common/filesystem/fsdatainputstream.html). +* [Hadoop Vectored IO: Your Data Just Got Faster!](https://apachecon.com/acasia2022/sessions/bigdata-1148.html) + Apachecon 2022 talk. Mapreduce: Manifest Committer for Azure ABFS and google GCS ---------------------------------------------------------- @@ -88,14 +91,6 @@ More details are available in the documentation. -HDFS: Router Based Federation ------------------------------ - -A lot of effort has been invested into stabilizing/improving the HDFS Router Based Federation feature. - -1. HDFS-13522, HDFS-16767 & Related Jiras: Allow Observer Reads in HDFS Router Based Federation. -2. HDFS-13248: RBF supports Client Locality - HDFS: Dynamic Datanode Reconfiguration -------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java index 4ba05794a09..14eb8cfb8a6 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java @@ -156,10 +156,7 @@ public class CopyCommitter extends FileOutputCommitter { final boolean directWrite = conf.getBoolean( DistCpOptionSwitch.DIRECT_WRITE.getConfigLabel(), false); - final boolean append = conf.getBoolean( - DistCpOptionSwitch.APPEND.getConfigLabel(), false); - final boolean useTempTarget = !append && !directWrite; - if (!useTempTarget) { + if (directWrite) { return; } diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java index f2dd246db5a..6a537dc6e7d 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java @@ -586,13 +586,11 @@ public class TestCopyCommitter { @Test public void testCommitWithCleanupTempFiles() throws IOException { - testCommitWithCleanup(true, false); - testCommitWithCleanup(false, true); - testCommitWithCleanup(true, true); - testCommitWithCleanup(false, false); + testCommitWithCleanup(true); + testCommitWithCleanup(false); } - private void testCommitWithCleanup(boolean append, boolean directWrite)throws IOException { + private void testCommitWithCleanup(boolean directWrite) throws IOException { TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); JobID jobID = taskAttemptContext.getTaskAttemptID().getJobID(); JobContext jobContext = new JobContextImpl( @@ -611,7 +609,7 @@ public class TestCopyCommitter { DistCpOptions options = new DistCpOptions.Builder( Collections.singletonList(new Path(sourceBase)), new Path("/out")) - .withAppend(append) + .withAppend(true) .withSyncFolder(true) .withDirectWrite(directWrite) .build(); @@ -631,7 +629,7 @@ public class TestCopyCommitter { null, taskAttemptContext); committer.commitJob(jobContext); - if (append || directWrite) { + if (directWrite) { ContractTestUtils.assertPathExists(fs, "Temp files should not be cleanup with append or direct option", tempFilePath); } else { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationStateStore.java index 3ca8ccc2bfb..e65bdf42e2e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationStateStore.java @@ -60,7 +60,21 @@ public interface FederationStateStore extends * Load the version information from the federation state store. * * @return the {@link Version} of the federation state store + * @throws Exception an exception occurred in load version. */ - Version loadVersion(); + Version loadVersion() throws Exception; + /** + * Store the Version information in federation state store. + * + * @throws Exception an exception occurred in store version. + */ + void storeVersion() throws Exception; + + /** + * Check the version of federation stateStore. + * + * @throws Exception an exception occurred in check version. + */ + void checkVersion() throws Exception; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java index b91de3ae808..273e736e887 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.Comparator; +import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.token.delegation.DelegationKey; @@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; @@ -107,7 +109,7 @@ import static org.apache.hadoop.yarn.server.federation.store.utils.FederationSta public class MemoryFederationStateStore implements FederationStateStore { private Map membership; - private Map applications; + private Map applications; private Map reservations; private Map policies; private RouterRMDTSecretManagerState routerRMSecretManagerState; @@ -122,10 +124,10 @@ public class MemoryFederationStateStore implements FederationStateStore { @Override public void init(Configuration conf) { - membership = new ConcurrentHashMap(); - applications = new ConcurrentHashMap(); - reservations = new ConcurrentHashMap(); - policies = new ConcurrentHashMap(); + membership = new ConcurrentHashMap<>(); + applications = new ConcurrentHashMap<>(); + reservations = new ConcurrentHashMap<>(); + policies = new ConcurrentHashMap<>(); routerRMSecretManagerState = new RouterRMDTSecretManagerState(); maxAppsInStateStore = conf.getInt( YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, @@ -143,14 +145,15 @@ public class MemoryFederationStateStore implements FederationStateStore { } @Override - public SubClusterRegisterResponse registerSubCluster( - SubClusterRegisterRequest request) throws YarnException { + public SubClusterRegisterResponse registerSubCluster(SubClusterRegisterRequest request) + throws YarnException { + long startTime = clock.getTime(); + FederationMembershipStateStoreInputValidator.validate(request); SubClusterInfo subClusterInfo = request.getSubClusterInfo(); long currentTime = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis(); - SubClusterInfo subClusterInfoToSave = SubClusterInfo.newInstance(subClusterInfo.getSubClusterId(), subClusterInfo.getAMRMServiceAddress(), @@ -161,18 +164,21 @@ public class MemoryFederationStateStore implements FederationStateStore { subClusterInfo.getCapability()); membership.put(subClusterInfo.getSubClusterId(), subClusterInfoToSave); + long stopTime = clock.getTime(); + + FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime); return SubClusterRegisterResponse.newInstance(); } @Override - public SubClusterDeregisterResponse deregisterSubCluster( - SubClusterDeregisterRequest request) throws YarnException { + public SubClusterDeregisterResponse deregisterSubCluster(SubClusterDeregisterRequest request) + throws YarnException { + FederationMembershipStateStoreInputValidator.validate(request); SubClusterInfo subClusterInfo = membership.get(request.getSubClusterId()); if (subClusterInfo == null) { - String errMsg = - "SubCluster " + request.getSubClusterId().toString() + " not found"; - FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + FederationStateStoreUtils.logAndThrowStoreException( + LOG, "SubCluster %s not found", request.getSubClusterId()); } else { subClusterInfo.setState(request.getState()); } @@ -181,17 +187,16 @@ public class MemoryFederationStateStore implements FederationStateStore { } @Override - public SubClusterHeartbeatResponse subClusterHeartbeat( - SubClusterHeartbeatRequest request) throws YarnException { + public SubClusterHeartbeatResponse subClusterHeartbeat(SubClusterHeartbeatRequest request) + throws YarnException { FederationMembershipStateStoreInputValidator.validate(request); SubClusterId subClusterId = request.getSubClusterId(); SubClusterInfo subClusterInfo = membership.get(subClusterId); if (subClusterInfo == null) { - String errMsg = "SubCluster " + subClusterId.toString() - + " does not exist; cannot heartbeat"; - FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + FederationStateStoreUtils.logAndThrowStoreException( + LOG, "SubCluster %s does not exist; cannot heartbeat.", request.getSubClusterId()); } long currentTime = @@ -205,11 +210,12 @@ public class MemoryFederationStateStore implements FederationStateStore { } @Override - public GetSubClusterInfoResponse getSubCluster( - GetSubClusterInfoRequest request) throws YarnException { + public GetSubClusterInfoResponse getSubCluster(GetSubClusterInfoRequest request) + throws YarnException { FederationMembershipStateStoreInputValidator.validate(request); SubClusterId subClusterId = request.getSubClusterId(); + if (!membership.containsKey(subClusterId)) { LOG.warn("The queried SubCluster: {} does not exist.", subClusterId); return null; @@ -219,16 +225,17 @@ public class MemoryFederationStateStore implements FederationStateStore { } @Override - public GetSubClustersInfoResponse getSubClusters( - GetSubClustersInfoRequest request) throws YarnException { - List result = new ArrayList(); + public GetSubClustersInfoResponse getSubClusters(GetSubClustersInfoRequest request) + throws YarnException { + + List result = new ArrayList<>(); for (SubClusterInfo info : membership.values()) { - if (!request.getFilterInactiveSubClusters() - || info.getState().isActive()) { + if (!request.getFilterInactiveSubClusters() || info.getState().isActive()) { result.add(info); } } + return GetSubClustersInfoResponse.newInstance(result); } @@ -239,16 +246,16 @@ public class MemoryFederationStateStore implements FederationStateStore { AddApplicationHomeSubClusterRequest request) throws YarnException { FederationApplicationHomeSubClusterStoreInputValidator.validate(request); - ApplicationId appId = - request.getApplicationHomeSubCluster().getApplicationId(); + ApplicationHomeSubCluster homeSubCluster = request.getApplicationHomeSubCluster(); + + ApplicationId appId = homeSubCluster.getApplicationId(); if (!applications.containsKey(appId)) { - applications.put(appId, - request.getApplicationHomeSubCluster().getHomeSubCluster()); + applications.put(appId, homeSubCluster); } - return AddApplicationHomeSubClusterResponse - .newInstance(applications.get(appId)); + ApplicationHomeSubCluster respHomeSubCluster = applications.get(appId); + return AddApplicationHomeSubClusterResponse.newInstance(respHomeSubCluster.getHomeSubCluster()); } @Override @@ -256,15 +263,16 @@ public class MemoryFederationStateStore implements FederationStateStore { UpdateApplicationHomeSubClusterRequest request) throws YarnException { FederationApplicationHomeSubClusterStoreInputValidator.validate(request); + ApplicationId appId = request.getApplicationHomeSubCluster().getApplicationId(); + if (!applications.containsKey(appId)) { - String errMsg = "Application " + appId + " does not exist"; - FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Application %s does not exist.", appId); } - applications.put(appId, - request.getApplicationHomeSubCluster().getHomeSubCluster()); + applications.put(appId, request.getApplicationHomeSubCluster()); return UpdateApplicationHomeSubClusterResponse.newInstance(); } @@ -275,11 +283,12 @@ public class MemoryFederationStateStore implements FederationStateStore { FederationApplicationHomeSubClusterStoreInputValidator.validate(request); ApplicationId appId = request.getApplicationId(); if (!applications.containsKey(appId)) { - String errMsg = "Application " + appId + " does not exist"; - FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Application %s does not exist.", appId); } - return GetApplicationHomeSubClusterResponse.newInstance(appId, applications.get(appId)); + return GetApplicationHomeSubClusterResponse.newInstance(appId, + applications.get(appId).getHomeSubCluster()); } @Override @@ -303,7 +312,7 @@ public class MemoryFederationStateStore implements FederationStateStore { } private ApplicationHomeSubCluster generateAppHomeSC(ApplicationId applicationId) { - SubClusterId subClusterId = applications.get(applicationId); + SubClusterId subClusterId = applications.get(applicationId).getHomeSubCluster(); return ApplicationHomeSubCluster.newInstance(applicationId, subClusterId); } @@ -314,8 +323,8 @@ public class MemoryFederationStateStore implements FederationStateStore { FederationApplicationHomeSubClusterStoreInputValidator.validate(request); ApplicationId appId = request.getApplicationId(); if (!applications.containsKey(appId)) { - String errMsg = "Application " + appId + " does not exist"; - FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Application %s does not exist.", appId); } applications.remove(appId); @@ -329,12 +338,11 @@ public class MemoryFederationStateStore implements FederationStateStore { FederationPolicyStoreInputValidator.validate(request); String queue = request.getQueue(); if (!policies.containsKey(queue)) { - LOG.warn("Policy for queue: {} does not exist.", queue); + LOG.warn("Policy for queue : {} does not exist.", queue); return null; } - return GetSubClusterPolicyConfigurationResponse - .newInstance(policies.get(queue)); + return GetSubClusterPolicyConfigurationResponse.newInstance(policies.get(queue)); } @Override @@ -350,8 +358,7 @@ public class MemoryFederationStateStore implements FederationStateStore { @Override public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations( GetSubClusterPoliciesConfigurationsRequest request) throws YarnException { - ArrayList result = - new ArrayList(); + ArrayList result = new ArrayList<>(); for (SubClusterPolicyConfiguration policy : policies.values()) { result.add(policy); } @@ -360,12 +367,22 @@ public class MemoryFederationStateStore implements FederationStateStore { @Override public Version getCurrentVersion() { - return null; + throw new NotImplementedException("Code is not implemented"); } @Override - public Version loadVersion() { - return null; + public Version loadVersion() throws Exception { + throw new NotImplementedException("Code is not implemented"); + } + + @Override + public void storeVersion() throws Exception { + throw new NotImplementedException("Code is not implemented"); + } + + @Override + public void checkVersion() throws Exception { + throw new NotImplementedException("Code is not implemented"); } @Override @@ -386,7 +403,8 @@ public class MemoryFederationStateStore implements FederationStateStore { FederationReservationHomeSubClusterStoreInputValidator.validate(request); ReservationId reservationId = request.getReservationId(); if (!reservations.containsKey(reservationId)) { - throw new YarnException("Reservation " + reservationId + " does not exist"); + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Reservation %s does not exist.", reservationId); } SubClusterId subClusterId = reservations.get(reservationId); ReservationHomeSubCluster homeSubCluster = @@ -417,7 +435,8 @@ public class MemoryFederationStateStore implements FederationStateStore { ReservationId reservationId = request.getReservationHomeSubCluster().getReservationId(); if (!reservations.containsKey(reservationId)) { - throw new YarnException("Reservation " + reservationId + " does not exist."); + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Reservation %s does not exist.", reservationId); } SubClusterId subClusterId = request.getReservationHomeSubCluster().getHomeSubCluster(); @@ -431,7 +450,8 @@ public class MemoryFederationStateStore implements FederationStateStore { FederationReservationHomeSubClusterStoreInputValidator.validate(request); ReservationId reservationId = request.getReservationId(); if (!reservations.containsKey(reservationId)) { - throw new YarnException("Reservation " + reservationId + " does not exist"); + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Reservation %s does not exist.", reservationId); } reservations.remove(reservationId); return DeleteReservationHomeSubClusterResponse.newInstance(); @@ -446,9 +466,8 @@ public class MemoryFederationStateStore implements FederationStateStore { Set rmDTMasterKeyState = routerRMSecretManagerState.getMasterKeyState(); if (rmDTMasterKeyState.contains(delegationKey)) { - LOG.info("Error storing info for RMDTMasterKey with keyID: {}.", delegationKey.getKeyId()); - throw new IOException("RMDTMasterKey with keyID: " + delegationKey.getKeyId() + - " is already stored"); + FederationStateStoreUtils.logAndThrowStoreException(LOG, + "Error storing info for RMDTMasterKey with keyID: %s.", delegationKey.getKeyId()); } routerRMSecretManagerState.getMasterKeyState().add(delegationKey); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java index f16fe673ce3..e1fc3f2a47e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java @@ -997,7 +997,17 @@ public class SQLFederationStateStore implements FederationStateStore { } @Override - public Version loadVersion() { + public Version loadVersion() throws Exception { + throw new NotImplementedException("Code is not implemented"); + } + + @Override + public void storeVersion() throws Exception { + throw new NotImplementedException("Code is not implemented"); + } + + @Override + public void checkVersion() throws Exception { throw new NotImplementedException("Code is not implemented"); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java index 95903b81d18..536faa31dca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java @@ -30,6 +30,7 @@ import java.util.TimeZone; import java.util.Comparator; import java.util.stream.Collectors; +import org.apache.commons.lang3.NotImplementedException; import org.apache.curator.framework.recipes.shared.SharedCount; import org.apache.curator.framework.recipes.shared.VersionedValue; import org.apache.hadoop.classification.VisibleForTesting; @@ -642,12 +643,22 @@ public class ZookeeperFederationStateStore implements FederationStateStore { @Override public Version getCurrentVersion() { - return null; + throw new NotImplementedException("Code is not implemented"); } @Override - public Version loadVersion() { - return null; + public Version loadVersion() throws Exception { + throw new NotImplementedException("Code is not implemented"); + } + + @Override + public void storeVersion() throws Exception { + throw new NotImplementedException("Code is not implemented"); + } + + @Override + public void checkVersion() throws Exception { + throw new NotImplementedException("Code is not implemented"); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java index c93115ccfd3..b93763583d7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java @@ -26,6 +26,7 @@ import java.util.Set; import java.util.HashSet; import java.util.TimeZone; +import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.delegation.DelegationKey; @@ -164,12 +165,9 @@ public abstract class FederationStateStoreBaseTest { SubClusterDeregisterRequest deregisterRequest = SubClusterDeregisterRequest .newInstance(subClusterId, SubClusterState.SC_UNREGISTERED); - try { - stateStore.deregisterSubCluster(deregisterRequest); - Assert.fail(); - } catch (FederationStateStoreException e) { - Assert.assertTrue(e.getMessage().startsWith("SubCluster SC not found")); - } + + LambdaTestUtils.intercept(YarnException.class, + "SubCluster SC not found", () -> stateStore.deregisterSubCluster(deregisterRequest)); } @Test @@ -266,13 +264,9 @@ public abstract class FederationStateStoreBaseTest { SubClusterHeartbeatRequest heartbeatRequest = SubClusterHeartbeatRequest .newInstance(subClusterId, SubClusterState.SC_RUNNING, "capability"); - try { - stateStore.subClusterHeartbeat(heartbeatRequest); - Assert.fail(); - } catch (FederationStateStoreException e) { - Assert.assertTrue(e.getMessage() - .startsWith("SubCluster SC does not exist; cannot heartbeat")); - } + LambdaTestUtils.intercept(YarnException.class, + "SubCluster SC does not exist; cannot heartbeat", + () -> stateStore.subClusterHeartbeat(heartbeatRequest)); } // Test FederationApplicationHomeSubClusterStore @@ -1050,4 +1044,24 @@ public abstract class FederationStateStoreBaseTest { checkRouterStoreToken(identifier, getStoreTokenResp); } + + @Test(expected = NotImplementedException.class) + public void testGetCurrentVersion() { + stateStore.getCurrentVersion(); + } + + @Test(expected = NotImplementedException.class) + public void testStoreVersion() throws Exception { + stateStore.storeVersion(); + } + + @Test(expected = NotImplementedException.class) + public void testLoadVersion() throws Exception { + stateStore.loadVersion(); + } + + @Test(expected = NotImplementedException.class) + public void testCheckVersion() throws Exception { + stateStore.checkVersion(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java index 92376872919..90dcadb721e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java @@ -266,10 +266,20 @@ public class FederationStateStoreService extends AbstractService } @Override - public Version loadVersion() { + public Version loadVersion() throws Exception { return stateStoreClient.getCurrentVersion(); } + @Override + public void storeVersion() throws Exception { + stateStoreClient.storeVersion(); + } + + @Override + public void checkVersion() throws Exception { + stateStoreClient.checkVersion(); + } + @Override public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration( GetSubClusterPolicyConfigurationRequest request) throws YarnException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSQueueConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSQueueConverter.java index c29d020b10c..34400d229d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSQueueConverter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSQueueConverter.java @@ -16,6 +16,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.AUTO_QUEUE_CREATION_V2_ENABLED; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DEFAULT_AUTO_QUEUE_CREATION_ENABLED; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.USER_LIMIT_FACTOR; @@ -82,13 +84,13 @@ public class FSQueueConverter { emitMaxParallelApps(queueName, queue); emitMaxAllocations(queueName, queue); emitPreemptionDisabled(queueName, queue); - emitDefaultUserLimitFactor(queueName, children); emitChildCapacity(queue); emitMaximumCapacity(queueName, queue); emitSizeBasedWeight(queueName); emitOrderingPolicy(queueName, queue); checkMaxChildCapacitySetting(queue); + emitDefaultUserLimitFactor(queueName, children); for (FSQueue childQueue : children) { convertQueueHierarchy(childQueue); @@ -220,7 +222,7 @@ public class FSQueueConverter { } public void emitDefaultUserLimitFactor(String queueName, List children) { - if (children.isEmpty()) { + if (children.isEmpty() && checkAutoQueueCreationV2Disabled(queueName)) { capacitySchedulerConfig.setFloat( CapacitySchedulerConfiguration. PREFIX + queueName + DOT + USER_LIMIT_FACTOR, @@ -309,6 +311,12 @@ public class FSQueueConverter { } } + private boolean checkAutoQueueCreationV2Disabled(String queueName) { + return !capacitySchedulerConfig.getBoolean( + PREFIX + queueName + DOT + AUTO_QUEUE_CREATION_V2_ENABLED, + DEFAULT_AUTO_QUEUE_CREATION_ENABLED); + } + private String getQueueShortName(String queueName) { int lastDot = queueName.lastIndexOf("."); return queueName.substring(lastDot + 1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/TestFSConfigToCSConfigConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/TestFSConfigToCSConfigConverter.java index 86bf113d64e..beb8f2f1de9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/TestFSConfigToCSConfigConverter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/TestFSConfigToCSConfigConverter.java @@ -194,6 +194,8 @@ public class TestFSConfigToCSConfigConverter { assertNull("root.users user-limit-factor should be null", conf.get(PREFIX + "root.users." + USER_LIMIT_FACTOR)); + assertEquals("root.users auto-queue-creation-v2.enabled", "true", + conf.get(PREFIX + "root.users.auto-queue-creation-v2.enabled")); assertEquals("root.default user-limit-factor", "-1.0", conf.get(PREFIX + "root.default.user-limit-factor")); @@ -203,6 +205,8 @@ public class TestFSConfigToCSConfigConverter { assertEquals("root.admins.bob user-limit-factor", "-1.0", conf.get(PREFIX + "root.admins.bob.user-limit-factor")); + assertNull("root.admin.bob auto-queue-creation-v2.enabled should be null", + conf.get(PREFIX + "root.admin.bob.auto-queue-creation-v2.enabled")); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java index 47396371ff4..8806cbb9aea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java @@ -147,6 +147,10 @@ public final class RouterMetrics { private MutableGaugeInt numRefreshSuperUserGroupsConfigurationFailedRetrieved; @Metric("# of refreshUserToGroupsMappings failed to be retrieved") private MutableGaugeInt numRefreshUserToGroupsMappingsFailedRetrieved; + @Metric("# of addToClusterNodeLabels failed to be retrieved") + private MutableGaugeInt numAddToClusterNodeLabelsFailedRetrieved; + @Metric("# of removeFromClusterNodeLabels failed to be retrieved") + private MutableGaugeInt numRemoveFromClusterNodeLabelsFailedRetrieved; // Aggregate metrics are shared, and don't have to be looked up per call @Metric("Total number of successful Submitted apps and latency(ms)") @@ -253,9 +257,12 @@ public final class RouterMetrics { private MutableRate totalSucceededRefreshSuperUserGroupsConfigurationRetrieved; @Metric("Total number of successful Retrieved RefreshUserToGroupsMappings and latency(ms)") private MutableRate totalSucceededRefreshUserToGroupsMappingsRetrieved; - @Metric("Total number of successful Retrieved GetSchedulerInfo and latency(ms)") private MutableRate totalSucceededGetSchedulerInfoRetrieved; + @Metric("Total number of successful Retrieved AddToClusterNodeLabels and latency(ms)") + private MutableRate totalSucceededAddToClusterNodeLabelsRetrieved; + @Metric("Total number of successful Retrieved RemoveFromClusterNodeLabels and latency(ms)") + private MutableRate totalSucceededRemoveFromClusterNodeLabelsRetrieved; /** * Provide quantile counters for all latencies. @@ -313,6 +320,8 @@ public final class RouterMetrics { private MutableQuantiles getSchedulerInfoRetrievedLatency; private MutableQuantiles refreshSuperUserGroupsConfLatency; private MutableQuantiles refreshUserToGroupsMappingsLatency; + private MutableQuantiles addToClusterNodeLabelsLatency; + private MutableQuantiles removeFromClusterNodeLabelsLatency; private static volatile RouterMetrics instance = null; private static MetricsRegistry registry; @@ -504,6 +513,12 @@ public final class RouterMetrics { refreshUserToGroupsMappingsLatency = registry.newQuantiles("refreshUserToGroupsMappingsLatency", "latency of refresh user to groups mappings timeouts", "ops", "latency", 10); + + addToClusterNodeLabelsLatency = registry.newQuantiles("addToClusterNodeLabelsLatency", + "latency of add cluster nodelabels timeouts", "ops", "latency", 10); + + removeFromClusterNodeLabelsLatency = registry.newQuantiles("removeFromClusterNodeLabelsLatency", + "latency of remove cluster nodelabels timeouts", "ops", "latency", 10); } public static RouterMetrics getMetrics() { @@ -780,6 +795,16 @@ public final class RouterMetrics { return totalSucceededGetSchedulerInfoRetrieved.lastStat().numSamples(); } + @VisibleForTesting + public long getNumSucceededAddToClusterNodeLabelsRetrieved() { + return totalSucceededAddToClusterNodeLabelsRetrieved.lastStat().numSamples(); + } + + @VisibleForTesting + public long getNumSucceededRemoveFromClusterNodeLabelsRetrieved() { + return totalSucceededRemoveFromClusterNodeLabelsRetrieved.lastStat().numSamples(); + } + @VisibleForTesting public long getNumSucceededRefreshSuperUserGroupsConfigurationRetrieved() { return totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.lastStat().numSamples(); @@ -1040,6 +1065,16 @@ public final class RouterMetrics { return totalSucceededGetSchedulerInfoRetrieved.lastStat().mean(); } + @VisibleForTesting + public double getLatencySucceededAddToClusterNodeLabelsRetrieved() { + return totalSucceededAddToClusterNodeLabelsRetrieved.lastStat().mean(); + } + + @VisibleForTesting + public double getLatencySucceededRemoveFromClusterNodeLabelsRetrieved() { + return totalSucceededRemoveFromClusterNodeLabelsRetrieved.lastStat().mean(); + } + @VisibleForTesting public double getLatencySucceededRefreshSuperUserGroupsConfigurationRetrieved() { return totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.lastStat().mean(); @@ -1251,6 +1286,14 @@ public final class RouterMetrics { return numRefreshUserToGroupsMappingsFailedRetrieved.value(); } + public int getNumAddToClusterNodeLabelsFailedRetrieved() { + return numAddToClusterNodeLabelsFailedRetrieved.value(); + } + + public int getNumRemoveFromClusterNodeLabelsFailedRetrieved() { + return numRemoveFromClusterNodeLabelsFailedRetrieved.value(); + } + public int getDelegationTokenFailedRetrieved() { return numGetDelegationTokenFailedRetrieved.value(); } @@ -1534,6 +1577,16 @@ public final class RouterMetrics { getSchedulerInfoRetrievedLatency.add(duration); } + public void succeededAddToClusterNodeLabelsRetrieved(long duration) { + totalSucceededAddToClusterNodeLabelsRetrieved.add(duration); + addToClusterNodeLabelsLatency.add(duration); + } + + public void succeededRemoveFromClusterNodeLabelsRetrieved(long duration) { + totalSucceededRemoveFromClusterNodeLabelsRetrieved.add(duration); + removeFromClusterNodeLabelsLatency.add(duration); + } + public void succeededRefreshSuperUserGroupsConfRetrieved(long duration) { totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.add(duration); refreshSuperUserGroupsConfLatency.add(duration); @@ -1728,6 +1781,14 @@ public final class RouterMetrics { numRefreshUserToGroupsMappingsFailedRetrieved.incr(); } + public void incrAddToClusterNodeLabelsFailedRetrieved() { + numAddToClusterNodeLabelsFailedRetrieved.incr(); + } + + public void incrRemoveFromClusterNodeLabelsFailedRetrieved() { + numRemoveFromClusterNodeLabelsFailedRetrieved.incr(); + } + public void incrGetDelegationTokenFailedRetrieved() { numGetDelegationTokenFailedRetrieved.incr(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java index 7cc403a492a..1c7af645855 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java @@ -43,6 +43,7 @@ import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -72,6 +73,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; @@ -1580,16 +1582,126 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { throw new RuntimeException("getClusterNodeLabels Failed."); } + /** + * This method adds specific node labels for specific nodes, and it is + * reachable by using {@link RMWSConsts#ADD_NODE_LABELS}. + * + * @see ResourceManagerAdministrationProtocol#addToClusterNodeLabels + * @param newNodeLabels the node labels to add. It is a content param. + * @param hsr the servlet request + * @return Response containing the status code + * @throws Exception in case of bad request + */ @Override public Response addToClusterNodeLabels(NodeLabelsInfo newNodeLabels, HttpServletRequest hsr) throws Exception { - throw new NotImplementedException("Code is not implemented"); + + if (newNodeLabels == null) { + routerMetrics.incrAddToClusterNodeLabelsFailedRetrieved(); + throw new IllegalArgumentException("Parameter error, the newNodeLabels is null."); + } + + List nodeLabelInfos = newNodeLabels.getNodeLabelsInfo(); + if (CollectionUtils.isEmpty(nodeLabelInfos)) { + routerMetrics.incrAddToClusterNodeLabelsFailedRetrieved(); + throw new IllegalArgumentException("Parameter error, the nodeLabelsInfo is null or empty."); + } + + try { + long startTime = clock.getTime(); + Map subClustersActive = getActiveSubclusters(); + final HttpServletRequest hsrCopy = clone(hsr); + Class[] argsClasses = new Class[]{NodeLabelsInfo.class, HttpServletRequest.class}; + Object[] args = new Object[]{newNodeLabels, hsrCopy}; + ClientMethod remoteMethod = new ClientMethod("addToClusterNodeLabels", argsClasses, args); + Map responseInfoMap = + invokeConcurrent(subClustersActive.values(), remoteMethod, Response.class); + StringBuffer buffer = new StringBuffer(); + // SubCluster-0:SUCCESS,SubCluster-1:SUCCESS + responseInfoMap.forEach((subClusterInfo, response) -> { + buildAppendMsg(subClusterInfo, buffer, response); + }); + long stopTime = clock.getTime(); + routerMetrics.succeededAddToClusterNodeLabelsRetrieved((stopTime - startTime)); + return Response.status(Status.OK).entity(buffer.toString()).build(); + } catch (NotFoundException e) { + routerMetrics.incrAddToClusterNodeLabelsFailedRetrieved(); + RouterServerUtil.logAndThrowIOException("get all active sub cluster(s) error.", e); + } catch (YarnException e) { + routerMetrics.incrAddToClusterNodeLabelsFailedRetrieved(); + RouterServerUtil.logAndThrowIOException("addToClusterNodeLabels with yarn error.", e); + } + + routerMetrics.incrAddToClusterNodeLabelsFailedRetrieved(); + throw new RuntimeException("addToClusterNodeLabels Failed."); } + /** + * This method removes all the node labels for specific nodes, and it is + * reachable by using {@link RMWSConsts#REMOVE_NODE_LABELS}. + * + * @see ResourceManagerAdministrationProtocol#removeFromClusterNodeLabels + * @param oldNodeLabels the node labels to remove. It is a QueryParam. + * @param hsr the servlet request + * @return Response containing the status code + * @throws Exception in case of bad request + */ @Override public Response removeFromClusterNodeLabels(Set oldNodeLabels, HttpServletRequest hsr) throws Exception { - throw new NotImplementedException("Code is not implemented"); + + if (CollectionUtils.isEmpty(oldNodeLabels)) { + routerMetrics.incrRemoveFromClusterNodeLabelsFailedRetrieved(); + throw new IllegalArgumentException("Parameter error, the oldNodeLabels is null or empty."); + } + + try { + long startTime = clock.getTime(); + Map subClustersActive = getActiveSubclusters(); + final HttpServletRequest hsrCopy = clone(hsr); + Class[] argsClasses = new Class[]{Set.class, HttpServletRequest.class}; + Object[] args = new Object[]{oldNodeLabels, hsrCopy}; + ClientMethod remoteMethod = + new ClientMethod("removeFromClusterNodeLabels", argsClasses, args); + Map responseInfoMap = + invokeConcurrent(subClustersActive.values(), remoteMethod, Response.class); + StringBuffer buffer = new StringBuffer(); + // SubCluster-0:SUCCESS,SubCluster-1:SUCCESS + responseInfoMap.forEach((subClusterInfo, response) -> { + buildAppendMsg(subClusterInfo, buffer, response); + }); + long stopTime = clock.getTime(); + routerMetrics.succeededRemoveFromClusterNodeLabelsRetrieved(stopTime - startTime); + return Response.status(Status.OK).entity(buffer.toString()).build(); + } catch (NotFoundException e) { + routerMetrics.incrRemoveFromClusterNodeLabelsFailedRetrieved(); + RouterServerUtil.logAndThrowIOException("get all active sub cluster(s) error.", e); + } catch (YarnException e) { + routerMetrics.incrRemoveFromClusterNodeLabelsFailedRetrieved(); + RouterServerUtil.logAndThrowIOException("removeFromClusterNodeLabels with yarn error.", e); + } + + routerMetrics.incrRemoveFromClusterNodeLabelsFailedRetrieved(); + throw new RuntimeException("removeFromClusterNodeLabels Failed."); + } + + /** + * Bbulid Append information. + * + * @param subClusterInfo subCluster information. + * @param buffer StringBuffer. + * @param response response message. + */ + private void buildAppendMsg(SubClusterInfo subClusterInfo, StringBuffer buffer, + Response response) { + SubClusterId subClusterId = subClusterInfo.getSubClusterId(); + String state = response != null && + (response.getStatus() == Status.OK.getStatusCode()) ? "SUCCESS" : "FAILED"; + buffer.append("SubCluster-") + .append(subClusterId.getId()) + .append(":") + .append(state) + .append(","); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java index b86d85a94fd..3e451627968 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java @@ -793,6 +793,11 @@ public class TestRouterMetrics { LOG.info("Mocked: successful GetBulkActivities call with duration {}", duration); metrics.succeededGetBulkActivitiesRetrieved(duration); } + + public void addToClusterNodeLabelsRetrieved(long duration) { + LOG.info("Mocked: successful AddToClusterNodeLabels call with duration {}", duration); + metrics.succeededAddToClusterNodeLabelsRetrieved(duration); + } } @Test @@ -1696,4 +1701,19 @@ public class TestRouterMetrics { Assert.assertEquals(totalBadBefore + 1, metrics.getBulkActivitiesFailedRetrieved()); } + + @Test + public void testAddToClusterNodeLabelsRetrieved() { + long totalGoodBefore = metrics.getNumSucceededAddToClusterNodeLabelsRetrieved(); + goodSubCluster.addToClusterNodeLabelsRetrieved(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededAddToClusterNodeLabelsRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededAddToClusterNodeLabelsRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.addToClusterNodeLabelsRetrieved(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededAddToClusterNodeLabelsRetrieved()); + Assert.assertEquals(225, + metrics.getLatencySucceededAddToClusterNodeLabelsRetrieved(), ASSERT_DOUBLE_DELTA); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java index 91f3a7d4cea..9d3223f9095 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java @@ -1307,4 +1307,44 @@ public class MockDefaultRequestInterceptorREST throw new RuntimeException(e); } } + + @Override + public Response addToClusterNodeLabels(NodeLabelsInfo newNodeLabels, HttpServletRequest hsr) + throws Exception { + List nodeLabelInfoList = newNodeLabels.getNodeLabelsInfo(); + NodeLabelInfo nodeLabelInfo = nodeLabelInfoList.get(0); + String nodeLabelName = nodeLabelInfo.getName(); + + // If nodeLabelName is ALL, we let all subclusters pass + if (StringUtils.equals("ALL", nodeLabelName)) { + return Response.status(Status.OK).build(); + } else if (StringUtils.equals("A0", nodeLabelName)) { + SubClusterId subClusterId = getSubClusterId(); + String id = subClusterId.getId(); + if (StringUtils.contains("A0", id)) { + return Response.status(Status.OK).build(); + } else { + return Response.status(Status.BAD_REQUEST).entity(null).build(); + } + } + throw new YarnException("addToClusterNodeLabels Error"); + } + + @Override + public Response removeFromClusterNodeLabels(Set oldNodeLabels, HttpServletRequest hsr) + throws Exception { + // If oldNodeLabels contains ALL, we let all subclusters pass + if (oldNodeLabels.contains("ALL")) { + return Response.status(Status.OK).build(); + } else if (oldNodeLabels.contains("A0")) { + SubClusterId subClusterId = getSubClusterId(); + String id = subClusterId.getId(); + if (StringUtils.contains("A0", id)) { + return Response.status(Status.OK).build(); + } else { + return Response.status(Status.BAD_REQUEST).entity(null).build(); + } + } + throw new YarnException("removeFromClusterNodeLabels Error"); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java index b7ddda7d30b..5ec53a63e20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.Arrays; import java.util.HashSet; import java.util.Collections; import java.util.stream.Collectors; @@ -40,6 +41,7 @@ import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.util.Lists; +import org.apache.hadoop.util.Sets; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -1899,4 +1901,132 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { "'groupBy' must not be empty.", () -> interceptor.getBulkActivities(null, "", 1)); } + + @Test + public void testAddToClusterNodeLabels1() throws Exception { + // In this test, we try to add ALL label, all subClusters will return success. + NodeLabelsInfo nodeLabelsInfo = new NodeLabelsInfo(); + NodeLabelInfo nodeLabelInfo = new NodeLabelInfo("ALL", true); + nodeLabelsInfo.getNodeLabelsInfo().add(nodeLabelInfo); + + Response response = interceptor.addToClusterNodeLabels(nodeLabelsInfo, null); + Assert.assertNotNull(response); + + Object entityObj = response.getEntity(); + Assert.assertNotNull(entityObj); + + String entity = String.valueOf(entityObj); + String[] entities = StringUtils.split(entity, ","); + Assert.assertNotNull(entities); + Assert.assertEquals(4, entities.length); + + // The order in which the cluster returns messages is uncertain, + // we confirm the result by contains + String expectedMsg = + "SubCluster-0:SUCCESS,SubCluster-1:SUCCESS,SubCluster-2:SUCCESS,SubCluster-3:SUCCESS"; + Arrays.stream(entities).forEach(item -> { + Assert.assertTrue(expectedMsg.contains(item)); + }); + } + + @Test + public void testAddToClusterNodeLabels2() throws Exception { + // In this test, we try to add A0 label, + // subCluster0 will return success, and other subClusters will return null + NodeLabelsInfo nodeLabelsInfo = new NodeLabelsInfo(); + NodeLabelInfo nodeLabelInfo = new NodeLabelInfo("A0", true); + nodeLabelsInfo.getNodeLabelsInfo().add(nodeLabelInfo); + + Response response = interceptor.addToClusterNodeLabels(nodeLabelsInfo, null); + Assert.assertNotNull(response); + + Object entityObj = response.getEntity(); + Assert.assertNotNull(entityObj); + + String expectedValue = "SubCluster-0:SUCCESS,"; + String entity = String.valueOf(entityObj); + Assert.assertTrue(entity.contains(expectedValue)); + } + + @Test + public void testAddToClusterNodeLabelsError() throws Exception { + // the newNodeLabels is null + LambdaTestUtils.intercept(IllegalArgumentException.class, + "Parameter error, the newNodeLabels is null.", + () -> interceptor.addToClusterNodeLabels(null, null)); + + // the nodeLabelsInfo is null + NodeLabelsInfo nodeLabelsInfo = new NodeLabelsInfo(); + LambdaTestUtils.intercept(IllegalArgumentException.class, + "Parameter error, the nodeLabelsInfo is null or empty.", + () -> interceptor.addToClusterNodeLabels(nodeLabelsInfo, null)); + + // error nodeLabelsInfo + NodeLabelsInfo nodeLabelsInfo1 = new NodeLabelsInfo(); + NodeLabelInfo nodeLabelInfo1 = new NodeLabelInfo("A", true); + nodeLabelsInfo1.getNodeLabelsInfo().add(nodeLabelInfo1); + LambdaTestUtils.intercept(YarnRuntimeException.class, "addToClusterNodeLabels Error", + () -> interceptor.addToClusterNodeLabels(nodeLabelsInfo1, null)); + } + + @Test + public void testRemoveFromClusterNodeLabels1() throws Exception { + Set oldNodeLabels = Sets.newHashSet(); + oldNodeLabels.add("ALL"); + + Response response = interceptor.removeFromClusterNodeLabels(oldNodeLabels, null); + Assert.assertNotNull(response); + + Object entityObj = response.getEntity(); + Assert.assertNotNull(entityObj); + + String entity = String.valueOf(entityObj); + String[] entities = StringUtils.split(entity, ","); + Assert.assertNotNull(entities); + Assert.assertEquals(4, entities.length); + + // The order in which the cluster returns messages is uncertain, + // we confirm the result by contains + String expectedMsg = + "SubCluster-0:SUCCESS,SubCluster-1:SUCCESS,SubCluster-2:SUCCESS,SubCluster-3:SUCCESS"; + Arrays.stream(entities).forEach(item -> { + Assert.assertTrue(expectedMsg.contains(item)); + }); + } + + @Test + public void testRemoveFromClusterNodeLabels2() throws Exception { + Set oldNodeLabels = Sets.newHashSet(); + oldNodeLabels.add("A0"); + + Response response = interceptor.removeFromClusterNodeLabels(oldNodeLabels, null); + Assert.assertNotNull(response); + + Object entityObj = response.getEntity(); + Assert.assertNotNull(entityObj); + + String expectedValue = "SubCluster-0:SUCCESS,"; + String entity = String.valueOf(entityObj); + Assert.assertTrue(entity.contains(expectedValue)); + } + + @Test + public void testRemoveFromClusterNodeLabelsError() throws Exception { + // the oldNodeLabels is null + LambdaTestUtils.intercept(IllegalArgumentException.class, + "Parameter error, the oldNodeLabels is null or empty.", + () -> interceptor.removeFromClusterNodeLabels(null, null)); + + // the oldNodeLabels is empty + Set oldNodeLabels = Sets.newHashSet(); + LambdaTestUtils.intercept(IllegalArgumentException.class, + "Parameter error, the oldNodeLabels is null or empty.", + () -> interceptor.removeFromClusterNodeLabels(oldNodeLabels, null)); + + // error oldNodeLabels + Set oldNodeLabels1 = Sets.newHashSet(); + oldNodeLabels1.add("A1"); + LambdaTestUtils.intercept(YarnRuntimeException.class, "removeFromClusterNodeLabels Error", + () -> interceptor.removeFromClusterNodeLabels(oldNodeLabels1, null)); + } } \ No newline at end of file diff --git a/pom.xml b/pom.xml index 51a80c76341..fa768296e37 100644 --- a/pom.xml +++ b/pom.xml @@ -118,7 +118,6 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/x 4.2.0 1.1.1 3.10.1 - 2.7.3 bash @@ -500,19 +499,6 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/x maven-compiler-plugin ${maven-compiler-plugin.version} - - org.cyclonedx - cyclonedx-maven-plugin - ${cyclonedx.version} - - - package - - makeBom - - - - @@ -621,10 +607,6 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/x com.github.spotbugs spotbugs-maven-plugin - - org.cyclonedx - cyclonedx-maven-plugin -