From 47c22e388ee5631c99a7f926d11a8747aa51e5e4 Mon Sep 17 00:00:00 2001 From: Simbarashe Dzinamarira Date: Wed, 5 Apr 2023 16:44:29 -0700 Subject: [PATCH] HDFS-16943. RBF: Implements MySQL based StateStoreDriver. (#5469) --- .../store/driver/StateStoreDriver.java | 4 + .../store/driver/impl/StateStoreBaseImpl.java | 4 + .../driver/impl/StateStoreMySQLImpl.java | 425 ++++++++++++++++++ .../impl/StateStoreSerializableImpl.java | 4 + .../src/main/resources/hdfs-rbf-default.xml | 3 +- .../store/driver/TestStateStoreMySQL.java | 102 +++++ 6 files changed, 541 insertions(+), 1 deletion(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreMySQLImpl.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreMySQL.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java index a4e9c1ce82b..778ac3ecea5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java @@ -20,6 +20,8 @@ import java.net.InetAddress; import java.util.Collection; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics; import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; @@ -35,6 +37,8 @@ * provider. Driver implementations will extend this class and implement some of * the default methods. */ +@InterfaceAudience.Public +@InterfaceStability.Evolving public abstract class StateStoreDriver implements StateStoreRecordOperations { private static final Logger LOG = diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java index 30686f104b7..f7a6174226e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java @@ -23,6 +23,8 @@ import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; @@ -39,6 +41,8 @@ * optimization, such as custom get/put/remove queries, depending on the * capabilities of the data store. */ +@InterfaceAudience.Public +@InterfaceStability.Evolving public abstract class StateStoreBaseImpl extends StateStoreDriver { @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreMySQLImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreMySQLImpl.java new file mode 100644 index 00000000000..72644bb816e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreMySQLImpl.java @@ -0,0 +1,425 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver.impl; + +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics; +import org.apache.hadoop.hdfs.server.federation.router.security.token.SQLConnectionFactory; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.DisabledNameservice; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.hdfs.server.federation.store.records.Query; +import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; +import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.*; + +/** + * StateStoreDriver implementation based on MySQL. + * There is a separate table for each record type. Each table just as two + * columns, recordKey and recordValue. + */ +public class StateStoreMySQLImpl extends StateStoreSerializableImpl { + public static final String SQL_STATE_STORE_CONF_PREFIX = "state-store-mysql."; + public static final String CONNECTION_URL = + SQL_STATE_STORE_CONF_PREFIX + "connection.url"; + public static final String CONNECTION_USERNAME = + SQL_STATE_STORE_CONF_PREFIX + "connection.username"; + public static final String CONNECTION_PASSWORD = + SQL_STATE_STORE_CONF_PREFIX + "connection.password"; + public static final String CONNECTION_DRIVER = + SQL_STATE_STORE_CONF_PREFIX + "connection.driver"; + + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreSerializableImpl.class); + private SQLConnectionFactory connectionFactory; + /** If the driver has been initialized. */ + private boolean initialized = false; + private final static Set VALID_TABLES = Collections.unmodifiableSet( + new HashSet<>(Arrays.asList( + MembershipState.class.getSimpleName(), + RouterState.class.getSimpleName(), + MountTable.class.getSimpleName(), + DisabledNameservice.class.getSimpleName() + )) + ); + + @Override + public boolean initDriver() { + Configuration conf = getConf(); + connectionFactory = new MySQLStateStoreHikariDataSourceConnectionFactory(conf); + initialized = true; + LOG.info("MySQL state store connection factory initialized"); + return true; + } + + @Override + public boolean initRecordStorage(String className, Class clazz) { + String tableName = getAndValidateTableNameForClass(clazz); + try (Connection connection = connectionFactory.getConnection(); + ResultSet resultSet = connection + .getMetaData() + .getTables(null, null, tableName, null)) { + if (resultSet.next()) { + return true; + } + } catch (SQLException e) { + LOG.error("Could not check if table {} able exists", tableName); + } + + try (Connection connection = connectionFactory.getConnection(); + Statement statement = connection.createStatement()) { + String sql = String.format("CREATE TABLE %s (" + + "recordKey VARCHAR (255) NOT NULL," + + "recordValue VARCHAR (2047) NOT NULL, " + + "PRIMARY KEY(recordKey))", tableName); + statement.execute(sql); + return true; + } catch (SQLException e) { + LOG.error(String.format("Cannot create table %s for record type %s.", + tableName, className), e.getMessage()); + return false; + } + } + + @Override + public boolean isDriverReady() { + return this.initialized; + } + + @Override + public void close() throws Exception { + connectionFactory.shutdown(); + } + + @Override + public QueryResult get(Class clazz) + throws IOException { + String tableName = getAndValidateTableNameForClass(clazz); + verifyDriverReady(); + long start = Time.monotonicNow(); + StateStoreMetrics metrics = getMetrics(); + List ret = new ArrayList<>(); + try (Connection connection = connectionFactory.getConnection(); + PreparedStatement statement = connection.prepareStatement( + String.format("SELECT * FROM %s", tableName))) { + try (ResultSet result = statement.executeQuery()) { + while(result.next()) { + String recordValue = result.getString("recordValue"); + T record = newRecord(recordValue, clazz, false); + ret.add(record); + } + } + } catch (SQLException e) { + if (metrics != null) { + metrics.addFailure(Time.monotonicNow() - start); + } + String msg = "Cannot fetch records for " + clazz.getSimpleName(); + LOG.error(msg, e); + throw new IOException(msg, e); + } + + if (metrics != null) { + metrics.addRead(Time.monotonicNow() - start); + } + return new QueryResult<>(ret, getTime()); + } + + @Override + public boolean putAll( + List records, boolean allowUpdate, boolean errorIfExists) throws IOException { + if (records.isEmpty()) { + return true; + } + + verifyDriverReady(); + StateStoreMetrics metrics = getMetrics(); + + long start = Time.monotonicNow(); + + boolean success = true; + for (T record : records) { + String tableName = getAndValidateTableNameForClass(record.getClass()); + String primaryKey = getPrimaryKey(record); + String data = serializeString(record); + + if (recordExists(tableName, primaryKey)) { + if (allowUpdate) { + // Update the mod time stamp. Many backends will use their + // own timestamp for the mod time. + record.setDateModified(this.getTime()); + if (!updateRecord(tableName, primaryKey, data)) { + LOG.error("Cannot write {} into table {}", primaryKey, tableName); + success = false; + } + } else { + if (errorIfExists) { + LOG.error("Attempted to insert record {} that already exists " + + "in table {} and updates are disallowed.", primaryKey, tableName); + if (metrics != null) { + metrics.addFailure(Time.monotonicNow() - start); + } + return false; + } else { + LOG.debug("Not updating {} as updates are not allowed", record); + } + } + } else { + if (!insertRecord(tableName, primaryKey, data)) { + LOG.error("Cannot write {} in table {}", primaryKey, tableName); + success = false; + } + } + } + + long end = Time.monotonicNow(); + if (metrics != null) { + if (success) { + metrics.addWrite(end - start); + } else { + metrics.addFailure(end - start); + } + } + return success; + } + + @Override + public boolean removeAll(Class clazz) throws IOException { + verifyDriverReady(); + long startTimeMs = Time.monotonicNow(); + StateStoreMetrics metrics = getMetrics(); + boolean success = true; + String tableName = getAndValidateTableNameForClass(clazz); + try (Connection connection = connectionFactory.getConnection(true); + PreparedStatement truncateTable = connection.prepareStatement( + String.format("TRUNCATE TABLE %s", tableName))){ + truncateTable.execute(); + } catch (SQLException e) { + LOG.error("Could not remove all records in table {}", tableName, e); + success = false; + } + + if (metrics != null) { + long durationMs = Time.monotonicNow() - startTimeMs; + if (success) { + metrics.addRemove(durationMs); + } else { + metrics.addFailure(durationMs); + } + } + return success; + } + + @Override + public int remove(Class clazz, Query query) throws IOException { + verifyDriverReady(); + + if (query == null) { + return 0; + } + + long startTimeMs = Time.monotonicNow(); + StateStoreMetrics metrics = getMetrics(); + int removed = 0; + // Get the current records + try { + final QueryResult result = get(clazz); + final List existingRecords = result.getRecords(); + // Write all of the existing records except those to be removed + final List recordsToRemove = filterMultiple(query, existingRecords); + boolean success = true; + for (T recordToRemove : recordsToRemove) { + String tableName = getAndValidateTableNameForClass(clazz); + String primaryKey = getPrimaryKey(recordToRemove); + if (removeRecord(tableName, primaryKey)) { + removed++; + } else { + LOG.error("Cannot remove record {} from table {}", primaryKey, tableName); + success = false; + } + } + if (!success) { + LOG.error("Cannot remove records {} query {}", clazz, query); + if (metrics != null) { + metrics.addFailure(Time.monotonicNow() - startTimeMs); + } + } + } catch (IOException e) { + LOG.error("Cannot remove records {} query {}", clazz, query, e); + if (metrics != null) { + metrics.addFailure(Time.monotonicNow() - startTimeMs); + } + } + + if (removed > 0 && metrics != null) { + metrics.addRemove(Time.monotonicNow() - startTimeMs); + } + return removed; + } + + /** + * Insert a record with a given key into the specified table. + * @param tableName Name of table to modify + * @param key Primary key for the record. + * @return True is operation is successful, false otherwise. + */ + protected boolean insertRecord(String tableName, String key, String data) { + try (Connection connection = connectionFactory.getConnection(true); + PreparedStatement statement = connection.prepareStatement( + String.format("INSERT INTO %s (recordKey, recordValue) VALUES (?, ?)", tableName))) { + statement.setString(1, key); + statement.setString(2, data); + statement.execute(); + } catch (SQLException e) { + LOG.error("Failed to insert record {} into table {}", key, tableName, e); + return false; + } + return true; + } + + /** + * Updates the record with a given key from the specified table. + * @param tableName Name of table to modify + * @param key Primary key for the record. + * @return True is operation is successful, false otherwise. + */ + protected boolean updateRecord(String tableName, String key, String data) { + try (Connection connection = connectionFactory.getConnection(true); + PreparedStatement statement = connection.prepareStatement( + String.format("UPDATE %s SET recordValue = ? WHERE recordKey = ?", tableName))) { + statement.setString(1, data); + statement.setString(2, key); + statement.execute(); + } catch (SQLException e){ + LOG.error("Failed to update record {} in table {}", key, tableName, e); + return false; + } + return true; + } + + /** + * Checks if a record with a given key existing in the specified table. + * @param tableName Name of table to modify + * @param key Primary key for the record. + * @return True is operation is successful, false otherwise. + */ + protected boolean recordExists(String tableName, String key) { + try (Connection connection = connectionFactory.getConnection(true); + PreparedStatement statement = connection.prepareStatement( + String.format("SELECT * FROM %s WHERE recordKey = ?", tableName))) { + statement.setString(1, key); + try (ResultSet result = statement.executeQuery()) { + return result.next(); + } + } catch (SQLException e) { + LOG.error("Failed to check existence of record {} in table {}", key, tableName, e); + return false; + } + } + + /** + * Removes the record with a given key from the specified table. + * @param tableName Name of table to modify + * @param key Primary key for the record. + * @return True is operation is successful, false otherwise. + */ + protected boolean removeRecord(String tableName, String key) { + try (Connection connection = connectionFactory.getConnection(true); + PreparedStatement statement = connection.prepareStatement( + String.format("DELETE FROM %s WHERE recordKey = ?", tableName))) { + statement.setString(1, key); + statement.execute(); + return true; + } catch (SQLException e) { + LOG.error("Failed to remove record {} in table {}", key, tableName, e); + return false; + } + } + + /** + * Get the table for a record class and validate is this is one of the supported + * record types. + * @param clazz Class of the record. + * @return Table name for this record class. + */ + private String getAndValidateTableNameForClass(final Class clazz) { + String tableName = StateStoreUtils.getRecordName(clazz); + if (VALID_TABLES.contains(tableName)) { + return tableName; + } else { + throw new IllegalArgumentException(tableName + " is not a valid table name"); + } + } + + + /** + * Class that relies on a HikariDataSource to provide SQL connections. + */ + static class MySQLStateStoreHikariDataSourceConnectionFactory + implements SQLConnectionFactory { + protected final static String HIKARI_PROPS = SQL_STATE_STORE_CONF_PREFIX + + "connection.hikari."; + private final HikariDataSource dataSource; + + MySQLStateStoreHikariDataSourceConnectionFactory(Configuration conf) { + Properties properties = new Properties(); + properties.setProperty("jdbcUrl", conf.get(StateStoreMySQLImpl.CONNECTION_URL)); + properties.setProperty("username", conf.get(StateStoreMySQLImpl.CONNECTION_USERNAME)); + properties.setProperty("password", conf.get(StateStoreMySQLImpl.CONNECTION_PASSWORD)); + properties.setProperty("driverClassName", conf.get(StateStoreMySQLImpl.CONNECTION_DRIVER)); + + // Include hikari connection properties + properties.putAll(conf.getPropsWithPrefix(HIKARI_PROPS)); + + HikariConfig hikariConfig = new HikariConfig(properties); + this.dataSource = new HikariDataSource(hikariConfig); + } + + @Override + public Connection getConnection() throws SQLException { + return dataSource.getConnection(); + } + + @Override + public void shutdown() { + // Close database connections + dataSource.close(); + } + } + +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java index 7bc93de84bc..8f766c65c5b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.util.Collection; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; @@ -29,6 +31,8 @@ * State Store driver that stores a serialization of the records. The serializer * is pluggable. */ +@InterfaceAudience.Public +@InterfaceStability.Evolving public abstract class StateStoreSerializableImpl extends StateStoreBaseImpl { /** Mark for slashes in path names. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index 780fb76a2da..c7b403ce634 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -362,7 +362,8 @@ Class to implement the State Store. There are three implementation classes currently being supported: org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl, - org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileSystemImpl and + org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileSystemImpl, + org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreMySQLImpl and org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreZooKeeperImpl. These implementation classes use the local file, filesystem and ZooKeeper as a backend respectively. By default it uses the ZooKeeper as the default State Store. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreMySQL.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreMySQL.java new file mode 100644 index 00000000000..ebac2c0b93b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreMySQL.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreMySQLImpl; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.*; + +/** + * Test the FileSystem (e.g., HDFS) implementation of the State Store driver. + */ +public class TestStateStoreMySQL extends TestStateStoreDriverBase { + private static final String CONNECTION_URL = "jdbc:derby:memory:StateStore"; + + @BeforeClass + public static void initDatabase() throws Exception { + Connection connection = DriverManager.getConnection(CONNECTION_URL + ";create=true"); + Statement s = connection.createStatement(); + s.execute("CREATE SCHEMA TESTUSER"); + + Configuration conf = + getStateStoreConfiguration(StateStoreMySQLImpl.class); + conf.set(StateStoreMySQLImpl.CONNECTION_URL, CONNECTION_URL); + conf.set(StateStoreMySQLImpl.CONNECTION_USERNAME, "testuser"); + conf.set(StateStoreMySQLImpl.CONNECTION_PASSWORD, "testpassword"); + conf.set(StateStoreMySQLImpl.CONNECTION_DRIVER, "org.apache.derby.jdbc.EmbeddedDriver"); + getStateStore(conf); + } + + @Before + public void startup() throws IOException { + removeAll(getStateStoreDriver()); + } + + @AfterClass + public static void cleanupDatabase() { + try { + DriverManager.getConnection(CONNECTION_URL + ";drop=true"); + } catch (SQLException e) { + // SQLException expected when database is dropped + if (!e.getMessage().contains("dropped")) { + throw new RuntimeException(e); + } + } + } + + @Test + public void testInsert() + throws IllegalArgumentException, IllegalAccessException, IOException { + testInsert(getStateStoreDriver()); + } + + @Test + public void testUpdate() + throws IllegalArgumentException, ReflectiveOperationException, + IOException, SecurityException { + testPut(getStateStoreDriver()); + } + + @Test + public void testDelete() + throws IllegalArgumentException, IllegalAccessException, IOException { + testRemove(getStateStoreDriver()); + } + + @Test + public void testFetchErrors() + throws IllegalArgumentException, IllegalAccessException, IOException { + testFetchErrors(getStateStoreDriver()); + } + + @Test + public void testMetrics() + throws IllegalArgumentException, IllegalAccessException, IOException { + testMetrics(getStateStoreDriver()); + } +} \ No newline at end of file