YARN-7988. Refactor FSNodeLabelStore code for Node Attributes store support. Contributed by Bibin A Chundatt.

This commit is contained in:
Sunil G 2018-03-29 17:12:08 +05:30
parent 3b3b6efe21
commit 6f4bc49c6d
18 changed files with 850 additions and 299 deletions

View File

@ -234,7 +234,10 @@ protected boolean isInitNodeLabelStoreInProgress() {
return initNodeLabelStoreInProgress;
}
boolean isCentralizedConfiguration() {
/**
* @return true if node label configuration type is not distributed.
*/
public boolean isCentralizedConfiguration() {
return isCentralizedNodeLabelConfiguration;
}
@ -245,8 +248,7 @@ protected void initNodeLabelStore(Configuration conf) throws Exception {
conf.getClass(YarnConfiguration.FS_NODE_LABELS_STORE_IMPL_CLASS,
FileSystemNodeLabelsStore.class, NodeLabelsStore.class),
conf);
this.store.setNodeLabelsManager(this);
this.store.init(conf);
this.store.init(conf, this);
this.store.recover();
}

View File

@ -18,275 +18,89 @@
package org.apache.hadoop.yarn.nodelabels;
import java.io.EOFException;
import java.io.FileNotFoundException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.nodelabels.store.AbstractFSNodeStore;
import org.apache.hadoop.yarn.nodelabels.store.FSStoreOpHandler;
import org.apache.hadoop.yarn.nodelabels.store.op.AddClusterLabelOp;
import org.apache.hadoop.yarn.nodelabels.store.FSStoreOpHandler.StoreType;
import org.apache.hadoop.yarn.nodelabels.store.op.NodeToLabelOp;
import org.apache.hadoop.yarn.nodelabels.store.op.RemoveClusterLabelOp;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeRequestPBImpl;
import com.google.common.collect.Sets;
public class FileSystemNodeLabelsStore extends NodeLabelsStore {
protected static final Log LOG = LogFactory.getLog(FileSystemNodeLabelsStore.class);
public class FileSystemNodeLabelsStore
extends AbstractFSNodeStore<CommonNodeLabelsManager>
implements NodeLabelsStore {
protected static final Log LOG =
LogFactory.getLog(FileSystemNodeLabelsStore.class);
protected static final String DEFAULT_DIR_NAME = "node-labels";
protected static final String MIRROR_FILENAME = "nodelabel.mirror";
protected static final String EDITLOG_FILENAME = "nodelabel.editlog";
protected enum SerializedLogType {
ADD_LABELS, NODE_TO_LABELS, REMOVE_LABELS
FileSystemNodeLabelsStore() {
super(StoreType.NODE_LABEL_STORE);
}
Path fsWorkingPath;
FileSystem fs;
private FSDataOutputStream editlogOs;
private Path editLogPath;
private String getDefaultFSNodeLabelsRootDir() throws IOException {
// default is in local: /tmp/hadoop-yarn-${user}/node-labels/
return "file:///tmp/hadoop-yarn-"
+ UserGroupInformation.getCurrentUser().getShortUserName() + "/"
+ DEFAULT_DIR_NAME;
return "file:///tmp/hadoop-yarn-" + UserGroupInformation.getCurrentUser()
.getShortUserName() + "/" + DEFAULT_DIR_NAME;
}
@Override
public void init(Configuration conf) throws Exception {
fsWorkingPath =
new Path(conf.get(YarnConfiguration.FS_NODE_LABELS_STORE_ROOT_DIR,
getDefaultFSNodeLabelsRootDir()));
setFileSystem(conf);
// mkdir of root dir path
fs.mkdirs(fsWorkingPath);
public void init(Configuration conf, CommonNodeLabelsManager mgr)
throws Exception {
StoreSchema schema = new StoreSchema(EDITLOG_FILENAME, MIRROR_FILENAME);
initStore(conf, new Path(
conf.get(YarnConfiguration.FS_NODE_LABELS_STORE_ROOT_DIR,
getDefaultFSNodeLabelsRootDir())), schema, mgr);
}
@Override
public void close() throws IOException {
IOUtils.cleanup(LOG, fs, editlogOs);
}
void setFileSystem(Configuration conf) throws IOException {
Configuration confCopy = new Configuration(conf);
fs = fsWorkingPath.getFileSystem(confCopy);
// if it's local file system, use RawLocalFileSystem instead of
// LocalFileSystem, the latter one doesn't support append.
if (fs.getScheme().equals("file")) {
fs = ((LocalFileSystem)fs).getRaw();
}
}
private void ensureAppendEditlogFile() throws IOException {
editlogOs = fs.append(editLogPath);
}
private void ensureCloseEditlogFile() throws IOException {
editlogOs.close();
super.closeFSStore();
}
@Override
public void updateNodeToLabelsMappings(
Map<NodeId, Set<String>> nodeToLabels) throws IOException {
try {
ensureAppendEditlogFile();
editlogOs.writeInt(SerializedLogType.NODE_TO_LABELS.ordinal());
((ReplaceLabelsOnNodeRequestPBImpl) ReplaceLabelsOnNodeRequest
.newInstance(nodeToLabels)).getProto().writeDelimitedTo(editlogOs);
} finally {
ensureCloseEditlogFile();
}
public void updateNodeToLabelsMappings(Map<NodeId, Set<String>> nodeToLabels)
throws IOException {
NodeToLabelOp op = new NodeToLabelOp();
writeToLog(op.setNodeToLabels(nodeToLabels));
}
@Override
public void storeNewClusterNodeLabels(List<NodeLabel> labels)
throws IOException {
try {
ensureAppendEditlogFile();
editlogOs.writeInt(SerializedLogType.ADD_LABELS.ordinal());
((AddToClusterNodeLabelsRequestPBImpl) AddToClusterNodeLabelsRequest
.newInstance(labels)).getProto().writeDelimitedTo(editlogOs);
} finally {
ensureCloseEditlogFile();
}
AddClusterLabelOp op = new AddClusterLabelOp();
writeToLog(op.setLabels(labels));
}
@Override
public void removeClusterNodeLabels(Collection<String> labels)
throws IOException {
try {
ensureAppendEditlogFile();
editlogOs.writeInt(SerializedLogType.REMOVE_LABELS.ordinal());
((RemoveFromClusterNodeLabelsRequestPBImpl) RemoveFromClusterNodeLabelsRequest.newInstance(Sets
.newHashSet(labels.iterator()))).getProto().writeDelimitedTo(editlogOs);
} finally {
ensureCloseEditlogFile();
}
}
protected void loadFromMirror(Path newMirrorPath, Path oldMirrorPath)
throws IOException {
// If mirror.new exists, read from mirror.new,
FSDataInputStream is = null;
try {
is = fs.open(newMirrorPath);
} catch (FileNotFoundException e) {
try {
is = fs.open(oldMirrorPath);
} catch (FileNotFoundException ignored) {
}
}
if (null != is) {
List<NodeLabel> labels = new AddToClusterNodeLabelsRequestPBImpl(
AddToClusterNodeLabelsRequestProto.parseDelimitedFrom(is))
.getNodeLabels();
mgr.addToCluserNodeLabels(labels);
if (mgr.isCentralizedConfiguration()) {
// Only load node to labels mapping while using centralized configuration
Map<NodeId, Set<String>> nodeToLabels =
new ReplaceLabelsOnNodeRequestPBImpl(
ReplaceLabelsOnNodeRequestProto.parseDelimitedFrom(is))
.getNodeToLabels();
mgr.replaceLabelsOnNode(nodeToLabels);
}
is.close();
}
RemoveClusterLabelOp op = new RemoveClusterLabelOp();
writeToLog(op.setLabels(labels));
}
/* (non-Javadoc)
* @see org.apache.hadoop.yarn.nodelabels.NodeLabelsStore#recover(boolean)
*/
@Override
public void recover() throws YarnException,
IOException {
/*
* Steps of recover
* 1) Read from last mirror (from mirror or mirror.old)
* 2) Read from last edit log, and apply such edit log
* 3) Write new mirror to mirror.writing
* 4) Rename mirror to mirror.old
* 5) Move mirror.writing to mirror
* 6) Remove mirror.old
* 7) Remove edit log and create a new empty edit log
*/
// Open mirror from serialized file
Path mirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME);
Path oldMirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME + ".old");
loadFromMirror(mirrorPath, oldMirrorPath);
// Open and process editlog
editLogPath = new Path(fsWorkingPath, EDITLOG_FILENAME);
FSDataInputStream is;
try {
is = fs.open(editLogPath);
} catch (FileNotFoundException e) {
is = null;
}
if (null != is) {
while (true) {
try {
// read edit log one by one
SerializedLogType type = SerializedLogType.values()[is.readInt()];
switch (type) {
case ADD_LABELS: {
List<NodeLabel> labels =
new AddToClusterNodeLabelsRequestPBImpl(
AddToClusterNodeLabelsRequestProto.parseDelimitedFrom(is))
.getNodeLabels();
mgr.addToCluserNodeLabels(labels);
break;
}
case REMOVE_LABELS: {
Collection<String> labels =
RemoveFromClusterNodeLabelsRequestProto.parseDelimitedFrom(is)
.getNodeLabelsList();
mgr.removeFromClusterNodeLabels(labels);
break;
}
case NODE_TO_LABELS: {
Map<NodeId, Set<String>> map =
new ReplaceLabelsOnNodeRequestPBImpl(
ReplaceLabelsOnNodeRequestProto.parseDelimitedFrom(is))
.getNodeToLabels();
if (mgr.isCentralizedConfiguration()) {
/*
* In case of Distributed NodeLabels setup,
* ignoreNodeToLabelsMappings will be set to true and recover will
* be invoked. As RM will collect the node labels from NM through
* registration/HB
*/
mgr.replaceLabelsOnNode(map);
}
break;
}
}
} catch (EOFException e) {
// EOF hit, break
break;
}
}
is.close();
}
// Serialize current mirror to mirror.writing
Path writingMirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME + ".writing");
FSDataOutputStream os = fs.create(writingMirrorPath, true);
((AddToClusterNodeLabelsRequestPBImpl) AddToClusterNodeLabelsRequestPBImpl
.newInstance(mgr.getClusterNodeLabels())).getProto().writeDelimitedTo(os);
((ReplaceLabelsOnNodeRequestPBImpl) ReplaceLabelsOnNodeRequest
.newInstance(mgr.getNodeLabels())).getProto().writeDelimitedTo(os);
os.close();
// Move mirror to mirror.old
if (fs.exists(mirrorPath)) {
fs.delete(oldMirrorPath, false);
fs.rename(mirrorPath, oldMirrorPath);
}
// move mirror.writing to mirror
fs.rename(writingMirrorPath, mirrorPath);
fs.delete(writingMirrorPath, false);
// remove mirror.old
fs.delete(oldMirrorPath, false);
// create a new editlog file
editlogOs = fs.create(editLogPath, true);
editlogOs.close();
LOG.info("Finished write mirror at:" + mirrorPath.toString());
LOG.info("Finished create editlog file at:" + editLogPath.toString());
public void recover() throws YarnException, IOException {
super.recoverFromStore();
}
}

View File

@ -30,25 +30,27 @@
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.exceptions.YarnException;
public abstract class NodeLabelsStore implements Closeable {
protected CommonNodeLabelsManager mgr;
/**
* Interface class for Node label store.
*/
public interface NodeLabelsStore extends Closeable {
/**
* Store node {@literal ->} label
* Store node {@literal ->} label.
*/
public abstract void updateNodeToLabelsMappings(
void updateNodeToLabelsMappings(
Map<NodeId, Set<String>> nodeToLabels) throws IOException;
/**
* Store new labels
* Store new labels.
*/
public abstract void storeNewClusterNodeLabels(List<NodeLabel> label)
void storeNewClusterNodeLabels(List<NodeLabel> label)
throws IOException;
/**
* Remove labels
* Remove labels.
*/
public abstract void removeClusterNodeLabels(Collection<String> labels)
void removeClusterNodeLabels(Collection<String> labels)
throws IOException;
/**
@ -56,16 +58,14 @@ public abstract void removeClusterNodeLabels(Collection<String> labels)
* ignoreNodeToLabelsMappings is true then node to labels mappings should not
* be recovered. In case of Distributed NodeLabels setup
* ignoreNodeToLabelsMappings will be set to true and recover will be invoked
* as RM will collect the node labels from NM through registration/HB
* as RM will collect the node labels from NM through registration/HB.
*
* @throws IOException
* @throws YarnException
*/
public abstract void recover() throws IOException, YarnException;
void recover() throws IOException, YarnException;
public void init(Configuration conf) throws Exception {}
void init(Configuration conf, CommonNodeLabelsManager mgr)
throws Exception;
public void setNodeLabelsManager(CommonNodeLabelsManager mgr) {
this.mgr = mgr;
}
}

View File

@ -18,13 +18,6 @@
package org.apache.hadoop.yarn.nodelabels;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -32,11 +25,19 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeRequestPBImpl;
import org.apache.hadoop.yarn.nodelabels.store.FSStoreOpHandler;
import org.apache.hadoop.yarn.nodelabels.store.StoreOp;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Store implementation for Non Appendable File Store
*/
public class NonAppendableFSNodeLabelStore extends FileSystemNodeLabelsStore {
protected static final Log
LOG = LogFactory.getLog(NonAppendableFSNodeLabelStore.class);
@ -91,29 +92,18 @@ public void removeClusterNodeLabels(Collection<String> labels)
}
private void writeNewMirror() throws IOException {
ReentrantReadWriteLock.ReadLock readLock = mgr.readLock;
ReentrantReadWriteLock.ReadLock readLock = manager.readLock;
try {
// Acquire readlock to make sure we get cluster node labels and
// node-to-labels mapping atomically.
readLock.lock();
List<NodeLabel> nodeLabels = mgr.getClusterNodeLabels();
Map<NodeId, Set<String>> nodeToLabels = mgr.getNodeLabels();
// Write mirror to mirror.new.tmp file
Path newTmpPath = new Path(fsWorkingPath, MIRROR_FILENAME + ".new.tmp");
FSDataOutputStream os = fs
.create(newTmpPath, true);
((AddToClusterNodeLabelsRequestPBImpl) AddToClusterNodeLabelsRequest
.newInstance(nodeLabels)).getProto().writeDelimitedTo(os);
if (mgr.isCentralizedConfiguration()) {
// Only save node-to-labels mapping while using centralized configuration
((ReplaceLabelsOnNodeRequestPBImpl) ReplaceLabelsOnNodeRequest
.newInstance(nodeToLabels)).getProto().writeDelimitedTo(os);
try (FSDataOutputStream os = fs.create(newTmpPath, true)) {
StoreOp op = FSStoreOpHandler.getMirrorOp(getStoreType());
op.write(os, manager);
}
os.close();
// Rename mirror.new.tmp to mirror.new (will remove .new if it's existed)
Path newPath = new Path(fsWorkingPath, MIRROR_FILENAME + ".new");
fs.delete(newPath, false);

View File

@ -0,0 +1,213 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.nodelabels.store;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.yarn.nodelabels.store.op.FSNodeStoreLogOp;
import org.apache.hadoop.yarn.nodelabels.store.FSStoreOpHandler.StoreType;
import java.io.EOFException;
import java.io.IOException;
/**
* Abstract class for File System based store.
*
* @param <M> manager filesystem store.Currently nodelabel will use
* CommonNodeLabelManager.
*/
public abstract class AbstractFSNodeStore<M> {
protected static final Log LOG = LogFactory.getLog(AbstractFSNodeStore.class);
private StoreType storeType;
private FSDataOutputStream editlogOs;
private Path editLogPath;
private StoreSchema schema;
protected M manager;
protected Path fsWorkingPath;
protected FileSystem fs;
public AbstractFSNodeStore(StoreType storeType) {
this.storeType = storeType;
}
protected void initStore(Configuration conf, Path fsStorePath,
StoreSchema schma, M mgr) throws IOException {
this.schema = schma;
this.fsWorkingPath = fsStorePath;
this.manager = mgr;
initFileSystem(conf);
// mkdir of root dir path
fs.mkdirs(fsWorkingPath);
}
/**
* Filesystem store schema define the log name and mirror name.
*/
public static class StoreSchema {
private String editLogName;
private String mirrorName;
public StoreSchema(String editLogName, String mirrorName) {
this.editLogName = editLogName;
this.mirrorName = mirrorName;
}
}
public void initFileSystem(Configuration conf) throws IOException {
Configuration confCopy = new Configuration(conf);
fs = fsWorkingPath.getFileSystem(confCopy);
// if it's local file system, use RawLocalFileSystem instead of
// LocalFileSystem, the latter one doesn't support append.
if (fs.getScheme().equals("file")) {
fs = ((LocalFileSystem) fs).getRaw();
}
}
protected void writeToLog(FSNodeStoreLogOp op) throws IOException {
try {
ensureAppendEditLogFile();
editlogOs.writeInt(op.getOpCode());
op.write(editlogOs, manager);
} finally {
ensureCloseEditlogFile();
}
}
protected void ensureAppendEditLogFile() throws IOException {
editlogOs = fs.append(editLogPath);
}
protected void ensureCloseEditlogFile() throws IOException {
editlogOs.close();
}
protected void loadFromMirror(Path newMirrorPath, Path oldMirrorPath)
throws IOException {
// If mirror.new exists, read from mirror.new
Path mirrorToRead = fs.exists(newMirrorPath) ?
newMirrorPath :
fs.exists(oldMirrorPath) ? oldMirrorPath : null;
if (mirrorToRead != null) {
try (FSDataInputStream is = fs.open(mirrorToRead)) {
StoreOp op = FSStoreOpHandler.getMirrorOp(storeType);
op.recover(is, manager);
}
}
}
protected StoreType getStoreType() {
return storeType;
}
public Path getFsWorkingPath() {
return fsWorkingPath;
}
protected void recoverFromStore() throws IOException {
/*
* Steps of recover
* 1) Read from last mirror (from mirror or mirror.old)
* 2) Read from last edit log, and apply such edit log
* 3) Write new mirror to mirror.writing
* 4) Rename mirror to mirror.old
* 5) Move mirror.writing to mirror
* 6) Remove mirror.old
* 7) Remove edit log and create a new empty edit log
*/
// Open mirror from serialized file
Path mirrorPath = new Path(fsWorkingPath, schema.mirrorName);
Path oldMirrorPath = new Path(fsWorkingPath, schema.mirrorName + ".old");
loadFromMirror(mirrorPath, oldMirrorPath);
// Open and process editlog
editLogPath = new Path(fsWorkingPath, schema.editLogName);
loadManagerFromEditLog(editLogPath);
// Serialize current mirror to mirror.writing
Path writingMirrorPath =
new Path(fsWorkingPath, schema.mirrorName + ".writing");
try(FSDataOutputStream os = fs.create(writingMirrorPath, true)){
StoreOp op = FSStoreOpHandler.getMirrorOp(storeType);
op.write(os, manager);
}
// Move mirror to mirror.old
if (fs.exists(mirrorPath)) {
fs.delete(oldMirrorPath, false);
fs.rename(mirrorPath, oldMirrorPath);
}
// move mirror.writing to mirror
fs.rename(writingMirrorPath, mirrorPath);
fs.delete(writingMirrorPath, false);
// remove mirror.old
fs.delete(oldMirrorPath, false);
// create a new editlog file
editlogOs = fs.create(editLogPath, true);
editlogOs.close();
LOG.info("Finished write mirror at:" + mirrorPath.toString());
LOG.info("Finished create editlog file at:" + editLogPath.toString());
}
protected void loadManagerFromEditLog(Path editLogPath) throws IOException {
if (!fs.exists(editLogPath)) {
return;
}
try (FSDataInputStream is = fs.open(editLogPath)) {
while (true) {
try {
StoreOp storeOp = FSStoreOpHandler.get(is.readInt(),storeType);
storeOp.recover(is, manager);
} catch (EOFException e) {
// EOF hit, break
break;
}
}
}
}
public FileSystem getFs() {
return fs;
}
public void setFs(FileSystem fs) {
this.fs = fs;
}
protected void closeFSStore() {
IOUtils.closeStreams(fs, editlogOs);
}
}

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.nodelabels.store;
import static org.apache.hadoop.yarn.nodelabels.store.FSStoreOpHandler
.StoreType.NODE_LABEL_STORE;
import org.apache.hadoop.yarn.nodelabels.store.op.AddClusterLabelOp;
import org.apache.hadoop.yarn.nodelabels.store.op.FSNodeStoreLogOp;
import org.apache.hadoop.yarn.nodelabels.store.op.NodeLabelMirrorOp;
import org.apache.hadoop.yarn.nodelabels.store.op.NodeToLabelOp;
import org.apache.hadoop.yarn.nodelabels.store.op.RemoveClusterLabelOp;
import java.util.HashMap;
import java.util.Map;
/**
* File system store op handler.
*/
public class FSStoreOpHandler {
private static Map<StoreType, Map<Integer, Class<? extends FSNodeStoreLogOp>>>
editLogOp;
private static Map<StoreType, Class<? extends FSNodeStoreLogOp>> mirrorOp;
public enum StoreType {
NODE_LABEL_STORE,
NODE_LABEL_ATTRIBUTE;
}
static {
editLogOp = new HashMap<>();
mirrorOp = new HashMap<>();
// registerLog edit log operation
registerLog(NODE_LABEL_STORE, AddClusterLabelOp.OPCODE, AddClusterLabelOp.class);
registerLog(NODE_LABEL_STORE, NodeToLabelOp.OPCODE, NodeToLabelOp.class);
registerLog(NODE_LABEL_STORE, RemoveClusterLabelOp.OPCODE, RemoveClusterLabelOp.class);
// registerLog Mirror op
registerMirror(NODE_LABEL_STORE, NodeLabelMirrorOp.class);
}
private static void registerMirror(StoreType type,
Class<? extends FSNodeStoreLogOp> clazz) {
mirrorOp.put(type, clazz);
}
private static void registerLog(StoreType type, int opcode,
Class<? extends FSNodeStoreLogOp> clazz) {
Map<Integer, Class<? extends FSNodeStoreLogOp>> ops = editLogOp.get(type);
Integer code = Integer.valueOf(opcode);
if (ops == null) {
Map<Integer, Class<? extends FSNodeStoreLogOp>> newOps = new HashMap<>();
newOps.put(code, clazz);
editLogOp.put(type, newOps);
} else {
ops.put(code, clazz);
}
}
/**
* Get mirror operation of store Type.
*
* @param storeType
* @return instance of FSNodeStoreLogOp.
*/
public static FSNodeStoreLogOp getMirrorOp(StoreType storeType) {
return newInstance(mirrorOp.get(storeType));
}
/**
* Will return StoreOp instance basead on opCode and StoreType.
* @param opCode
* @param storeType
* @return instance of FSNodeStoreLogOp.
*/
public static FSNodeStoreLogOp get(int opCode, StoreType storeType) {
return newInstance(editLogOp.get(storeType).get(opCode));
}
private static <T extends FSNodeStoreLogOp> T newInstance(Class<T> clazz) {
FSNodeStoreLogOp instance = null;
if (clazz != null) {
try {
instance = clazz.newInstance();
} catch (Exception ex) {
throw new RuntimeException("Failed to instantiate " + clazz, ex);
}
}
return (T) instance;
}
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.nodelabels.store;
import java.io.IOException;
/**
* Define the interface for store activity.
* Used by for FileSystem based operation.
*
* @param <W> write to be done to
* @param <R> read to be done from
* @param <M> manager used
*/
public interface StoreOp<W, R, M> {
/**
* Write operation to persistent storage
*
* @param write write to be done to
* @param mgr manager used by store
* @throws IOException
*/
void write(W write, M mgr) throws IOException;
/**
* Read and populate StoreOp
*
* @param read read to be done from
* @param mgr manager used by store
* @throws IOException
*/
void recover(R read, M mgr) throws IOException;
}

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.nodelabels.store.op;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
import org.apache.hadoop.yarn.server.api.protocolrecords
.AddToClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
.AddToClusterNodeLabelsRequestPBImpl;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
/**
* Add label operation for file system.
*/
public class AddClusterLabelOp
extends FSNodeStoreLogOp<CommonNodeLabelsManager> {
private List<NodeLabel> labels;
public static final int OPCODE = 0;
@Override
public void write(OutputStream os, CommonNodeLabelsManager mgr)
throws IOException {
((AddToClusterNodeLabelsRequestPBImpl) AddToClusterNodeLabelsRequest
.newInstance(labels)).getProto().writeDelimitedTo(os);
}
@Override
public void recover(InputStream is, CommonNodeLabelsManager mgr)
throws IOException {
labels = new AddToClusterNodeLabelsRequestPBImpl(
YarnServerResourceManagerServiceProtos
.AddToClusterNodeLabelsRequestProto
.parseDelimitedFrom(is)).getNodeLabels();
mgr.addToCluserNodeLabels(labels);
}
public AddClusterLabelOp setLabels(List<NodeLabel> labels) {
this.labels = labels;
return this;
}
public List<NodeLabel> getLabels() {
return labels;
}
@Override
public int getOpCode() {
return OPCODE;
}
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.nodelabels.store.op;
import org.apache.hadoop.yarn.nodelabels.store.StoreOp;
import java.io.InputStream;
import java.io.OutputStream;
/**
* Defines all FileSystem editlog operation. All node label and attribute
* store write or read operation will be defined in this class.
*
* @param <M> Manager used for each operation.
*/
public abstract class FSNodeStoreLogOp<M>
implements StoreOp<OutputStream, InputStream, M> {
public abstract int getOpCode();
}

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.nodelabels.store.op;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
import org.apache.hadoop.yarn.server.api.protocolrecords
.ReplaceLabelsOnNodeRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
.AddToClusterNodeLabelsRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
.ReplaceLabelsOnNodeRequestPBImpl;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class NodeLabelMirrorOp
extends FSNodeStoreLogOp<CommonNodeLabelsManager> {
public NodeLabelMirrorOp() {
super();
}
@Override
public void write(OutputStream os, CommonNodeLabelsManager mgr)
throws IOException {
((AddToClusterNodeLabelsRequestPBImpl) AddToClusterNodeLabelsRequestPBImpl
.newInstance(mgr.getClusterNodeLabels())).getProto()
.writeDelimitedTo(os);
if (mgr.isCentralizedConfiguration()) {
((ReplaceLabelsOnNodeRequestPBImpl) ReplaceLabelsOnNodeRequest
.newInstance(mgr.getNodeLabels())).getProto().writeDelimitedTo(os);
}
}
@Override
public void recover(InputStream is, CommonNodeLabelsManager mgr)
throws IOException {
List<NodeLabel> labels = new AddToClusterNodeLabelsRequestPBImpl(
YarnServerResourceManagerServiceProtos
.AddToClusterNodeLabelsRequestProto
.parseDelimitedFrom(is)).getNodeLabels();
mgr.addToCluserNodeLabels(labels);
if (mgr.isCentralizedConfiguration()) {
// Only load node to labels mapping while using centralized
// configuration
Map<NodeId, Set<String>> nodeToLabels =
new ReplaceLabelsOnNodeRequestPBImpl(
YarnServerResourceManagerServiceProtos
.ReplaceLabelsOnNodeRequestProto
.parseDelimitedFrom(is)).getNodeToLabels();
mgr.replaceLabelsOnNode(nodeToLabels);
}
}
@Override
public int getOpCode() {
return -1;
}
}

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.nodelabels.store.op;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
import org.apache.hadoop.yarn.server.api.protocolrecords
.ReplaceLabelsOnNodeRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
.ReplaceLabelsOnNodeRequestPBImpl;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import java.util.Set;
/**
* Node to label mapping store operation for label.
*/
public class NodeToLabelOp
extends FSNodeStoreLogOp<CommonNodeLabelsManager> {
private Map<NodeId, Set<String>> nodeToLabels;
public static final int OPCODE = 1;
@Override
public void write(OutputStream os, CommonNodeLabelsManager mgr)
throws IOException {
((ReplaceLabelsOnNodeRequestPBImpl) ReplaceLabelsOnNodeRequest
.newInstance(nodeToLabels)).getProto().writeDelimitedTo(os);
}
@Override
public void recover(InputStream is, CommonNodeLabelsManager mgr)
throws IOException {
nodeToLabels = new ReplaceLabelsOnNodeRequestPBImpl(
YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProto
.parseDelimitedFrom(is)).getNodeToLabels();
if (mgr.isCentralizedConfiguration()) {
mgr.replaceLabelsOnNode(nodeToLabels);
}
}
public NodeToLabelOp setNodeToLabels(
Map<NodeId, Set<String>> nodeToLabels) {
this.nodeToLabels = nodeToLabels;
return this;
}
public Map<NodeId, Set<String>> getNodeToLabels() {
return nodeToLabels;
}
@Override
public int getOpCode() {
return OPCODE;
}
}

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.nodelabels.store.op;
import com.google.common.collect.Sets;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
import org.apache.hadoop.yarn.server.api.protocolrecords
.RemoveFromClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
.RemoveFromClusterNodeLabelsRequestPBImpl;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
/**
* Remove label from cluster log store operation.
*/
public class RemoveClusterLabelOp
extends FSNodeStoreLogOp<CommonNodeLabelsManager> {
private Collection<String> labels;
public static final int OPCODE = 2;
@Override
public void write(OutputStream os, CommonNodeLabelsManager mgr)
throws IOException {
((RemoveFromClusterNodeLabelsRequestPBImpl)
RemoveFromClusterNodeLabelsRequest
.newInstance(Sets.newHashSet(labels.iterator()))).getProto()
.writeDelimitedTo(os);
}
@Override
public void recover(InputStream is, CommonNodeLabelsManager mgr)
throws IOException {
labels =
YarnServerResourceManagerServiceProtos
.RemoveFromClusterNodeLabelsRequestProto
.parseDelimitedFrom(is).getNodeLabelsList();
mgr.removeFromClusterNodeLabels(labels);
}
public RemoveClusterLabelOp setLabels(Collection<String> labels) {
this.labels = labels;
return this;
}
public Collection<String> getLabels() {
return labels;
}
@Override
public int getOpCode() {
return OPCODE;
}
}

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@InterfaceAudience.Private
package org.apache.hadoop.yarn.nodelabels.store;
import org.apache.hadoop.classification.InterfaceAudience;

View File

@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -21,12 +21,15 @@
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsRequestProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsRequestProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos
.RemoveFromClusterNodeLabelsRequestProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos
.RemoveFromClusterNodeLabelsRequestProto;
import org.apache.hadoop.yarn.server.api.protocolrecords
.RemoveFromClusterNodeLabelsRequest;
public class RemoveFromClusterNodeLabelsRequestPBImpl extends
RemoveFromClusterNodeLabelsRequest {
public class RemoveFromClusterNodeLabelsRequestPBImpl
extends RemoveFromClusterNodeLabelsRequest {
Set<String> labels;
RemoveFromClusterNodeLabelsRequestProto proto =
RemoveFromClusterNodeLabelsRequestProto.getDefaultInstance();

View File

@ -43,6 +43,12 @@ public void recover()
throws IOException {
}
@Override
public void init(Configuration conf, CommonNodeLabelsManager mgr)
throws Exception {
}
@Override
public void removeClusterNodeLabels(Collection<String> labels)
throws IOException {
@ -65,8 +71,6 @@ public void close() throws IOException {
// do nothing
}
};
this.store.setNodeLabelsManager(this);
}
@Override

View File

@ -96,7 +96,7 @@ public void after() throws IOException {
if (mgr.store instanceof FileSystemNodeLabelsStore) {
FileSystemNodeLabelsStore fsStore =
((FileSystemNodeLabelsStore) mgr.store);
fsStore.fs.delete(fsStore.fsWorkingPath, true);
fsStore.getFs().delete(fsStore.getFsWorkingPath(), true);
}
mgr.stop();
}
@ -342,12 +342,12 @@ public void testSerilizationAfterRecovery() throws Exception {
public void testRootMkdirOnInitStore() throws Exception {
final FileSystem mockFs = Mockito.mock(FileSystem.class);
FileSystemNodeLabelsStore mockStore = new FileSystemNodeLabelsStore() {
void setFileSystem(Configuration conf) throws IOException {
fs = mockFs;
public void initFileSystem(Configuration config) throws IOException {
setFs(mockFs);
}
};
mockStore.setNodeLabelsManager(mgr);
mockStore.fs = mockFs;
mockStore.setFs(mockFs);
verifyMkdirsCount(mockStore, true, 1);
verifyMkdirsCount(mockStore, false, 2);
verifyMkdirsCount(mockStore, true, 3);
@ -357,10 +357,10 @@ void setFileSystem(Configuration conf) throws IOException {
private void verifyMkdirsCount(FileSystemNodeLabelsStore store,
boolean existsRetVal, int expectedNumOfCalls)
throws Exception {
Mockito.when(store.fs.exists(Mockito.any(
Mockito.when(store.getFs().exists(Mockito.any(
Path.class))).thenReturn(existsRetVal);
store.init(conf);
Mockito.verify(store.fs,Mockito.times(
store.init(conf, mgr);
Mockito.verify(store.getFs(),Mockito.times(
expectedNumOfCalls)).mkdirs(Mockito.any(Path
.class));
}

View File

@ -28,6 +28,7 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.nodelabels.NodeLabelsStore;
public class NullRMNodeLabelsManager extends RMNodeLabelsManager {
@ -45,6 +46,12 @@ public void recover()
// do nothing
}
@Override
public void init(Configuration conf, CommonNodeLabelsManager mgr)
throws Exception {
}
@Override
public void removeClusterNodeLabels(Collection<String> labels)
throws IOException {