HADOOP-18535. Implement token storage solution based on MySQL

Fixes #1240

Signed-off-by: Owen O'Malley <oomalley@linkedin.com>
This commit is contained in:
hchaverr 2023-01-24 10:43:36 -08:00 committed by Owen O'Malley
parent 84e999b35c
commit eab7215354
No known key found for this signature in database
GPG Key ID: D19EB09DAD1C5877
14 changed files with 1697 additions and 2 deletions

View File

@ -19,7 +19,9 @@
package org.apache.hadoop.security.token.delegation;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.security.MessageDigest;
import java.util.Collection;
@ -38,6 +40,8 @@ import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@ -428,8 +432,9 @@ public abstract class AbstractDelegationTokenSecretManager<TokenIdent
/**
* Update the current master key for generating delegation tokens
* It should be called only by tokenRemoverThread.
* @throws IOException raised on errors performing I/O.
*/
void rollMasterKey() throws IOException {
protected void rollMasterKey() throws IOException {
synchronized (this) {
removeExpiredKeys();
/* set final expiry date for retiring currentKey */
@ -664,11 +669,15 @@ public abstract class AbstractDelegationTokenSecretManager<TokenIdent
/** Class to encapsulate a token's renew date and password. */
@InterfaceStability.Evolving
public static class DelegationTokenInformation {
public static class DelegationTokenInformation implements Writable {
long renewDate;
byte[] password;
String trackingId;
public DelegationTokenInformation() {
this(0, null);
}
public DelegationTokenInformation(long renewDate, byte[] password) {
this(renewDate, password, null);
}
@ -698,6 +707,29 @@ public abstract class AbstractDelegationTokenSecretManager<TokenIdent
public String getTrackingId() {
return trackingId;
}
@Override
public void write(DataOutput out) throws IOException {
WritableUtils.writeVLong(out, renewDate);
if (password == null) {
WritableUtils.writeVInt(out, -1);
} else {
WritableUtils.writeVInt(out, password.length);
out.write(password);
}
WritableUtils.writeString(out, trackingId);
}
@Override
public void readFields(DataInput in) throws IOException {
renewDate = WritableUtils.readVLong(in);
int len = WritableUtils.readVInt(in);
if (len > -1) {
password = new byte[len];
in.readFully(password);
}
trackingId = WritableUtils.readString(in);
}
}
/** Remove expired delegation tokens from cache */

View File

@ -0,0 +1,400 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.security.token.delegation;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.sql.SQLException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An implementation of {@link AbstractDelegationTokenSecretManager} that
* persists TokenIdentifiers and DelegationKeys in an existing SQL database.
*/
public abstract class SQLDelegationTokenSecretManager<TokenIdent
extends AbstractDelegationTokenIdentifier>
extends AbstractDelegationTokenSecretManager<TokenIdent> {
private static final Logger LOG =
LoggerFactory.getLogger(SQLDelegationTokenSecretManager.class);
public static final String SQL_DTSM_CONF_PREFIX = "sql-dt-secret-manager.";
private static final String SQL_DTSM_TOKEN_SEQNUM_BATCH_SIZE = SQL_DTSM_CONF_PREFIX
+ "token.seqnum.batch.size";
public static final int DEFAULT_SEQ_NUM_BATCH_SIZE = 10;
// Batch of sequence numbers that will be requested by the sequenceNumCounter.
// A new batch is requested once the sequenceNums available to a secret manager are
// exhausted, including during initialization.
private final int seqNumBatchSize;
// Last sequenceNum in the current batch that has been allocated to a token.
private int currentSeqNum;
// Max sequenceNum in the current batch that can be allocated to a token.
// Unused sequenceNums in the current batch cannot be reused by other routers.
private int currentMaxSeqNum;
public SQLDelegationTokenSecretManager(Configuration conf) {
super(conf.getLong(DelegationTokenManager.UPDATE_INTERVAL,
DelegationTokenManager.UPDATE_INTERVAL_DEFAULT) * 1000,
conf.getLong(DelegationTokenManager.MAX_LIFETIME,
DelegationTokenManager.MAX_LIFETIME_DEFAULT) * 1000,
conf.getLong(DelegationTokenManager.RENEW_INTERVAL,
DelegationTokenManager.RENEW_INTERVAL_DEFAULT) * 1000,
conf.getLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL,
DelegationTokenManager.REMOVAL_SCAN_INTERVAL_DEFAULT) * 1000);
this.seqNumBatchSize = conf.getInt(SQL_DTSM_TOKEN_SEQNUM_BATCH_SIZE,
DEFAULT_SEQ_NUM_BATCH_SIZE);
}
/**
* Persists a TokenIdentifier and its corresponding TokenInformation into
* the SQL database. The TokenIdentifier is expected to be unique and any
* duplicate token attempts will result in an IOException.
* @param ident TokenIdentifier to persist.
* @param tokenInfo DelegationTokenInformation associated with the TokenIdentifier.
*/
@Override
protected void storeToken(TokenIdent ident,
DelegationTokenInformation tokenInfo) throws IOException {
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos)) {
tokenInfo.write(dos);
// Add token to SQL database
insertToken(ident.getSequenceNumber(), ident.getBytes(), bos.toByteArray());
// Add token to local cache
super.storeToken(ident, tokenInfo);
} catch (SQLException e) {
throw new IOException("Failed to store token in SQL secret manager", e);
}
}
/**
* Updates the TokenInformation of an existing TokenIdentifier in
* the SQL database.
* @param ident Existing TokenIdentifier in the SQL database.
* @param tokenInfo Updated DelegationTokenInformation associated with the TokenIdentifier.
*/
@Override
protected void updateToken(TokenIdent ident,
DelegationTokenInformation tokenInfo) throws IOException {
try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
try (DataOutputStream dos = new DataOutputStream(bos)) {
tokenInfo.write(dos);
// Update token in SQL database
updateToken(ident.getSequenceNumber(), ident.getBytes(), bos.toByteArray());
// Update token in local cache
super.updateToken(ident, tokenInfo);
}
} catch (SQLException e) {
throw new IOException("Failed to update token in SQL secret manager", e);
}
}
/**
* Cancels a token by removing it from the SQL database. This will
* call the corresponding method in {@link AbstractDelegationTokenSecretManager}
* to perform validation and remove the token from the cache.
* @return Identifier of the canceled token
*/
@Override
public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
String canceller) throws IOException {
try (ByteArrayInputStream bis = new ByteArrayInputStream(token.getIdentifier());
DataInputStream din = new DataInputStream(bis)) {
TokenIdent id = createIdentifier();
id.readFields(din);
// Calling getTokenInfo to load token into local cache if not present.
// super.cancelToken() requires token to be present in local cache.
getTokenInfo(id);
}
return super.cancelToken(token, canceller);
}
/**
* Removes the existing TokenInformation from the SQL database to
* invalidate it.
* @param ident TokenInformation to remove from the SQL database.
*/
@Override
protected void removeStoredToken(TokenIdent ident) throws IOException {
try {
deleteToken(ident.getSequenceNumber(), ident.getBytes());
} catch (SQLException e) {
LOG.warn("Failed to remove token in SQL secret manager", e);
}
}
/**
* Obtains the DelegationTokenInformation associated with the given
* TokenIdentifier in the SQL database.
* @param ident Existing TokenIdentifier in the SQL database.
* @return DelegationTokenInformation that matches the given TokenIdentifier or
* null if it doesn't exist in the database.
*/
@Override
protected DelegationTokenInformation getTokenInfo(TokenIdent ident) {
// Look for token in local cache
DelegationTokenInformation tokenInfo = super.getTokenInfo(ident);
if (tokenInfo == null) {
try {
// Look for token in SQL database
byte[] tokenInfoBytes = selectTokenInfo(ident.getSequenceNumber(), ident.getBytes());
if (tokenInfoBytes != null) {
tokenInfo = new DelegationTokenInformation();
try (ByteArrayInputStream bis = new ByteArrayInputStream(tokenInfoBytes)) {
try (DataInputStream dis = new DataInputStream(bis)) {
tokenInfo.readFields(dis);
}
}
// Update token in local cache
currentTokens.put(ident, tokenInfo);
}
} catch (IOException | SQLException e) {
LOG.error("Failed to get token in SQL secret manager", e);
}
}
return tokenInfo;
}
/**
* Obtains the value of the last reserved sequence number.
* @return Last reserved sequence number.
*/
@Override
public int getDelegationTokenSeqNum() {
try {
return selectSequenceNum();
} catch (SQLException e) {
throw new RuntimeException(
"Failed to get token sequence number in SQL secret manager", e);
}
}
/**
* Updates the value of the last reserved sequence number.
* @param seqNum Value to update the sequence number to.
*/
@Override
public void setDelegationTokenSeqNum(int seqNum) {
try {
updateSequenceNum(seqNum);
} catch (SQLException e) {
throw new RuntimeException(
"Failed to update token sequence number in SQL secret manager", e);
}
}
/**
* Obtains the next available sequence number that can be allocated to a Token.
* Sequence numbers need to be reserved using the shared sequenceNumberCounter once
* the local batch has been exhausted, which handles sequenceNumber allocation
* concurrently with other secret managers.
* This method ensures that sequence numbers are incremental in a single secret manager,
* but not across secret managers.
* @return Next available sequence number.
*/
@Override
public synchronized int incrementDelegationTokenSeqNum() {
if (currentSeqNum >= currentMaxSeqNum) {
try {
// Request a new batch of sequence numbers and use the
// lowest one available.
currentSeqNum = incrementSequenceNum(seqNumBatchSize);
currentMaxSeqNum = currentSeqNum + seqNumBatchSize;
} catch (SQLException e) {
throw new RuntimeException(
"Failed to increment token sequence number in SQL secret manager", e);
}
}
return ++currentSeqNum;
}
/**
* Persists a DelegationKey into the SQL database. The delegation keyId
* is expected to be unique and any duplicate key attempts will result
* in an IOException.
* @param key DelegationKey to persist into the SQL database.
*/
@Override
protected void storeDelegationKey(DelegationKey key) throws IOException {
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos)) {
key.write(dos);
// Add delegation key to SQL database
insertDelegationKey(key.getKeyId(), bos.toByteArray());
// Add delegation key to local cache
super.storeDelegationKey(key);
} catch (SQLException e) {
throw new IOException("Failed to store delegation key in SQL secret manager", e);
}
}
/**
* Updates an existing DelegationKey in the SQL database.
* @param key Updated DelegationKey.
*/
@Override
protected void updateDelegationKey(DelegationKey key) throws IOException {
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos)) {
key.write(dos);
// Update delegation key in SQL database
updateDelegationKey(key.getKeyId(), bos.toByteArray());
// Update delegation key in local cache
super.updateDelegationKey(key);
} catch (SQLException e) {
throw new IOException("Failed to update delegation key in SQL secret manager", e);
}
}
/**
* Removes the existing DelegationKey from the SQL database to
* invalidate it.
* @param key DelegationKey to remove from the SQL database.
*/
@Override
protected void removeStoredMasterKey(DelegationKey key) {
try {
deleteDelegationKey(key.getKeyId());
} catch (SQLException e) {
LOG.warn("Failed to remove delegation key in SQL secret manager", e);
}
}
/**
* Obtains the DelegationKey from the SQL database.
* @param keyId KeyId of the DelegationKey to obtain.
* @return DelegationKey that matches the given keyId or null
* if it doesn't exist in the database.
*/
@Override
protected DelegationKey getDelegationKey(int keyId) {
// Look for delegation key in local cache
DelegationKey delegationKey = super.getDelegationKey(keyId);
if (delegationKey == null) {
try {
// Look for delegation key in SQL database
byte[] delegationKeyBytes = selectDelegationKey(keyId);
if (delegationKeyBytes != null) {
delegationKey = new DelegationKey();
try (ByteArrayInputStream bis = new ByteArrayInputStream(delegationKeyBytes)) {
try (DataInputStream dis = new DataInputStream(bis)) {
delegationKey.readFields(dis);
}
}
// Update delegation key in local cache
allKeys.put(keyId, delegationKey);
}
} catch (IOException | SQLException e) {
LOG.error("Failed to get delegation key in SQL secret manager", e);
}
}
return delegationKey;
}
/**
* Obtains the value of the last delegation key id.
* @return Last delegation key id.
*/
@Override
public int getCurrentKeyId() {
try {
return selectKeyId();
} catch (SQLException e) {
throw new RuntimeException(
"Failed to get delegation key id in SQL secret manager", e);
}
}
/**
* Updates the value of the last delegation key id.
* @param keyId Value to update the delegation key id to.
*/
@Override
public void setCurrentKeyId(int keyId) {
try {
updateKeyId(keyId);
} catch (SQLException e) {
throw new RuntimeException(
"Failed to set delegation key id in SQL secret manager", e);
}
}
/**
* Obtains the next available delegation key id that can be allocated to a DelegationKey.
* Delegation key id need to be reserved using the shared delegationKeyIdCounter,
* which handles keyId allocation concurrently with other secret managers.
* @return Next available delegation key id.
*/
@Override
public int incrementCurrentKeyId() {
try {
return incrementKeyId(1) + 1;
} catch (SQLException e) {
throw new RuntimeException(
"Failed to increment delegation key id in SQL secret manager", e);
}
}
// Token operations in SQL database
protected abstract byte[] selectTokenInfo(int sequenceNum, byte[] tokenIdentifier)
throws SQLException;
protected abstract void insertToken(int sequenceNum, byte[] tokenIdentifier, byte[] tokenInfo)
throws SQLException;
protected abstract void updateToken(int sequenceNum, byte[] tokenIdentifier, byte[] tokenInfo)
throws SQLException;
protected abstract void deleteToken(int sequenceNum, byte[] tokenIdentifier)
throws SQLException;
// Delegation key operations in SQL database
protected abstract byte[] selectDelegationKey(int keyId) throws SQLException;
protected abstract void insertDelegationKey(int keyId, byte[] delegationKey)
throws SQLException;
protected abstract void updateDelegationKey(int keyId, byte[] delegationKey)
throws SQLException;
protected abstract void deleteDelegationKey(int keyId) throws SQLException;
// Counter operations in SQL database
protected abstract int selectSequenceNum() throws SQLException;
protected abstract void updateSequenceNum(int value) throws SQLException;
protected abstract int incrementSequenceNum(int amount) throws SQLException;
protected abstract int selectKeyId() throws SQLException;
protected abstract void updateKeyId(int value) throws SQLException;
protected abstract int incrementKeyId(int amount) throws SQLException;
}

