Merge -c 1415029 from trunk to branch-2 to fix YARN-229. Remove old unused RM recovery code. Contributed by Bikas Saha.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1415030 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2012-11-29 01:59:49 +00:00
parent b432cf076e
commit 7b092edd2f
53 changed files with 145 additions and 1006 deletions

View File

@ -98,6 +98,8 @@ Release 2.0.3-alpha - Unreleased
MAPREDUCE-4778. Fair scheduler event log is only written if directory
exists on HDFS. (Sandy Ryza via tomwhite)
YARN-229. Remove old unused RM recovery code. (Bikas Saha via acmurthy)
Release 2.0.2-alpha - 2012-09-07
YARN-9. Rename YARN_HOME to HADOOP_YARN_HOME. (vinodkv via acmurthy)

View File

@ -28,7 +28,7 @@ import org.apache.hadoop.tools.GetGroupsTestBase;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.service.Service.STATE;
import org.junit.AfterClass;
@ -46,7 +46,7 @@ public class TestGetGroups extends GetGroupsTestBase {
@BeforeClass
public static void setUpResourceManager() throws IOException, InterruptedException {
conf = new YarnConfiguration();
Store store = StoreFactory.getStore(conf);
RMStateStore store = StoreFactory.getStore(conf);
resourceManager = new ResourceManager(store) {
@Override
protected void doSecureLogin() throws IOException {

View File

@ -228,16 +228,7 @@ public class YarnConfiguration extends Configuration {
/** The class to use as the persistent store.*/
public static final String RM_STORE = RM_PREFIX + "store.class";
/** The address of the zookeeper instance to use with ZK store.*/
public static final String RM_ZK_STORE_ADDRESS =
RM_PREFIX + "zookeeper-store.address";
/** The zookeeper session timeout for the zookeeper store.*/
public static final String RM_ZK_STORE_TIMEOUT_MS =
RM_PREFIX + "zookeeper-store.session.timeout-ms";
public static final int DEFAULT_RM_ZK_STORE_TIMEOUT_MS = 60000;
/** The maximum number of completed applications RM keeps. */
public static final String RM_MAX_COMPLETED_APPLICATIONS =
RM_PREFIX + "max-completed-applications";

View File

@ -209,17 +209,6 @@
<name>yarn.resourcemanager.store.class</name>
</property>
<property>
<description>The address of the zookeeper instance to use with ZK store.</description>
<name>yarn.resourcemanager.zookeeper-store.address</name>
</property>
<property>
<description>The zookeeper session timeout for the zookeeper store.</description>
<name>yarn.resourcemanager.zookeeper-store.session.timeout-ms</name>
<value>60000</value>
</property>
<property>
<description>The maximum number of completed applications RM keeps. </description>
<name>yarn.resourcemanager.max-completed-applications</name>

View File

@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@ -251,17 +250,12 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent> {
YarnConfiguration.DEFAULT_APPLICATION_NAME);
}
// Store application for recovery
ApplicationStore appStore = rmContext.getApplicationsStore()
.createApplicationStore(submissionContext.getApplicationId(),
submissionContext);
// Create RMApp
application =
new RMAppImpl(applicationId, rmContext, this.conf,
submissionContext.getApplicationName(),
submissionContext.getUser(), submissionContext.getQueue(),
submissionContext, clientTokenStr, appStore, this.scheduler,
submissionContext, clientTokenStr, this.scheduler,
this.masterService, submitTime);
// Sanity check - duplicate?

View File

@ -23,8 +23,6 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NodeStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
@ -41,10 +39,6 @@ public interface RMContext {
Dispatcher getDispatcher();
NodeStore getNodeStore();
ApplicationsStore getApplicationsStore();
ConcurrentMap<ApplicationId, RMApp> getRMApps();
ConcurrentMap<String, RMNode> getInactiveRMNodes();

View File

@ -24,9 +24,6 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NodeStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
@ -39,7 +36,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSe
public class RMContextImpl implements RMContext {
private final Dispatcher rmDispatcher;
private final Store store;
private final ConcurrentMap<ApplicationId, RMApp> applications
= new ConcurrentHashMap<ApplicationId, RMApp>();
@ -58,7 +54,7 @@ public class RMContextImpl implements RMContext {
private final RMContainerTokenSecretManager containerTokenSecretManager;
private final ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
public RMContextImpl(Store store, Dispatcher rmDispatcher,
public RMContextImpl(Dispatcher rmDispatcher,
ContainerAllocationExpirer containerAllocationExpirer,
AMLivelinessMonitor amLivelinessMonitor,
AMLivelinessMonitor amFinishingMonitor,
@ -66,7 +62,6 @@ public class RMContextImpl implements RMContext {
ApplicationTokenSecretManager appTokenSecretManager,
RMContainerTokenSecretManager containerTokenSecretManager,
ClientToAMTokenSecretManagerInRM clientTokenSecretManager) {
this.store = store;
this.rmDispatcher = rmDispatcher;
this.containerAllocationExpirer = containerAllocationExpirer;
this.amLivelinessMonitor = amLivelinessMonitor;
@ -82,16 +77,6 @@ public class RMContextImpl implements RMContext {
return this.rmDispatcher;
}
@Override
public NodeStore getNodeStore() {
return store;
}
@Override
public ApplicationsStore getApplicationsStore() {
return store;
}
@Override
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
return this.applications;

View File

@ -46,8 +46,8 @@ import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@ -119,12 +119,12 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected RMDelegationTokenSecretManager rmDTSecretManager;
private WebApp webApp;
protected RMContext rmContext;
private final Store store;
private final RMStateStore store;
protected ResourceTrackerService resourceTracker;
private Configuration conf;
public ResourceManager(Store store) {
public ResourceManager(RMStateStore store) {
super("ResourceManager");
this.store = store;
}
@ -161,7 +161,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
this.containerTokenSecretManager = createContainerTokenSecretManager(conf);
this.rmContext =
new RMContextImpl(this.store, this.rmDispatcher,
new RMContextImpl(this.rmDispatcher,
this.containerAllocationExpirer, amLivelinessMonitor,
amFinishingMonitor, tokenRenewer, this.appTokenSecretManager,
this.containerTokenSecretManager, this.clientToAMSecretManager);
@ -643,8 +643,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
@Override
public void recover(RMState state) throws Exception {
resourceTracker.recover(state);
scheduler.recover(state);
}
public static void main(String argv[]) {
@ -652,14 +650,13 @@ public class ResourceManager extends CompositeService implements Recoverable {
StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG);
try {
Configuration conf = new YarnConfiguration();
Store store = StoreFactory.getStore(conf);
RMStateStore store = StoreFactory.getStore(conf);
ResourceManager resourceManager = new ResourceManager(store);
ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(resourceManager),
SHUTDOWN_HOOK_PRIORITY);
resourceManager.init(conf);
//resourceManager.recover(store.restore());
//store.doneWithRecovery();
resourceManager.start();
} catch (Throwable t) {
LOG.fatal("Error starting ResourceManager", t);

View File

@ -44,7 +44,7 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
@ -297,28 +297,6 @@ public class ResourceTrackerService extends AbstractService implements
return nodeHeartBeatResponse;
}
public void recover(RMState state) {
//
// List<RMNode> nodeManagers = state.getStoredNodeManagers();
// for (RMNode nm : nodeManagers) {
// createNewNode(nm.getNodeID(), nm.getNodeHostName(), nm
// .getCommandPort(), nm.getHttpPort(), nm.getNode(), nm
// .getTotalCapability());
// }
// for (Map.Entry<ApplicationId, ApplicationInfo> entry : state
// .getStoredApplications().entrySet()) {
// List<Container> containers = entry.getValue().getContainers();
// List<Container> containersToAdd = new ArrayList<Container>();
// for (Container c : containers) {
// RMNode containerNode = this.rmContext.getNodesCollection()
// .getNodeInfo(c.getNodeId());
// containersToAdd.add(c);
// containerNode.allocateContainer(entry.getKey(), containersToAdd);
// containersToAdd.clear();
// }
// }
}
/**
* resolving the network topology.
* @param hostName the hostname of this node.

View File

@ -1,39 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import java.io.IOException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationMaster;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
public interface ApplicationsStore {
public ApplicationStore createApplicationStore(ApplicationId applicationId,
ApplicationSubmissionContext context) throws IOException;
public void removeApplication(ApplicationId application) throws IOException;
public interface ApplicationStore {
public void storeContainer(Container container) throws IOException;
public void removeContainer(Container container) throws IOException;
public void storeMasterContainer(Container container) throws IOException;
public void updateApplicationState(ApplicationMaster master) throws IOException;
public boolean isLoggable();
}
}

View File

@ -15,18 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import java.io.IOException;
public class FileRMStateStore implements RMStateStore {
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
public interface NodeStore {
public void storeNode(RMNode node) throws IOException;
public void removeNode(RMNode node) throws IOException;
public NodeId getNextNodeId() throws IOException;
public boolean isLoggable();
}
}

View File

@ -1,128 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationMaster;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
public class MemStore implements Store {
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private NodeId nodeId;
private boolean doneWithRecovery = false;
public MemStore() {
nodeId = recordFactory.newRecordInstance(NodeId.class);
nodeId.setHost("TODO");
nodeId.setPort(-1);
}
@Override
public void storeNode(RMNode node) throws IOException {}
@Override
public void removeNode(RMNode node) throws IOException {}
private class ApplicationStoreImpl implements ApplicationStore {
@Override
public void storeContainer(Container container) throws IOException {}
@Override
public void removeContainer(Container container) throws IOException {}
@Override
public void storeMasterContainer(Container container) throws IOException {}
@Override
public void updateApplicationState(
ApplicationMaster master) throws IOException {}
@Override
public boolean isLoggable() {
return doneWithRecovery;
}
}
@Override
public ApplicationStore createApplicationStore(ApplicationId application,
ApplicationSubmissionContext context) throws IOException {
return new ApplicationStoreImpl();
}
@Override
public void removeApplication(ApplicationId application) throws IOException {}
@Override
public RMState restore() throws IOException {
MemRMState state = new MemRMState();
return state;
}
@Override
public synchronized NodeId getNextNodeId() throws IOException {
// TODO: FIXMEVinodkv
// int num = nodeId.getId();
// num++;
// nodeId.setId(num);
return nodeId;
}
private class MemRMState implements RMState {
public MemRMState() {
nodeId = recordFactory.newRecordInstance(NodeId.class);
}
@Override
public List<RMNode> getStoredNodeManagers() {
return new ArrayList<RMNode>();
}
@Override
public NodeId getLastLoggedNodeId() {
return nodeId;
}
@Override
public Map<ApplicationId, ApplicationInfo> getStoredApplications() {
return new HashMap<ApplicationId, Store.ApplicationInfo>();
}
}
@Override
public boolean isLoggable() {
return doneWithRecovery;
}
@Override
public void doneWithRecovery() {
doneWithRecovery = true;
}
}

View File

@ -0,0 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
public interface RMStateStore {
public interface RMState {
}
}

View File

@ -17,7 +17,7 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
public interface Recoverable {
public void recover(RMState state) throws Exception;

View File

@ -1,46 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationMaster;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
public interface Store extends NodeStore, ApplicationsStore {
public interface ApplicationInfo {
public ApplicationMaster getApplicationMaster();
public Container getMasterContainer();
public ApplicationSubmissionContext getApplicationSubmissionContext();
public List<Container> getContainers();
}
public interface RMState {
public List<RMNode> getStoredNodeManagers() ;
public Map<ApplicationId, ApplicationInfo> getStoredApplications();
public NodeId getLastLoggedNodeId();
}
public RMState restore() throws IOException;
public void doneWithRecovery();
}

View File

@ -17,53 +17,17 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ApplicationMaster;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
public class StoreFactory {
public static Store getStore(Configuration conf) {
Store store = ReflectionUtils.newInstance(
public static RMStateStore getStore(Configuration conf) {
RMStateStore store = ReflectionUtils.newInstance(
conf.getClass(YarnConfiguration.RM_STORE,
MemStore.class, Store.class),
FileRMStateStore.class, RMStateStore.class),
conf);
return store;
}
public static ApplicationStore createVoidAppStore() {
return new VoidApplicationStore();
}
private static class VoidApplicationStore implements ApplicationStore {
public VoidApplicationStore() {}
@Override
public void storeContainer(Container container) throws IOException {
}
@Override
public void removeContainer(Container container) throws IOException {
}
@Override
public void storeMasterContainer(Container container) throws IOException {
}
@Override
public void updateApplicationState(ApplicationMaster master)
throws IOException {
}
@Override
public boolean isLoggable() {
return false;
}
}
}

View File

@ -1,509 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationMaster;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationMasterPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationMasterProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class ZKStore implements Store {
private final Configuration conf;
private final ZooKeeper zkClient;
private static final Log LOG = LogFactory.getLog(ZKStore.class);
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private static final String NODES = "nodes/";
private static final String APPS = "apps/";
private static final String ZK_PATH_SEPARATOR = "/";
private static final String NODE_ID = "nodeid";
private static final String APP_MASTER = "master";
private static final String APP_MASTER_CONTAINER = "mastercontainer";
private final String ZK_ADDRESS;
private final int ZK_TIMEOUT;
private boolean doneWithRecovery = false;
/** TODO make this generic **/
private NodeIdPBImpl nodeId = new NodeIdPBImpl();
/**
* TODO fix this for later to handle all kinds of events
* of connection and session events.
*
*/
private static class ZKWatcher implements Watcher {
@Override
public void process(WatchedEvent arg0) {
}
}
public ZKStore(Configuration conf) throws IOException {
this.conf = conf;
this.ZK_ADDRESS = conf.get(YarnConfiguration.RM_ZK_STORE_ADDRESS);
this.ZK_TIMEOUT = conf.getInt(YarnConfiguration.RM_ZK_STORE_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_STORE_TIMEOUT_MS);
zkClient = new ZooKeeper(this.ZK_ADDRESS,
this.ZK_TIMEOUT,
createZKWatcher()
);
// TODO: FIXMEVinodkv
// this.nodeId.setId(0);
}
protected Watcher createZKWatcher() {
return new ZKWatcher();
}
private NodeReportPBImpl createNodeManagerInfo(RMNode rmNode) {
NodeReport node =
recordFactory.newRecordInstance(NodeReport.class);
node.setNodeId(rmNode.getNodeID());
node.setRackName(rmNode.getRackName());
node.setCapability(rmNode.getTotalCapability());
// TODO: FIXME
// node.setUsed(nodeInfo.getUsedResource());
// TODO: acm: refactor2 FIXME
// node.setNumContainers(rmNode.getNumContainers());
return (NodeReportPBImpl)node;
}
@Override
public synchronized void storeNode(RMNode node) throws IOException {
/** create a storage node and store it in zk **/
if (!doneWithRecovery) return;
// TODO: FIXMEVinodkv
// NodeReportPBImpl nodeManagerInfo = createNodeManagerInfo(node);
// byte[] bytes = nodeManagerInfo.getProto().toByteArray();
// try {
// zkClient.create(NODES + Integer.toString(node.getNodeID().getId()), bytes, null,
// CreateMode.PERSISTENT);
// } catch(InterruptedException ie) {
// LOG.info("Interrupted", ie);
// throw new InterruptedIOException("Interrupted");
// } catch(KeeperException ke) {
// LOG.info("Keeper exception", ke);
// throw convertToIOException(ke);
// }
}
@Override
public synchronized void removeNode(RMNode node) throws IOException {
if (!doneWithRecovery) return;
// TODO: FIXME VINODKV
// /** remove a storage node **/
// try {
// zkClient.delete(NODES + Integer.toString(node.getNodeID().getId()), -1);
// } catch(InterruptedException ie) {
// LOG.info("Interrupted", ie);
// throw new InterruptedIOException("Interrupted");
// } catch(KeeperException ke) {
// LOG.info("Keeper exception", ke);
// throw convertToIOException(ke);
// }
}
private static IOException convertToIOException(KeeperException ke) {
IOException io = new IOException();
io.setStackTrace(ke.getStackTrace());
return io;
}
@Override
public synchronized NodeId getNextNodeId() throws IOException {
// TODO: FIXME VINODKV
// int num = nodeId.getId();
// num++;
// nodeId.setId(num);
// try {
// zkClient.setData(NODES + NODE_ID, nodeId.getProto().toByteArray() , -1);
// } catch(InterruptedException ie) {
// LOG.info("Interrupted", ie);
// throw new InterruptedIOException(ie.getMessage());
// } catch(KeeperException ke) {
// throw convertToIOException(ke);
// }
return nodeId;
}
private String containerPathFromContainerId(ContainerId containerId) {
String appString = ConverterUtils.toString(
containerId.getApplicationAttemptId().getApplicationId());
return appString + "/" + containerId.getId();
}
private class ZKApplicationStore implements ApplicationStore {
private final ApplicationId applicationId;
public ZKApplicationStore(ApplicationId applicationId) {
this.applicationId = applicationId;
}
@Override
public void storeMasterContainer(Container container) throws IOException {
if (!doneWithRecovery) return;
ContainerPBImpl containerPBImpl = (ContainerPBImpl) container;
try {
zkClient.setData(APPS +
ConverterUtils.toString(
container.getId().getApplicationAttemptId().getApplicationId())
+
ZK_PATH_SEPARATOR + APP_MASTER_CONTAINER
, containerPBImpl.getProto().toByteArray(), -1);
} catch(InterruptedException ie) {
LOG.info("Interrupted", ie);
throw new InterruptedIOException(ie.getMessage());
} catch(KeeperException ke) {
LOG.info("Keeper exception", ke);
throw convertToIOException(ke);
}
}
@Override
public synchronized void storeContainer(Container container) throws IOException {
if (!doneWithRecovery) return;
ContainerPBImpl containerPBImpl = (ContainerPBImpl) container;
try {
zkClient.create(APPS + containerPathFromContainerId(container.getId())
, containerPBImpl.getProto().toByteArray(), null, CreateMode.PERSISTENT);
} catch(InterruptedException ie) {
LOG.info("Interrupted", ie);
throw new InterruptedIOException(ie.getMessage());
} catch(KeeperException ke) {
LOG.info("Keeper exception", ke);
throw convertToIOException(ke);
}
}
@Override
public synchronized void removeContainer(Container container) throws IOException {
if (!doneWithRecovery) return;
try {
zkClient.delete(APPS + containerPathFromContainerId(container.getId()),
-1);
} catch(InterruptedException ie) {
throw new InterruptedIOException(ie.getMessage());
} catch(KeeperException ke) {
LOG.info("Keeper exception", ke);
throw convertToIOException(ke);
}
}
@Override
public void updateApplicationState(
ApplicationMaster master) throws IOException {
if (!doneWithRecovery) return;
String appString = APPS + ConverterUtils.toString(applicationId);
ApplicationMasterPBImpl masterPBImpl = (ApplicationMasterPBImpl) master;
try {
zkClient.setData(appString, masterPBImpl.getProto().toByteArray(), -1);
} catch(InterruptedException ie) {
LOG.info("Interrupted", ie);
throw new InterruptedIOException(ie.getMessage());
} catch(KeeperException ke) {
LOG.info("Keeper exception", ke);
throw convertToIOException(ke);
}
}
@Override
public boolean isLoggable() {
return doneWithRecovery;
}
}
@Override
public synchronized ApplicationStore createApplicationStore(ApplicationId application,
ApplicationSubmissionContext context) throws IOException {
if (!doneWithRecovery) return new ZKApplicationStore(application);
ApplicationSubmissionContextPBImpl contextPBImpl = (ApplicationSubmissionContextPBImpl) context;
String appString = APPS + ConverterUtils.toString(application);
ApplicationMasterPBImpl masterPBImpl = new ApplicationMasterPBImpl();
ContainerPBImpl container = new ContainerPBImpl();
try {
zkClient.create(appString, contextPBImpl.getProto()
.toByteArray(), null, CreateMode.PERSISTENT);
zkClient.create(appString + ZK_PATH_SEPARATOR + APP_MASTER,
masterPBImpl.getProto().toByteArray(), null, CreateMode.PERSISTENT);
zkClient.create(appString + ZK_PATH_SEPARATOR + APP_MASTER_CONTAINER,
container.getProto().toByteArray(), null, CreateMode.PERSISTENT);
} catch(InterruptedException ie) {
LOG.info("Interrupted", ie);
throw new InterruptedIOException(ie.getMessage());
} catch(KeeperException ke) {
LOG.info("Keeper exception", ke);
throw convertToIOException(ke);
}
return new ZKApplicationStore(application);
}
@Override
public synchronized void removeApplication(ApplicationId application) throws IOException {
if (!doneWithRecovery) return;
try {
zkClient.delete(APPS + ConverterUtils.toString(application), -1);
} catch(InterruptedException ie) {
LOG.info("Interrupted", ie);
throw new InterruptedIOException(ie.getMessage());
} catch(KeeperException ke) {
LOG.info("Keeper Exception", ke);
throw convertToIOException(ke);
}
}
@Override
public boolean isLoggable() {
return doneWithRecovery;
}
@Override
public void doneWithRecovery() {
this.doneWithRecovery = true;
}
@Override
public synchronized RMState restore() throws IOException {
ZKRMState rmState = new ZKRMState();
rmState.load();
return rmState;
}
private static class ApplicationInfoImpl implements ApplicationInfo {
private ApplicationMaster master;
private Container masterContainer;
private final ApplicationSubmissionContext context;
private final List<Container> containers = new ArrayList<Container>();
public ApplicationInfoImpl(ApplicationSubmissionContext context) {
this.context = context;
}
public void setApplicationMaster(ApplicationMaster master) {
this.master = master;
}
public void setMasterContainer(Container container) {
this.masterContainer = container;
}
@Override
public ApplicationMaster getApplicationMaster() {
return this.master;
}
@Override
public ApplicationSubmissionContext getApplicationSubmissionContext() {
return this.context;
}
@Override
public Container getMasterContainer() {
return this.masterContainer;
}
@Override
public List<Container> getContainers() {
return this.containers;
}
public void addContainer(Container container) {
containers.add(container);
}
}
private class ZKRMState implements RMState {
private List<RMNode> nodeManagers = new ArrayList<RMNode>();
private Map<ApplicationId, ApplicationInfo> applications = new
HashMap<ApplicationId, ApplicationInfo>();
public ZKRMState() {
LOG.info("Restoring RM state from ZK");
}
private synchronized List<NodeReport> listStoredNodes() throws IOException {
/** get the list of nodes stored in zk **/
//TODO PB
List<NodeReport> nodes = new ArrayList<NodeReport>();
Stat stat = new Stat();
try {
List<String> children = zkClient.getChildren(NODES, false);
for (String child: children) {
byte[] data = zkClient.getData(NODES + child, false, stat);
NodeReportPBImpl nmImpl = new NodeReportPBImpl(
NodeReportProto.parseFrom(data));
nodes.add(nmImpl);
}
} catch (InterruptedException ie) {
LOG.info("Interrupted" , ie);
throw new InterruptedIOException("Interrupted");
} catch(KeeperException ke) {
LOG.error("Failed to list nodes", ke);
throw convertToIOException(ke);
}
return nodes;
}
@Override
public List<RMNode> getStoredNodeManagers() {
return nodeManagers;
}
@Override
public NodeId getLastLoggedNodeId() {
return nodeId;
}
private void readLastNodeId() throws IOException {
Stat stat = new Stat();
try {
byte[] data = zkClient.getData(NODES + NODE_ID, false, stat);
nodeId = new NodeIdPBImpl(NodeIdProto.parseFrom(data));
} catch(InterruptedException ie) {
LOG.info("Interrupted", ie);
throw new InterruptedIOException(ie.getMessage());
} catch(KeeperException ke) {
LOG.info("Keeper Exception", ke);
throw convertToIOException(ke);
}
}
private ApplicationInfo getAppInfo(String app) throws IOException {
ApplicationInfoImpl info = null;
Stat stat = new Stat();
try {
ApplicationSubmissionContext context = null;
byte[] data = zkClient.getData(APPS + app, false, stat);
context = new ApplicationSubmissionContextPBImpl(
ApplicationSubmissionContextProto.parseFrom(data));
info = new ApplicationInfoImpl(context);
List<String> children = zkClient.getChildren(APPS + app, false, stat);
ApplicationMaster master = null;
for (String child: children) {
byte[] childdata = zkClient.getData(APPS + app + ZK_PATH_SEPARATOR + child, false, stat);
if (APP_MASTER.equals(child)) {
master = new ApplicationMasterPBImpl(ApplicationMasterProto.parseFrom(childdata));
info.setApplicationMaster(master);
} else if (APP_MASTER_CONTAINER.equals(child)) {
Container masterContainer = new ContainerPBImpl(ContainerProto.parseFrom(data));
info.setMasterContainer(masterContainer);
} else {
Container container = new ContainerPBImpl(ContainerProto.parseFrom(data));
info.addContainer(container);
}
}
} catch(InterruptedException ie) {
LOG.info("Interrupted", ie);
throw new InterruptedIOException(ie.getMessage());
} catch(KeeperException ke) {
throw convertToIOException(ke);
}
return info;
}
private void load() throws IOException {
List<NodeReport> nodeInfos = listStoredNodes();
final Pattern trackerPattern = Pattern.compile(".*:.*");
final Matcher m = trackerPattern.matcher("");
for (NodeReport node: nodeInfos) {
m.reset(node.getNodeId().getHost());
if (!m.find()) {
LOG.info("Skipping node, bad node-address "
+ node.getNodeId().getHost());
continue;
}
String hostName = m.group(0);
int cmPort = Integer.valueOf(m.group(1));
m.reset(node.getHttpAddress());
if (!m.find()) {
LOG.info("Skipping node, bad http-address " + node.getHttpAddress());
continue;
}
int httpPort = Integer.valueOf(m.group(1));
// TODO: FindBugs warns passing null below. Commenting this for later.
// RMNode nm = new RMNodeImpl(node.getNodeId(), null,
// hostName, cmPort, httpPort,
// ResourceTrackerService.resolve(node.getNodeId().getHost()),
// node.getCapability());
// nodeManagers.add(nm);
}
readLastNodeId();
/* make sure we get all the applications */
List<String> apps = null;
try {
apps = zkClient.getChildren(APPS, false);
} catch(InterruptedException ie) {
LOG.info("Interrupted", ie);
throw new InterruptedIOException(ie.getMessage());
} catch(KeeperException ke) {
throw convertToIOException(ke);
}
for (String app: apps) {
ApplicationInfo info = getAppInfo(app);
applications.put(info.getApplicationMaster().getApplicationId(), info);
}
}
@Override
public Map<ApplicationId, ApplicationInfo> getStoredApplications() {
return applications;
}
}
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -29,7 +29,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@ -131,13 +130,6 @@ public interface RMApp extends EventHandler<RMAppEvent> {
*/
int pullRMNodeUpdates(Collection<RMNode> updatedNodes);
/**
* Application level metadata is stored in {@link ApplicationStore} which
* can persist the information.
* @return the {@link ApplicationStore} for this {@link RMApp}.
*/
ApplicationStore getApplicationStore();
/**
* The finish time of the {@link RMApp}
* @return the finish time of the application.,

View File

@ -49,7 +49,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -81,7 +80,6 @@ public class RMAppImpl implements RMApp {
private final String name;
private final ApplicationSubmissionContext submissionContext;
private final String clientTokenStr;
private final ApplicationStore appStore;
private final Dispatcher dispatcher;
private final YarnScheduler scheduler;
private final ApplicationMasterService masterService;
@ -213,7 +211,6 @@ public class RMAppImpl implements RMApp {
public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
Configuration config, String name, String user, String queue,
ApplicationSubmissionContext submissionContext, String clientTokenStr,
ApplicationStore appStore,
YarnScheduler scheduler, ApplicationMasterService masterService,
long submitTime) {
@ -227,7 +224,6 @@ public class RMAppImpl implements RMApp {
this.queue = queue;
this.submissionContext = submissionContext;
this.clientTokenStr = clientTokenStr;
this.appStore = appStore;
this.scheduler = scheduler;
this.masterService = masterService;
this.submitTime = submitTime;
@ -340,11 +336,6 @@ public class RMAppImpl implements RMApp {
}
}
@Override
public ApplicationStore getApplicationStore() {
return this.appStore;
}
private YarnApplicationState createApplicationState(RMAppState rmAppState) {
switch(rmAppState) {
case NEW:

View File

@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@ -70,14 +69,12 @@ public class AppSchedulingInfo {
boolean pending = true; // for app metrics
public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
ApplicationStore store) {
String user, Queue queue, ActiveUsersManager activeUsersManager) {
this.applicationAttemptId = appAttemptId;
this.applicationId = appAttemptId.getApplicationId();
this.queue = queue;
this.queueName = queue.getQueueName();
this.user = user;
//this.store = store;
this.activeUsersManager = activeUsersManager;
}

View File

@ -49,7 +49,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
@ -365,7 +365,7 @@ implements ResourceScheduler, CapacitySchedulerContext, Configurable {
// TODO: Fix store
FiCaSchedulerApp SchedulerApp =
new FiCaSchedulerApp(applicationAttemptId, user, queue,
queue.getActiveUsersManager(), rmContext, null);
queue.getActiveUsersManager(), rmContext);
// Submit to the queue
try {
@ -767,18 +767,7 @@ implements ResourceScheduler, CapacitySchedulerContext, Configurable {
@Override
@Lock(Lock.NoLock.class)
public void recover(RMState state) throws Exception {
// TODO: VINDOKVFIXME recovery
// applications.clear();
// for (Map.Entry<ApplicationId, ApplicationInfo> entry : state.getStoredApplications().entrySet()) {
// ApplicationId appId = entry.getKey();
// ApplicationInfo appInfo = entry.getValue();
// SchedulerApp app = applications.get(appId);
// app.allocate(appInfo.getContainers());
// for (Container c: entry.getValue().getContainers()) {
// Queue queue = queues.get(appInfo.getApplicationSubmissionContext().getQueue());
// queue.recoverContainer(clusterResource, applications.get(appId), c);
// }
// }
// NOT IMPLEMENTED
}
@Override

View File

@ -43,7 +43,6 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@ -111,11 +110,11 @@ public class FiCaSchedulerApp extends SchedulerApplication {
private final RMContext rmContext;
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext, ApplicationStore store) {
RMContext rmContext) {
this.rmContext = rmContext;
this.appSchedulingInfo =
new AppSchedulingInfo(applicationAttemptId, user, queue,
activeUsersManager, store);
activeUsersManager);
this.queue = queue;
}

View File

@ -42,7 +42,6 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@ -103,11 +102,11 @@ public class FSSchedulerApp extends SchedulerApplication {
private final RMContext rmContext;
public FSSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext, ApplicationStore store) {
RMContext rmContext) {
this.rmContext = rmContext;
this.appSchedulingInfo =
new AppSchedulingInfo(applicationAttemptId, user, queue,
activeUsersManager, store);
activeUsersManager);
this.queue = queue;
}

View File

@ -50,7 +50,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
@ -486,7 +486,7 @@ public class FairScheduler implements ResourceScheduler {
FSSchedulerApp schedulerApp =
new FSSchedulerApp(applicationAttemptId, user,
queue.getQueueSchedulable(), new ActiveUsersManager(getRootQueueMetrics()),
rmContext, null);
rmContext);
// Inforce ACLs
UserGroupInformation userUgi;

View File

@ -56,7 +56,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
@ -292,7 +292,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
// TODO: Fix store
FiCaSchedulerApp schedulerApp =
new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager,
this.rmContext, null);
this.rmContext);
applications.put(appAttemptId, schedulerApp);
metrics.submitApp(user, appAttemptId.getAttemptId());
LOG.info("Application Submission: " + appAttemptId.getApplicationId() +
@ -763,13 +763,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
@Override
public void recover(RMState state) {
// TODO fix recovery
// for (Map.Entry<ApplicationId, ApplicationInfo> entry: state.getStoredApplications().entrySet()) {
// ApplicationId appId = entry.getKey();
// ApplicationInfo appInfo = entry.getValue();
// SchedulerApp app = applications.get(appId);
// app.allocate(appInfo.getContainers());
// }
// NOT IMPLEMENTED
}
@Override

View File

@ -37,7 +37,6 @@ import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@ -93,7 +92,7 @@ public class TestAppManager{
rmDispatcher);
AMLivelinessMonitor amFinishingMonitor = new AMLivelinessMonitor(
rmDispatcher);
return new RMContextImpl(new MemStore(), rmDispatcher,
return new RMContextImpl(rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
null, null, null, null) {
@Override
@ -366,7 +365,6 @@ public class TestAppManager{
YarnConfiguration.DEFAULT_QUEUE_NAME,
app.getQueue());
Assert.assertEquals("app state doesn't match", RMAppState.NEW, app.getState());
Assert.assertNotNull("app store is null", app.getApplicationStore());
// wait for event to be processed
int timeoutSecs = 0;
@ -413,7 +411,6 @@ public class TestAppManager{
Assert.assertEquals("app name doesn't match", "testApp1", app.getName());
Assert.assertEquals("app queue doesn't match", "testQueue", app.getQueue());
Assert.assertEquals("app state doesn't match", RMAppState.NEW, app.getState());
Assert.assertNotNull("app store is null", app.getApplicationStore());
// wait for event to be processed
int timeoutSecs = 0;

View File

@ -50,7 +50,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.service.Service.STATE;
@ -85,7 +85,7 @@ public class TestApplicationACLs {
@BeforeClass
public static void setup() throws InterruptedException, IOException {
Store store = StoreFactory.getStore(conf);
RMStateStore store = StoreFactory.getStore(conf);
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
AccessControlList adminACL = new AccessControlList("");
adminACL.addGroup(SUPER_GROUP);

View File

@ -182,7 +182,7 @@ public class TestClientRMService {
private RMAppImpl getRMApp(RMContext rmContext, YarnScheduler yarnScheduler,
ApplicationId applicationId3, YarnConfiguration config, String queueName) {
return new RMAppImpl(applicationId3, rmContext, config, null, null,
queueName, null, null, null, yarnScheduler, null, System
queueName, null, null, yarnScheduler, null, System
.currentTimeMillis());
}
}

View File

@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
@ -79,7 +78,7 @@ public class TestRMNodeTransitions {
InlineDispatcher rmDispatcher = new InlineDispatcher();
rmContext =
new RMContextImpl(new MemStore(), rmDispatcher, null, null, null,
new RMContextImpl(rmDispatcher, null, null, null,
mock(DelegationTokenRenewer.class), null, null, null);
scheduler = mock(YarnScheduler.class);
doAnswer(

View File

@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@ -47,7 +47,7 @@ public class TestResourceManager {
@Before
public void setUp() throws Exception {
Configuration conf = new YarnConfiguration();
Store store = StoreFactory.getStore(conf);
RMStateStore store = StoreFactory.getStore(conf);
resourceManager = new ResourceManager(store);
resourceManager.init(conf);
}

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@ -194,10 +193,6 @@ public abstract class MockAsm extends MockApps {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public ApplicationStore getApplicationStore() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public float getProgress() {
throw new UnsupportedOperationException("Not supported yet.");
}

View File

@ -49,8 +49,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;

View File

@ -53,9 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;

View File

@ -35,7 +35,6 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.junit.After;

View File

@ -36,8 +36,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.junit.Before;
import org.junit.Test;

View File

@ -43,9 +43,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;

View File

@ -39,7 +39,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.NodeEventDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@ -70,7 +69,7 @@ public class TestNMExpiry {
Configuration conf = new Configuration();
// Dispatcher that processes events inline
Dispatcher dispatcher = new InlineDispatcher();
RMContext context = new RMContextImpl(new MemStore(), dispatcher, null,
RMContext context = new RMContextImpl(dispatcher, null,
null, null, null, null, null, null);
dispatcher.register(SchedulerEventType.class,
new InlineDispatcher.EmptyEventHandler());

View File

@ -39,7 +39,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@ -66,7 +65,7 @@ public class TestRMNMRPCResponseId {
}
});
RMContext context =
new RMContextImpl(new MemStore(), dispatcher, null, null, null, null,
new RMContextImpl(dispatcher, null, null, null, null,
null, null, null);
dispatcher.register(RMNodeEventType.class,
new ResourceManager.NodeEventDispatcher(context));

View File

@ -28,7 +28,6 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@ -137,11 +136,6 @@ public class MockRMApp implements RMApp {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public ApplicationStore getApplicationStore() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public long getFinishTime() {
return finish;

View File

@ -40,8 +40,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
@ -140,7 +138,7 @@ public class TestRMAppTransitions {
AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class);
this.rmContext =
new RMContextImpl(new MemStore(), rmDispatcher,
new RMContextImpl(rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
null, new ApplicationTokenSecretManager(conf),
new RMContainerTokenSecretManager(conf),
@ -171,7 +169,6 @@ public class TestRMAppTransitions {
// ensure max retries set to known value
conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, maxRetries);
String clientTokenStr = "bogusstring";
ApplicationStore appStore = mock(ApplicationStore.class);
YarnScheduler scheduler = mock(YarnScheduler.class);
ApplicationMasterService masterService =
new ApplicationMasterService(rmContext, scheduler);
@ -183,7 +180,7 @@ public class TestRMAppTransitions {
RMApp application = new RMAppImpl(applicationId, rmContext,
conf, name, user,
queue, submissionContext, clientTokenStr,
appStore, scheduler,
scheduler,
masterService, System.currentTimeMillis());
testAppStartState(applicationId, user, name, queue, application);

View File

@ -53,7 +53,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@ -162,7 +161,7 @@ public class TestRMAppAttemptTransitions {
amFinishingMonitor = mock(AMLivelinessMonitor.class);
Configuration conf = new Configuration();
rmContext =
new RMContextImpl(new MemStore(), rmDispatcher,
new RMContextImpl(rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
null, new ApplicationTokenSecretManager(conf),
new RMContainerTokenSecretManager(conf),

View File

@ -479,7 +479,7 @@ public class TestApplicationLimits {
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0_0 =
spy(new FiCaSchedulerApp(appAttemptId_0_0, user_0, queue,
queue.getActiveUsersManager(), rmContext, null));
queue.getActiveUsersManager(), rmContext));
queue.submitApplication(app_0_0, user_0, A);
List<ResourceRequest> app_0_0_requests = new ArrayList<ResourceRequest>();
@ -498,7 +498,7 @@ public class TestApplicationLimits {
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_0_1 =
spy(new FiCaSchedulerApp(appAttemptId_0_1, user_0, queue,
queue.getActiveUsersManager(), rmContext, null));
queue.getActiveUsersManager(), rmContext));
queue.submitApplication(app_0_1, user_0, A);
List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>();
@ -517,7 +517,7 @@ public class TestApplicationLimits {
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_1_0 =
spy(new FiCaSchedulerApp(appAttemptId_1_0, user_1, queue,
queue.getActiveUsersManager(), rmContext, null));
queue.getActiveUsersManager(), rmContext));
queue.submitApplication(app_1_0, user_1, A);
List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>();

View File

@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@ -76,7 +76,7 @@ public class TestCapacityScheduler {
@Before
public void setUp() throws Exception {
Store store = StoreFactory.getStore(new Configuration());
RMStateStore store = StoreFactory.getStore(new Configuration());
resourceManager = new ResourceManager(store);
CapacitySchedulerConfiguration csConf
= new CapacitySchedulerConfiguration();
@ -251,7 +251,7 @@ public class TestCapacityScheduler {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
cs.setConf(new YarnConfiguration());
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null,
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new ClientToAMTokenSecretManagerInRM()));
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
@ -349,7 +349,7 @@ public class TestCapacityScheduler {
conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f);
conf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f);
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null,
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new ClientToAMTokenSecretManagerInRM()));
}
@ -361,7 +361,7 @@ public class TestCapacityScheduler {
setupQueueConfiguration(csConf);
CapacityScheduler cs = new CapacityScheduler();
cs.setConf(new YarnConfiguration());
cs.reinitialize(csConf, new RMContextImpl(null, null, null, null, null,
cs.reinitialize(csConf, new RMContextImpl(null, null, null, null,
null, null, new RMContainerTokenSecretManager(csConf),
new ClientToAMTokenSecretManagerInRM()));
@ -387,7 +387,7 @@ public class TestCapacityScheduler {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
cs.setConf(new YarnConfiguration());
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null,
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new ClientToAMTokenSecretManagerInRM()));
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);

View File

@ -250,14 +250,14 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
mock(ActiveUsersManager.class), rmContext);
a.submitApplication(app_0, user_0, B);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
mock(ActiveUsersManager.class), rmContext);
a.submitApplication(app_1, user_0, B); // same user
@ -295,14 +295,14 @@ public class TestLeafQueue {
final ApplicationAttemptId appAttemptId_0 = TestUtils
.getMockApplicationAttemptId(0, 1);
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_d, d, null,
rmContext, null);
rmContext);
d.submitApplication(app_0, user_d, D);
// Attempt the same application again
final ApplicationAttemptId appAttemptId_1 = TestUtils
.getMockApplicationAttemptId(0, 2);
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_d, d, null,
rmContext, null);
rmContext);
d.submitApplication(app_1, user_d, D); // same user
}
@ -320,7 +320,7 @@ public class TestLeafQueue {
final ApplicationAttemptId appAttemptId_0 = TestUtils
.getMockApplicationAttemptId(0, 1);
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, null,
rmContext, null);
rmContext);
a.submitApplication(app_0, user_0, B);
when(cs.getApplication(appAttemptId_0)).thenReturn(app_0);
@ -335,7 +335,7 @@ public class TestLeafQueue {
final ApplicationAttemptId appAttemptId_1 = TestUtils
.getMockApplicationAttemptId(0, 2);
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, null,
rmContext, null);
rmContext);
a.submitApplication(app_1, user_0, B); // same user
assertEquals(1, a.getMetrics().getAppsSubmitted());
@ -371,14 +371,14 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
mock(ActiveUsersManager.class), rmContext);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
mock(ActiveUsersManager.class), rmContext);
a.submitApplication(app_1, user_0, A); // same user
@ -495,21 +495,21 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
a.getActiveUsersManager(), rmContext, null);
a.getActiveUsersManager(), rmContext);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a,
a.getActiveUsersManager(), rmContext, null);
a.getActiveUsersManager(), rmContext);
a.submitApplication(app_1, user_0, A); // same user
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_1, a,
a.getActiveUsersManager(), rmContext, null);
a.getActiveUsersManager(), rmContext);
a.submitApplication(app_2, user_1, A);
// Setup some nodes
@ -588,21 +588,21 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
a.getActiveUsersManager(), rmContext, null);
a.getActiveUsersManager(), rmContext);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a,
a.getActiveUsersManager(), rmContext, null);
a.getActiveUsersManager(), rmContext);
a.submitApplication(app_1, user_0, A); // same user
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_1, a,
a.getActiveUsersManager(), rmContext, null);
a.getActiveUsersManager(), rmContext);
a.submitApplication(app_2, user_1, A);
// Setup some nodes
@ -699,28 +699,28 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
a.getActiveUsersManager(), rmContext, null);
a.getActiveUsersManager(), rmContext);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a,
a.getActiveUsersManager(), rmContext, null);
a.getActiveUsersManager(), rmContext);
a.submitApplication(app_1, user_0, A); // same user
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_1, a,
a.getActiveUsersManager(), rmContext, null);
a.getActiveUsersManager(), rmContext);
a.submitApplication(app_2, user_1, A);
final ApplicationAttemptId appAttemptId_3 =
TestUtils.getMockApplicationAttemptId(3, 0);
FiCaSchedulerApp app_3 =
new FiCaSchedulerApp(appAttemptId_3, user_2, a,
a.getActiveUsersManager(), rmContext, null);
a.getActiveUsersManager(), rmContext);
a.submitApplication(app_3, user_2, A);
// Setup some nodes
@ -874,14 +874,14 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
mock(ActiveUsersManager.class), rmContext);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_1, a,
mock(ActiveUsersManager.class), rmContext, null);
mock(ActiveUsersManager.class), rmContext);
a.submitApplication(app_1, user_1, A);
// Setup some nodes
@ -973,14 +973,14 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
mock(ActiveUsersManager.class), rmContext);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_1, a,
mock(ActiveUsersManager.class), rmContext, null);
mock(ActiveUsersManager.class), rmContext);
a.submitApplication(app_1, user_1, A);
// Setup some nodes
@ -1072,14 +1072,14 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null);
mock(ActiveUsersManager.class), rmContext);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_1, a,
mock(ActiveUsersManager.class), rmContext, null);
mock(ActiveUsersManager.class), rmContext);
a.submitApplication(app_1, user_1, A);
// Setup some nodes
@ -1187,7 +1187,7 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null));
mock(ActiveUsersManager.class), rmContext));
a.submitApplication(app_0, user_0, A);
// Setup some nodes and racks
@ -1327,7 +1327,7 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null));
mock(ActiveUsersManager.class), rmContext));
a.submitApplication(app_0, user_0, A);
// Setup some nodes and racks
@ -1457,7 +1457,7 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), rmContext, null));
mock(ActiveUsersManager.class), rmContext));
a.submitApplication(app_0, user_0, A);
// Setup some nodes and racks

View File

@ -43,7 +43,7 @@ public class TestQueueParsing {
CapacityScheduler capacityScheduler = new CapacityScheduler();
capacityScheduler.setConf(conf);
capacityScheduler.reinitialize(conf, new RMContextImpl(null, null, null,
capacityScheduler.reinitialize(conf, new RMContextImpl(null, null,
null, null, null, null, new RMContainerTokenSecretManager(conf),
new ClientToAMTokenSecretManagerInRM()));

View File

@ -83,7 +83,7 @@ public class TestUtils {
Configuration conf = new Configuration();
RMContext rmContext =
new RMContextImpl(null, nullDispatcher, cae, null, null, null,
new RMContextImpl(nullDispatcher, cae, null, null, null,
new ApplicationTokenSecretManager(conf),
new RMContainerTokenSecretManager(conf),
new ClientToAMTokenSecretManagerInRM());

View File

@ -53,7 +53,7 @@ public class TestFSSchedulerApp {
ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
FSSchedulerApp schedulerApp =
new FSSchedulerApp(applicationAttemptId, "user1", queue , null, null, null);
new FSSchedulerApp(applicationAttemptId, "user1", queue , null, null);
// Default level should be node-local
assertEquals(NodeType.NODE_LOCAL, schedulerApp.getAllowedLocalityLevel(
@ -111,7 +111,7 @@ public class TestFSSchedulerApp {
ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
FSSchedulerApp schedulerApp =
new FSSchedulerApp(applicationAttemptId, "user1", queue , null, null, null);
new FSSchedulerApp(applicationAttemptId, "user1", queue , null, null);
assertEquals(NodeType.OFF_SWITCH, schedulerApp.getAllowedLocalityLevel(
prio, 10, -1.0, -1.0));
}

View File

@ -47,7 +47,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@ -97,7 +97,7 @@ public class TestFairScheduler {
Configuration conf = createConfiguration();
// All tests assume only one assignment per node update
conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
Store store = StoreFactory.getStore(conf);
RMStateStore store = StoreFactory.getStore(conf);
resourceManager = new ResourceManager(store);
resourceManager.init(conf);
((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();

View File

@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.junit.After;
@ -50,7 +50,7 @@ public class TestFairSchedulerEventLog {
// All tests assume only one assignment per node update
conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
Store store = StoreFactory.getStore(conf);
RMStateStore store = StoreFactory.getStore(conf);
resourceManager = new ResourceManager(store);
resourceManager.init(conf);
((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();

View File

@ -38,7 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
@ -59,7 +59,7 @@ public class TestFifoScheduler {
@Before
public void setUp() throws Exception {
Store store = StoreFactory.getStore(new Configuration());
RMStateStore store = StoreFactory.getStore(new Configuration());
resourceManager = new ResourceManager(store);
Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER,
@ -91,7 +91,7 @@ public class TestFifoScheduler {
@Test
public void testAppAttemptMetrics() throws Exception {
AsyncDispatcher dispatcher = new InlineDispatcher();
RMContext rmContext = new RMContextImpl(null, dispatcher, null,
RMContext rmContext = new RMContextImpl(dispatcher, null,
null, null, null, null, null, null);
FifoScheduler schedular = new FifoScheduler();

View File

@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.MockAsm;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@ -160,7 +159,7 @@ public class TestRMWebApp {
for (RMNode node : deactivatedNodes) {
deactivatedNodesMap.put(node.getHostName(), node);
}
return new RMContextImpl(new MemStore(), null, null, null, null,
return new RMContextImpl(null, null, null, null,
null, null, null, null) {
@Override
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
@ -201,7 +200,7 @@ public class TestRMWebApp {
CapacityScheduler cs = new CapacityScheduler();
cs.setConf(new YarnConfiguration());
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null,
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new ClientToAMTokenSecretManagerInRM()));
return cs;

View File

@ -48,7 +48,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.CompositeService;
@ -154,7 +154,7 @@ public class MiniYARNCluster extends CompositeService {
getConfig().set(YarnConfiguration.RM_WEBAPP_ADDRESS,
MiniYARNCluster.getHostname() + ":0");
}
Store store = StoreFactory.getStore(getConfig());
RMStateStore store = StoreFactory.getStore(getConfig());
resourceManager = new ResourceManager(store) {
@Override
protected void doSecureLogin() throws IOException {