YARN-5408. Compose Federation membership/application/policy APIs into an uber FederationStateStore API. (Ellen Hui via Subru).

(cherry picked from commit 268389d021)
This commit is contained in:
Subru Krishnan 2016-08-08 14:53:38 -07:00 committed by Carlo Curino
parent f1a508bdef
commit e0c3a44396
6 changed files with 99 additions and 94 deletions

View File

@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHom
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.records.Version;
/** /**
* FederationApplicationHomeSubClusterStore maintains the state of all * FederationApplicationHomeSubClusterStore maintains the state of all
@ -49,15 +48,6 @@ import org.apache.hadoop.yarn.server.records.Version;
@Unstable @Unstable
public interface FederationApplicationHomeSubClusterStore { public interface FederationApplicationHomeSubClusterStore {
/**
* Get the {@link Version} of the underlying federation application state
* store.
*
* @return the {@link Version} of the underlying federation application state
* store
*/
Version getApplicationStateStoreVersion();
/** /**
* Register the home {@code SubClusterId} of the newly submitted * Register the home {@code SubClusterId} of the newly submitted
* {@code ApplicationId}. Currently response is empty if the operation was * {@code ApplicationId}. Currently response is empty if the operation was
@ -91,16 +81,16 @@ public interface FederationApplicationHomeSubClusterStore {
* {@code ApplicationId}. * {@code ApplicationId}.
* *
* @param request contains the application queried * @param request contains the application queried
* @return {@code ApplicationHomeSubCluster} containing the application's * @return {@code ApplicationHomeSubCluster} containing the application's home
* home subcluster * subcluster
* @throws YarnException if the request is invalid/fails * @throws YarnException if the request is invalid/fails
*/ */
GetApplicationHomeSubClusterResponse getApplicationHomeSubClusterMap( GetApplicationHomeSubClusterResponse getApplicationHomeSubClusterMap(
GetApplicationHomeSubClusterRequest request) throws YarnException; GetApplicationHomeSubClusterRequest request) throws YarnException;
/** /**
* Get the {@code ApplicationHomeSubCluster} list representing the mapping * Get the {@code ApplicationHomeSubCluster} list representing the mapping of
* of all submitted applications to it's home sub-cluster. * all submitted applications to it's home sub-cluster.
* *
* @param request empty representing all applications * @param request empty representing all applications
* @return the mapping of all submitted application to it's home sub-cluster * @return the mapping of all submitted application to it's home sub-cluster

View File

@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbea
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
import org.apache.hadoop.yarn.server.records.Version;
/** /**
* FederationMembershipStateStore maintains the state of all * FederationMembershipStateStore maintains the state of all
@ -41,15 +40,6 @@ import org.apache.hadoop.yarn.server.records.Version;
@Unstable @Unstable
public interface FederationMembershipStateStore { public interface FederationMembershipStateStore {
/**
* Get the {@link Version} of the underlying federation membership state
* store.
*
* @return the {@link Version} of the underlying federation membership state
* store
*/
Version getMembershipStateStoreVersion();
/** /**
* Register a <em>subcluster</em> by publishing capabilities as represented by * Register a <em>subcluster</em> by publishing capabilities as represented by
* {@code SubClusterInfo} to indicate participation in federation. This is * {@code SubClusterInfo} to indicate participation in federation. This is
@ -80,7 +70,7 @@ public interface FederationMembershipStateStore {
*/ */
SubClusterDeregisterResponse deregisterSubCluster( SubClusterDeregisterResponse deregisterSubCluster(
SubClusterDeregisterRequest subClusterDeregisterRequest) SubClusterDeregisterRequest subClusterDeregisterRequest)
throws YarnException; throws YarnException;
/** /**
* Periodic heartbeat from a <code>ResourceManager</code> participating in * Periodic heartbeat from a <code>ResourceManager</code> participating in
@ -96,7 +86,7 @@ public interface FederationMembershipStateStore {
*/ */
SubClusterHeartbeatResponse subClusterHeartbeat( SubClusterHeartbeatResponse subClusterHeartbeat(
SubClusterHeartbeatRequest subClusterHeartbeatRequest) SubClusterHeartbeatRequest subClusterHeartbeatRequest)
throws YarnException; throws YarnException;
/** /**
* Get the membership information of <em>subcluster</em> as identified by * Get the membership information of <em>subcluster</em> as identified by

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.store;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.records.Version;
/**
* FederationStore extends the three interfaces used to coordinate the state of
* a federated cluster: {@link FederationApplicationHomeSubClusterStore},
* {@link FederationMembershipStateStore}, and {@link FederationPolicyStore}.
*
*/
public interface FederationStateStore
extends FederationApplicationHomeSubClusterStore,
FederationMembershipStateStore, FederationPolicyStore {
/**
* Initialize the FederationStore.
*
* @param conf the cluster configuration
* @throws YarnException if initialization fails
*/
void init(Configuration conf) throws YarnException;
/**
* Perform any cleanup operations of the StateStore.
*
* @throws Exception if cleanup fails
*/
void close() throws Exception;
/**
* Get the {@link Version} of the underlying federation state store client.
*
* @return the {@link Version} of the underlying federation store client
*/
Version getCurrentVersion();
/**
* Load the version information from the federation state store.
*
* @return the {@link Version} of the federation state store
*/
Version loadVersion();
}

