YARN-5408. Compose Federation membership/application/policy APIs into an uber FederationStateStore API. (Ellen Hui via Subru).

(cherry picked from commit 268389d021)
(cherry picked from commit e0c3a44396)
This commit is contained in:
Subru Krishnan 2016-08-08 14:53:38 -07:00 committed by Carlo Curino
parent d0dddb4ebd
commit 4c3d45a1cf
6 changed files with 99 additions and 94 deletions

View File

@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHom
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.records.Version;
/**
* FederationApplicationHomeSubClusterStore maintains the state of all
@ -49,15 +48,6 @@ import org.apache.hadoop.yarn.server.records.Version;
@Unstable
public interface FederationApplicationHomeSubClusterStore {
/**
* Get the {@link Version} of the underlying federation application state
* store.
*
* @return the {@link Version} of the underlying federation application state
* store
*/
Version getApplicationStateStoreVersion();
/**
* Register the home {@code SubClusterId} of the newly submitted
* {@code ApplicationId}. Currently response is empty if the operation was
@ -91,16 +81,16 @@ public interface FederationApplicationHomeSubClusterStore {
* {@code ApplicationId}.
*
* @param request contains the application queried
* @return {@code ApplicationHomeSubCluster} containing the application's
* home subcluster
* @return {@code ApplicationHomeSubCluster} containing the application's home
* subcluster
* @throws YarnException if the request is invalid/fails
*/
GetApplicationHomeSubClusterResponse getApplicationHomeSubClusterMap(
GetApplicationHomeSubClusterRequest request) throws YarnException;
/**
* Get the {@code ApplicationHomeSubCluster} list representing the mapping
* of all submitted applications to it's home sub-cluster.
* Get the {@code ApplicationHomeSubCluster} list representing the mapping of
* all submitted applications to it's home sub-cluster.
*
* @param request empty representing all applications
* @return the mapping of all submitted application to it's home sub-cluster

View File

@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbea
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
import org.apache.hadoop.yarn.server.records.Version;
/**
* FederationMembershipStateStore maintains the state of all
@ -41,15 +40,6 @@ import org.apache.hadoop.yarn.server.records.Version;
@Unstable
public interface FederationMembershipStateStore {
/**
* Get the {@link Version} of the underlying federation membership state
* store.
*
* @return the {@link Version} of the underlying federation membership state
* store
*/
Version getMembershipStateStoreVersion();
/**
* Register a <em>subcluster</em> by publishing capabilities as represented by
* {@code SubClusterInfo} to indicate participation in federation. This is
@ -80,7 +70,7 @@ public interface FederationMembershipStateStore {
*/
SubClusterDeregisterResponse deregisterSubCluster(
SubClusterDeregisterRequest subClusterDeregisterRequest)
throws YarnException;
throws YarnException;
/**
* Periodic heartbeat from a <code>ResourceManager</code> participating in
@ -96,7 +86,7 @@ public interface FederationMembershipStateStore {
*/
SubClusterHeartbeatResponse subClusterHeartbeat(
SubClusterHeartbeatRequest subClusterHeartbeatRequest)
throws YarnException;
throws YarnException;
/**
* Get the membership information of <em>subcluster</em> as identified by

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.store;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.records.Version;
/**
* FederationStore extends the three interfaces used to coordinate the state of
* a federated cluster: {@link FederationApplicationHomeSubClusterStore},
* {@link FederationMembershipStateStore}, and {@link FederationPolicyStore}.
*
*/
public interface FederationStateStore
extends FederationApplicationHomeSubClusterStore,
FederationMembershipStateStore, FederationPolicyStore {
/**
* Initialize the FederationStore.
*
* @param conf the cluster configuration
* @throws YarnException if initialization fails
*/
void init(Configuration conf) throws YarnException;
/**
* Perform any cleanup operations of the StateStore.
*
* @throws Exception if cleanup fails
*/
void close() throws Exception;
/**
* Get the {@link Version} of the underlying federation state store client.
*
* @return the {@link Version} of the underlying federation store client
*/
Version getCurrentVersion();
/**
* Load the version information from the federation state store.
*
* @return the {@link Version} of the federation state store
*/
Version loadVersion();
}

View File

