YARN-5406. In-memory based implementation of the FederationMembershipStateStore. Contributed by Ellen Hui.
This commit is contained in:
parent
5424d0899c
commit
20d1d2be91
|
@ -0,0 +1,138 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.federation.store.impl;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||
import org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
|
||||
import org.apache.hadoop.yarn.server.records.Version;
|
||||
import org.apache.hadoop.yarn.util.MonotonicClock;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* In-memory implementation of FederationMembershipStateStore.
|
||||
*/
|
||||
public class MemoryFederationStateStore
|
||||
implements FederationMembershipStateStore {
|
||||
|
||||
private final Map<SubClusterId, SubClusterInfo> membership =
|
||||
new ConcurrentHashMap<SubClusterId, SubClusterInfo>();
|
||||
private final MonotonicClock clock = new MonotonicClock();
|
||||
|
||||
@Override
|
||||
public Version getMembershipStateStoreVersion() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SubClusterRegisterResponse registerSubCluster(
|
||||
SubClusterRegisterRequest request) throws YarnException {
|
||||
SubClusterInfo subClusterInfo = request.getSubClusterInfo();
|
||||
subClusterInfo.setLastStartTime(clock.getTime());
|
||||
membership.put(subClusterInfo.getSubClusterId(), subClusterInfo);
|
||||
return SubClusterRegisterResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SubClusterDeregisterResponse deregisterSubCluster(
|
||||
SubClusterDeregisterRequest request) throws YarnException {
|
||||
SubClusterInfo subClusterInfo = membership.get(request.getSubClusterId());
|
||||
if (subClusterInfo == null) {
|
||||
throw new YarnException(
|
||||
"SubCluster " + request.getSubClusterId().toString() + " not found");
|
||||
} else {
|
||||
subClusterInfo.setState(request.getState());
|
||||
}
|
||||
|
||||
return SubClusterDeregisterResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SubClusterHeartbeatResponse subClusterHeartbeat(
|
||||
SubClusterHeartbeatRequest request) throws YarnException {
|
||||
|
||||
SubClusterId subClusterId = request.getSubClusterId();
|
||||
SubClusterInfo subClusterInfo = membership.get(subClusterId);
|
||||
|
||||
if (subClusterInfo == null) {
|
||||
throw new YarnException("Subcluster " + subClusterId.toString()
|
||||
+ " does not exist; cannot heartbeat");
|
||||
}
|
||||
|
||||
subClusterInfo.setLastHeartBeat(clock.getTime());
|
||||
subClusterInfo.setState(request.getState());
|
||||
subClusterInfo.setCapability(request.getCapability());
|
||||
|
||||
return SubClusterHeartbeatResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetSubClusterInfoResponse getSubCluster(
|
||||
GetSubClusterInfoRequest request) throws YarnException {
|
||||
SubClusterId subClusterId = request.getSubClusterId();
|
||||
if (!membership.containsKey(subClusterId)) {
|
||||
throw new YarnException(
|
||||
"Subcluster " + subClusterId.toString() + " does not exist");
|
||||
}
|
||||
|
||||
return GetSubClusterInfoResponse.newInstance(membership.get(subClusterId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetSubClustersInfoResponse getSubClusters(
|
||||
GetSubClustersInfoRequest request) throws YarnException {
|
||||
List<SubClusterInfo> result = new ArrayList<SubClusterInfo>();
|
||||
|
||||
for (SubClusterInfo info : membership.values()) {
|
||||
if (!request.getFilterInactiveSubClusters()
|
||||
|| info.getState().isActive()) {
|
||||
result.add(info);
|
||||
}
|
||||
}
|
||||
return GetSubClustersInfoResponse.newInstance(result);
|
||||
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public Map<SubClusterId, SubClusterInfo> getMembershipTable() {
|
||||
return membership;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void clearMembershipTable() {
|
||||
membership.clear();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,17 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.federation.store.impl;
|
|
@ -26,6 +26,10 @@ import org.apache.hadoop.yarn.util.Records;
|
|||
/**
|
||||
* Request class to obtain information about all sub-clusters that are
|
||||
* participating in federation.
|
||||
*
|
||||
* If filterInactiveSubClusters is set to true, only active sub-clusters will be
|
||||
* returned; otherwise, all sub-clusters will be returned regardless of state.
|
||||
* By default, filterInactiveSubClusters is true.
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
|
|
|
@ -53,6 +53,10 @@ public enum SubClusterState {
|
|||
return (this != SC_RUNNING && this != SC_NEW);
|
||||
}
|
||||
|
||||
public boolean isActive() {
|
||||
return this == SC_RUNNING;
|
||||
}
|
||||
|
||||
public boolean isFinal() {
|
||||
return (this == SC_UNREGISTERED || this == SC_DECOMMISSIONED
|
||||
|| this == SC_LOST);
|
||||
|
|
|
@ -0,0 +1,221 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.federation.store.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
|
||||
import org.apache.hadoop.yarn.util.MonotonicClock;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Base class for FederationMembershipStateStore implementations.
|
||||
*/
|
||||
public abstract class FederationStateStoreBaseTest {
|
||||
|
||||
static final Logger LOG =
|
||||
LoggerFactory.getLogger(FederationStateStoreBaseTest.class);
|
||||
private static final MonotonicClock CLOCK = new MonotonicClock();
|
||||
|
||||
private FederationMembershipStateStore stateStore = getStateStore();
|
||||
|
||||
@Before
|
||||
public void before() throws IOException {
|
||||
clearMembership();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegisterSubCluster() throws Exception {
|
||||
SubClusterId subClusterId = SubClusterId.newInstance("SC");
|
||||
SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
|
||||
|
||||
SubClusterRegisterResponse result = stateStore.registerSubCluster(
|
||||
SubClusterRegisterRequest.newInstance(subClusterInfo));
|
||||
Map<SubClusterId, SubClusterInfo> membership = getMembership();
|
||||
|
||||
Assert.assertNotNull(membership.get(subClusterId));
|
||||
Assert.assertNotNull(result);
|
||||
Assert.assertEquals(subClusterInfo, membership.get(subClusterId));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeregisterSubCluster() throws Exception {
|
||||
SubClusterId subClusterId = SubClusterId.newInstance("SC");
|
||||
SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
|
||||
|
||||
stateStore.registerSubCluster(
|
||||
SubClusterRegisterRequest.newInstance(subClusterInfo));
|
||||
|
||||
SubClusterDeregisterRequest deregisterRequest = SubClusterDeregisterRequest
|
||||
.newInstance(subClusterId, SubClusterState.SC_UNREGISTERED);
|
||||
|
||||
stateStore.deregisterSubCluster(deregisterRequest);
|
||||
|
||||
Map<SubClusterId, SubClusterInfo> membership = getMembership();
|
||||
Assert.assertNotNull(membership.get(subClusterId));
|
||||
Assert.assertEquals(membership.get(subClusterId).getState(),
|
||||
SubClusterState.SC_UNREGISTERED);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeregisterSubClusterUnknownSubCluster() throws Exception {
|
||||
SubClusterId subClusterId = SubClusterId.newInstance("SC");
|
||||
|
||||
SubClusterDeregisterRequest deregisterRequest = SubClusterDeregisterRequest
|
||||
.newInstance(subClusterId, SubClusterState.SC_UNREGISTERED);
|
||||
try {
|
||||
stateStore.deregisterSubCluster(deregisterRequest);
|
||||
Assert.fail();
|
||||
} catch (YarnException e) {
|
||||
Assert.assertTrue(e.getMessage().startsWith("SubCluster SC not found"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetSubClusterInfo() throws Exception {
|
||||
|
||||
SubClusterId subClusterId = SubClusterId.newInstance("SC");
|
||||
SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
|
||||
|
||||
stateStore.registerSubCluster(
|
||||
SubClusterRegisterRequest.newInstance(subClusterInfo));
|
||||
|
||||
GetSubClusterInfoRequest request =
|
||||
GetSubClusterInfoRequest.newInstance(subClusterId);
|
||||
Assert.assertEquals(subClusterInfo,
|
||||
stateStore.getSubCluster(request).getSubClusterInfo());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetSubClusterInfoUnknownSubCluster() throws Exception {
|
||||
SubClusterId subClusterId = SubClusterId.newInstance("SC");
|
||||
GetSubClusterInfoRequest request =
|
||||
GetSubClusterInfoRequest.newInstance(subClusterId);
|
||||
|
||||
try {
|
||||
stateStore.getSubCluster(request).getSubClusterInfo();
|
||||
Assert.fail();
|
||||
} catch (YarnException e) {
|
||||
Assert.assertTrue(
|
||||
e.getMessage().startsWith("Subcluster SC does not exist"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAllSubClustersInfo() throws Exception {
|
||||
|
||||
SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
|
||||
SubClusterInfo subClusterInfo1 = createSubClusterInfo(subClusterId1);
|
||||
|
||||
SubClusterId subClusterId2 = SubClusterId.newInstance("SC2");
|
||||
SubClusterInfo subClusterInfo2 = createSubClusterInfo(subClusterId2);
|
||||
|
||||
stateStore.registerSubCluster(
|
||||
SubClusterRegisterRequest.newInstance(subClusterInfo1));
|
||||
stateStore.registerSubCluster(
|
||||
SubClusterRegisterRequest.newInstance(subClusterInfo2));
|
||||
|
||||
stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest
|
||||
.newInstance(subClusterId1, SubClusterState.SC_RUNNING, ""));
|
||||
stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest
|
||||
.newInstance(subClusterId2, SubClusterState.SC_UNHEALTHY, ""));
|
||||
|
||||
Assert.assertTrue(
|
||||
stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(true))
|
||||
.getSubClusters().contains(subClusterInfo1));
|
||||
Assert.assertFalse(
|
||||
stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(true))
|
||||
.getSubClusters().contains(subClusterInfo2));
|
||||
|
||||
Assert.assertTrue(
|
||||
stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(false))
|
||||
.getSubClusters().contains(subClusterInfo1));
|
||||
Assert.assertTrue(
|
||||
stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(false))
|
||||
.getSubClusters().contains(subClusterInfo2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSubClusterHeartbeat() throws Exception {
|
||||
SubClusterId subClusterId = SubClusterId.newInstance("SC");
|
||||
SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
|
||||
|
||||
stateStore.registerSubCluster(
|
||||
SubClusterRegisterRequest.newInstance(subClusterInfo));
|
||||
|
||||
SubClusterHeartbeatRequest heartbeatRequest = SubClusterHeartbeatRequest
|
||||
.newInstance(subClusterId, SubClusterState.SC_RUNNING, "cabability");
|
||||
stateStore.subClusterHeartbeat(heartbeatRequest);
|
||||
|
||||
Map<SubClusterId, SubClusterInfo> membership = getMembership();
|
||||
Assert.assertEquals(membership.get(subClusterId).getState(),
|
||||
SubClusterState.SC_RUNNING);
|
||||
Assert.assertNotNull(membership.get(subClusterId).getLastHeartBeat());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSubClusterHeartbeatUnknownSubCluster() throws Exception {
|
||||
SubClusterId subClusterId = SubClusterId.newInstance("SC");
|
||||
SubClusterHeartbeatRequest heartbeatRequest = SubClusterHeartbeatRequest
|
||||
.newInstance(subClusterId, SubClusterState.SC_RUNNING, "cabability");
|
||||
|
||||
try {
|
||||
stateStore.subClusterHeartbeat(heartbeatRequest);
|
||||
Assert.fail();
|
||||
} catch (YarnException e) {
|
||||
Assert.assertTrue(e.getMessage()
|
||||
.startsWith("Subcluster SC does not exist; cannot heartbeat"));
|
||||
}
|
||||
}
|
||||
|
||||
private SubClusterInfo createSubClusterInfo(SubClusterId subClusterId) {
|
||||
|
||||
String amRMAddress = "1.2.3.4:1";
|
||||
String clientRMAddress = "1.2.3.4:2";
|
||||
String rmAdminAddress = "1.2.3.4:3";
|
||||
String webAppAddress = "1.2.3.4:4";
|
||||
|
||||
return SubClusterInfo.newInstance(subClusterId, amRMAddress,
|
||||
clientRMAddress, rmAdminAddress, webAppAddress, SubClusterState.SC_NEW,
|
||||
CLOCK.getTime(), "cabability");
|
||||
}
|
||||
|
||||
protected abstract Map<SubClusterId, SubClusterInfo> getMembership();
|
||||
|
||||
protected abstract void clearMembership();
|
||||
|
||||
protected abstract FederationMembershipStateStore getStateStore();
|
||||
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.federation.store.impl;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||
|
||||
/**
|
||||
* Unit tests for MemoryFederationStateStore.
|
||||
*/
|
||||
public class TestMemoryFederationStateStore
|
||||
extends FederationStateStoreBaseTest {
|
||||
|
||||
private static final MemoryFederationStateStore STATESTORE =
|
||||
new MemoryFederationStateStore();
|
||||
|
||||
@Override
|
||||
protected Map<SubClusterId, SubClusterInfo> getMembership() {
|
||||
return STATESTORE.getMembershipTable();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void clearMembership() {
|
||||
STATESTORE.clearMembershipTable();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected FederationMembershipStateStore getStateStore() {
|
||||
return STATESTORE;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue