YARN-5323. Policies APIs for Federation Router and AMRMProxy policies. (Carlo Curino via Subru).

This commit is contained in:
Subru Krishnan 2016-09-07 17:33:34 -07:00 committed by Carlo Curino
parent 9ca2aba9cc
commit f8208fe0b5
17 changed files with 894 additions and 0 deletions

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
/**
* This interface provides a general method to reinitialize a policy. The
* semantics are try-n-swap, so in case of an exception is thrown the
* implmentation must ensure the previous state and configuration is preserved.
*/
public interface ConfigurableFederationPolicy {
/**
* This method is invoked to initialize of update the configuration of
* policies. The implementor should provide try-n-swap semantics, and retain
* state if possible.
*
* @param federationPolicyInitializationContext the new context to provide to
* implementor.
*
* @throws FederationPolicyInitializationException in case the initialization
* fails.
*/
void reinitialize(
FederationPolicyInitializationContext
federationPolicyInitializationContext)
throws FederationPolicyInitializationException;
}

View File

@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.router
.FederationRouterPolicy;
/**
* Implementors of this interface are capable to instantiate and (re)initalize
* {@link FederationAMRMProxyPolicy} and {@link FederationRouterPolicy} based on
* a {@link FederationPolicyInitializationContext}. The reason to bind these two
* policies together is to make sure we remain consistent across the router and
* amrmproxy policy decisions.
*/
public interface FederationPolicyConfigurator {
/**
* If the current instance is compatible, this method returns the same
* instance of {@link FederationAMRMProxyPolicy} reinitialized with the
* current context, otherwise a new instance initialized with the current
* context is provided. If the instance is compatible with the current class
* the implementors should attempt to reinitalize (retaining state). To affect
* a complete policy reset oldInstance should be null.
*
* @param federationPolicyInitializationContext the current context
* @param oldInstance the existing (possibly null)
* instance.
*
* @return an updated {@link FederationAMRMProxyPolicy
}.
*
* @throws FederationPolicyInitializationException if the initialization
* cannot be completed
* properly. The oldInstance
* should be still valid in
* case of failed
* initialization.
*/
FederationAMRMProxyPolicy getAMRMPolicy(
FederationPolicyInitializationContext
federationPolicyInitializationContext,
FederationAMRMProxyPolicy oldInstance)
throws FederationPolicyInitializationException;
/**
* If the current instance is compatible, this method returns the same
* instance of {@link FederationRouterPolicy} reinitialized with the current
* context, otherwise a new instance initialized with the current context is
* provided. If the instance is compatible with the current class the
* implementors should attempt to reinitalize (retaining state). To affect a
* complete policy reset oldInstance shoulb be set to null.
*
* @param federationPolicyInitializationContext the current context
* @param oldInstance the existing (possibly null)
* instance.
*
* @return an updated {@link FederationRouterPolicy}.
*
* @throws FederationPolicyInitializationException if the initalization cannot
* be completed properly. The
* oldInstance should be still
* valid in case of failed
* initialization.
*/
FederationRouterPolicy getRouterPolicy(
FederationPolicyInitializationContext
federationPolicyInitializationContext,
FederationRouterPolicy oldInstance)
throws FederationPolicyInitializationException;
}

View File