@ -36,11 +36,8 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbea
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.util.MonotonicClock;
import com.google.common.annotations.VisibleForTesting;
/**
* In-memory implementation of FederationMembershipStateStore.
*/
@ -51,11 +48,6 @@ public class MemoryFederationStateStore
new ConcurrentHashMap<SubClusterId, SubClusterInfo>();
private final MonotonicClock clock = new MonotonicClock();
@Override
public Version getMembershipStateStoreVersion() {
return null;
}
@Override
public SubClusterRegisterResponse registerSubCluster(
SubClusterRegisterRequest request) throws YarnException {
@ -122,17 +114,6 @@ public class MemoryFederationStateStore
}
}
return GetSubClustersInfoResponse.newInstance(result);
}
@VisibleForTesting
public Map<SubClusterId, SubClusterInfo> getMembershipTable() {
return membership;
}
@VisibleForTesting
public void clearMembershipTable() {
membership.clear();
}
}

View File

@ -19,25 +19,21 @@ package org.apache.hadoop.yarn.server.federation.store.impl;
import java.io.IOException;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
@ -45,17 +41,22 @@ import org.junit.Test;
*/
public abstract class FederationStateStoreBaseTest {
static final Logger LOG =
LoggerFactory.getLogger(FederationStateStoreBaseTest.class);
private static final MonotonicClock CLOCK = new MonotonicClock();
private FederationMembershipStateStore stateStore = getStateStore();
private FederationMembershipStateStore stateStore;
@Before
public void before() throws IOException {
clearMembership();
stateStore = getCleanStateStore();
}
@After
public void after() {
stateStore = null;
}
protected abstract FederationMembershipStateStore getCleanStateStore();
@Test
public void testRegisterSubCluster() throws Exception {
SubClusterId subClusterId = SubClusterId.newInstance("SC");
@ -63,11 +64,9 @@ public abstract class FederationStateStoreBaseTest {
SubClusterRegisterResponse result = stateStore.registerSubCluster(
SubClusterRegisterRequest.newInstance(subClusterInfo));
Map<SubClusterId, SubClusterInfo> membership = getMembership();
Assert.assertNotNull(membership.get(subClusterId));
Assert.assertNotNull(result);
Assert.assertEquals(subClusterInfo, membership.get(subClusterId));
Assert.assertEquals(subClusterInfo, querySubClusterInfo(subClusterId));
}
@Test
@ -83,10 +82,8 @@ public abstract class FederationStateStoreBaseTest {
stateStore.deregisterSubCluster(deregisterRequest);
Map<SubClusterId, SubClusterInfo> membership = getMembership();
Assert.assertNotNull(membership.get(subClusterId));
Assert.assertEquals(membership.get(subClusterId).getState(),
SubClusterState.SC_UNREGISTERED);
Assert.assertEquals(SubClusterState.SC_UNREGISTERED,
querySubClusterInfo(subClusterId).getState());
}
@Test
@ -179,10 +176,9 @@ public abstract class FederationStateStoreBaseTest {
.newInstance(subClusterId, SubClusterState.SC_RUNNING, "cabability");
stateStore.subClusterHeartbeat(heartbeatRequest);
Map<SubClusterId, SubClusterInfo> membership = getMembership();
Assert.assertEquals(membership.get(subClusterId).getState(),
SubClusterState.SC_RUNNING);
Assert.assertNotNull(membership.get(subClusterId).getLastHeartBeat());
Assert.assertEquals(SubClusterState.SC_RUNNING,
querySubClusterInfo(subClusterId).getState());
Assert.assertNotNull(querySubClusterInfo(subClusterId).getLastHeartBeat());
}
@Test
@ -212,10 +208,11 @@ public abstract class FederationStateStoreBaseTest {
CLOCK.getTime(), "cabability");
}
protected abstract Map<SubClusterId, SubClusterInfo> getMembership();
protected abstract void clearMembership();
protected abstract FederationMembershipStateStore getStateStore();
private SubClusterInfo querySubClusterInfo(SubClusterId subClusterId)
throws YarnException {
GetSubClusterInfoRequest request =
GetSubClusterInfoRequest.newInstance(subClusterId);
return stateStore.getSubCluster(request).getSubClusterInfo();
}
}

View File

@ -17,11 +17,7 @@
package org.apache.hadoop.yarn.server.federation.store.impl;
import java.util.Map;
import org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
/**
* Unit tests for MemoryFederationStateStore.
@ -29,21 +25,8 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
public class TestMemoryFederationStateStore
extends FederationStateStoreBaseTest {
private static final MemoryFederationStateStore STATESTORE =
new MemoryFederationStateStore();
@Override
protected Map<SubClusterId, SubClusterInfo> getMembership() {
return STATESTORE.getMembershipTable();
}
@Override
protected void clearMembership() {
STATESTORE.clearMembershipTable();
}
@Override
protected FederationMembershipStateStore getStateStore() {
return STATESTORE;
protected FederationMembershipStateStore getCleanStateStore() {
return new MemoryFederationStateStore();
}
}