YARN-5406. In-memory based implementation of the FederationMembershipStateStore. Contributed by Ellen Hui.

(cherry picked from commit 20d1d2be91)
This commit is contained in:
Subru Krishnan 2016-08-04 15:54:38 -07:00 committed by Carlo Curino
parent 3e22896f86
commit dcf8c52d80
6 changed files with 433 additions and 0 deletions

View File

@ -0,0 +1,138 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.store.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.util.MonotonicClock;
import com.google.common.annotations.VisibleForTesting;
/**
* In-memory implementation of FederationMembershipStateStore.
*/
public class MemoryFederationStateStore
implements FederationMembershipStateStore {
private final Map<SubClusterId, SubClusterInfo> membership =
new ConcurrentHashMap<SubClusterId, SubClusterInfo>();
private final MonotonicClock clock = new MonotonicClock();
@Override
public Version getMembershipStateStoreVersion() {
return null;
}
@Override
public SubClusterRegisterResponse registerSubCluster(
SubClusterRegisterRequest request) throws YarnException {
SubClusterInfo subClusterInfo = request.getSubClusterInfo();
subClusterInfo.setLastStartTime(clock.getTime());
membership.put(subClusterInfo.getSubClusterId(), subClusterInfo);
return SubClusterRegisterResponse.newInstance();
}
@Override
public SubClusterDeregisterResponse deregisterSubCluster(
SubClusterDeregisterRequest request) throws YarnException {
SubClusterInfo subClusterInfo = membership.get(request.getSubClusterId());
if (subClusterInfo == null) {
throw new YarnException(
"SubCluster " + request.getSubClusterId().toString() + " not found");
} else {
subClusterInfo.setState(request.getState());
}
return SubClusterDeregisterResponse.newInstance();
}
@Override
public SubClusterHeartbeatResponse subClusterHeartbeat(
SubClusterHeartbeatRequest request) throws YarnException {
SubClusterId subClusterId = request.getSubClusterId();
SubClusterInfo subClusterInfo = membership.get(subClusterId);
if (subClusterInfo == null) {
throw new YarnException("Subcluster " + subClusterId.toString()
+ " does not exist; cannot heartbeat");
}
subClusterInfo.setLastHeartBeat(clock.getTime());
subClusterInfo.setState(request.getState());
subClusterInfo.setCapability(request.getCapability());
return SubClusterHeartbeatResponse.newInstance();
}
@Override
public GetSubClusterInfoResponse getSubCluster(
GetSubClusterInfoRequest request) throws YarnException {
SubClusterId subClusterId = request.getSubClusterId();
if (!membership.containsKey(subClusterId)) {
throw new YarnException(
"Subcluster " + subClusterId.toString() + " does not exist");
}
return GetSubClusterInfoResponse.newInstance(membership.get(subClusterId));
}
@Override
public GetSubClustersInfoResponse getSubClusters(
GetSubClustersInfoRequest request) throws YarnException {
List<SubClusterInfo> result = new ArrayList<SubClusterInfo>();
for (SubClusterInfo info : membership.values()) {
if (!request.getFilterInactiveSubClusters()
|| info.getState().isActive()) {
result.add(info);
}
}
return GetSubClustersInfoResponse.newInstance(result);
}
@VisibleForTesting
public Map<SubClusterId, SubClusterInfo> getMembershipTable() {
return membership;
}
@VisibleForTesting
public void clearMembershipTable() {
membership.clear();
}
}

View File

@ -0,0 +1,17 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.store.impl;

View File

@ -26,6 +26,10 @@ import org.apache.hadoop.yarn.util.Records;
/**
* Request class to obtain information about all sub-clusters that are
* participating in federation.
*
* If filterInactiveSubClusters is set to true, only active sub-clusters will be
* returned; otherwise, all sub-clusters will be returned regardless of state.
* By default, filterInactiveSubClusters is true.
*/
@Private
@Unstable

View File

