YARN-8626. Create HomePolicyManager that sends all the requests to the home subcluster. Contributed by Inigo Goiri.

This commit is contained in:
Giovanni Matteo Fumarola 2018-08-07 15:33:16 -07:00
parent 861095f761
commit d838179d8d
8 changed files with 305 additions and 18 deletions

View File

@ -20,9 +20,12 @@ package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
import java.util.Map;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.AbstractConfigurableFederationPolicy;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
/**
@ -44,4 +47,9 @@ public abstract class AbstractAMRMProxyPolicy extends
}
}
@Override
public void notifyOfResponse(SubClusterId subClusterId,
AllocateResponse response) throws YarnException {
// By default, a stateless policy does not care about responses
}
}

View File

@ -22,7 +22,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
@ -65,10 +64,4 @@ public class BroadcastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
return answer;
}
@Override
public void notifyOfResponse(SubClusterId subClusterId,
AllocateResponse response) throws YarnException {
// stateless policy does not care about responses
}
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
/**
* An implementation of the {@link FederationAMRMProxyPolicy} that simply
* sends the {@link ResourceRequest} to the home subcluster.
*/
public class HomeAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
/** Identifier of the local subcluster. */
private SubClusterId homeSubcluster;
@Override
public void reinitialize(
FederationPolicyInitializationContext policyContext)
throws FederationPolicyInitializationException {
FederationPolicyInitializationContextValidator
.validate(policyContext, this.getClass().getCanonicalName());
setPolicyContext(policyContext);
this.homeSubcluster = policyContext.getHomeSubcluster();
}
@Override
public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
List<ResourceRequest> resourceRequests) throws YarnException {
if (homeSubcluster == null) {
throw new FederationPolicyException("No home subcluster available");
}
Map<SubClusterId, SubClusterInfo> active = getActiveSubclusters();
if (!active.containsKey(homeSubcluster)) {
throw new FederationPolicyException(
"The local subcluster " + homeSubcluster + " is not active");
}
List<ResourceRequest> resourceRequestsCopy =
new ArrayList<>(resourceRequests);
return Collections.singletonMap(homeSubcluster, resourceRequestsCopy);
}
}

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
@ -53,11 +52,4 @@ public class RejectAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
+ "rejects all routing requests by construction.");
}
@Override
public void notifyOfResponse(SubClusterId subClusterId,
AllocateResponse response) throws YarnException {
// This might be invoked for applications started with a previous policy,
// do nothing for this policy.
}
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.manager;
import java.nio.ByteBuffer;
import java.util.Collections;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.HomeAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
/**
* Policy manager which uses the {@link UniformRandomRouterPolicy} for the
* Router and {@link HomeAMRMProxyPolicy} as the AMRMProxy policy to find the
* RM.
*/
public class HomePolicyManager extends AbstractPolicyManager {
/** Imaginary configuration to fulfill the super class. */
private WeightedPolicyInfo weightedPolicyInfo;
public HomePolicyManager() {
weightedPolicyInfo = new WeightedPolicyInfo();
weightedPolicyInfo.setRouterPolicyWeights(
Collections.singletonMap(new SubClusterIdInfo(""), 1.0f));
weightedPolicyInfo.setAMRMPolicyWeights(
Collections.singletonMap(new SubClusterIdInfo(""), 1.0f));
// Hard-codes two compatible policies for Router and AMRMProxy.
routerFederationPolicy = UniformRandomRouterPolicy.class;
amrmProxyFederationPolicy = HomeAMRMProxyPolicy.class;
}
@Override
public SubClusterPolicyConfiguration serializeConf()
throws FederationPolicyInitializationException {
ByteBuffer buf = weightedPolicyInfo.toByteBuffer();
return SubClusterPolicyConfiguration.newInstance(
getQueue(), this.getClass().getCanonicalName(), buf);
}
}

