YARN-5323. Policies APIs for Federation Router and AMRMProxy policies. (Carlo Curino via Subru).
(cherry picked from commit f8208fe0b536f29aa65af71d20c3b3e3765679fd) (cherry picked from commit 81472778d7ef013ea6b2714bc734bc6fc3ca32fa)
This commit is contained in:
parent
aac8755125
commit
23c42408ba
@ -0,0 +1,44 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.federation.policies;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This interface provides a general method to reinitialize a policy. The
|
||||||
|
* semantics are try-n-swap, so in case of an exception is thrown the
|
||||||
|
* implmentation must ensure the previous state and configuration is preserved.
|
||||||
|
*/
|
||||||
|
public interface ConfigurableFederationPolicy {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method is invoked to initialize of update the configuration of
|
||||||
|
* policies. The implementor should provide try-n-swap semantics, and retain
|
||||||
|
* state if possible.
|
||||||
|
*
|
||||||
|
* @param federationPolicyInitializationContext the new context to provide to
|
||||||
|
* implementor.
|
||||||
|
*
|
||||||
|
* @throws FederationPolicyInitializationException in case the initialization
|
||||||
|
* fails.
|
||||||
|
*/
|
||||||
|
void reinitialize(
|
||||||
|
FederationPolicyInitializationContext
|
||||||
|
federationPolicyInitializationContext)
|
||||||
|
throws FederationPolicyInitializationException;
|
||||||
|
}
|
@ -0,0 +1,91 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.federation.policies;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
|
||||||
|
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.router
|
||||||
|
.FederationRouterPolicy;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implementors of this interface are capable to instantiate and (re)initalize
|
||||||
|
* {@link FederationAMRMProxyPolicy} and {@link FederationRouterPolicy} based on
|
||||||
|
* a {@link FederationPolicyInitializationContext}. The reason to bind these two
|
||||||
|
* policies together is to make sure we remain consistent across the router and
|
||||||
|
* amrmproxy policy decisions.
|
||||||
|
*/
|
||||||
|
public interface FederationPolicyConfigurator {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If the current instance is compatible, this method returns the same
|
||||||
|
* instance of {@link FederationAMRMProxyPolicy} reinitialized with the
|
||||||
|
* current context, otherwise a new instance initialized with the current
|
||||||
|
* context is provided. If the instance is compatible with the current class
|
||||||
|
* the implementors should attempt to reinitalize (retaining state). To affect
|
||||||
|
* a complete policy reset oldInstance should be null.
|
||||||
|
*
|
||||||
|
* @param federationPolicyInitializationContext the current context
|
||||||
|
* @param oldInstance the existing (possibly null)
|
||||||
|
* instance.
|
||||||
|
*
|
||||||
|
* @return an updated {@link FederationAMRMProxyPolicy
|
||||||
|
}.
|
||||||
|
*
|
||||||
|
* @throws FederationPolicyInitializationException if the initialization
|
||||||
|
* cannot be completed
|
||||||
|
* properly. The oldInstance
|
||||||
|
* should be still valid in
|
||||||
|
* case of failed
|
||||||
|
* initialization.
|
||||||
|
*/
|
||||||
|
FederationAMRMProxyPolicy getAMRMPolicy(
|
||||||
|
FederationPolicyInitializationContext
|
||||||
|
federationPolicyInitializationContext,
|
||||||
|
FederationAMRMProxyPolicy oldInstance)
|
||||||
|
throws FederationPolicyInitializationException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If the current instance is compatible, this method returns the same
|
||||||
|
* instance of {@link FederationRouterPolicy} reinitialized with the current
|
||||||
|
* context, otherwise a new instance initialized with the current context is
|
||||||
|
* provided. If the instance is compatible with the current class the
|
||||||
|
* implementors should attempt to reinitalize (retaining state). To affect a
|
||||||
|
* complete policy reset oldInstance shoulb be set to null.
|
||||||
|
*
|
||||||
|
* @param federationPolicyInitializationContext the current context
|
||||||
|
* @param oldInstance the existing (possibly null)
|
||||||
|
* instance.
|
||||||
|
*
|
||||||
|
* @return an updated {@link FederationRouterPolicy}.
|
||||||
|
*
|
||||||
|
* @throws FederationPolicyInitializationException if the initalization cannot
|
||||||
|
* be completed properly. The
|
||||||
|
* oldInstance should be still
|
||||||
|
* valid in case of failed
|
||||||
|
* initialization.
|
||||||
|
*/
|
||||||
|
FederationRouterPolicy getRouterPolicy(
|
||||||
|
FederationPolicyInitializationContext
|
||||||
|
federationPolicyInitializationContext,
|
||||||
|
FederationRouterPolicy oldInstance)
|
||||||
|
throws FederationPolicyInitializationException;
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,109 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.federation.policies;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Context to (re)initialize a {@code FederationAMRMProxyPolicy} and {@code
|
||||||
|
* FederationRouterPolicy}.
|
||||||
|
*/
|
||||||
|
public class FederationPolicyInitializationContext {
|
||||||
|
|
||||||
|
private SubClusterPolicyConfiguration federationPolicyConfiguration;
|
||||||
|
private SubClusterResolver federationSubclusterResolver;
|
||||||
|
private FederationStateStoreFacade federationStateStoreFacade;
|
||||||
|
|
||||||
|
public FederationPolicyInitializationContext() {
|
||||||
|
federationPolicyConfiguration = null;
|
||||||
|
federationSubclusterResolver = null;
|
||||||
|
federationStateStoreFacade = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public FederationPolicyInitializationContext(SubClusterPolicyConfiguration
|
||||||
|
policy, SubClusterResolver resolver, FederationStateStoreFacade
|
||||||
|
storeFacade) {
|
||||||
|
this.federationPolicyConfiguration = policy;
|
||||||
|
this.federationSubclusterResolver = resolver;
|
||||||
|
this.federationStateStoreFacade = storeFacade;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Getter for the {@link SubClusterPolicyConfiguration}.
|
||||||
|
*
|
||||||
|
* @return the {@link SubClusterPolicyConfiguration} to be used for
|
||||||
|
* initialization.
|
||||||
|
*/
|
||||||
|
public SubClusterPolicyConfiguration getSubClusterPolicyConfiguration() {
|
||||||
|
return federationPolicyConfiguration;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Setter for the {@link SubClusterPolicyConfiguration}.
|
||||||
|
*
|
||||||
|
* @param federationPolicyConfiguration the
|
||||||
|
* {@link SubClusterPolicyConfiguration}
|
||||||
|
* to be used for initialization.
|
||||||
|
*/
|
||||||
|
public void setFederationPolicyConfiguration(
|
||||||
|
SubClusterPolicyConfiguration federationPolicyConfiguration) {
|
||||||
|
this.federationPolicyConfiguration = federationPolicyConfiguration;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Getter for the {@link SubClusterResolver}.
|
||||||
|
*
|
||||||
|
* @return the {@link SubClusterResolver} to be used for initialization.
|
||||||
|
*/
|
||||||
|
public SubClusterResolver getFederationSubclusterResolver() {
|
||||||
|
return federationSubclusterResolver;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Setter for the {@link SubClusterResolver}.
|
||||||
|
*
|
||||||
|
* @param federationSubclusterResolver the {@link SubClusterResolver} to be
|
||||||
|
* used for initialization.
|
||||||
|
*/
|
||||||
|
public void setFederationSubclusterResolver(
|
||||||
|
SubClusterResolver federationSubclusterResolver) {
|
||||||
|
this.federationSubclusterResolver = federationSubclusterResolver;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Getter for the {@link FederationStateStoreFacade}.
|
||||||
|
*
|
||||||
|
* @return the facade.
|
||||||
|
*/
|
||||||
|
public FederationStateStoreFacade getFederationStateStoreFacade() {
|
||||||
|
return federationStateStoreFacade;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Setter for the {@link FederationStateStoreFacade}.
|
||||||
|
*
|
||||||
|
* @param federationStateStoreFacade the facade.
|
||||||
|
*/
|
||||||
|
public void setFederationStateStoreFacade(
|
||||||
|
FederationStateStoreFacade federationStateStoreFacade) {
|
||||||
|
this.federationStateStoreFacade = federationStateStoreFacade;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,82 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.federation.policies;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper class used to factor out common validation steps for policies.
|
||||||
|
*/
|
||||||
|
public final class FederationPolicyInitializationContextValidator {
|
||||||
|
|
||||||
|
private FederationPolicyInitializationContextValidator() {
|
||||||
|
//disable constructor per checkstyle
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void validate(
|
||||||
|
FederationPolicyInitializationContext
|
||||||
|
federationPolicyInitializationContext,
|
||||||
|
String myType) throws FederationPolicyInitializationException {
|
||||||
|
|
||||||
|
if (myType == null) {
|
||||||
|
throw new FederationPolicyInitializationException("The myType parameter"
|
||||||
|
+ " should not be null.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (federationPolicyInitializationContext == null) {
|
||||||
|
throw new FederationPolicyInitializationException(
|
||||||
|
"The FederationPolicyInitializationContext provided is null. Cannot"
|
||||||
|
+ " reinitalize "
|
||||||
|
+ "successfully.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (federationPolicyInitializationContext.getFederationStateStoreFacade()
|
||||||
|
== null) {
|
||||||
|
throw new FederationPolicyInitializationException(
|
||||||
|
"The FederationStateStoreFacade provided is null. Cannot"
|
||||||
|
+ " reinitalize successfully.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (federationPolicyInitializationContext.getFederationSubclusterResolver()
|
||||||
|
== null) {
|
||||||
|
throw new FederationPolicyInitializationException(
|
||||||
|
"The FederationStateStoreFacase provided is null. Cannot"
|
||||||
|
+ " reinitalize successfully.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (federationPolicyInitializationContext.getSubClusterPolicyConfiguration()
|
||||||
|
== null) {
|
||||||
|
throw new FederationPolicyInitializationException(
|
||||||
|
"The FederationSubclusterResolver provided is null. Cannot "
|
||||||
|
+ "reinitalize successfully.");
|
||||||
|
}
|
||||||
|
|
||||||
|
String intendedType =
|
||||||
|
federationPolicyInitializationContext.getSubClusterPolicyConfiguration()
|
||||||
|
.getType();
|
||||||
|
|
||||||
|
if (!myType.equals(intendedType)) {
|
||||||
|
throw new FederationPolicyInitializationException(
|
||||||
|
"The FederationPolicyConfiguration carries a type (" + intendedType
|
||||||
|
+ ") different then mine (" + myType
|
||||||
|
+ "). Cannot reinitialize successfully.");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,45 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.federation.policies;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implementors of this class are able to serializeConf the configuraiton of a
|
||||||
|
* policy as a {@link SubClusterPolicyConfiguration}. This is used during the
|
||||||
|
* lifetime of a policy from the admin APIs or policy engine to serializeConf
|
||||||
|
* the policy into the policy store.
|
||||||
|
*/
|
||||||
|
public interface FederationPolicyWriter {
|
||||||
|
|
||||||
|
/**
|
||||||
|
/**
|
||||||
|
* This method is invoked to derive a {@link SubClusterPolicyConfiguration}.
|
||||||
|
* This is to be used when writing a policy object in the federation policy
|
||||||
|
* store.
|
||||||
|
*
|
||||||
|
* @return a valid policy configuration representing this object
|
||||||
|
* parametrization.
|
||||||
|
*
|
||||||
|
* @throws FederationPolicyInitializationException if the current state cannot
|
||||||
|
* be serialized properly
|
||||||
|
*/
|
||||||
|
SubClusterPolicyConfiguration serializeConf()
|
||||||
|
throws FederationPolicyInitializationException;
|
||||||
|
}
|
@ -0,0 +1,66 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implementors of this interface provide logic to split the list of {@link
|
||||||
|
* ResourceRequest}s received by the AM among various RMs.
|
||||||
|
*/
|
||||||
|
public interface FederationAMRMProxyPolicy
|
||||||
|
extends ConfigurableFederationPolicy {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Splits the {@link ResourceRequest}s from the client across one or more
|
||||||
|
* sub-clusters based on the policy semantics (e.g., broadcast, load-based).
|
||||||
|
*
|
||||||
|
* @param resourceRequests the list of {@link ResourceRequest}s from the
|
||||||
|
* AM to be split
|
||||||
|
*
|
||||||
|
* @return map of sub-cluster as identified by {@link SubClusterId} to the
|
||||||
|
* list of {@link ResourceRequest}s that should be forwarded to it
|
||||||
|
*
|
||||||
|
* @throws YarnException in case the request is malformed or no viable
|
||||||
|
* sub-clusters can be found.
|
||||||
|
*/
|
||||||
|
Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
|
||||||
|
List<ResourceRequest> resourceRequests)
|
||||||
|
throws YarnException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method should be invoked to notify the policy about responses being
|
||||||
|
* received. This is useful for stateful policies that make decisions based on
|
||||||
|
* previous responses being received.
|
||||||
|
*
|
||||||
|
* @param subClusterId the id of the subcluster sending the notification
|
||||||
|
* @param response the response received from one of the RMs
|
||||||
|
*
|
||||||
|
* @throws YarnException in case the response is not valid
|
||||||
|
*/
|
||||||
|
void notifyOfResponse(SubClusterId subClusterId,
|
||||||
|
AllocateResponse response) throws YarnException;
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,20 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
/** AMRMPRoxy policies. **/
|
||||||
|
package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
|
||||||
|
|
@ -0,0 +1,33 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.federation.policies.exceptions;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generic policy exception.
|
||||||
|
*/
|
||||||
|
public class FederationPolicyException extends YarnException {
|
||||||
|
public FederationPolicyException(String s) {
|
||||||
|
super(s);
|
||||||
|
}
|
||||||
|
|
||||||
|
public FederationPolicyException(Throwable t) {
|
||||||
|
super(t);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,33 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.federation.policies.exceptions;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This exception is thrown when the initialization of a federation policy is
|
||||||
|
* not successful.
|
||||||
|
*/
|
||||||
|
public class FederationPolicyInitializationException
|
||||||
|
extends FederationPolicyException {
|
||||||
|
public FederationPolicyInitializationException(String message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
public FederationPolicyInitializationException(Throwable j) {
|
||||||
|
super(j);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,27 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.federation.policies.exceptions;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This exception is thrown when policies cannot locate any active cluster.
|
||||||
|
*/
|
||||||
|
public class NoActiveSubclustersException extends FederationPolicyException {
|
||||||
|
public NoActiveSubclustersException(String s) {
|
||||||
|
super(s);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,28 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.federation.policies.exceptions;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This exception is thrown whenever a policy is given a {@code SubClusterId}
|
||||||
|
* that is unknown.
|
||||||
|
*/
|
||||||
|
public class UnknownSubclusterException extends FederationPolicyException {
|
||||||
|
public UnknownSubclusterException(String s) {
|
||||||
|
super(s);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,20 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
/** Exceptions for policies. **/
|
||||||
|
package org.apache.hadoop.yarn.server.federation.policies.exceptions;
|
||||||
|
|
@ -0,0 +1,20 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
/** Federation Policies. **/
|
||||||
|
package org.apache.hadoop.yarn.server.federation.policies;
|
||||||
|
|
@ -0,0 +1,45 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.federation.policies.router;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implements the logic for determining the routing of an application submission
|
||||||
|
* based on a policy.
|
||||||
|
*/
|
||||||
|
public interface FederationRouterPolicy extends ConfigurableFederationPolicy {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Determines the sub-cluster that the user application submision should be
|
||||||
|
* routed to.
|
||||||
|
*
|
||||||
|
* @param appSubmissionContext the context for the app being submitted.
|
||||||
|
*
|
||||||
|
* @return the sub-cluster as identified by {@link SubClusterId} to route the
|
||||||
|
* request to.
|
||||||
|
*
|
||||||
|
* @throws YarnException if the policy cannot determine a viable subcluster.
|
||||||
|
*/
|
||||||
|
SubClusterId getHomeSubcluster(
|
||||||
|
ApplicationSubmissionContext appSubmissionContext)
|
||||||
|
throws YarnException;
|
||||||
|
}
|
@ -0,0 +1,20 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
/** Router policies. **/
|
||||||
|
package org.apache.hadoop.yarn.server.federation.policies.router;
|
||||||
|
|
@ -0,0 +1,128 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.federation.policies;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test class for {@link FederationPolicyInitializationContextValidator}.
|
||||||
|
*/
|
||||||
|
public class TestFederationPolicyInitializationContextValidator {
|
||||||
|
|
||||||
|
private SubClusterPolicyConfiguration goodConfig;
|
||||||
|
private SubClusterResolver goodSR;
|
||||||
|
private FederationStateStoreFacade goodFacade;
|
||||||
|
private FederationPolicyInitializationContext context;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
goodFacade = FederationPoliciesTestUtil.initFacade();
|
||||||
|
goodConfig =
|
||||||
|
new MockPolicyManager().serializeConf();
|
||||||
|
goodSR =FederationPoliciesTestUtil.initResolver();
|
||||||
|
context = new
|
||||||
|
FederationPolicyInitializationContext(goodConfig, goodSR, goodFacade);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void correcInit() throws Exception {
|
||||||
|
FederationPolicyInitializationContextValidator.validate(context,
|
||||||
|
MockPolicyManager.class.getCanonicalName());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = FederationPolicyInitializationException.class)
|
||||||
|
public void nullContext() throws Exception {
|
||||||
|
FederationPolicyInitializationContextValidator.validate(null,
|
||||||
|
MockPolicyManager.class.getCanonicalName());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = FederationPolicyInitializationException.class)
|
||||||
|
public void nullType() throws Exception {
|
||||||
|
FederationPolicyInitializationContextValidator.validate(context, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = FederationPolicyInitializationException.class)
|
||||||
|
public void wrongType() throws Exception {
|
||||||
|
FederationPolicyInitializationContextValidator.validate(context,
|
||||||
|
"WrongType");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = FederationPolicyInitializationException.class)
|
||||||
|
public void nullConf() throws Exception {
|
||||||
|
context.setFederationPolicyConfiguration(null);
|
||||||
|
FederationPolicyInitializationContextValidator.validate(context,
|
||||||
|
MockPolicyManager.class.getCanonicalName());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = FederationPolicyInitializationException.class)
|
||||||
|
public void nullResolver() throws Exception {
|
||||||
|
context.setFederationSubclusterResolver(null);
|
||||||
|
FederationPolicyInitializationContextValidator.validate(context,
|
||||||
|
MockPolicyManager.class.getCanonicalName());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = FederationPolicyInitializationException.class)
|
||||||
|
public void nullFacade() throws Exception {
|
||||||
|
context.setFederationStateStoreFacade(null);
|
||||||
|
FederationPolicyInitializationContextValidator.validate(context,
|
||||||
|
MockPolicyManager.class.getCanonicalName());
|
||||||
|
}
|
||||||
|
|
||||||
|
private class MockPolicyManager
|
||||||
|
implements FederationPolicyWriter, FederationPolicyConfigurator {
|
||||||
|
@Override
|
||||||
|
public FederationAMRMProxyPolicy getAMRMPolicy(
|
||||||
|
FederationPolicyInitializationContext
|
||||||
|
federationPolicyInitializationContext,
|
||||||
|
FederationAMRMProxyPolicy oldInstance)
|
||||||
|
throws FederationPolicyInitializationException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FederationRouterPolicy getRouterPolicy(
|
||||||
|
FederationPolicyInitializationContext
|
||||||
|
federationPolicyInitializationContext,
|
||||||
|
FederationRouterPolicy oldInstance)
|
||||||
|
throws FederationPolicyInitializationException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SubClusterPolicyConfiguration serializeConf()
|
||||||
|
throws FederationPolicyInitializationException {
|
||||||
|
ByteBuffer buf = ByteBuffer.allocate(0);
|
||||||
|
return SubClusterPolicyConfiguration
|
||||||
|
.newInstance("queue1", this.getClass().getCanonicalName(), buf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,83 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.federation.utils;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolverImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
|
||||||
|
|
||||||
|
import java.net.URL;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Support class providing common initialization methods to test federation
|
||||||
|
* policies.
|
||||||
|
*/
|
||||||
|
public final class FederationPoliciesTestUtil {
|
||||||
|
|
||||||
|
private FederationPoliciesTestUtil() {
|
||||||
|
// disabled.
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize a {@link SubClusterResolver}.
|
||||||
|
*
|
||||||
|
* @return a subcluster resolver for tests.
|
||||||
|
*/
|
||||||
|
public static SubClusterResolver initResolver() {
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
SubClusterResolver resolver =
|
||||||
|
new DefaultSubClusterResolverImpl();
|
||||||
|
URL url =
|
||||||
|
Thread.currentThread().getContextClassLoader().getResource("nodes");
|
||||||
|
if (url == null) {
|
||||||
|
throw new RuntimeException(
|
||||||
|
"Could not find 'nodes' dummy file in classpath");
|
||||||
|
}
|
||||||
|
conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, url.getPath());
|
||||||
|
resolver.setConf(conf);
|
||||||
|
resolver.load();
|
||||||
|
return resolver;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialiaze a main-memory {@link FederationStateStoreFacade} used for
|
||||||
|
* testing, wiht a mock resolver.
|
||||||
|
*
|
||||||
|
* @return the facade.
|
||||||
|
*
|
||||||
|
* @throws YarnException in case the initialization is not successful.
|
||||||
|
*/
|
||||||
|
public static FederationStateStoreFacade initFacade() throws YarnException {
|
||||||
|
FederationStateStoreFacade goodFacade = FederationStateStoreFacade
|
||||||
|
.getInstance();
|
||||||
|
FederationStateStore fss = mock(FederationStateStore.class);
|
||||||
|
GetSubClustersInfoResponse response = GetSubClustersInfoResponse
|
||||||
|
.newInstance(new ArrayList<>());
|
||||||
|
when(fss.getSubClusters(any())).thenReturn(response);
|
||||||
|
goodFacade.reinitialize(fss, new Configuration());
|
||||||
|
return goodFacade;
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user