YARN-5604. [Federation] Add versioning for FederationStateStore. (#5394)

This commit is contained in:
slfan1989 2023-02-25 02:51:19 +08:00 committed by GitHub
parent 4067facae6
commit 27a54955f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 86 additions and 9 deletions

View File

@ -60,7 +60,21 @@ public interface FederationStateStore extends
* Load the version information from the federation state store.
*
* @return the {@link Version} of the federation state store
* @throws Exception an exception occurred in load version.
*/
Version loadVersion();
Version loadVersion() throws Exception;
/**
* Store the Version information in federation state store.
*
* @throws Exception an exception occurred in store version.
*/
void storeVersion() throws Exception;
/**
* Check the version of federation stateStore.
*
* @throws Exception an exception occurred in check version.
*/
void checkVersion() throws Exception;
}

View File

@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.Comparator;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.DelegationKey;
@ -366,12 +367,22 @@ public class MemoryFederationStateStore implements FederationStateStore {
@Override
public Version getCurrentVersion() {
return null;
throw new NotImplementedException("Code is not implemented");
}
@Override
public Version loadVersion() {
return null;
public Version loadVersion() throws Exception {
throw new NotImplementedException("Code is not implemented");
}
@Override
public void storeVersion() throws Exception {
throw new NotImplementedException("Code is not implemented");
}
@Override
public void checkVersion() throws Exception {
throw new NotImplementedException("Code is not implemented");
}
@Override

View File

@ -997,7 +997,17 @@ public class SQLFederationStateStore implements FederationStateStore {
}
@Override
public Version loadVersion() {
public Version loadVersion() throws Exception {
throw new NotImplementedException("Code is not implemented");
}
@Override
public void storeVersion() throws Exception {
throw new NotImplementedException("Code is not implemented");
}
@Override
public void checkVersion() throws Exception {
throw new NotImplementedException("Code is not implemented");
}

View File

@ -30,6 +30,7 @@ import java.util.TimeZone;
import java.util.Comparator;
import java.util.stream.Collectors;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.curator.framework.recipes.shared.VersionedValue;
import org.apache.hadoop.classification.VisibleForTesting;
@ -642,12 +643,22 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
@Override
public Version getCurrentVersion() {
return null;
throw new NotImplementedException("Code is not implemented");
}
@Override
public Version loadVersion() {
return null;
public Version loadVersion() throws Exception {
throw new NotImplementedException("Code is not implemented");
}
@Override
public void storeVersion() throws Exception {
throw new NotImplementedException("Code is not implemented");
}
@Override
public void checkVersion() throws Exception {
throw new NotImplementedException("Code is not implemented");
}
/**

View File

@ -26,6 +26,7 @@ import java.util.Set;
import java.util.HashSet;
import java.util.TimeZone;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.delegation.DelegationKey;
@ -1043,4 +1044,24 @@ public abstract class FederationStateStoreBaseTest {
checkRouterStoreToken(identifier, getStoreTokenResp);
}
@Test(expected = NotImplementedException.class)
public void testGetCurrentVersion() {
stateStore.getCurrentVersion();
}
@Test(expected = NotImplementedException.class)
public void testStoreVersion() throws Exception {
stateStore.storeVersion();
}
@Test(expected = NotImplementedException.class)
public void testLoadVersion() throws Exception {
stateStore.loadVersion();
}
@Test(expected = NotImplementedException.class)
public void testCheckVersion() throws Exception {
stateStore.checkVersion();
}
}

View File

@ -266,10 +266,20 @@ public class FederationStateStoreService extends AbstractService
}
@Override
public Version loadVersion() {
public Version loadVersion() throws Exception {
return stateStoreClient.getCurrentVersion();
}
@Override
public void storeVersion() throws Exception {
stateStoreClient.storeVersion();
}
@Override
public void checkVersion() throws Exception {
stateStoreClient.checkVersion();
}
@Override
public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
GetSubClusterPolicyConfigurationRequest request) throws YarnException {