View File

@ -36,11 +36,8 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbea
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.util.MonotonicClock; import org.apache.hadoop.yarn.util.MonotonicClock;
import com.google.common.annotations.VisibleForTesting;
/** /**
* In-memory implementation of FederationMembershipStateStore. * In-memory implementation of FederationMembershipStateStore.
*/ */
@ -51,11 +48,6 @@ public class MemoryFederationStateStore
new ConcurrentHashMap<SubClusterId, SubClusterInfo>(); new ConcurrentHashMap<SubClusterId, SubClusterInfo>();
private final MonotonicClock clock = new MonotonicClock(); private final MonotonicClock clock = new MonotonicClock();
@Override
public Version getMembershipStateStoreVersion() {
return null;
}
@Override @Override
public SubClusterRegisterResponse registerSubCluster( public SubClusterRegisterResponse registerSubCluster(
SubClusterRegisterRequest request) throws YarnException { SubClusterRegisterRequest request) throws YarnException {
@ -122,17 +114,6 @@ public class MemoryFederationStateStore
} }
} }
return GetSubClustersInfoResponse.newInstance(result); return GetSubClustersInfoResponse.newInstance(result);
}
@VisibleForTesting
public Map<SubClusterId, SubClusterInfo> getMembershipTable() {
return membership;
}
@VisibleForTesting
public void clearMembershipTable() {
membership.clear();
} }
} }

View File