@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
/**
* Context to (re)initialize a {@code FederationAMRMProxyPolicy} and {@code
* FederationRouterPolicy}.
*/
public class FederationPolicyInitializationContext {
private SubClusterPolicyConfiguration federationPolicyConfiguration;
private SubClusterResolver federationSubclusterResolver;
private FederationStateStoreFacade federationStateStoreFacade;
public FederationPolicyInitializationContext() {
federationPolicyConfiguration = null;
federationSubclusterResolver = null;
federationStateStoreFacade = null;
}
public FederationPolicyInitializationContext(SubClusterPolicyConfiguration
policy, SubClusterResolver resolver, FederationStateStoreFacade
storeFacade) {
this.federationPolicyConfiguration = policy;
this.federationSubclusterResolver = resolver;
this.federationStateStoreFacade = storeFacade;
}
/**
* Getter for the {@link SubClusterPolicyConfiguration}.
*
* @return the {@link SubClusterPolicyConfiguration} to be used for
* initialization.
*/
public SubClusterPolicyConfiguration getSubClusterPolicyConfiguration() {
return federationPolicyConfiguration;
}
/**
* Setter for the {@link SubClusterPolicyConfiguration}.
*
* @param federationPolicyConfiguration the
* {@link SubClusterPolicyConfiguration}
* to be used for initialization.
*/
public void setFederationPolicyConfiguration(
SubClusterPolicyConfiguration federationPolicyConfiguration) {
this.federationPolicyConfiguration = federationPolicyConfiguration;
}
/**
* Getter for the {@link SubClusterResolver}.
*
* @return the {@link SubClusterResolver} to be used for initialization.
*/
public SubClusterResolver getFederationSubclusterResolver() {
return federationSubclusterResolver;
}
/**
* Setter for the {@link SubClusterResolver}.
*
* @param federationSubclusterResolver the {@link SubClusterResolver} to be
* used for initialization.
*/
public void setFederationSubclusterResolver(
SubClusterResolver federationSubclusterResolver) {
this.federationSubclusterResolver = federationSubclusterResolver;
}
/**
* Getter for the {@link FederationStateStoreFacade}.
*
* @return the facade.
*/
public FederationStateStoreFacade getFederationStateStoreFacade() {
return federationStateStoreFacade;
}
/**
* Setter for the {@link FederationStateStoreFacade}.
*
* @param federationStateStoreFacade the facade.
*/
public void setFederationStateStoreFacade(
FederationStateStoreFacade federationStateStoreFacade) {
this.federationStateStoreFacade = federationStateStoreFacade;
}
}

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
/**
* Helper class used to factor out common validation steps for policies.
*/
public final class FederationPolicyInitializationContextValidator {
private FederationPolicyInitializationContextValidator() {
//disable constructor per checkstyle
}
public static void validate(
FederationPolicyInitializationContext
federationPolicyInitializationContext,
String myType) throws FederationPolicyInitializationException {
if (myType == null) {
throw new FederationPolicyInitializationException("The myType parameter"
+ " should not be null.");
}
if (federationPolicyInitializationContext == null) {
throw new FederationPolicyInitializationException(
"The FederationPolicyInitializationContext provided is null. Cannot"
+ " reinitalize "
+ "successfully.");
}
if (federationPolicyInitializationContext.getFederationStateStoreFacade()
== null) {
throw new FederationPolicyInitializationException(
"The FederationStateStoreFacade provided is null. Cannot"
+ " reinitalize successfully.");
}
if (federationPolicyInitializationContext.getFederationSubclusterResolver()
== null) {
throw new FederationPolicyInitializationException(
"The FederationStateStoreFacase provided is null. Cannot"
+ " reinitalize successfully.");
}
if (federationPolicyInitializationContext.getSubClusterPolicyConfiguration()
== null) {
throw new FederationPolicyInitializationException(
"The FederationSubclusterResolver provided is null. Cannot "
+ "reinitalize successfully.");
}
String intendedType =
federationPolicyInitializationContext.getSubClusterPolicyConfiguration()
.getType();
if (!myType.equals(intendedType)) {
throw new FederationPolicyInitializationException(
"The FederationPolicyConfiguration carries a type (" + intendedType
+ ") different then mine (" + myType
+ "). Cannot reinitialize successfully.");
}
}
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
/**
* Implementors of this class are able to serializeConf the configuraiton of a
* policy as a {@link SubClusterPolicyConfiguration}. This is used during the
* lifetime of a policy from the admin APIs or policy engine to serializeConf
* the policy into the policy store.
*/
public interface FederationPolicyWriter {
/**
/**
* This method is invoked to derive a {@link SubClusterPolicyConfiguration}.
* This is to be used when writing a policy object in the federation policy
* store.
*
* @return a valid policy configuration representing this object
* parametrization.
*
* @throws FederationPolicyInitializationException if the current state cannot
* be serialized properly
*/
SubClusterPolicyConfiguration serializeConf()
throws FederationPolicyInitializationException;
}

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import java.util.List;
import java.util.Map;
/**
* Implementors of this interface provide logic to split the list of {@link
* ResourceRequest}s received by the AM among various RMs.
*/
public interface FederationAMRMProxyPolicy
extends ConfigurableFederationPolicy {
/**
* Splits the {@link ResourceRequest}s from the client across one or more
* sub-clusters based on the policy semantics (e.g., broadcast, load-based).
*
* @param resourceRequests the list of {@link ResourceRequest}s from the
* AM to be split
*
* @return map of sub-cluster as identified by {@link SubClusterId} to the
* list of {@link ResourceRequest}s that should be forwarded to it
*
* @throws YarnException in case the request is malformed or no viable
* sub-clusters can be found.
*/
Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
List<ResourceRequest> resourceRequests)
throws YarnException;
/**
* This method should be invoked to notify the policy about responses being
* received. This is useful for stateful policies that make decisions based on
* previous responses being received.
*
* @param subClusterId the id of the subcluster sending the notification
* @param response the response received from one of the RMs
*
* @throws YarnException in case the response is not valid
*/
void notifyOfResponse(SubClusterId subClusterId,
AllocateResponse response) throws YarnException;
}