@ -53,6 +53,10 @@ public enum SubClusterState {
return (this != SC_RUNNING && this != SC_NEW);
}
public boolean isActive() {
return this == SC_RUNNING;
}
public boolean isFinal() {
return (this == SC_UNREGISTERED || this == SC_DECOMMISSIONED
|| this == SC_LOST);

View File

@ -0,0 +1,221 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.store.impl;
import java.io.IOException;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.junit.Assert;
import org.junit.Test;
/**
* Base class for FederationMembershipStateStore implementations.
*/
public abstract class FederationStateStoreBaseTest {
static final Logger LOG =
LoggerFactory.getLogger(FederationStateStoreBaseTest.class);
private static final MonotonicClock CLOCK = new MonotonicClock();
private FederationMembershipStateStore stateStore = getStateStore();
@Before
public void before() throws IOException {
clearMembership();
}
@Test
public void testRegisterSubCluster() throws Exception {
SubClusterId subClusterId = SubClusterId.newInstance("SC");
SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
SubClusterRegisterResponse result = stateStore.registerSubCluster(
SubClusterRegisterRequest.newInstance(subClusterInfo));
Map<SubClusterId, SubClusterInfo> membership = getMembership();
Assert.assertNotNull(membership.get(subClusterId));
Assert.assertNotNull(result);
Assert.assertEquals(subClusterInfo, membership.get(subClusterId));
}
@Test
public void testDeregisterSubCluster() throws Exception {
SubClusterId subClusterId = SubClusterId.newInstance("SC");
SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
stateStore.registerSubCluster(
SubClusterRegisterRequest.newInstance(subClusterInfo));
SubClusterDeregisterRequest deregisterRequest = SubClusterDeregisterRequest
.newInstance(subClusterId, SubClusterState.SC_UNREGISTERED);
stateStore.deregisterSubCluster(deregisterRequest);
Map<SubClusterId, SubClusterInfo> membership = getMembership();
Assert.assertNotNull(membership.get(subClusterId));
Assert.assertEquals(membership.get(subClusterId).getState(),
SubClusterState.SC_UNREGISTERED);
}
@Test
public void testDeregisterSubClusterUnknownSubCluster() throws Exception {
SubClusterId subClusterId = SubClusterId.newInstance("SC");
SubClusterDeregisterRequest deregisterRequest = SubClusterDeregisterRequest
.newInstance(subClusterId, SubClusterState.SC_UNREGISTERED);
try {
stateStore.deregisterSubCluster(deregisterRequest);
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(e.getMessage().startsWith("SubCluster SC not found"));
}
}
@Test
public void testGetSubClusterInfo() throws Exception {
SubClusterId subClusterId = SubClusterId.newInstance("SC");
SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
stateStore.registerSubCluster(
SubClusterRegisterRequest.newInstance(subClusterInfo));
GetSubClusterInfoRequest request =
GetSubClusterInfoRequest.newInstance(subClusterId);
Assert.assertEquals(subClusterInfo,
stateStore.getSubCluster(request).getSubClusterInfo());
}
@Test
public void testGetSubClusterInfoUnknownSubCluster() throws Exception {
SubClusterId subClusterId = SubClusterId.newInstance("SC");
GetSubClusterInfoRequest request =
GetSubClusterInfoRequest.newInstance(subClusterId);
try {
stateStore.getSubCluster(request).getSubClusterInfo();
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(
e.getMessage().startsWith("Subcluster SC does not exist"));
}
}
@Test
public void testGetAllSubClustersInfo() throws Exception {
SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
SubClusterInfo subClusterInfo1 = createSubClusterInfo(subClusterId1);
SubClusterId subClusterId2 = SubClusterId.newInstance("SC2");
SubClusterInfo subClusterInfo2 = createSubClusterInfo(subClusterId2);
stateStore.registerSubCluster(
SubClusterRegisterRequest.newInstance(subClusterInfo1));
stateStore.registerSubCluster(
SubClusterRegisterRequest.newInstance(subClusterInfo2));
stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest
.newInstance(subClusterId1, SubClusterState.SC_RUNNING, ""));
stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest
.newInstance(subClusterId2, SubClusterState.SC_UNHEALTHY, ""));
Assert.assertTrue(
stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(true))
.getSubClusters().contains(subClusterInfo1));
Assert.assertFalse(
stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(true))
.getSubClusters().contains(subClusterInfo2));
Assert.assertTrue(
stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(false))
.getSubClusters().contains(subClusterInfo1));
Assert.assertTrue(
stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(false))
.getSubClusters().contains(subClusterInfo2));
}
@Test
public void testSubClusterHeartbeat() throws Exception {
SubClusterId subClusterId = SubClusterId.newInstance("SC");
SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
stateStore.registerSubCluster(
SubClusterRegisterRequest.newInstance(subClusterInfo));
SubClusterHeartbeatRequest heartbeatRequest = SubClusterHeartbeatRequest
.newInstance(subClusterId, SubClusterState.SC_RUNNING, "cabability");
stateStore.subClusterHeartbeat(heartbeatRequest);
Map<SubClusterId, SubClusterInfo> membership = getMembership();
Assert.assertEquals(membership.get(subClusterId).getState(),
SubClusterState.SC_RUNNING);
Assert.assertNotNull(membership.get(subClusterId).getLastHeartBeat());
}
@Test
public void testSubClusterHeartbeatUnknownSubCluster() throws Exception {
SubClusterId subClusterId = SubClusterId.newInstance("SC");
SubClusterHeartbeatRequest heartbeatRequest = SubClusterHeartbeatRequest
.newInstance(subClusterId, SubClusterState.SC_RUNNING, "cabability");
try {
stateStore.subClusterHeartbeat(heartbeatRequest);
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(e.getMessage()
.startsWith("Subcluster SC does not exist; cannot heartbeat"));
}
}
private SubClusterInfo createSubClusterInfo(SubClusterId subClusterId) {
String amRMAddress = "1.2.3.4:1";
String clientRMAddress = "1.2.3.4:2";
String rmAdminAddress = "1.2.3.4:3";
String webAppAddress = "1.2.3.4:4";
return SubClusterInfo.newInstance(subClusterId, amRMAddress,
clientRMAddress, rmAdminAddress, webAppAddress, SubClusterState.SC_NEW,
CLOCK.getTime(), "cabability");
}
protected abstract Map<SubClusterId, SubClusterInfo> getMembership();
protected abstract void clearMembership();
protected abstract FederationMembershipStateStore getStateStore();
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.store.impl;
import java.util.Map;
import org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
/**
* Unit tests for MemoryFederationStateStore.
*/
public class TestMemoryFederationStateStore
extends FederationStateStoreBaseTest {
private static final MemoryFederationStateStore STATESTORE =
new MemoryFederationStateStore();
@Override
protected Map<SubClusterId, SubClusterInfo> getMembership() {
return STATESTORE.getMembershipTable();
}
@Override
protected void clearMembership() {
STATESTORE.clearMembershipTable();
}
@Override
protected FederationMembershipStateStore getStateStore() {
return STATESTORE;
}
}