@ -19,25 +19,21 @@ package org.apache.hadoop.yarn.server.federation.store.impl;
import java.io.IOException; import java.io.IOException;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore; import org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.util.MonotonicClock; import org.apache.hadoop.yarn.util.MonotonicClock;
import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
/** /**
@ -45,17 +41,22 @@ import org.junit.Test;
*/ */
public abstract class FederationStateStoreBaseTest { public abstract class FederationStateStoreBaseTest {
static final Logger LOG =
LoggerFactory.getLogger(FederationStateStoreBaseTest.class);
private static final MonotonicClock CLOCK = new MonotonicClock(); private static final MonotonicClock CLOCK = new MonotonicClock();
private FederationMembershipStateStore stateStore = getStateStore(); private FederationMembershipStateStore stateStore;
@Before @Before
public void before() throws IOException { public void before() throws IOException {
clearMembership(); stateStore = getCleanStateStore();
} }
@After
public void after() {
stateStore = null;
}
protected abstract FederationMembershipStateStore getCleanStateStore();
@Test @Test
public void testRegisterSubCluster() throws Exception { public void testRegisterSubCluster() throws Exception {
SubClusterId subClusterId = SubClusterId.newInstance("SC"); SubClusterId subClusterId = SubClusterId.newInstance("SC");
@ -63,11 +64,9 @@ public abstract class FederationStateStoreBaseTest {
SubClusterRegisterResponse result = stateStore.registerSubCluster( SubClusterRegisterResponse result = stateStore.registerSubCluster(
SubClusterRegisterRequest.newInstance(subClusterInfo)); SubClusterRegisterRequest.newInstance(subClusterInfo));
Map<SubClusterId, SubClusterInfo> membership = getMembership();
Assert.assertNotNull(membership.get(subClusterId));
Assert.assertNotNull(result); Assert.assertNotNull(result);
Assert.assertEquals(subClusterInfo, membership.get(subClusterId)); Assert.assertEquals(subClusterInfo, querySubClusterInfo(subClusterId));
} }
@Test @Test
@ -83,10 +82,8 @@ public abstract class FederationStateStoreBaseTest {
stateStore.deregisterSubCluster(deregisterRequest); stateStore.deregisterSubCluster(deregisterRequest);
Map<SubClusterId, SubClusterInfo> membership = getMembership(); Assert.assertEquals(SubClusterState.SC_UNREGISTERED,
Assert.assertNotNull(membership.get(subClusterId)); querySubClusterInfo(subClusterId).getState());
Assert.assertEquals(membership.get(subClusterId).getState(),
SubClusterState.SC_UNREGISTERED);
} }
@Test @Test
@ -179,10 +176,9 @@ public abstract class FederationStateStoreBaseTest {
.newInstance(subClusterId, SubClusterState.SC_RUNNING, "cabability"); .newInstance(subClusterId, SubClusterState.SC_RUNNING, "cabability");
stateStore.subClusterHeartbeat(heartbeatRequest); stateStore.subClusterHeartbeat(heartbeatRequest);
Map<SubClusterId, SubClusterInfo> membership = getMembership(); Assert.assertEquals(SubClusterState.SC_RUNNING,
Assert.assertEquals(membership.get(subClusterId).getState(), querySubClusterInfo(subClusterId).getState());
SubClusterState.SC_RUNNING); Assert.assertNotNull(querySubClusterInfo(subClusterId).getLastHeartBeat());
Assert.assertNotNull(membership.get(subClusterId).getLastHeartBeat());
} }
@Test @Test
@ -212,10 +208,11 @@ public abstract class FederationStateStoreBaseTest {
CLOCK.getTime(), "cabability"); CLOCK.getTime(), "cabability");
} }
protected abstract Map<SubClusterId, SubClusterInfo> getMembership(); private SubClusterInfo querySubClusterInfo(SubClusterId subClusterId)
throws YarnException {
protected abstract void clearMembership(); GetSubClusterInfoRequest request =
GetSubClusterInfoRequest.newInstance(subClusterId);
protected abstract FederationMembershipStateStore getStateStore(); return stateStore.getSubCluster(request).getSubClusterInfo();
}
} }

View File

@ -17,11 +17,7 @@
package org.apache.hadoop.yarn.server.federation.store.impl; package org.apache.hadoop.yarn.server.federation.store.impl;
import java.util.Map;
import org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore; import org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
/** /**
* Unit tests for MemoryFederationStateStore. * Unit tests for MemoryFederationStateStore.
@ -29,21 +25,8 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
public class TestMemoryFederationStateStore public class TestMemoryFederationStateStore
extends FederationStateStoreBaseTest { extends FederationStateStoreBaseTest {
private static final MemoryFederationStateStore STATESTORE =
new MemoryFederationStateStore();
@Override @Override
protected Map<SubClusterId, SubClusterInfo> getMembership() { protected FederationMembershipStateStore getCleanStateStore() {
return STATESTORE.getMembershipTable(); return new MemoryFederationStateStore();
}
@Override
protected void clearMembership() {
STATESTORE.clearMembershipTable();
}
@Override
protected FederationMembershipStateStore getStateStore() {
return STATESTORE;
} }
} }