View File

@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/** AMRMPRoxy policies. **/
package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.exceptions;
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
* Generic policy exception.
*/
public class FederationPolicyException extends YarnException {
public FederationPolicyException(String s) {
super(s);
}
public FederationPolicyException(Throwable t) {
super(t);
}
}

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.exceptions;
/**
* This exception is thrown when the initialization of a federation policy is
* not successful.
*/
public class FederationPolicyInitializationException
extends FederationPolicyException {
public FederationPolicyInitializationException(String message) {
super(message);
}
public FederationPolicyInitializationException(Throwable j) {
super(j);
}
}

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.exceptions;
/**
* This exception is thrown when policies cannot locate any active cluster.
*/
public class NoActiveSubclustersException extends FederationPolicyException {
public NoActiveSubclustersException(String s) {
super(s);
}
}

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.exceptions;
/**
* This exception is thrown whenever a policy is given a {@code SubClusterId}
* that is unknown.
*/
public class UnknownSubclusterException extends FederationPolicyException {
public UnknownSubclusterException(String s) {
super(s);
}
}

View File

@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/** Exceptions for policies. **/
package org.apache.hadoop.yarn.server.federation.policies.exceptions;

View File

@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/** Federation Policies. **/
package org.apache.hadoop.yarn.server.federation.policies;

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies.router;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
/**
* Implements the logic for determining the routing of an application submission
* based on a policy.
*/
public interface FederationRouterPolicy extends ConfigurableFederationPolicy {
/**
* Determines the sub-cluster that the user application submision should be
* routed to.
*
* @param appSubmissionContext the context for the app being submitted.
*
* @return the sub-cluster as identified by {@link SubClusterId} to route the
* request to.
*
* @throws YarnException if the policy cannot determine a viable subcluster.
*/
SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext)
throws YarnException;
}

View File

@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/** Router policies. **/
package org.apache.hadoop.yarn.server.federation.policies.router;

View File