View File

@ -93,6 +93,15 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP-java7</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
@ -109,6 +118,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
@ -126,6 +140,14 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemProperties>
<property>
<name>derby.stream.error.file</name>
<value>${project.build.directory}/derby.log</value>
</property>
</systemProperties>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Script to create a new Database in MySQL for the TokenStore
CREATE DATABASE IF NOT EXISTS TokenStore;

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Script to generate all the tables for the TokenStore in MySQL
USE TokenStore
CREATE TABLE IF NOT EXISTS Tokens(
sequenceNum int NOT NULL,
tokenIdentifier varbinary(255) NOT NULL,
tokenInfo varbinary(255) NOT NULL,
modifiedTime timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY(sequenceNum, tokenIdentifier)
);
CREATE TABLE IF NOT EXISTS DelegationKeys(
keyId int NOT NULL,
delegationKey varbinary(255) NOT NULL,
modifiedTime timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY(keyId)
);
CREATE TABLE IF NOT EXISTS LastSequenceNum(
sequenceNum int NOT NULL
);
-- Initialize the LastSequenceNum table with a single entry
INSERT INTO LastSequenceNum (sequenceNum)
SELECT 0 WHERE NOT EXISTS (SELECT * FROM LastSequenceNum);
CREATE TABLE IF NOT EXISTS LastDelegationKeyId(
keyId int NOT NULL
);
-- Initialize the LastDelegationKeyId table with a single entry
INSERT INTO LastDelegationKeyId (keyId)
SELECT 0 WHERE NOT EXISTS (SELECT * FROM LastDelegationKeyId);

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Script to create a new User in MySQL for the TokenStore
-- Update TokenStore user and password on this script
CREATE USER IF NOT EXISTS 'TokenStoreUser'@'%' IDENTIFIED BY 'TokenStorePassword';
GRANT ALL PRIVILEGES ON TokenStore.* TO 'TokenStoreUser'@'%';
FLUSH PRIVILEGES;