View File

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
import static org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil.createResourceRequests;
import static org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil.initializePolicyContext;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.junit.Before;
import org.junit.Test;
/**
* Simple test class for the {@link HomeAMRMProxyPolicy}.
*/
public class TestHomeAMRMProxyPolicy extends BaseFederationPoliciesTest {
private static final int NUM_SUBCLUSTERS = 4;
private static final String HOME_SC_NAME = "sc2";
private static final SubClusterId HOME_SC_ID =
SubClusterId.newInstance(HOME_SC_NAME);
@Before
public void setUp() throws Exception {
setPolicy(new HomeAMRMProxyPolicy());
// needed for base test to work
setPolicyInfo(mock(WeightedPolicyInfo.class));
for (int i = 0; i < NUM_SUBCLUSTERS; i++) {
SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i);
SubClusterInfo sci = mock(SubClusterInfo.class);
when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
when(sci.getSubClusterId()).thenReturn(sc.toId());
getActiveSubclusters().put(sc.toId(), sci);
}
initializePolicyContext(getPolicy(), mock(WeightedPolicyInfo.class),
getActiveSubclusters(), HOME_SC_NAME);
}
@Test
public void testSplitAllocateRequest() throws YarnException {
// Verify the request only goes to the home subcluster
String[] hosts = new String[] {"host0", "host1", "host2", "host3"};
List<ResourceRequest> resourceRequests = createResourceRequests(
hosts, 2 * 1024, 2, 1, 3, null, false);
HomeAMRMProxyPolicy federationPolicy =
(HomeAMRMProxyPolicy)getPolicy();
Map<SubClusterId, List<ResourceRequest>> response =
federationPolicy.splitResourceRequests(resourceRequests);
assertEquals(1, response.size());
assertNotNull(response.get(HOME_SC_ID));
assertEquals(9, response.get(HOME_SC_ID).size());
}
@Test
public void testHomeSubclusterNotActive() throws YarnException {
// We setup the home subcluster to a non-existing one
initializePolicyContext(getPolicy(), mock(WeightedPolicyInfo.class),
getActiveSubclusters(), "badsc");
// Verify the request fails because the home subcluster is not available
try {
String[] hosts = new String[] {"host0", "host1", "host2", "host3"};
List<ResourceRequest> resourceRequests = createResourceRequests(
hosts, 2 * 1024, 2, 1, 3, null, false);
HomeAMRMProxyPolicy federationPolicy = (HomeAMRMProxyPolicy)getPolicy();
federationPolicy.splitResourceRequests(resourceRequests);
fail("It should fail when the home subcluster is not active");
} catch(FederationPolicyException e) {
GenericTestUtils.assertExceptionContains("is not active", e);
}
}
}

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.manager;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.HomeAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy;
import org.junit.Before;
/**
* Simple test of {@link HomePolicyManager}.
*/
public class TestHomePolicyManager extends BasePolicyManagerTest {
@Before
public void setup() {
wfp = new HomePolicyManager();
//set expected params that the base test class will use for tests
expectedPolicyManager = HomePolicyManager.class;
expectedAMRMProxyPolicy = HomeAMRMProxyPolicy.class;
expectedRouterPolicy = UniformRandomRouterPolicy.class;
}
}

View File

@ -140,11 +140,21 @@ public final class FederationPoliciesTestUtil {
public static void initializePolicyContext(
ConfigurableFederationPolicy policy,
WeightedPolicyInfo policyInfo, Map<SubClusterId,
SubClusterInfo> activeSubclusters) throws YarnException {
WeightedPolicyInfo policyInfo,
Map<SubClusterId, SubClusterInfo> activeSubclusters)
throws YarnException {
initializePolicyContext(
policy, policyInfo, activeSubclusters, "homesubcluster");
}
public static void initializePolicyContext(
ConfigurableFederationPolicy policy,
WeightedPolicyInfo policyInfo,
Map<SubClusterId, SubClusterInfo> activeSubclusters,
String subclusterId) throws YarnException {
FederationPolicyInitializationContext context =
new FederationPolicyInitializationContext(null, initResolver(),
initFacade(), SubClusterId.newInstance("homesubcluster"));
initFacade(), SubClusterId.newInstance(subclusterId));
initializePolicyContext(context, policy, policyInfo, activeSubclusters);
}