@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.policies;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
/**
* Test class for {@link FederationPolicyInitializationContextValidator}.
*/
public class TestFederationPolicyInitializationContextValidator {
private SubClusterPolicyConfiguration goodConfig;
private SubClusterResolver goodSR;
private FederationStateStoreFacade goodFacade;
private FederationPolicyInitializationContext context;
@Before
public void setUp() throws Exception {
goodFacade = FederationPoliciesTestUtil.initFacade();
goodConfig =
new MockPolicyManager().serializeConf();
goodSR =FederationPoliciesTestUtil.initResolver();
context = new
FederationPolicyInitializationContext(goodConfig, goodSR, goodFacade);
}
@Test
public void correcInit() throws Exception {
FederationPolicyInitializationContextValidator.validate(context,
MockPolicyManager.class.getCanonicalName());
}
@Test(expected = FederationPolicyInitializationException.class)
public void nullContext() throws Exception {
FederationPolicyInitializationContextValidator.validate(null,
MockPolicyManager.class.getCanonicalName());
}
@Test(expected = FederationPolicyInitializationException.class)
public void nullType() throws Exception {
FederationPolicyInitializationContextValidator.validate(context, null);
}
@Test(expected = FederationPolicyInitializationException.class)
public void wrongType() throws Exception {
FederationPolicyInitializationContextValidator.validate(context,
"WrongType");
}
@Test(expected = FederationPolicyInitializationException.class)
public void nullConf() throws Exception {
context.setFederationPolicyConfiguration(null);
FederationPolicyInitializationContextValidator.validate(context,
MockPolicyManager.class.getCanonicalName());
}
@Test(expected = FederationPolicyInitializationException.class)
public void nullResolver() throws Exception {
context.setFederationSubclusterResolver(null);
FederationPolicyInitializationContextValidator.validate(context,
MockPolicyManager.class.getCanonicalName());
}
@Test(expected = FederationPolicyInitializationException.class)
public void nullFacade() throws Exception {
context.setFederationStateStoreFacade(null);
FederationPolicyInitializationContextValidator.validate(context,
MockPolicyManager.class.getCanonicalName());
}
private class MockPolicyManager
implements FederationPolicyWriter, FederationPolicyConfigurator {
@Override
public FederationAMRMProxyPolicy getAMRMPolicy(
FederationPolicyInitializationContext
federationPolicyInitializationContext,
FederationAMRMProxyPolicy oldInstance)
throws FederationPolicyInitializationException {
return null;
}
@Override
public FederationRouterPolicy getRouterPolicy(
FederationPolicyInitializationContext
federationPolicyInitializationContext,
FederationRouterPolicy oldInstance)
throws FederationPolicyInitializationException {
return null;
}
@Override
public SubClusterPolicyConfiguration serializeConf()
throws FederationPolicyInitializationException {
ByteBuffer buf = ByteBuffer.allocate(0);
return SubClusterPolicyConfiguration
.newInstance("queue1", this.getClass().getCanonicalName(), buf);
}
}
}

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolverImpl;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
import java.net.URL;
import java.util.ArrayList;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Support class providing common initialization methods to test federation
* policies.
*/
public final class FederationPoliciesTestUtil {
private FederationPoliciesTestUtil() {
// disabled.
}
/**
* Initialize a {@link SubClusterResolver}.
*
* @return a subcluster resolver for tests.
*/
public static SubClusterResolver initResolver() {
YarnConfiguration conf = new YarnConfiguration();
SubClusterResolver resolver =
new DefaultSubClusterResolverImpl();
URL url =
Thread.currentThread().getContextClassLoader().getResource("nodes");
if (url == null) {
throw new RuntimeException(
"Could not find 'nodes' dummy file in classpath");
}
conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, url.getPath());
resolver.setConf(conf);
resolver.load();
return resolver;
}
/**
* Initialiaze a main-memory {@link FederationStateStoreFacade} used for
* testing, wiht a mock resolver.
*
* @return the facade.
*
* @throws YarnException in case the initialization is not successful.
*/
public static FederationStateStoreFacade initFacade() throws YarnException {
FederationStateStoreFacade goodFacade = FederationStateStoreFacade
.getInstance();
FederationStateStore fss = mock(FederationStateStore.class);
GetSubClustersInfoResponse response = GetSubClustersInfoResponse
.newInstance(new ArrayList<>());
when(fss.getSubClusters(any())).thenReturn(response);
goodFacade.reinitialize(fss, new Configuration());
return goodFacade;
}
}