HDFS-16943. RBF: Implements MySQL based StateStoreDriver. (#5469)

This commit is contained in:
Simbarashe Dzinamarira 2023-04-05 16:44:29 -07:00 committed by GitHub
parent 422bf3b24c
commit 47c22e388e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 541 additions and 1 deletions

View File

@ -20,6 +20,8 @@
import java.net.InetAddress;
import java.util.Collection;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
@ -35,6 +37,8 @@
* provider. Driver implementations will extend this class and implement some of
* the default methods.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class StateStoreDriver implements StateStoreRecordOperations {
private static final Logger LOG =

View File

@ -23,6 +23,8 @@
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
@ -39,6 +41,8 @@
* optimization, such as custom get/put/remove queries, depending on the
* capabilities of the data store.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class StateStoreBaseImpl extends StateStoreDriver {
@Override

View File

@ -0,0 +1,425 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.driver.impl;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
import org.apache.hadoop.hdfs.server.federation.router.security.token.SQLConnectionFactory;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.DisabledNameservice;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.federation.store.records.Query;
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.*;
/**
* StateStoreDriver implementation based on MySQL.
* There is a separate table for each record type. Each table just as two
* columns, recordKey and recordValue.
*/
public class StateStoreMySQLImpl extends StateStoreSerializableImpl {
public static final String SQL_STATE_STORE_CONF_PREFIX = "state-store-mysql.";
public static final String CONNECTION_URL =
SQL_STATE_STORE_CONF_PREFIX + "connection.url";
public static final String CONNECTION_USERNAME =
SQL_STATE_STORE_CONF_PREFIX + "connection.username";
public static final String CONNECTION_PASSWORD =
SQL_STATE_STORE_CONF_PREFIX + "connection.password";
public static final String CONNECTION_DRIVER =
SQL_STATE_STORE_CONF_PREFIX + "connection.driver";
private static final Logger LOG =
LoggerFactory.getLogger(StateStoreSerializableImpl.class);
private SQLConnectionFactory connectionFactory;
/** If the driver has been initialized. */
private boolean initialized = false;
private final static Set<String> VALID_TABLES = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(
MembershipState.class.getSimpleName(),
RouterState.class.getSimpleName(),
MountTable.class.getSimpleName(),
DisabledNameservice.class.getSimpleName()
))
);
@Override
public boolean initDriver() {
Configuration conf = getConf();
connectionFactory = new MySQLStateStoreHikariDataSourceConnectionFactory(conf);
initialized = true;
LOG.info("MySQL state store connection factory initialized");
return true;
}
@Override
public <T extends BaseRecord> boolean initRecordStorage(String className, Class<T> clazz) {
String tableName = getAndValidateTableNameForClass(clazz);
try (Connection connection = connectionFactory.getConnection();
ResultSet resultSet = connection
.getMetaData()
.getTables(null, null, tableName, null)) {
if (resultSet.next()) {
return true;
}
} catch (SQLException e) {
LOG.error("Could not check if table {} able exists", tableName);
}
try (Connection connection = connectionFactory.getConnection();
Statement statement = connection.createStatement()) {
String sql = String.format("CREATE TABLE %s ("
+ "recordKey VARCHAR (255) NOT NULL,"
+ "recordValue VARCHAR (2047) NOT NULL, "
+ "PRIMARY KEY(recordKey))", tableName);
statement.execute(sql);
return true;
} catch (SQLException e) {
LOG.error(String.format("Cannot create table %s for record type %s.",
tableName, className), e.getMessage());
return false;
}
}
@Override
public boolean isDriverReady() {
return this.initialized;
}
@Override
public void close() throws Exception {
connectionFactory.shutdown();
}
@Override
public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz)
throws IOException {
String tableName = getAndValidateTableNameForClass(clazz);
verifyDriverReady();
long start = Time.monotonicNow();
StateStoreMetrics metrics = getMetrics();
List<T> ret = new ArrayList<>();
try (Connection connection = connectionFactory.getConnection();
PreparedStatement statement = connection.prepareStatement(
String.format("SELECT * FROM %s", tableName))) {
try (ResultSet result = statement.executeQuery()) {
while(result.next()) {
String recordValue = result.getString("recordValue");
T record = newRecord(recordValue, clazz, false);
ret.add(record);
}
}
} catch (SQLException e) {
if (metrics != null) {
metrics.addFailure(Time.monotonicNow() - start);
}
String msg = "Cannot fetch records for " + clazz.getSimpleName();
LOG.error(msg, e);
throw new IOException(msg, e);
}
if (metrics != null) {
metrics.addRead(Time.monotonicNow() - start);
}
return new QueryResult<>(ret, getTime());
}
@Override
public <T extends BaseRecord> boolean putAll(
List<T> records, boolean allowUpdate, boolean errorIfExists) throws IOException {
if (records.isEmpty()) {
return true;
}
verifyDriverReady();
StateStoreMetrics metrics = getMetrics();
long start = Time.monotonicNow();
boolean success = true;
for (T record : records) {
String tableName = getAndValidateTableNameForClass(record.getClass());
String primaryKey = getPrimaryKey(record);
String data = serializeString(record);
if (recordExists(tableName, primaryKey)) {
if (allowUpdate) {
// Update the mod time stamp. Many backends will use their
// own timestamp for the mod time.
record.setDateModified(this.getTime());
if (!updateRecord(tableName, primaryKey, data)) {
LOG.error("Cannot write {} into table {}", primaryKey, tableName);
success = false;
}
} else {
if (errorIfExists) {
LOG.error("Attempted to insert record {} that already exists "
+ "in table {} and updates are disallowed.", primaryKey, tableName);
if (metrics != null) {
metrics.addFailure(Time.monotonicNow() - start);
}
return false;
} else {
LOG.debug("Not updating {} as updates are not allowed", record);
}
}
} else {
if (!insertRecord(tableName, primaryKey, data)) {
LOG.error("Cannot write {} in table {}", primaryKey, tableName);
success = false;
}
}
}
long end = Time.monotonicNow();
if (metrics != null) {
if (success) {
metrics.addWrite(end - start);
} else {
metrics.addFailure(end - start);
}
}
return success;
}
@Override
public <T extends BaseRecord> boolean removeAll(Class<T> clazz) throws IOException {
verifyDriverReady();
long startTimeMs = Time.monotonicNow();
StateStoreMetrics metrics = getMetrics();
boolean success = true;
String tableName = getAndValidateTableNameForClass(clazz);
try (Connection connection = connectionFactory.getConnection(true);
PreparedStatement truncateTable = connection.prepareStatement(
String.format("TRUNCATE TABLE %s", tableName))){
truncateTable.execute();
} catch (SQLException e) {
LOG.error("Could not remove all records in table {}", tableName, e);
success = false;
}
if (metrics != null) {
long durationMs = Time.monotonicNow() - startTimeMs;
if (success) {
metrics.addRemove(durationMs);
} else {
metrics.addFailure(durationMs);
}
}
return success;
}
@Override
public <T extends BaseRecord> int remove(Class<T> clazz, Query<T> query) throws IOException {
verifyDriverReady();
if (query == null) {
return 0;
}
long startTimeMs = Time.monotonicNow();
StateStoreMetrics metrics = getMetrics();
int removed = 0;
// Get the current records
try {
final QueryResult<T> result = get(clazz);
final List<T> existingRecords = result.getRecords();
// Write all of the existing records except those to be removed
final List<T> recordsToRemove = filterMultiple(query, existingRecords);
boolean success = true;
for (T recordToRemove : recordsToRemove) {
String tableName = getAndValidateTableNameForClass(clazz);
String primaryKey = getPrimaryKey(recordToRemove);
if (removeRecord(tableName, primaryKey)) {
removed++;
} else {
LOG.error("Cannot remove record {} from table {}", primaryKey, tableName);
success = false;
}
}
if (!success) {
LOG.error("Cannot remove records {} query {}", clazz, query);
if (metrics != null) {
metrics.addFailure(Time.monotonicNow() - startTimeMs);
}
}
} catch (IOException e) {
LOG.error("Cannot remove records {} query {}", clazz, query, e);
if (metrics != null) {
metrics.addFailure(Time.monotonicNow() - startTimeMs);
}
}
if (removed > 0 && metrics != null) {
metrics.addRemove(Time.monotonicNow() - startTimeMs);
}
return removed;
}
/**
* Insert a record with a given key into the specified table.
* @param tableName Name of table to modify
* @param key Primary key for the record.
* @return True is operation is successful, false otherwise.
*/
protected boolean insertRecord(String tableName, String key, String data) {
try (Connection connection = connectionFactory.getConnection(true);
PreparedStatement statement = connection.prepareStatement(
String.format("INSERT INTO %s (recordKey, recordValue) VALUES (?, ?)", tableName))) {
statement.setString(1, key);
statement.setString(2, data);
statement.execute();
} catch (SQLException e) {
LOG.error("Failed to insert record {} into table {}", key, tableName, e);
return false;
}
return true;
}
/**
* Updates the record with a given key from the specified table.
* @param tableName Name of table to modify
* @param key Primary key for the record.
* @return True is operation is successful, false otherwise.
*/
protected boolean updateRecord(String tableName, String key, String data) {
try (Connection connection = connectionFactory.getConnection(true);
PreparedStatement statement = connection.prepareStatement(
String.format("UPDATE %s SET recordValue = ? WHERE recordKey = ?", tableName))) {
statement.setString(1, data);
statement.setString(2, key);
statement.execute();
} catch (SQLException e){
LOG.error("Failed to update record {} in table {}", key, tableName, e);
return false;
}
return true;
}
/**
* Checks if a record with a given key existing in the specified table.
* @param tableName Name of table to modify
* @param key Primary key for the record.
* @return True is operation is successful, false otherwise.
*/
protected boolean recordExists(String tableName, String key) {
try (Connection connection = connectionFactory.getConnection(true);
PreparedStatement statement = connection.prepareStatement(
String.format("SELECT * FROM %s WHERE recordKey = ?", tableName))) {
statement.setString(1, key);
try (ResultSet result = statement.executeQuery()) {
return result.next();
}
} catch (SQLException e) {
LOG.error("Failed to check existence of record {} in table {}", key, tableName, e);
return false;
}
}
/**
* Removes the record with a given key from the specified table.
* @param tableName Name of table to modify
* @param key Primary key for the record.
* @return True is operation is successful, false otherwise.
*/
protected boolean removeRecord(String tableName, String key) {
try (Connection connection = connectionFactory.getConnection(true);
PreparedStatement statement = connection.prepareStatement(
String.format("DELETE FROM %s WHERE recordKey = ?", tableName))) {
statement.setString(1, key);
statement.execute();
return true;
} catch (SQLException e) {
LOG.error("Failed to remove record {} in table {}", key, tableName, e);
return false;
}
}
/**
* Get the table for a record class and validate is this is one of the supported
* record types.
* @param clazz Class of the record.
* @return Table name for this record class.
*/
private <T extends BaseRecord> String getAndValidateTableNameForClass(final Class<T> clazz) {
String tableName = StateStoreUtils.getRecordName(clazz);
if (VALID_TABLES.contains(tableName)) {
return tableName;
} else {
throw new IllegalArgumentException(tableName + " is not a valid table name");
}
}
/**
* Class that relies on a HikariDataSource to provide SQL connections.
*/
static class MySQLStateStoreHikariDataSourceConnectionFactory
implements SQLConnectionFactory {
protected final static String HIKARI_PROPS = SQL_STATE_STORE_CONF_PREFIX
+ "connection.hikari.";
private final HikariDataSource dataSource;
MySQLStateStoreHikariDataSourceConnectionFactory(Configuration conf) {
Properties properties = new Properties();
properties.setProperty("jdbcUrl", conf.get(StateStoreMySQLImpl.CONNECTION_URL));
properties.setProperty("username", conf.get(StateStoreMySQLImpl.CONNECTION_USERNAME));
properties.setProperty("password", conf.get(StateStoreMySQLImpl.CONNECTION_PASSWORD));
properties.setProperty("driverClassName", conf.get(StateStoreMySQLImpl.CONNECTION_DRIVER));
// Include hikari connection properties
properties.putAll(conf.getPropsWithPrefix(HIKARI_PROPS));
HikariConfig hikariConfig = new HikariConfig(properties);
this.dataSource = new HikariDataSource(hikariConfig);
}
@Override
public Connection getConnection() throws SQLException {
return dataSource.getConnection();
}
@Override
public void shutdown() {
// Close database connections
dataSource.close();
}
}
}

View File

@ -20,6 +20,8 @@
import java.io.IOException;
import java.util.Collection;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
@ -29,6 +31,8 @@
* State Store driver that stores a serialization of the records. The serializer
* is pluggable.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class StateStoreSerializableImpl extends StateStoreBaseImpl {
/** Mark for slashes in path names. */

View File

@ -362,7 +362,8 @@
Class to implement the State Store. There are three implementation classes currently
being supported:
org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl,
org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileSystemImpl and
org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileSystemImpl,
org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreMySQLImpl and
org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreZooKeeperImpl.
These implementation classes use the local file, filesystem and ZooKeeper as a backend respectively.
By default it uses the ZooKeeper as the default State Store.

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.driver;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreMySQLImpl;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.*;
/**
* Test the FileSystem (e.g., HDFS) implementation of the State Store driver.
*/
public class TestStateStoreMySQL extends TestStateStoreDriverBase {
private static final String CONNECTION_URL = "jdbc:derby:memory:StateStore";
@BeforeClass
public static void initDatabase() throws Exception {
Connection connection = DriverManager.getConnection(CONNECTION_URL + ";create=true");
Statement s = connection.createStatement();
s.execute("CREATE SCHEMA TESTUSER");
Configuration conf =
getStateStoreConfiguration(StateStoreMySQLImpl.class);
conf.set(StateStoreMySQLImpl.CONNECTION_URL, CONNECTION_URL);
conf.set(StateStoreMySQLImpl.CONNECTION_USERNAME, "testuser");
conf.set(StateStoreMySQLImpl.CONNECTION_PASSWORD, "testpassword");
conf.set(StateStoreMySQLImpl.CONNECTION_DRIVER, "org.apache.derby.jdbc.EmbeddedDriver");
getStateStore(conf);
}
@Before
public void startup() throws IOException {
removeAll(getStateStoreDriver());
}
@AfterClass
public static void cleanupDatabase() {
try {
DriverManager.getConnection(CONNECTION_URL + ";drop=true");
} catch (SQLException e) {
// SQLException expected when database is dropped
if (!e.getMessage().contains("dropped")) {
throw new RuntimeException(e);
}
}
}
@Test
public void testInsert()
throws IllegalArgumentException, IllegalAccessException, IOException {
testInsert(getStateStoreDriver());
}
@Test
public void testUpdate()
throws IllegalArgumentException, ReflectiveOperationException,
IOException, SecurityException {
testPut(getStateStoreDriver());
}
@Test
public void testDelete()
throws IllegalArgumentException, IllegalAccessException, IOException {
testRemove(getStateStoreDriver());
}
@Test
public void testFetchErrors()
throws IllegalArgumentException, IllegalAccessException, IOException {
testFetchErrors(getStateStoreDriver());
}
@Test
public void testMetrics()
throws IllegalArgumentException, IllegalAccessException, IOException {
testMetrics(getStateStoreDriver());
}
}