YARN-11296. [Federation] Fix SQLFederationStateStore#Sql script bug. (#4858)

This commit is contained in:
slfan1989 2022-09-23 07:40:47 +08:00 committed by GitHub
parent e6d2c336cb
commit e526f48fa4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 378 additions and 12 deletions

View File

@ -36,7 +36,7 @@ CREATE TABLE membership(
state varchar(32) NOT NULL, state varchar(32) NOT NULL,
lastStartTime bigint NULL, lastStartTime bigint NULL,
capability varchar(6000), capability varchar(6000),
CONSTRAINT pk_subClusterId PRIMARY KEY (subClusterId) CONSTRAINT pk_subClusterId PRIMARY KEY (subClusterId),
UNIQUE(lastStartTime) UNIQUE(lastStartTime)
); );

View File

@ -77,7 +77,7 @@ IF NOT EXISTS ( SELECT * FROM [FederationStateStore].sys.tables
CONSTRAINT [pk_subClusterId] PRIMARY KEY CONSTRAINT [pk_subClusterId] PRIMARY KEY
( (
[subClusterId] [subClusterId]
) ),
CONSTRAINT [uc_lastStartTime] UNIQUE CONSTRAINT [uc_lastStartTime] UNIQUE
( (
[lastStartTime] [lastStartTime]
@ -140,7 +140,7 @@ IF NOT EXISTS ( SELECT * FROM [FederationStateStore].sys.tables
CREATE TABLE [dbo].[reservationsHomeSubCluster]( CREATE TABLE [dbo].[reservationsHomeSubCluster](
reservationId VARCHAR(128) COLLATE Latin1_General_100_BIN2 NOT NULL, reservationId VARCHAR(128) COLLATE Latin1_General_100_BIN2 NOT NULL,
homeSubCluster VARCHAR(256) NOT NULL, homeSubCluster VARCHAR(256) NOT NULL,
createTime DATETIME2 NOT NULL CONSTRAINT ts_createAppTime DEFAULT GETUTCDATE(), createTime DATETIME2 NOT NULL CONSTRAINT ts_createResTime DEFAULT GETUTCDATE(),
CONSTRAINT [pk_reservationId] PRIMARY KEY CONSTRAINT [pk_reservationId] PRIMARY KEY
( (

View File

@ -169,6 +169,7 @@
<artifactId>maven-antrun-plugin</artifactId> <artifactId>maven-antrun-plugin</artifactId>
<executions> <executions>
<execution> <execution>
<id>copy-site</id>
<phase>pre-site</phase> <phase>pre-site</phase>
<goals> <goals>
<goal>run</goal> <goal>run</goal>
@ -179,6 +180,21 @@
</tasks> </tasks>
</configuration> </configuration>
</execution> </execution>
<execution>
<id>copy-sql</id>
<phase>test</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<tasks>
<copy file="${basedir}/../../bin/FederationStateStore/MySQL/FederationStateStoreTables.sql" todir="${project.build.directory}/test-classes/MySQL"/>
<copy file="${basedir}/../../bin/FederationStateStore/MySQL/FederationStateStoreStoredProcs.sql" todir="${project.build.directory}/test-classes/MySQL"/>
<copy file="${basedir}/../../bin/FederationStateStore/SQLServer/FederationStateStoreTables.sql" todir="${project.build.directory}/test-classes/SQLServer"/>
<copy file="${basedir}/../../bin/FederationStateStore/SQLServer/FederationStateStoreStoredProcs.sql" todir="${project.build.directory}/test-classes/SQLServer"/>
</tasks>
</configuration>
</execution>
</executions> </executions>
</plugin> </plugin>
<plugin> <plugin>

View File

@ -1389,6 +1389,11 @@ public class SQLFederationStateStore implements FederationStateStore {
" according to reservation" + reservationId); " according to reservation" + reservationId);
} }
@VisibleForTesting
public Connection getConn() {
return conn;
}
@Override @Override
public RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest request) public RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest request)
throws YarnException, IOException { throws YarnException, IOException {

View File

@ -18,9 +18,18 @@
package org.apache.hadoop.yarn.server.federation.store.impl; package org.apache.hadoop.yarn.server.federation.store.impl;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.Connection; import java.sql.Connection;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
@ -301,15 +310,12 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore {
+ " WHERE reservationId = reservationId_IN;" + " WHERE reservationId = reservationId_IN;"
+ " SET rowCount_OUT = 2; END"; + " SET rowCount_OUT = 2; END";
private List<String> tables = new ArrayList<>();
@Override @Override
public void init(Configuration conf) { public void init(Configuration conf) {
try { try {
super.init(conf); super.init(conf);
} catch (YarnException e1) {
LOG.error("ERROR: failed to init HSQLDB " + e1.getMessage());
}
try {
conn = super.conn; conn = super.conn;
LOG.info("Database Init: Start"); LOG.info("Database Init: Start");
@ -342,8 +348,17 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore {
conn.prepareStatement(SP_UPDATERESERVATIONHOMESUBCLUSTER).execute(); conn.prepareStatement(SP_UPDATERESERVATIONHOMESUBCLUSTER).execute();
LOG.info("Database Init: Complete"); LOG.info("Database Init: Complete");
} catch (SQLException e) { } catch (Exception e) {
LOG.error("ERROR: failed to inizialize HSQLDB " + e.getMessage()); LOG.error("ERROR: failed to initialize HSQLDB {}.", e.getMessage());
}
}
public void initConnection(Configuration conf) {
try {
super.init(conf);
conn = super.conn;
} catch (YarnException e1) {
LOG.error("ERROR: failed open connection to HSQLDB DB {}.", e1.getMessage());
} }
} }
@ -351,9 +366,38 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore {
try { try {
conn.close(); conn.close();
} catch (SQLException e) { } catch (SQLException e) {
LOG.error( LOG.error("ERROR: failed to close connection to HSQLDB DB {}.", e.getMessage());
"ERROR: failed to close connection to HSQLDB DB " + e.getMessage());
} }
} }
/**
* Extract The Create Table Sql From The Script.
*
* @param dbIdentifier database identifier, Like Mysql / SqlServer
* @param regex the regex
* @throws IOException IO exception.
*/
protected void extractCreateTableSQL(String dbIdentifier, String regex) throws IOException {
String[] createTableScriptPathItems = new String[] {
".", "target", "test-classes", dbIdentifier, "FederationStateStoreTables.sql" };
String createTableScriptPath = StringUtils.join(createTableScriptPathItems, File.separator);
String createTableSQL =
FileUtils.readFileToString(new File(createTableScriptPath), StandardCharsets.UTF_8);
Pattern p = Pattern.compile(regex);
Matcher m = p.matcher(createTableSQL);
while (m != null && m.find()) {
String group = m.group();
tables.add(group);
}
}
public List<String> getTables() {
return tables;
}
public void setTables(List<String> tables) {
this.tables = tables;
}
} }

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.store.impl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* MySQLFederationStateStore implementation of {@link FederationStateStore}.
*/
public class MySQLFederationStateStore extends HSQLDBFederationStateStore {
private static final Logger LOG =
LoggerFactory.getLogger(MySQLFederationStateStore.class);
@Override
public void init(Configuration conf) {
try {
super.initConnection(conf);
// get the sql that creates the table
extractCreateTableSQL("MySQL", "CREATE TABLE.*\\n(.*,\\n){1,10}.*\\n.*");
// print log
LOG.info("Mysql - tables = {}.", getTables().size());
} catch (IOException e) {
LOG.error("ERROR: failed to init HSQLDB {}", e.getMessage());
}
}
}

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.store.impl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
/**
* SQLServerFederationStateStore implementation of {@link FederationStateStore}.
*/
public class SQLServerFederationStateStore extends HSQLDBFederationStateStore {
private static final Logger LOG =
LoggerFactory.getLogger(SQLServerFederationStateStore.class);
@Override
public void init(Configuration conf) {
try {
super.initConnection(conf);
// get the sql that creates the table
extractCreateTableSQL("SQLServer", "CREATE TABLE .*\\n(.*,\\n){1,5}.*(\\n.*){1,15}\\)");
List<String> tables = getTables();
// replacing some incompatible syntaxes
if (tables != null && !tables.isEmpty()) {
tables = tables.stream().map(table -> {
String newTable = table.replace("COLLATE Latin1_General_100_BIN2", "").
replace("DEFAULT GETUTCDATE()", "").
replace("[dbo].", "").
replace("[", "").
replace("]", "");
return newTable;
}).collect(Collectors.toList());
setTables(tables);
}
// print log
LOG.info("SqlServer - tables = {}.", tables.size());
} catch (IOException e) {
LOG.error("ERROR: failed to init HSQLDB {}.", e.getMessage());
}
}
}

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.store.sql;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
public abstract class FederationSQLAccuracyTest {
protected static final String HSQLDB_DRIVER = "org.hsqldb.jdbc.JDBCDataSource";
protected static final String DATABASE_URL = "jdbc:hsqldb:mem:state";
protected static final String DATABASE_USERNAME = "SA";
protected static final String DATABASE_PASSWORD = "";
private FederationStateStore stateStore;
protected abstract FederationStateStore createStateStore();
private Configuration conf;
@Before
public void before() throws IOException, YarnException {
stateStore = createStateStore();
conf = new YarnConfiguration();
conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_JDBC_CLASS, HSQLDB_DRIVER);
conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_USERNAME, DATABASE_USERNAME);
conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_PASSWORD, DATABASE_PASSWORD);
conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_URL, getSQLURL());
stateStore.init(conf);
}
@After
public void after() throws Exception {
stateStore.close();
}
protected abstract String getSQLURL();
protected void setConf(Configuration conf) {
this.conf = conf;
}
protected Configuration getConf() {
return conf;
}
protected FederationStateStore getStateStore() {
return stateStore;
}
}

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.store.sql;
import org.apache.hadoop.yarn.server.federation.store.impl.MySQLFederationStateStore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.List;
public class TestFederationMySQLScriptAccuracy extends FederationSQLAccuracyTest {
private static final Logger LOG =
LoggerFactory.getLogger(TestFederationMySQLScriptAccuracy.class);
private static final String MYSQL_COMPATIBILITY = ";sql.syntax_mys=true";
@Override
protected MySQLFederationStateStore createStateStore() {
return new MySQLFederationStateStore();
}
@Override
protected String getSQLURL() {
return DATABASE_URL + System.currentTimeMillis() + MYSQL_COMPATIBILITY;
}
@Test
public void checkMysqlScriptAccuracy() throws SQLException {
MySQLFederationStateStore federationStateStore = this.createStateStore();
federationStateStore.initConnection(this.getConf());
// get a list of tables
List<String> tables = federationStateStore.getTables();
for (String table : tables) {
federationStateStore.getConn().prepareStatement(table).execute();
}
LOG.info("FederationStateStore create {} tables.", tables.size());
}
}

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.store.sql;
import org.apache.hadoop.yarn.server.federation.store.impl.SQLServerFederationStateStore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.List;
public class TestFederationSQLServerScriptAccuracy extends FederationSQLAccuracyTest {
private static final Logger LOG =
LoggerFactory.getLogger(TestFederationSQLServerScriptAccuracy.class);
private static final String SQLSERVER_COMPATIBILITY = ";sql.syntax_mss=true";
@Override
protected SQLServerFederationStateStore createStateStore() {
return new SQLServerFederationStateStore();
}
@Override
protected String getSQLURL() {
return DATABASE_URL + System.currentTimeMillis() + SQLSERVER_COMPATIBILITY;
}
@Test
public void checkSqlServerScriptAccuracy() throws SQLException {
SQLServerFederationStateStore federationStateStore = this.createStateStore();
federationStateStore.init(getConf());
// get a list of tables
List<String> tables = federationStateStore.getTables();
for (String table : tables) {
federationStateStore.getConn().prepareStatement(table).execute();
}
LOG.info("FederationStateStore create {} tables.", tables.size());
}
}