View File

@ -0,0 +1,24 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
These scripts must be executed to create the TokenStore database, tables and users needed to use the
SQLDelegationTokenSecretManagerImpl as the delegation token secret manager:
1. TokenStoreDatabase.sql
2. TokenStoreTables.sql
3. TokenStoreUser.sql
Note: The TokenStoreUser.sql defines a default user/password. You are highly encouraged to set
this to a proper strong password.

View File

@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router.security.token;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Distributed counter that relies on a SQL database to synchronize
* between multiple clients. This expects a table with a single int field
* to exist in the database. One record must exist on the table at all times,
* representing the last used value reserved by a client.
*/
public class DistributedSQLCounter {
private static final Logger LOG =
LoggerFactory.getLogger(DistributedSQLCounter.class);
private final String field;
private final String table;
private final SQLConnectionFactory connectionFactory;
public DistributedSQLCounter(String field, String table,
SQLConnectionFactory connectionFactory) {
this.field = field;
this.table = table;
this.connectionFactory = connectionFactory;
}
/**
* Obtains the value of the counter.
* @return counter value.
*/
public int selectCounterValue() throws SQLException {
try (Connection connection = connectionFactory.getConnection()) {
return selectCounterValue(false, connection);
}
}
private int selectCounterValue(boolean forUpdate, Connection connection) throws SQLException {
String query = String.format("SELECT %s FROM %s %s", field, table,
forUpdate ? "FOR UPDATE" : "");
LOG.debug("Select counter statement: " + query);
try (Statement statement = connection.createStatement();
ResultSet result = statement.executeQuery(query)) {
if (result.next()) {
return result.getInt(field);
} else {
throw new IllegalStateException("Counter table not initialized: " + table);
}
}
}
/**
* Sets the counter to the given value.
* @param value Value to assign to counter.
*/
public void updateCounterValue(int value) throws SQLException {
try (Connection connection = connectionFactory.getConnection(true)) {
updateCounterValue(value, connection);
}
}
/**
* Sets the counter to the given value.
* @param connection Connection to database hosting the counter table.
* @param value Value to assign to counter.
*/
public void updateCounterValue(int value, Connection connection) throws SQLException {
String queryText = String.format("UPDATE %s SET %s = ?", table, field);
LOG.debug("Update counter statement: " + queryText + ". Value: " + value);
try (PreparedStatement statement = connection.prepareStatement(queryText)) {
statement.setInt(1, value);
statement.execute();
}
}
/**
* Increments the counter by the given amount and
* returns the previous counter value.
* @param amount Amount to increase the counter.
* @return Previous counter value.
*/
public int incrementCounterValue(int amount) throws SQLException {
// Disabling auto-commit to ensure that all statements on this transaction
// are committed at once.
try (Connection connection = connectionFactory.getConnection(false)) {
// Preventing dirty reads and non-repeatable reads to ensure that the
// value read will not be updated by a different connection.
if (connection.getTransactionIsolation() < Connection.TRANSACTION_REPEATABLE_READ) {
connection.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
}
try {
// Reading the counter value "FOR UPDATE" to lock the value record,
// forcing other connections to wait until this transaction is committed.
int lastValue = selectCounterValue(true, connection);
// Calculate the new counter value and handling overflow by
// resetting the counter to 0.
int newValue = lastValue + amount;
if (newValue < 0) {
lastValue = 0;
newValue = amount;
}
updateCounterValue(newValue, connection);
connection.commit();
return lastValue;
} catch (Exception e) {
// Rollback transaction to release table locks
connection.rollback();
throw e;
}
}
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router.security.token;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.SQLDelegationTokenSecretManager;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;
/**
* Class that relies on a HikariDataSource to provide SQL connections.
*/
class HikariDataSourceConnectionFactory implements SQLConnectionFactory {
protected final static String HIKARI_PROPS = SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX
+ "connection.hikari.";
private final HikariDataSource dataSource;
HikariDataSourceConnectionFactory(Configuration conf) {
Properties properties = new Properties();
properties.setProperty("jdbcUrl", conf.get(CONNECTION_URL));
properties.setProperty("username", conf.get(CONNECTION_USERNAME));
properties.setProperty("password", conf.get(CONNECTION_PASSWORD));
properties.setProperty("driverClassName", conf.get(CONNECTION_DRIVER));
// Include hikari connection properties
properties.putAll(conf.getPropsWithPrefix(HIKARI_PROPS));
HikariConfig hikariConfig = new HikariConfig(properties);
this.dataSource = new HikariDataSource(hikariConfig);
}
@Override
public Connection getConnection() throws SQLException {
return dataSource.getConnection();
}
@Override
public void shutdown() {
// Close database connections
dataSource.close();
}
@VisibleForTesting
HikariDataSource getDataSource() {
return dataSource;
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router.security.token;
import com.mysql.cj.jdbc.MysqlDataSource;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.SQLDelegationTokenSecretManager;
/**
* Interface to provide SQL connections to the {@link SQLDelegationTokenSecretManagerImpl}.
*/
public interface SQLConnectionFactory {
String CONNECTION_URL = SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX
+ "connection.url";
String CONNECTION_USERNAME = SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX
+ "connection.username";
String CONNECTION_PASSWORD = SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX
+ "connection.password";
String CONNECTION_DRIVER = SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX
+ "connection.driver";
Connection getConnection() throws SQLException;
void shutdown();
default Connection getConnection(boolean autocommit) throws SQLException {
Connection connection = getConnection();
connection.setAutoCommit(autocommit);
return connection;
}
}

View File

@ -0,0 +1,242 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router.security.token;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.SQLDelegationTokenSecretManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An implementation of {@link SQLDelegationTokenSecretManager} that
* persists TokenIdentifiers and DelegationKeys in a SQL database.
* This implementation relies on the Datanucleus JDO PersistenceManager, which
* can be configured with datanucleus.* configuration properties.
*/
public class SQLDelegationTokenSecretManagerImpl
extends SQLDelegationTokenSecretManager<AbstractDelegationTokenIdentifier> {
private static final Logger LOG =
LoggerFactory.getLogger(SQLDelegationTokenSecretManagerImpl.class);
private static final String SEQ_NUM_COUNTER_FIELD = "sequenceNum";
private static final String SEQ_NUM_COUNTER_TABLE = "LastSequenceNum";
private static final String KEY_ID_COUNTER_FIELD = "keyId";
private static final String KEY_ID_COUNTER_TABLE = "LastDelegationKeyId";
private final SQLConnectionFactory connectionFactory;
private final DistributedSQLCounter sequenceNumCounter;
private final DistributedSQLCounter delegationKeyIdCounter;
private final SQLSecretManagerRetriableHandler retryHandler;
public SQLDelegationTokenSecretManagerImpl(Configuration conf) {
this(conf, new HikariDataSourceConnectionFactory(conf),
SQLSecretManagerRetriableHandlerImpl.getInstance(conf));
}
public SQLDelegationTokenSecretManagerImpl(Configuration conf,
SQLConnectionFactory connectionFactory, SQLSecretManagerRetriableHandler retryHandler) {
super(conf);
this.connectionFactory = connectionFactory;
this.sequenceNumCounter = new DistributedSQLCounter(SEQ_NUM_COUNTER_FIELD,
SEQ_NUM_COUNTER_TABLE, connectionFactory);
this.delegationKeyIdCounter = new DistributedSQLCounter(KEY_ID_COUNTER_FIELD,
KEY_ID_COUNTER_TABLE, connectionFactory);
this.retryHandler = retryHandler;
try {
super.startThreads();
} catch (IOException e) {
throw new RuntimeException("Error starting threads for MySQL secret manager", e);
}
LOG.info("MySQL delegation token secret manager instantiated");
}
@Override
public DelegationTokenIdentifier createIdentifier() {
return new DelegationTokenIdentifier();
}
@Override
public void stopThreads() {
super.stopThreads();
connectionFactory.shutdown();
}
@Override
protected void insertToken(int sequenceNum, byte[] tokenIdentifier, byte[] tokenInfo)
throws SQLException {
retryHandler.execute(() -> {
try (Connection connection = connectionFactory.getConnection(true);
PreparedStatement statement = connection.prepareStatement(
"INSERT INTO Tokens (sequenceNum, tokenIdentifier, tokenInfo) VALUES (?, ?, ?)")) {
statement.setInt(1, sequenceNum);
statement.setBytes(2, tokenIdentifier);
statement.setBytes(3, tokenInfo);
statement.execute();
}
});
}
@Override
protected void updateToken(int sequenceNum, byte[] tokenIdentifier, byte[] tokenInfo)
throws SQLException {
retryHandler.execute(() -> {
try (Connection connection = connectionFactory.getConnection(true);
PreparedStatement statement = connection.prepareStatement(
"UPDATE Tokens SET tokenInfo = ? WHERE sequenceNum = ? AND tokenIdentifier = ?")) {
statement.setBytes(1, tokenInfo);
statement.setInt(2, sequenceNum);
statement.setBytes(3, tokenIdentifier);
statement.execute();
}
});
}
@Override
protected void deleteToken(int sequenceNum, byte[] tokenIdentifier) throws SQLException {
retryHandler.execute(() -> {
try (Connection connection = connectionFactory.getConnection(true);
PreparedStatement statement = connection.prepareStatement(
"DELETE FROM Tokens WHERE sequenceNum = ? AND tokenIdentifier = ?")) {
statement.setInt(1, sequenceNum);
statement.setBytes(2, tokenIdentifier);
statement.execute();
}
});
}
@Override
protected byte[] selectTokenInfo(int sequenceNum, byte[] tokenIdentifier) throws SQLException {
return retryHandler.execute(() -> {
try (Connection connection = connectionFactory.getConnection();
PreparedStatement statement = connection.prepareStatement(
"SELECT tokenInfo FROM Tokens WHERE sequenceNum = ? AND tokenIdentifier = ?")) {
statement.setInt(1, sequenceNum);
statement.setBytes(2, tokenIdentifier);
try (ResultSet result = statement.executeQuery()) {
if (result.next()) {
return result.getBytes("tokenInfo");
}
}
}
return null;
});
}
@Override
protected void insertDelegationKey(int keyId, byte[] delegationKey) throws SQLException {
retryHandler.execute(() -> {
try (Connection connection = connectionFactory.getConnection(true);
PreparedStatement statement = connection.prepareStatement(
"INSERT INTO DelegationKeys (keyId, delegationKey) VALUES (?, ?)")) {
statement.setInt(1, keyId);
statement.setBytes(2, delegationKey);
statement.execute();
}
});
}
@Override
protected void updateDelegationKey(int keyId, byte[] delegationKey) throws SQLException {
retryHandler.execute(() -> {
try (Connection connection = connectionFactory.getConnection(true);
PreparedStatement statement = connection.prepareStatement(
"UPDATE DelegationKeys SET delegationKey = ? WHERE keyId = ?")) {
statement.setBytes(1, delegationKey);
statement.setInt(2, keyId);
statement.execute();
}
});
}
@Override
protected void deleteDelegationKey(int keyId) throws SQLException {
retryHandler.execute(() -> {
try (Connection connection = connectionFactory.getConnection(true);
PreparedStatement statement = connection.prepareStatement(
"DELETE FROM DelegationKeys WHERE keyId = ?")) {
statement.setInt(1, keyId);
statement.execute();
}
});
}
@Override
protected byte[] selectDelegationKey(int keyId) throws SQLException {
return retryHandler.execute(() -> {
try (Connection connection = connectionFactory.getConnection();
PreparedStatement statement = connection.prepareStatement(
"SELECT delegationKey FROM DelegationKeys WHERE keyId = ?")) {
statement.setInt(1, keyId);
try (ResultSet result = statement.executeQuery()) {
if (result.next()) {
return result.getBytes("delegationKey");
}
}
}
return null;
});
}
@Override
protected int selectSequenceNum() throws SQLException {
return retryHandler.execute(() -> sequenceNumCounter.selectCounterValue());
}
@Override
protected void updateSequenceNum(int value) throws SQLException {
retryHandler.execute(() -> sequenceNumCounter.updateCounterValue(value));
}
@Override
protected int incrementSequenceNum(int amount) throws SQLException {
return retryHandler.execute(() -> sequenceNumCounter.incrementCounterValue(amount));
}
@Override
protected int selectKeyId() throws SQLException {
return retryHandler.execute(delegationKeyIdCounter::selectCounterValue);
}
@Override
protected void updateKeyId(int value) throws SQLException {
retryHandler.execute(() -> delegationKeyIdCounter.updateCounterValue(value));
}
@Override
protected int incrementKeyId(int amount) throws SQLException {
return retryHandler.execute(() -> delegationKeyIdCounter.incrementCounterValue(amount));
}
@VisibleForTesting
protected SQLConnectionFactory getConnectionFactory() {
return connectionFactory;
}
}

View File

@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router.security.token;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.security.token.delegation.SQLDelegationTokenSecretManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Interface to handle retries when {@link SQLDelegationTokenSecretManagerImpl}
* throws expected errors.
*/
public interface SQLSecretManagerRetriableHandler {
void execute(SQLCommandVoid command) throws SQLException;
<T> T execute(SQLCommand<T> command) throws SQLException;
@FunctionalInterface
interface SQLCommandVoid {
void doCall() throws SQLException;
}
@FunctionalInterface
interface SQLCommand<T> {
T doCall() throws SQLException;
}
}
/**
* Implementation of {@link SQLSecretManagerRetriableHandler} that uses a
* {@link RetryProxy} to simplify the retryable operations.
*/
class SQLSecretManagerRetriableHandlerImpl implements SQLSecretManagerRetriableHandler {
public final static String MAX_RETRIES =
SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX + "max-retries";
public final static int MAX_RETRIES_DEFAULT = 0;
public final static String RETRY_SLEEP_TIME_MS =
SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX + "retry-sleep-time-ms";
public final static long RETRY_SLEEP_TIME_MS_DEFAULT = 100;
private static final Logger LOG =
LoggerFactory.getLogger(SQLSecretManagerRetriableHandlerImpl.class);
static SQLSecretManagerRetriableHandler getInstance(Configuration conf) {
return getInstance(conf, new SQLSecretManagerRetriableHandlerImpl());
}
static SQLSecretManagerRetriableHandler getInstance(Configuration conf,
SQLSecretManagerRetriableHandlerImpl retryHandler) {
RetryPolicy basePolicy = RetryPolicies.exponentialBackoffRetry(
conf.getInt(MAX_RETRIES, MAX_RETRIES_DEFAULT),
conf.getLong(RETRY_SLEEP_TIME_MS, RETRY_SLEEP_TIME_MS_DEFAULT),
TimeUnit.MILLISECONDS);
// Configure SQLSecretManagerRetriableException to retry with exponential backoff
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = new HashMap<>();
exceptionToPolicyMap.put(SQLSecretManagerRetriableException.class, basePolicy);
// Configure all other exceptions to fail after one attempt
RetryPolicy retryPolicy = RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
return (SQLSecretManagerRetriableHandler) RetryProxy.create(
SQLSecretManagerRetriableHandler.class, retryHandler, retryPolicy);
}
/**
* Executes a SQL command and raises retryable errors as
* {@link SQLSecretManagerRetriableException}s so they are recognized by the
* {@link RetryProxy}.
* @param command SQL command to execute
* @throws SQLException When SQL connection errors occur
*/
@Override
public void execute(SQLCommandVoid command) throws SQLException {
try {
command.doCall();
} catch (SQLException e) {
LOG.warn("Failed to execute SQL command", e);
throw new SQLSecretManagerRetriableException(e);
}
}
/**
* Executes a SQL command and raises retryable errors as
* {@link SQLSecretManagerRetriableException}s so they are recognized by the
* {@link RetryProxy}.
* @param command SQL command to execute
* @throws SQLException When SQL connection errors occur
*/
@Override
public <T> T execute(SQLCommand<T> command) throws SQLException {
try {
return command.doCall();
} catch (SQLException e) {
LOG.warn("Failed to execute SQL command", e);
throw new SQLSecretManagerRetriableException(e);
}
}
/**
* Class used to identify errors that can be retried.
*/
static class SQLSecretManagerRetriableException extends SQLException {
SQLSecretManagerRetriableException(Throwable cause) {
super(cause);
}
}
}

View File

@ -0,0 +1,471 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router.security.token;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestSQLDelegationTokenSecretManagerImpl {
private static final String CONNECTION_URL = "jdbc:derby:memory:TokenStore";
private static final int TEST_MAX_RETRIES = 3;
private static Configuration conf;
@Before
public void init() throws SQLException {
createTestDBTables();
}
@After
public void cleanup() throws SQLException {
dropTestDBTables();
}
@BeforeClass
public static void initDatabase() throws SQLException {
DriverManager.getConnection(CONNECTION_URL + ";create=true");
conf = new Configuration();
conf.set(SQLConnectionFactory.CONNECTION_URL, CONNECTION_URL);
conf.set(SQLConnectionFactory.CONNECTION_USERNAME, "testuser");
conf.set(SQLConnectionFactory.CONNECTION_PASSWORD, "testpassword");
conf.set(SQLConnectionFactory.CONNECTION_DRIVER, "org.apache.derby.jdbc.EmbeddedDriver");
conf.setInt(SQLSecretManagerRetriableHandlerImpl.MAX_RETRIES, TEST_MAX_RETRIES);
conf.setInt(SQLSecretManagerRetriableHandlerImpl.RETRY_SLEEP_TIME_MS, 10);
}
@AfterClass
public static void cleanupDatabase() {
try {
DriverManager.getConnection(CONNECTION_URL + ";drop=true");
} catch (SQLException e) {
// SQLException expected when database is dropped
if (!e.getMessage().contains("dropped")) {
throw new RuntimeException(e);
}
}
}
@Test
public void testSingleSecretManager() throws Exception {
DelegationTokenManager tokenManager = createTokenManager();
try {
Token<? extends AbstractDelegationTokenIdentifier> token =
tokenManager.createToken(UserGroupInformation.getCurrentUser(), "foo");
validateToken(tokenManager, token);
} finally {
stopTokenManager(tokenManager);
}
}
@Test
public void testMultipleSecretManagers() throws Exception {
DelegationTokenManager tokenManager1 = createTokenManager();
DelegationTokenManager tokenManager2 = createTokenManager();
try {
Token<? extends AbstractDelegationTokenIdentifier> token1 =
tokenManager1.createToken(UserGroupInformation.getCurrentUser(), "foo");
Token<? extends AbstractDelegationTokenIdentifier> token2 =
tokenManager2.createToken(UserGroupInformation.getCurrentUser(), "foo");
validateToken(tokenManager1, token2);
validateToken(tokenManager2, token1);
} finally {
stopTokenManager(tokenManager1);
stopTokenManager(tokenManager2);
}
}
@Test
public void testSequenceNumAllocation() throws Exception {
int tokensPerManager = SQLDelegationTokenSecretManagerImpl.DEFAULT_SEQ_NUM_BATCH_SIZE * 5;
Set<Integer> sequenceNums1 = new HashSet<>();
Set<Integer> sequenceNums2 = new HashSet<>();
Set<Integer> sequenceNums3 = new HashSet<>();
Set<Integer> sequenceNums = new HashSet<>();
DelegationTokenManager tokenManager1 = createTokenManager();
DelegationTokenManager tokenManager2 = createTokenManager();
DelegationTokenManager tokenManager3 = createTokenManager();
try {
for (int i = 0; i < tokensPerManager; i++) {
allocateSequenceNum(tokenManager1, sequenceNums1);
allocateSequenceNum(tokenManager2, sequenceNums2);
allocateSequenceNum(tokenManager3, sequenceNums3);
sequenceNums.addAll(sequenceNums1);
sequenceNums.addAll(sequenceNums2);
sequenceNums.addAll(sequenceNums3);
}
Assert.assertEquals("Verify that all tokens were created with unique sequence numbers",
tokensPerManager * 3, sequenceNums.size());
Assert.assertEquals("Verify that tokenManager1 generated unique sequence numbers",
tokensPerManager, sequenceNums1.size());
Assert.assertEquals("Verify that tokenManager2 generated unique sequence number",
tokensPerManager, sequenceNums2.size());
Assert.assertEquals("Verify that tokenManager3 generated unique sequence numbers",
tokensPerManager, sequenceNums3.size());
// Validate sequence number batches allocated in order to each token manager
int batchSize = SQLDelegationTokenSecretManagerImpl.DEFAULT_SEQ_NUM_BATCH_SIZE;
for (int seqNum = 1; seqNum < tokensPerManager;) {
// First batch allocated tokenManager1
for (int i = 0; i < batchSize; i++, seqNum++) {
Assert.assertTrue(sequenceNums1.contains(seqNum));
}
// Second batch allocated tokenManager2
for (int i = 0; i < batchSize; i++, seqNum++) {
Assert.assertTrue(sequenceNums2.contains(seqNum));
}
// Third batch allocated tokenManager3
for (int i = 0; i < batchSize; i++, seqNum++) {
Assert.assertTrue(sequenceNums3.contains(seqNum));
}
}
SQLDelegationTokenSecretManagerImpl secretManager =
(SQLDelegationTokenSecretManagerImpl) tokenManager1.getDelegationTokenSecretManager();
Assert.assertEquals("Verify that the counter is set to the highest sequence number",
tokensPerManager * 3, secretManager.getDelegationTokenSeqNum());
} finally {
stopTokenManager(tokenManager1);
stopTokenManager(tokenManager2);
stopTokenManager(tokenManager3);
}
}
@Test
public void testSequenceNumRollover() throws Exception {
int tokenBatch = SQLDelegationTokenSecretManagerImpl.DEFAULT_SEQ_NUM_BATCH_SIZE;
Set<Integer> sequenceNums = new HashSet<>();
DelegationTokenManager tokenManager = createTokenManager();
try {
SQLDelegationTokenSecretManagerImpl secretManager =
(SQLDelegationTokenSecretManagerImpl) tokenManager.getDelegationTokenSecretManager();
secretManager.setDelegationTokenSeqNum(Integer.MAX_VALUE - tokenBatch);
// Allocate sequence numbers before they are rolled over
for (int seqNum = Integer.MAX_VALUE - tokenBatch; seqNum < Integer.MAX_VALUE; seqNum++) {
allocateSequenceNum(tokenManager, sequenceNums);
Assert.assertTrue(sequenceNums.contains(seqNum + 1));
}
// Allocate sequence numbers after they are rolled over
for (int seqNum = 0; seqNum < tokenBatch; seqNum++) {
allocateSequenceNum(tokenManager, sequenceNums);
Assert.assertTrue(sequenceNums.contains(seqNum + 1));
}
} finally {
stopTokenManager(tokenManager);
}
}
@Test
public void testDelegationKeyAllocation() throws Exception {
DelegationTokenManager tokenManager1 = createTokenManager();
try {
SQLDelegationTokenSecretManagerImpl secretManager1 =
(SQLDelegationTokenSecretManagerImpl) tokenManager1.getDelegationTokenSecretManager();
// Prevent delegation keys to roll for the rest of the test to avoid race conditions
// between the keys generated and the active keys in the database.
((TestDelegationTokenSecretManager) secretManager1).lockKeyRoll();
int keyId1 = secretManager1.getCurrentKeyId();
// Validate that latest key1 is assigned to tokenManager1 tokens
Token<? extends AbstractDelegationTokenIdentifier> token1 =
tokenManager1.createToken(UserGroupInformation.getCurrentUser(), "foo");
validateKeyId(token1, keyId1);
DelegationTokenManager tokenManager2 = createTokenManager();
try {
SQLDelegationTokenSecretManagerImpl secretManager2 =
(SQLDelegationTokenSecretManagerImpl) tokenManager2.getDelegationTokenSecretManager();
// Prevent delegation keys to roll for the rest of the test
((TestDelegationTokenSecretManager) secretManager2).lockKeyRoll();
int keyId2 = secretManager2.getCurrentKeyId();
Assert.assertNotEquals("Each secret manager has its own key", keyId1, keyId2);
// Validate that latest key2 is assigned to tokenManager2 tokens
Token<? extends AbstractDelegationTokenIdentifier> token2 =
tokenManager2.createToken(UserGroupInformation.getCurrentUser(), "foo");
validateKeyId(token2, keyId2);
// Validate that key1 is still assigned to tokenManager1 tokens
token1 = tokenManager1.createToken(UserGroupInformation.getCurrentUser(), "foo");
validateKeyId(token1, keyId1);
// Validate that key2 is still assigned to tokenManager2 tokens
token2 = tokenManager2.createToken(UserGroupInformation.getCurrentUser(), "foo");
validateKeyId(token2, keyId2);
} finally {
stopTokenManager(tokenManager2);
}
} finally {
stopTokenManager(tokenManager1);
}
}
@Test
public void testHikariConfigs() {
HikariDataSourceConnectionFactory factory1 = new HikariDataSourceConnectionFactory(conf);
int defaultMaximumPoolSize = factory1.getDataSource().getMaximumPoolSize();
factory1.shutdown();
// Changing default maximumPoolSize
Configuration hikariConf = new Configuration(conf);
hikariConf.setInt(HikariDataSourceConnectionFactory.HIKARI_PROPS + "maximumPoolSize",
defaultMaximumPoolSize + 1);
// Verifying property is correctly set in datasource
HikariDataSourceConnectionFactory factory2 = new HikariDataSourceConnectionFactory(hikariConf);
Assert.assertEquals(factory2.getDataSource().getMaximumPoolSize(),
defaultMaximumPoolSize + 1);
factory2.shutdown();
}
@Test
public void testRetries() throws Exception {
DelegationTokenManager tokenManager = createTokenManager();
TestDelegationTokenSecretManager secretManager =
(TestDelegationTokenSecretManager) tokenManager.getDelegationTokenSecretManager();
try {
// Prevent delegation keys to roll for the rest of the test
secretManager.lockKeyRoll();
// Reset counter and expect a single request when inserting a token
TestRetryHandler.resetExecutionAttemptCounter();
tokenManager.createToken(UserGroupInformation.getCurrentUser(), "foo");
Assert.assertEquals(1, TestRetryHandler.getExecutionAttempts());
// Breaking database connections to cause retries
secretManager.setReadOnly(true);
// Reset counter and expect a multiple retries when failing to insert a token
TestRetryHandler.resetExecutionAttemptCounter();
tokenManager.createToken(UserGroupInformation.getCurrentUser(), "foo");
Assert.assertEquals(TEST_MAX_RETRIES + 1, TestRetryHandler.getExecutionAttempts());
} finally {
// Fix database connections
secretManager.setReadOnly(false);
stopTokenManager(tokenManager);
}
}
private DelegationTokenManager createTokenManager() {
DelegationTokenManager tokenManager = new DelegationTokenManager(new Configuration(), null);
tokenManager.setExternalDelegationTokenSecretManager(new TestDelegationTokenSecretManager());
return tokenManager;
}
private void allocateSequenceNum(DelegationTokenManager tokenManager, Set<Integer> sequenceNums)
throws IOException {
Token<? extends AbstractDelegationTokenIdentifier> token =
tokenManager.createToken(UserGroupInformation.getCurrentUser(), "foo");
AbstractDelegationTokenIdentifier tokenIdentifier = token.decodeIdentifier();
Assert.assertFalse("Verify sequence number is unique",
sequenceNums.contains(tokenIdentifier.getSequenceNumber()));
sequenceNums.add(tokenIdentifier.getSequenceNumber());
}
private void validateToken(DelegationTokenManager tokenManager,
Token<? extends AbstractDelegationTokenIdentifier> token)
throws Exception {
SQLDelegationTokenSecretManagerImpl secretManager =
(SQLDelegationTokenSecretManagerImpl) tokenManager.getDelegationTokenSecretManager();
AbstractDelegationTokenIdentifier tokenIdentifier = token.decodeIdentifier();
// Verify token using token manager
tokenManager.verifyToken(token);
byte[] tokenInfo1 = secretManager.selectTokenInfo(tokenIdentifier.getSequenceNumber(),
tokenIdentifier.getBytes());
Assert.assertNotNull("Verify token exists in database", tokenInfo1);
// Renew token using token manager
tokenManager.renewToken(token, "foo");
byte[] tokenInfo2 = secretManager.selectTokenInfo(tokenIdentifier.getSequenceNumber(),
tokenIdentifier.getBytes());
Assert.assertNotNull("Verify token exists in database", tokenInfo2);
Assert.assertFalse("Verify token has been updated in database",
Arrays.equals(tokenInfo1, tokenInfo2));
// Cancel token using token manager
tokenManager.cancelToken(token, "foo");
byte[] tokenInfo3 = secretManager.selectTokenInfo(tokenIdentifier.getSequenceNumber(),
tokenIdentifier.getBytes());
Assert.assertNull("Verify token was removed from database", tokenInfo3);
}
private void validateKeyId(Token<? extends AbstractDelegationTokenIdentifier> token,
int expectedKeyiD) throws IOException {
AbstractDelegationTokenIdentifier tokenIdentifier = token.decodeIdentifier();
Assert.assertEquals("Verify that keyId is assigned to token",
tokenIdentifier.getMasterKeyId(), expectedKeyiD);
}
private static Connection getTestDBConnection() {
try {
return DriverManager.getConnection(CONNECTION_URL);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static void createTestDBTables() throws SQLException {
execute("CREATE TABLE Tokens (sequenceNum INT NOT NULL, "
+ "tokenIdentifier VARCHAR (255) FOR BIT DATA NOT NULL, "
+ "tokenInfo VARCHAR (255) FOR BIT DATA NOT NULL, "
+ "modifiedTime TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, "
+ "PRIMARY KEY(sequenceNum))");
execute("CREATE TABLE DelegationKeys (keyId INT NOT NULL, "
+ "delegationKey VARCHAR (255) FOR BIT DATA NOT NULL, "
+ "modifiedTime TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, "
+ "PRIMARY KEY(keyId))");
execute("CREATE TABLE LastSequenceNum (sequenceNum INT NOT NULL)");
execute("INSERT INTO LastSequenceNum VALUES (0)");
execute("CREATE TABLE LastDelegationKeyId (keyId INT NOT NULL)");
execute("INSERT INTO LastDelegationKeyId VALUES (0)");
}
private static void dropTestDBTables() throws SQLException {
execute("DROP TABLE Tokens");
execute("DROP TABLE DelegationKeys");
execute("DROP TABLE LastSequenceNum");
execute("DROP TABLE LastDelegationKeyId");
}
private static void execute(String statement) throws SQLException {
try (Connection connection = getTestDBConnection()) {
connection.createStatement().execute(statement);
}
}
private void stopTokenManager(DelegationTokenManager tokenManager) {
TestDelegationTokenSecretManager secretManager =
(TestDelegationTokenSecretManager) tokenManager.getDelegationTokenSecretManager();
// Release any locks on tables
secretManager.unlockKeyRoll();
// Stop threads to close database connections
secretManager.stopThreads();
}
static class TestDelegationTokenSecretManager extends SQLDelegationTokenSecretManagerImpl {
private ReentrantLock keyRollLock;
private synchronized ReentrantLock getKeyRollLock() {
if (keyRollLock == null) {
keyRollLock = new ReentrantLock();
}
return keyRollLock;
}
TestDelegationTokenSecretManager() {
super(conf, new TestConnectionFactory(conf),
SQLSecretManagerRetriableHandlerImpl.getInstance(conf, new TestRetryHandler()));
}
// Tests can call this method to prevent delegation keys from
// being rolled in the middle of a test to prevent race conditions
public void lockKeyRoll() {
getKeyRollLock().lock();
}
public void unlockKeyRoll() {
if (getKeyRollLock().isHeldByCurrentThread()) {
getKeyRollLock().unlock();
}
}
@Override
protected void rollMasterKey() throws IOException {
try {
lockKeyRoll();
super.rollMasterKey();
} finally {
unlockKeyRoll();
}
}
public void setReadOnly(boolean readOnly) {
((TestConnectionFactory) getConnectionFactory()).readOnly = readOnly;
}
}
static class TestConnectionFactory extends HikariDataSourceConnectionFactory {
private boolean readOnly = false;
TestConnectionFactory(Configuration conf) {
super(conf);
}
@Override
public Connection getConnection() throws SQLException {
Connection connection = super.getConnection();
// Change to default schema as derby driver looks for user schema
connection.setSchema("APP");
connection.setReadOnly(readOnly);
return connection;
}
}
static class TestRetryHandler extends SQLSecretManagerRetriableHandlerImpl {
// Tracks the amount of times that a SQL command is executed, regardless of
// whether it completed successfully or not.
private static AtomicInteger executionAttemptCounter = new AtomicInteger();
static void resetExecutionAttemptCounter() {
executionAttemptCounter = new AtomicInteger();
}
static int getExecutionAttempts() {
return executionAttemptCounter.get();
}
@Override
public void execute(SQLCommandVoid command) throws SQLException {
executionAttemptCounter.incrementAndGet();
super.execute(command);
}
}
}

View File

@ -133,6 +133,8 @@
<jcache.version>1.0-alpha-1</jcache.version>
<ehcache.version>3.3.1</ehcache.version>
<hikari.version>2.4.12</hikari.version>
<derby.version>10.10.2.0</derby.version>
<mysql-connector-java.version>8.0.29</mysql-connector-java.version>
<mssql.version>6.2.1.jre7</mssql.version>
<okhttp3.version>4.9.3</okhttp3.version>
<kotlin-stdlib.verion>1.4.10</kotlin-stdlib.verion>
@ -1869,6 +1871,16 @@
<artifactId>HikariCP-java7</artifactId>
<version>${hikari.version}</version>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>${derby.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql-connector-java.version}</version>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>