diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java index b5f4757cf53..19254c15b1b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java @@ -234,7 +234,10 @@ public class CommonNodeLabelsManager extends AbstractService { return initNodeLabelStoreInProgress; } - boolean isCentralizedConfiguration() { + /** + * @return true if node label configuration type is not distributed. + */ + public boolean isCentralizedConfiguration() { return isCentralizedNodeLabelConfiguration; } @@ -245,8 +248,7 @@ public class CommonNodeLabelsManager extends AbstractService { conf.getClass(YarnConfiguration.FS_NODE_LABELS_STORE_IMPL_CLASS, FileSystemNodeLabelsStore.class, NodeLabelsStore.class), conf); - this.store.setNodeLabelsManager(this); - this.store.init(conf); + this.store.init(conf, this); this.store.recover(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/FileSystemNodeLabelsStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/FileSystemNodeLabelsStore.java index 0ec4ea42f73..e11e6f82377 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/FileSystemNodeLabelsStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/FileSystemNodeLabelsStore.java @@ -18,275 +18,89 @@ package org.apache.hadoop.yarn.nodelabels; -import java.io.EOFException; -import java.io.FileNotFoundException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.nodelabels.store.AbstractFSNodeStore; +import org.apache.hadoop.yarn.nodelabels.store.FSStoreOpHandler; + +import org.apache.hadoop.yarn.nodelabels.store.op.AddClusterLabelOp; +import org.apache.hadoop.yarn.nodelabels.store.FSStoreOpHandler.StoreType; +import org.apache.hadoop.yarn.nodelabels.store.op.NodeToLabelOp; +import org.apache.hadoop.yarn.nodelabels.store.op.RemoveClusterLabelOp; + import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeLabel; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsRequestProto; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProto; -import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; -import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest; -import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest; -import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl; -import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsRequestPBImpl; -import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeRequestPBImpl; - -import com.google.common.collect.Sets; - -public class FileSystemNodeLabelsStore extends NodeLabelsStore { - protected static final Log LOG = LogFactory.getLog(FileSystemNodeLabelsStore.class); +public class FileSystemNodeLabelsStore + extends AbstractFSNodeStore + implements NodeLabelsStore { + protected static final Log LOG = + LogFactory.getLog(FileSystemNodeLabelsStore.class); protected static final String DEFAULT_DIR_NAME = "node-labels"; protected static final String MIRROR_FILENAME = "nodelabel.mirror"; protected static final String EDITLOG_FILENAME = "nodelabel.editlog"; - - protected enum SerializedLogType { - ADD_LABELS, NODE_TO_LABELS, REMOVE_LABELS + + FileSystemNodeLabelsStore() { + super(StoreType.NODE_LABEL_STORE); } - Path fsWorkingPath; - FileSystem fs; - private FSDataOutputStream editlogOs; - private Path editLogPath; - private String getDefaultFSNodeLabelsRootDir() throws IOException { // default is in local: /tmp/hadoop-yarn-${user}/node-labels/ - return "file:///tmp/hadoop-yarn-" - + UserGroupInformation.getCurrentUser().getShortUserName() + "/" - + DEFAULT_DIR_NAME; + return "file:///tmp/hadoop-yarn-" + UserGroupInformation.getCurrentUser() + .getShortUserName() + "/" + DEFAULT_DIR_NAME; } @Override - public void init(Configuration conf) throws Exception { - fsWorkingPath = - new Path(conf.get(YarnConfiguration.FS_NODE_LABELS_STORE_ROOT_DIR, - getDefaultFSNodeLabelsRootDir())); - - setFileSystem(conf); - - // mkdir of root dir path - fs.mkdirs(fsWorkingPath); + public void init(Configuration conf, CommonNodeLabelsManager mgr) + throws Exception { + StoreSchema schema = new StoreSchema(EDITLOG_FILENAME, MIRROR_FILENAME); + initStore(conf, new Path( + conf.get(YarnConfiguration.FS_NODE_LABELS_STORE_ROOT_DIR, + getDefaultFSNodeLabelsRootDir())), schema, mgr); } @Override public void close() throws IOException { - IOUtils.cleanup(LOG, fs, editlogOs); - } - - void setFileSystem(Configuration conf) throws IOException { - Configuration confCopy = new Configuration(conf); - fs = fsWorkingPath.getFileSystem(confCopy); - - // if it's local file system, use RawLocalFileSystem instead of - // LocalFileSystem, the latter one doesn't support append. - if (fs.getScheme().equals("file")) { - fs = ((LocalFileSystem)fs).getRaw(); - } - } - - private void ensureAppendEditlogFile() throws IOException { - editlogOs = fs.append(editLogPath); - } - - private void ensureCloseEditlogFile() throws IOException { - editlogOs.close(); + super.closeFSStore(); } @Override - public void updateNodeToLabelsMappings( - Map> nodeToLabels) throws IOException { - try { - ensureAppendEditlogFile(); - editlogOs.writeInt(SerializedLogType.NODE_TO_LABELS.ordinal()); - ((ReplaceLabelsOnNodeRequestPBImpl) ReplaceLabelsOnNodeRequest - .newInstance(nodeToLabels)).getProto().writeDelimitedTo(editlogOs); - } finally { - ensureCloseEditlogFile(); - } + public void updateNodeToLabelsMappings(Map> nodeToLabels) + throws IOException { + NodeToLabelOp op = new NodeToLabelOp(); + writeToLog(op.setNodeToLabels(nodeToLabels)); } @Override public void storeNewClusterNodeLabels(List labels) throws IOException { - try { - ensureAppendEditlogFile(); - editlogOs.writeInt(SerializedLogType.ADD_LABELS.ordinal()); - ((AddToClusterNodeLabelsRequestPBImpl) AddToClusterNodeLabelsRequest - .newInstance(labels)).getProto().writeDelimitedTo(editlogOs); - } finally { - ensureCloseEditlogFile(); - } + AddClusterLabelOp op = new AddClusterLabelOp(); + writeToLog(op.setLabels(labels)); } @Override public void removeClusterNodeLabels(Collection labels) throws IOException { - try { - ensureAppendEditlogFile(); - editlogOs.writeInt(SerializedLogType.REMOVE_LABELS.ordinal()); - ((RemoveFromClusterNodeLabelsRequestPBImpl) RemoveFromClusterNodeLabelsRequest.newInstance(Sets - .newHashSet(labels.iterator()))).getProto().writeDelimitedTo(editlogOs); - } finally { - ensureCloseEditlogFile(); - } - } - - protected void loadFromMirror(Path newMirrorPath, Path oldMirrorPath) - throws IOException { - // If mirror.new exists, read from mirror.new, - FSDataInputStream is = null; - try { - is = fs.open(newMirrorPath); - } catch (FileNotFoundException e) { - try { - is = fs.open(oldMirrorPath); - } catch (FileNotFoundException ignored) { - - } - } - if (null != is) { - List labels = new AddToClusterNodeLabelsRequestPBImpl( - AddToClusterNodeLabelsRequestProto.parseDelimitedFrom(is)) - .getNodeLabels(); - mgr.addToCluserNodeLabels(labels); - - if (mgr.isCentralizedConfiguration()) { - // Only load node to labels mapping while using centralized configuration - Map> nodeToLabels = - new ReplaceLabelsOnNodeRequestPBImpl( - ReplaceLabelsOnNodeRequestProto.parseDelimitedFrom(is)) - .getNodeToLabels(); - mgr.replaceLabelsOnNode(nodeToLabels); - } - is.close(); - } + RemoveClusterLabelOp op = new RemoveClusterLabelOp(); + writeToLog(op.setLabels(labels)); } /* (non-Javadoc) - * @see org.apache.hadoop.yarn.nodelabels.NodeLabelsStore#recover(boolean) - */ - @Override - public void recover() throws YarnException, - IOException { - /* - * Steps of recover - * 1) Read from last mirror (from mirror or mirror.old) - * 2) Read from last edit log, and apply such edit log - * 3) Write new mirror to mirror.writing - * 4) Rename mirror to mirror.old - * 5) Move mirror.writing to mirror - * 6) Remove mirror.old - * 7) Remove edit log and create a new empty edit log + * @see org.apache.hadoop.yarn.nodelabels.NodeLabelsStore#recover(boolean) */ - - // Open mirror from serialized file - Path mirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME); - Path oldMirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME + ".old"); - - loadFromMirror(mirrorPath, oldMirrorPath); - - // Open and process editlog - editLogPath = new Path(fsWorkingPath, EDITLOG_FILENAME); - FSDataInputStream is; - try { - is = fs.open(editLogPath); - } catch (FileNotFoundException e) { - is = null; - } - if (null != is) { - - while (true) { - try { - // read edit log one by one - SerializedLogType type = SerializedLogType.values()[is.readInt()]; - - switch (type) { - case ADD_LABELS: { - List labels = - new AddToClusterNodeLabelsRequestPBImpl( - AddToClusterNodeLabelsRequestProto.parseDelimitedFrom(is)) - .getNodeLabels(); - mgr.addToCluserNodeLabels(labels); - break; - } - case REMOVE_LABELS: { - Collection labels = - RemoveFromClusterNodeLabelsRequestProto.parseDelimitedFrom(is) - .getNodeLabelsList(); - mgr.removeFromClusterNodeLabels(labels); - break; - } - case NODE_TO_LABELS: { - Map> map = - new ReplaceLabelsOnNodeRequestPBImpl( - ReplaceLabelsOnNodeRequestProto.parseDelimitedFrom(is)) - .getNodeToLabels(); - if (mgr.isCentralizedConfiguration()) { - /* - * In case of Distributed NodeLabels setup, - * ignoreNodeToLabelsMappings will be set to true and recover will - * be invoked. As RM will collect the node labels from NM through - * registration/HB - */ - mgr.replaceLabelsOnNode(map); - } - break; - } - } - } catch (EOFException e) { - // EOF hit, break - break; - } - } - is.close(); - } - - // Serialize current mirror to mirror.writing - Path writingMirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME + ".writing"); - FSDataOutputStream os = fs.create(writingMirrorPath, true); - ((AddToClusterNodeLabelsRequestPBImpl) AddToClusterNodeLabelsRequestPBImpl - .newInstance(mgr.getClusterNodeLabels())).getProto().writeDelimitedTo(os); - ((ReplaceLabelsOnNodeRequestPBImpl) ReplaceLabelsOnNodeRequest - .newInstance(mgr.getNodeLabels())).getProto().writeDelimitedTo(os); - os.close(); - - // Move mirror to mirror.old - if (fs.exists(mirrorPath)) { - fs.delete(oldMirrorPath, false); - fs.rename(mirrorPath, oldMirrorPath); - } - - // move mirror.writing to mirror - fs.rename(writingMirrorPath, mirrorPath); - fs.delete(writingMirrorPath, false); - - // remove mirror.old - fs.delete(oldMirrorPath, false); - - // create a new editlog file - editlogOs = fs.create(editLogPath, true); - editlogOs.close(); - - LOG.info("Finished write mirror at:" + mirrorPath.toString()); - LOG.info("Finished create editlog file at:" + editLogPath.toString()); + @Override + public void recover() throws YarnException, IOException { + super.recoverFromStore(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelsStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelsStore.java index aacb920c4a2..e4efd68f92b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelsStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelsStore.java @@ -30,25 +30,27 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.exceptions.YarnException; -public abstract class NodeLabelsStore implements Closeable { - protected CommonNodeLabelsManager mgr; - +/** + * Interface class for Node label store. + */ +public interface NodeLabelsStore extends Closeable { + /** - * Store node {@literal ->} label + * Store node {@literal ->} label. */ - public abstract void updateNodeToLabelsMappings( + void updateNodeToLabelsMappings( Map> nodeToLabels) throws IOException; /** - * Store new labels + * Store new labels. */ - public abstract void storeNewClusterNodeLabels(List label) + void storeNewClusterNodeLabels(List label) throws IOException; /** - * Remove labels + * Remove labels. */ - public abstract void removeClusterNodeLabels(Collection labels) + void removeClusterNodeLabels(Collection labels) throws IOException; /** @@ -56,16 +58,14 @@ public abstract class NodeLabelsStore implements Closeable { * ignoreNodeToLabelsMappings is true then node to labels mappings should not * be recovered. In case of Distributed NodeLabels setup * ignoreNodeToLabelsMappings will be set to true and recover will be invoked - * as RM will collect the node labels from NM through registration/HB + * as RM will collect the node labels from NM through registration/HB. * * @throws IOException * @throws YarnException */ - public abstract void recover() throws IOException, YarnException; - - public void init(Configuration conf) throws Exception {} + void recover() throws IOException, YarnException; + + void init(Configuration conf, CommonNodeLabelsManager mgr) + throws Exception; - public void setNodeLabelsManager(CommonNodeLabelsManager mgr) { - this.mgr = mgr; - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NonAppendableFSNodeLabelStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NonAppendableFSNodeLabelStore.java index 989f0279403..29bfff933d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NonAppendableFSNodeLabelStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NonAppendableFSNodeLabelStore.java @@ -18,13 +18,6 @@ package org.apache.hadoop.yarn.nodelabels; -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.locks.ReentrantReadWriteLock; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataOutputStream; @@ -32,11 +25,19 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; -import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest; -import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl; -import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeRequestPBImpl; +import org.apache.hadoop.yarn.nodelabels.store.FSStoreOpHandler; +import org.apache.hadoop.yarn.nodelabels.store.StoreOp; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Store implementation for Non Appendable File Store + */ public class NonAppendableFSNodeLabelStore extends FileSystemNodeLabelsStore { protected static final Log LOG = LogFactory.getLog(NonAppendableFSNodeLabelStore.class); @@ -52,7 +53,7 @@ public class NonAppendableFSNodeLabelStore extends FileSystemNodeLabelsStore { Path newMirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME + ".new"); Path oldMirrorPath = new Path(fsWorkingPath, MIRROR_FILENAME); loadFromMirror(newMirrorPath, oldMirrorPath); - + // if new mirror exists, remove old mirror and rename new mirror if (fs.exists(newMirrorPath)) { // remove old mirror @@ -91,29 +92,18 @@ public class NonAppendableFSNodeLabelStore extends FileSystemNodeLabelsStore { } private void writeNewMirror() throws IOException { - ReentrantReadWriteLock.ReadLock readLock = mgr.readLock; + ReentrantReadWriteLock.ReadLock readLock = manager.readLock; try { // Acquire readlock to make sure we get cluster node labels and // node-to-labels mapping atomically. readLock.lock(); - List nodeLabels = mgr.getClusterNodeLabels(); - Map> nodeToLabels = mgr.getNodeLabels(); - // Write mirror to mirror.new.tmp file - Path newTmpPath = new Path(fsWorkingPath, MIRROR_FILENAME + ".new.tmp"); - FSDataOutputStream os = fs - .create(newTmpPath, true); - ((AddToClusterNodeLabelsRequestPBImpl) AddToClusterNodeLabelsRequest - .newInstance(nodeLabels)).getProto().writeDelimitedTo(os); - - if (mgr.isCentralizedConfiguration()) { - // Only save node-to-labels mapping while using centralized configuration - ((ReplaceLabelsOnNodeRequestPBImpl) ReplaceLabelsOnNodeRequest - .newInstance(nodeToLabels)).getProto().writeDelimitedTo(os); + Path newTmpPath = new Path(fsWorkingPath, MIRROR_FILENAME + ".new.tmp"); + try (FSDataOutputStream os = fs.create(newTmpPath, true)) { + StoreOp op = FSStoreOpHandler.getMirrorOp(getStoreType()); + op.write(os, manager); } - os.close(); - // Rename mirror.new.tmp to mirror.new (will remove .new if it's existed) Path newPath = new Path(fsWorkingPath, MIRROR_FILENAME + ".new"); fs.delete(newPath, false); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/AbstractFSNodeStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/AbstractFSNodeStore.java new file mode 100644 index 00000000000..a47cacf7848 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/AbstractFSNodeStore.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.nodelabels.store; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.yarn.nodelabels.store.op.FSNodeStoreLogOp; +import org.apache.hadoop.yarn.nodelabels.store.FSStoreOpHandler.StoreType; + +import java.io.EOFException; +import java.io.IOException; + +/** + * Abstract class for File System based store. + * + * @param manager filesystem store.Currently nodelabel will use + * CommonNodeLabelManager. + */ +public abstract class AbstractFSNodeStore { + + protected static final Log LOG = LogFactory.getLog(AbstractFSNodeStore.class); + + private StoreType storeType; + private FSDataOutputStream editlogOs; + + private Path editLogPath; + private StoreSchema schema; + + protected M manager; + protected Path fsWorkingPath; + protected FileSystem fs; + + public AbstractFSNodeStore(StoreType storeType) { + this.storeType = storeType; + } + + protected void initStore(Configuration conf, Path fsStorePath, + StoreSchema schma, M mgr) throws IOException { + this.schema = schma; + this.fsWorkingPath = fsStorePath; + this.manager = mgr; + initFileSystem(conf); + // mkdir of root dir path + fs.mkdirs(fsWorkingPath); + + } + + /** + * Filesystem store schema define the log name and mirror name. + */ + public static class StoreSchema { + private String editLogName; + private String mirrorName; + + public StoreSchema(String editLogName, String mirrorName) { + this.editLogName = editLogName; + this.mirrorName = mirrorName; + } + } + + public void initFileSystem(Configuration conf) throws IOException { + Configuration confCopy = new Configuration(conf); + fs = fsWorkingPath.getFileSystem(confCopy); + // if it's local file system, use RawLocalFileSystem instead of + // LocalFileSystem, the latter one doesn't support append. + if (fs.getScheme().equals("file")) { + fs = ((LocalFileSystem) fs).getRaw(); + } + } + + protected void writeToLog(FSNodeStoreLogOp op) throws IOException { + try { + ensureAppendEditLogFile(); + editlogOs.writeInt(op.getOpCode()); + op.write(editlogOs, manager); + } finally { + ensureCloseEditlogFile(); + } + } + + protected void ensureAppendEditLogFile() throws IOException { + editlogOs = fs.append(editLogPath); + } + + protected void ensureCloseEditlogFile() throws IOException { + editlogOs.close(); + } + + protected void loadFromMirror(Path newMirrorPath, Path oldMirrorPath) + throws IOException { + // If mirror.new exists, read from mirror.new + Path mirrorToRead = fs.exists(newMirrorPath) ? + newMirrorPath : + fs.exists(oldMirrorPath) ? oldMirrorPath : null; + if (mirrorToRead != null) { + try (FSDataInputStream is = fs.open(mirrorToRead)) { + StoreOp op = FSStoreOpHandler.getMirrorOp(storeType); + op.recover(is, manager); + } + } + } + + protected StoreType getStoreType() { + return storeType; + } + + public Path getFsWorkingPath() { + return fsWorkingPath; + } + + protected void recoverFromStore() throws IOException { + /* + * Steps of recover + * 1) Read from last mirror (from mirror or mirror.old) + * 2) Read from last edit log, and apply such edit log + * 3) Write new mirror to mirror.writing + * 4) Rename mirror to mirror.old + * 5) Move mirror.writing to mirror + * 6) Remove mirror.old + * 7) Remove edit log and create a new empty edit log + */ + + // Open mirror from serialized file + Path mirrorPath = new Path(fsWorkingPath, schema.mirrorName); + Path oldMirrorPath = new Path(fsWorkingPath, schema.mirrorName + ".old"); + + loadFromMirror(mirrorPath, oldMirrorPath); + + // Open and process editlog + editLogPath = new Path(fsWorkingPath, schema.editLogName); + + loadManagerFromEditLog(editLogPath); + + // Serialize current mirror to mirror.writing + Path writingMirrorPath = + new Path(fsWorkingPath, schema.mirrorName + ".writing"); + + try(FSDataOutputStream os = fs.create(writingMirrorPath, true)){ + StoreOp op = FSStoreOpHandler.getMirrorOp(storeType); + op.write(os, manager); + } + // Move mirror to mirror.old + if (fs.exists(mirrorPath)) { + fs.delete(oldMirrorPath, false); + fs.rename(mirrorPath, oldMirrorPath); + } + + // move mirror.writing to mirror + fs.rename(writingMirrorPath, mirrorPath); + fs.delete(writingMirrorPath, false); + + // remove mirror.old + fs.delete(oldMirrorPath, false); + + // create a new editlog file + editlogOs = fs.create(editLogPath, true); + editlogOs.close(); + + LOG.info("Finished write mirror at:" + mirrorPath.toString()); + LOG.info("Finished create editlog file at:" + editLogPath.toString()); + } + + protected void loadManagerFromEditLog(Path editLogPath) throws IOException { + if (!fs.exists(editLogPath)) { + return; + } + try (FSDataInputStream is = fs.open(editLogPath)) { + while (true) { + try { + StoreOp storeOp = FSStoreOpHandler.get(is.readInt(),storeType); + storeOp.recover(is, manager); + } catch (EOFException e) { + // EOF hit, break + break; + } + } + } + } + + public FileSystem getFs() { + return fs; + } + + public void setFs(FileSystem fs) { + this.fs = fs; + } + + protected void closeFSStore() { + IOUtils.closeStreams(fs, editlogOs); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/FSStoreOpHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/FSStoreOpHandler.java new file mode 100644 index 00000000000..0f7f53dbb40 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/FSStoreOpHandler.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.nodelabels.store; + +import static org.apache.hadoop.yarn.nodelabels.store.FSStoreOpHandler + .StoreType.NODE_LABEL_STORE; +import org.apache.hadoop.yarn.nodelabels.store.op.AddClusterLabelOp; +import org.apache.hadoop.yarn.nodelabels.store.op.FSNodeStoreLogOp; +import org.apache.hadoop.yarn.nodelabels.store.op.NodeLabelMirrorOp; +import org.apache.hadoop.yarn.nodelabels.store.op.NodeToLabelOp; +import org.apache.hadoop.yarn.nodelabels.store.op.RemoveClusterLabelOp; + +import java.util.HashMap; +import java.util.Map; + +/** + * File system store op handler. + */ +public class FSStoreOpHandler { + + private static Map>> + editLogOp; + private static Map> mirrorOp; + + public enum StoreType { + NODE_LABEL_STORE, + NODE_LABEL_ATTRIBUTE; + } + + static { + editLogOp = new HashMap<>(); + mirrorOp = new HashMap<>(); + + // registerLog edit log operation + registerLog(NODE_LABEL_STORE, AddClusterLabelOp.OPCODE, AddClusterLabelOp.class); + registerLog(NODE_LABEL_STORE, NodeToLabelOp.OPCODE, NodeToLabelOp.class); + registerLog(NODE_LABEL_STORE, RemoveClusterLabelOp.OPCODE, RemoveClusterLabelOp.class); + + // registerLog Mirror op + + registerMirror(NODE_LABEL_STORE, NodeLabelMirrorOp.class); + } + + private static void registerMirror(StoreType type, + Class clazz) { + mirrorOp.put(type, clazz); + } + + private static void registerLog(StoreType type, int opcode, + Class clazz) { + Map> ops = editLogOp.get(type); + Integer code = Integer.valueOf(opcode); + if (ops == null) { + Map> newOps = new HashMap<>(); + newOps.put(code, clazz); + editLogOp.put(type, newOps); + } else { + ops.put(code, clazz); + } + } + + /** + * Get mirror operation of store Type. + * + * @param storeType + * @return instance of FSNodeStoreLogOp. + */ + public static FSNodeStoreLogOp getMirrorOp(StoreType storeType) { + return newInstance(mirrorOp.get(storeType)); + } + + /** + * Will return StoreOp instance basead on opCode and StoreType. + * @param opCode + * @param storeType + * @return instance of FSNodeStoreLogOp. + */ + public static FSNodeStoreLogOp get(int opCode, StoreType storeType) { + return newInstance(editLogOp.get(storeType).get(opCode)); + } + + private static T newInstance(Class clazz) { + FSNodeStoreLogOp instance = null; + if (clazz != null) { + try { + instance = clazz.newInstance(); + } catch (Exception ex) { + throw new RuntimeException("Failed to instantiate " + clazz, ex); + } + } + return (T) instance; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/StoreOp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/StoreOp.java new file mode 100644 index 00000000000..c26e1dc40ee --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/StoreOp.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.nodelabels.store; + +import java.io.IOException; + +/** + * Define the interface for store activity. + * Used by for FileSystem based operation. + * + * @param write to be done to + * @param read to be done from + * @param manager used + */ +public interface StoreOp { + + /** + * Write operation to persistent storage + * + * @param write write to be done to + * @param mgr manager used by store + * @throws IOException + */ + void write(W write, M mgr) throws IOException; + + /** + * Read and populate StoreOp + * + * @param read read to be done from + * @param mgr manager used by store + * @throws IOException + */ + void recover(R read, M mgr) throws IOException; +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/AddClusterLabelOp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/AddClusterLabelOp.java new file mode 100644 index 00000000000..ce736aae085 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/AddClusterLabelOp.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.nodelabels.store.op; + +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos; +import org.apache.hadoop.yarn.server.api.protocolrecords + .AddToClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb + .AddToClusterNodeLabelsRequestPBImpl; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; + +/** + * Add label operation for file system. + */ +public class AddClusterLabelOp + extends FSNodeStoreLogOp { + + private List labels; + + public static final int OPCODE = 0; + + @Override + public void write(OutputStream os, CommonNodeLabelsManager mgr) + throws IOException { + ((AddToClusterNodeLabelsRequestPBImpl) AddToClusterNodeLabelsRequest + .newInstance(labels)).getProto().writeDelimitedTo(os); + } + + @Override + public void recover(InputStream is, CommonNodeLabelsManager mgr) + throws IOException { + labels = new AddToClusterNodeLabelsRequestPBImpl( + YarnServerResourceManagerServiceProtos + .AddToClusterNodeLabelsRequestProto + .parseDelimitedFrom(is)).getNodeLabels(); + mgr.addToCluserNodeLabels(labels); + } + + public AddClusterLabelOp setLabels(List labels) { + this.labels = labels; + return this; + } + + public List getLabels() { + return labels; + } + + @Override + public int getOpCode() { + return OPCODE; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/FSNodeStoreLogOp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/FSNodeStoreLogOp.java new file mode 100644 index 00000000000..cd739c025af --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/FSNodeStoreLogOp.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.nodelabels.store.op; + +import org.apache.hadoop.yarn.nodelabels.store.StoreOp; + +import java.io.InputStream; +import java.io.OutputStream; + +/** + * Defines all FileSystem editlog operation. All node label and attribute + * store write or read operation will be defined in this class. + * + * @param Manager used for each operation. + */ +public abstract class FSNodeStoreLogOp + implements StoreOp { + + public abstract int getOpCode(); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/NodeLabelMirrorOp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/NodeLabelMirrorOp.java new file mode 100644 index 00000000000..3ec837b0b3d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/NodeLabelMirrorOp.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.nodelabels.store.op; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos; +import org.apache.hadoop.yarn.server.api.protocolrecords + .ReplaceLabelsOnNodeRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb + .AddToClusterNodeLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb + .ReplaceLabelsOnNodeRequestPBImpl; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class NodeLabelMirrorOp + extends FSNodeStoreLogOp { + + public NodeLabelMirrorOp() { + super(); + } + + @Override + public void write(OutputStream os, CommonNodeLabelsManager mgr) + throws IOException { + ((AddToClusterNodeLabelsRequestPBImpl) AddToClusterNodeLabelsRequestPBImpl + .newInstance(mgr.getClusterNodeLabels())).getProto() + .writeDelimitedTo(os); + if (mgr.isCentralizedConfiguration()) { + ((ReplaceLabelsOnNodeRequestPBImpl) ReplaceLabelsOnNodeRequest + .newInstance(mgr.getNodeLabels())).getProto().writeDelimitedTo(os); + } + } + + @Override + public void recover(InputStream is, CommonNodeLabelsManager mgr) + throws IOException { + List labels = new AddToClusterNodeLabelsRequestPBImpl( + YarnServerResourceManagerServiceProtos + .AddToClusterNodeLabelsRequestProto + .parseDelimitedFrom(is)).getNodeLabels(); + mgr.addToCluserNodeLabels(labels); + + if (mgr.isCentralizedConfiguration()) { + // Only load node to labels mapping while using centralized + // configuration + Map> nodeToLabels = + new ReplaceLabelsOnNodeRequestPBImpl( + YarnServerResourceManagerServiceProtos + .ReplaceLabelsOnNodeRequestProto + .parseDelimitedFrom(is)).getNodeToLabels(); + mgr.replaceLabelsOnNode(nodeToLabels); + } + } + + @Override + public int getOpCode() { + return -1; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/NodeToLabelOp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/NodeToLabelOp.java new file mode 100644 index 00000000000..0e1e3985114 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/NodeToLabelOp.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.nodelabels.store.op; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos; +import org.apache.hadoop.yarn.server.api.protocolrecords + .ReplaceLabelsOnNodeRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb + .ReplaceLabelsOnNodeRequestPBImpl; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; +import java.util.Set; + +/** + * Node to label mapping store operation for label. + */ +public class NodeToLabelOp + extends FSNodeStoreLogOp { + + private Map> nodeToLabels; + public static final int OPCODE = 1; + + @Override + public void write(OutputStream os, CommonNodeLabelsManager mgr) + throws IOException { + ((ReplaceLabelsOnNodeRequestPBImpl) ReplaceLabelsOnNodeRequest + .newInstance(nodeToLabels)).getProto().writeDelimitedTo(os); + } + + @Override + public void recover(InputStream is, CommonNodeLabelsManager mgr) + throws IOException { + nodeToLabels = new ReplaceLabelsOnNodeRequestPBImpl( + YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProto + .parseDelimitedFrom(is)).getNodeToLabels(); + if (mgr.isCentralizedConfiguration()) { + mgr.replaceLabelsOnNode(nodeToLabels); + } + } + + public NodeToLabelOp setNodeToLabels( + Map> nodeToLabels) { + this.nodeToLabels = nodeToLabels; + return this; + } + + public Map> getNodeToLabels() { + return nodeToLabels; + } + + @Override + public int getOpCode() { + return OPCODE; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/RemoveClusterLabelOp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/RemoveClusterLabelOp.java new file mode 100644 index 00000000000..4f6d4bdac88 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/op/RemoveClusterLabelOp.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.nodelabels.store.op; + +import com.google.common.collect.Sets; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos; +import org.apache.hadoop.yarn.server.api.protocolrecords + .RemoveFromClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb + .RemoveFromClusterNodeLabelsRequestPBImpl; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collection; + +/** + * Remove label from cluster log store operation. + */ +public class RemoveClusterLabelOp + extends FSNodeStoreLogOp { + + private Collection labels; + + public static final int OPCODE = 2; + + @Override + public void write(OutputStream os, CommonNodeLabelsManager mgr) + throws IOException { + ((RemoveFromClusterNodeLabelsRequestPBImpl) + RemoveFromClusterNodeLabelsRequest + .newInstance(Sets.newHashSet(labels.iterator()))).getProto() + .writeDelimitedTo(os); + } + + @Override + public void recover(InputStream is, CommonNodeLabelsManager mgr) + throws IOException { + labels = + YarnServerResourceManagerServiceProtos + .RemoveFromClusterNodeLabelsRequestProto + .parseDelimitedFrom(is).getNodeLabelsList(); + mgr.removeFromClusterNodeLabels(labels); + } + + public RemoveClusterLabelOp setLabels(Collection labels) { + this.labels = labels; + return this; + } + + public Collection getLabels() { + return labels; + } + + @Override + public int getOpCode() { + return OPCODE; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/package-info.java new file mode 100644 index 00000000000..0444807071a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/store/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@InterfaceAudience.Private +package org.apache.hadoop.yarn.nodelabels.store; +import org.apache.hadoop.classification.InterfaceAudience; + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoveFromClusterNodeLabelsRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoveFromClusterNodeLabelsRequestPBImpl.java index a9358407149..f6338045f47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoveFromClusterNodeLabelsRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoveFromClusterNodeLabelsRequestPBImpl.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,12 +21,15 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; import java.util.HashSet; import java.util.Set; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsRequestProtoOrBuilder; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsRequestProto; -import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos + .RemoveFromClusterNodeLabelsRequestProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos + .RemoveFromClusterNodeLabelsRequestProto; +import org.apache.hadoop.yarn.server.api.protocolrecords + .RemoveFromClusterNodeLabelsRequest; -public class RemoveFromClusterNodeLabelsRequestPBImpl extends - RemoveFromClusterNodeLabelsRequest { +public class RemoveFromClusterNodeLabelsRequestPBImpl + extends RemoveFromClusterNodeLabelsRequest { Set labels; RemoveFromClusterNodeLabelsRequestProto proto = RemoveFromClusterNodeLabelsRequestProto.getDefaultInstance(); @@ -102,7 +105,7 @@ public class RemoveFromClusterNodeLabelsRequestPBImpl extends assert false : "hashCode not designed"; return 0; } - + @Override public boolean equals(Object other) { if (other == null) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/DummyCommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/DummyCommonNodeLabelsManager.java index 64c74c2baa8..61373dcdb04 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/DummyCommonNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/DummyCommonNodeLabelsManager.java @@ -43,6 +43,12 @@ public class DummyCommonNodeLabelsManager extends CommonNodeLabelsManager { throws IOException { } + @Override + public void init(Configuration conf, CommonNodeLabelsManager mgr) + throws Exception { + + } + @Override public void removeClusterNodeLabels(Collection labels) throws IOException { @@ -65,8 +71,6 @@ public class DummyCommonNodeLabelsManager extends CommonNodeLabelsManager { // do nothing } }; - - this.store.setNodeLabelsManager(this); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestFileSystemNodeLabelsStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestFileSystemNodeLabelsStore.java index ed2f4aa6c74..93c039a5bc3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestFileSystemNodeLabelsStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestFileSystemNodeLabelsStore.java @@ -96,7 +96,7 @@ public class TestFileSystemNodeLabelsStore extends NodeLabelTestBase { if (mgr.store instanceof FileSystemNodeLabelsStore) { FileSystemNodeLabelsStore fsStore = ((FileSystemNodeLabelsStore) mgr.store); - fsStore.fs.delete(fsStore.fsWorkingPath, true); + fsStore.getFs().delete(fsStore.getFsWorkingPath(), true); } mgr.stop(); } @@ -342,12 +342,12 @@ public class TestFileSystemNodeLabelsStore extends NodeLabelTestBase { public void testRootMkdirOnInitStore() throws Exception { final FileSystem mockFs = Mockito.mock(FileSystem.class); FileSystemNodeLabelsStore mockStore = new FileSystemNodeLabelsStore() { - void setFileSystem(Configuration conf) throws IOException { - fs = mockFs; + public void initFileSystem(Configuration config) throws IOException { + setFs(mockFs); } }; - mockStore.setNodeLabelsManager(mgr); - mockStore.fs = mockFs; + + mockStore.setFs(mockFs); verifyMkdirsCount(mockStore, true, 1); verifyMkdirsCount(mockStore, false, 2); verifyMkdirsCount(mockStore, true, 3); @@ -357,10 +357,10 @@ public class TestFileSystemNodeLabelsStore extends NodeLabelTestBase { private void verifyMkdirsCount(FileSystemNodeLabelsStore store, boolean existsRetVal, int expectedNumOfCalls) throws Exception { - Mockito.when(store.fs.exists(Mockito.any( + Mockito.when(store.getFs().exists(Mockito.any( Path.class))).thenReturn(existsRetVal); - store.init(conf); - Mockito.verify(store.fs,Mockito.times( + store.init(conf, mgr); + Mockito.verify(store.getFs(),Mockito.times( expectedNumOfCalls)).mkdirs(Mockito.any(Path .class)); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NullRMNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NullRMNodeLabelsManager.java index bb0b45f6917..b8f3fae7da6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NullRMNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NullRMNodeLabelsManager.java @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.nodelabels.NodeLabelsStore; public class NullRMNodeLabelsManager extends RMNodeLabelsManager { @@ -45,6 +46,12 @@ public class NullRMNodeLabelsManager extends RMNodeLabelsManager { // do nothing } + @Override + public void init(Configuration conf, CommonNodeLabelsManager mgr) + throws Exception { + + } + @Override public void removeClusterNodeLabels(Collection labels) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodeLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodeLabels.java index 5e6fd4e5b60..21df698bda1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodeLabels.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodeLabels.java @@ -605,7 +605,7 @@ public class TestRMWebServicesNodeLabels extends JerseyTestBase { String expectedmessage = "java.io.IOException: label name should only contains" + " {0-9, a-z, A-Z, -, _} and should not started with" - + " {-,_}, now it is=a&"; + + " {-,_}, now it is= a&"; validateJsonExceptionContent(response, expectedmessage); }