YARN-6413. FileSystem based Yarn Registry implementation. (Ellen Hui via Subru).

(cherry picked from commit ed24da3dd7)
This commit is contained in:
Subru Krishnan 2017-10-31 12:05:43 -07:00
parent 15b839c446
commit ac41107423
3 changed files with 611 additions and 0 deletions

View File

@ -0,0 +1,249 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.registry.client.impl;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.fs.PathNotFoundException;
import org.apache.hadoop.registry.client.api.BindFlags;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.registry.client.exceptions.InvalidPathnameException;
import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
import org.apache.hadoop.registry.client.exceptions.NoRecordException;
import org.apache.hadoop.registry.client.types.RegistryPathStatus;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
* Filesystem-based implementation of RegistryOperations. This class relies
* entirely on the configured FS for security and does no extra checks.
*/
public class FSRegistryOperationsService extends CompositeService
implements RegistryOperations {
private FileSystem fs;
private static final Logger LOG =
LoggerFactory.getLogger(FSRegistryOperationsService.class);
private final RegistryUtils.ServiceRecordMarshal serviceRecordMarshal =
new RegistryUtils.ServiceRecordMarshal();
public FSRegistryOperationsService() {
super(FSRegistryOperationsService.class.getName());
}
@VisibleForTesting
public FileSystem getFs() {
return this.fs;
}
@Override
protected void serviceInit(Configuration conf) {
try {
this.fs = FileSystem.get(conf);
LOG.info("Initialized Yarn-registry with Filesystem "
+ fs.getClass().getCanonicalName());
} catch (IOException e) {
LOG.error("Failed to get FileSystem for registry", e);
throw new YarnRuntimeException(e);
}
}
private Path makePath(String path) {
return new Path(path);
}
private Path formatDataPath(String basePath) {
return Path.mergePaths(new Path(basePath), new Path("/_record"));
}
private String relativize(String basePath, String childPath) {
String relative = new File(basePath).toURI()
.relativize(new File(childPath).toURI()).getPath();
return relative;
}
@Override
public boolean mknode(String path, boolean createParents)
throws PathNotFoundException, InvalidPathnameException, IOException {
Path registryPath = makePath(path);
// getFileStatus throws FileNotFound if the path doesn't exist. If the
// file already exists, return.
try {
fs.getFileStatus(registryPath);
return false;
} catch (FileNotFoundException e) {
}
if (createParents) {
// By default, mkdirs creates any parent dirs it needs
fs.mkdirs(registryPath);
} else {
FileStatus parentStatus = null;
if (registryPath.getParent() != null) {
parentStatus = fs.getFileStatus(registryPath.getParent());
}
if (registryPath.getParent() == null || parentStatus.isDirectory()) {
fs.mkdirs(registryPath);
} else {
throw new PathNotFoundException("no parent for " + path);
}
}
return true;
}
@Override
public void bind(String path, ServiceRecord record, int flags)
throws PathNotFoundException, FileAlreadyExistsException,
InvalidPathnameException, IOException {
// Preserve same overwrite semantics as ZK implementation
Preconditions.checkArgument(record != null, "null record");
RegistryTypeUtils.validateServiceRecord(path, record);
Path dataPath = formatDataPath(path);
Boolean overwrite = ((flags & BindFlags.OVERWRITE) != 0);
if (fs.exists(dataPath) && !overwrite) {
throw new FileAlreadyExistsException();
} else {
// Either the file doesn't exist, or it exists and we're
// overwriting. Create overwrites by default and creates parent dirs if
// needed.
FSDataOutputStream stream = fs.create(dataPath);
byte[] bytes = serviceRecordMarshal.toBytes(record);
stream.write(bytes);
stream.close();
LOG.info("Bound record to path " + dataPath);
}
}
@Override
public ServiceRecord resolve(String path) throws PathNotFoundException,
NoRecordException, InvalidRecordException, IOException {
// Read the entire file into byte array, should be small metadata
Long size = fs.getFileStatus(formatDataPath(path)).getLen();
byte[] bytes = new byte[size.intValue()];
FSDataInputStream instream = fs.open(formatDataPath(path));
int bytesRead = instream.read(bytes);
instream.close();
if (bytesRead < size) {
throw new InvalidRecordException(path,
"Expected " + size + " bytes, but read " + bytesRead);
}
// Unmarshal, check, and return
ServiceRecord record = serviceRecordMarshal.fromBytes(path, bytes);
RegistryTypeUtils.validateServiceRecord(path, record);
return record;
}
@Override
public RegistryPathStatus stat(String path)
throws PathNotFoundException, InvalidPathnameException, IOException {
FileStatus fstat = fs.getFileStatus(formatDataPath(path));
int numChildren = fs.listStatus(makePath(path)).length;
RegistryPathStatus regstat =
new RegistryPathStatus(fstat.getPath().toString(),
fstat.getModificationTime(), fstat.getLen(), numChildren);
return regstat;
}
@Override
public boolean exists(String path) throws IOException {
return fs.exists(makePath(path));
}
@Override
public List<String> list(String path)
throws PathNotFoundException, InvalidPathnameException, IOException {
FileStatus[] statArray = fs.listStatus(makePath(path));
String basePath = fs.getFileStatus(makePath(path)).getPath().toString();
List<String> paths = new ArrayList<String>();
FileStatus stat;
// Only count dirs; the _record files are hidden.
for (int i = 0; i < statArray.length; i++) {
stat = statArray[i];
if (stat.isDirectory()) {
String relativePath = relativize(basePath, stat.getPath().toString());
paths.add(relativePath);
}
}
return paths;
}
@Override
public void delete(String path, boolean recursive)
throws PathNotFoundException, PathIsNotEmptyDirectoryException,
InvalidPathnameException, IOException {
Path dirPath = makePath(path);
if (!fs.exists(dirPath)) {
throw new PathNotFoundException(path);
}
// If recursive == true, or dir is empty, delete.
if (recursive || list(path).isEmpty()) {
fs.delete(makePath(path), true);
return;
}
throw new PathIsNotEmptyDirectoryException(path);
}
@Override
public boolean addWriteAccessor(String id, String pass) throws IOException {
throw new NotImplementedException();
}
@Override
public void clearWriteAccessors() {
throw new NotImplementedException();
}
}

View File

@ -245,5 +245,69 @@ public class ServiceRecord implements Cloneable {
return super.clone();
}
@Override
public int hashCode() {
// Generated by eclipse
final int prime = 31;
int result = 1;
result =
prime * result + ((attributes == null) ? 0 : attributes.hashCode());
result =
prime * result + ((description == null) ? 0 : description.hashCode());
result = prime * result + ((external == null) ? 0 : external.hashCode());
result = prime * result + ((internal == null) ? 0 : internal.hashCode());
result = prime * result + ((type == null) ? 0 : type.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
ServiceRecord other = (ServiceRecord) obj;
if (attributes == null) {
if (other.attributes != null) {
return false;
}
} else if (!attributes.equals(other.attributes)) {
return false;
}
if (description == null) {
if (other.description != null) {
return false;
}
} else if (!description.equals(other.description)) {
return false;
}
if (external == null) {
if (other.external != null) {
return false;
}
} else if (!external.equals(other.external)) {
return false;
}
if (internal == null) {
if (other.internal != null) {
return false;
}
} else if (!internal.equals(other.internal)) {
return false;
}
if (type == null) {
if (other.type != null) {
return false;
}
} else if (!type.equals(other.type)) {
return false;
}
return true;
}
}

View File

@ -0,0 +1,298 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.registry.client.impl;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.fs.PathNotFoundException;
import org.apache.hadoop.registry.client.exceptions.InvalidPathnameException;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
/**
* FSRegistryOperationsService test, using the local filesystem.
*/
public class TestFSRegistryOperationsService {
private static FSRegistryOperationsService registry =
new FSRegistryOperationsService();
private static FileSystem fs;
@BeforeClass
public static void initRegistry() throws IOException {
Assert.assertNotNull(registry);
registry.init(new Configuration());
fs = registry.getFs();
fs.delete(new Path("test"), true);
}
@Before
public void createTestDir() throws IOException {
fs.mkdirs(new Path("test"));
}
@After
public void cleanTestDir() throws IOException {
fs.delete(new Path("test"), true);
}
@Test
public void testMkNodeNonRecursive()
throws InvalidPathnameException, PathNotFoundException, IOException {
boolean result = false;
System.out.println("Make node with parent already made, nonrecursive");
result = registry.mknode("test/registryTestNode", false);
Assert.assertTrue(result);
Assert.assertTrue(fs.exists(new Path("test/registryTestNode")));
// Expected to fail
try {
System.out.println("Try to make node with no parent, nonrecursive");
registry.mknode("test/parent/registryTestNode", false);
Assert.fail("Should not have created node");
} catch (IOException e) {
}
Assert.assertFalse(fs.exists(new Path("test/parent/registryTestNode")));
}
@Test
public void testMkNodeRecursive() throws IOException {
boolean result = false;
System.out.println("Make node with parent already made, recursive");
result = registry.mknode("test/registryTestNode", true);
Assert.assertTrue(result);
Assert.assertTrue(fs.exists(new Path("test/registryTestNode")));
result = false;
System.out.println("Try to make node with no parent, recursive");
result = registry.mknode("test/parent/registryTestNode", true);
Assert.assertTrue(result);
Assert.assertTrue(fs.exists(new Path("test/parent/registryTestNode")));
}
@Test
public void testMkNodeAlreadyExists() throws IOException {
System.out.println("pre-create test path");
fs.mkdirs(new Path("test/registryTestNode"));
System.out.println(
"Try to mknode existing path -- should be noop and return false");
Assert.assertFalse(registry.mknode("test/registryTestNode", true));
Assert.assertFalse(registry.mknode("test/registryTestNode", false));
}
@Test
public void testBindParentPath() throws InvalidPathnameException,
PathNotFoundException, FileAlreadyExistsException, IOException {
ServiceRecord record = createRecord("0");
System.out.println("pre-create test path");
fs.mkdirs(new Path("test/parent1/registryTestNode"));
registry.bind("test/parent1/registryTestNode", record, 1);
Assert.assertTrue(
fs.exists(new Path("test/parent1/registryTestNode/_record")));
// Test without pre-creating path
registry.bind("test/parent2/registryTestNode", record, 1);
Assert.assertTrue(fs.exists(new Path("test/parent2/registryTestNode")));
}
@Test
public void testBindAlreadyExists() throws IOException {
ServiceRecord record1 = createRecord("1");
ServiceRecord record2 = createRecord("2");
System.out.println("Bind record1");
registry.bind("test/registryTestNode", record1, 1);
Assert.assertTrue(fs.exists(new Path("test/registryTestNode/_record")));
System.out.println("Bind record2, overwrite = 1");
registry.bind("test/registryTestNode", record2, 1);
Assert.assertTrue(fs.exists(new Path("test/registryTestNode/_record")));
// The record should have been overwritten
ServiceRecord readRecord = registry.resolve("test/registryTestNode");
Assert.assertTrue(readRecord.equals(record2));
System.out.println("Bind record3, overwrite = 0");
try {
registry.bind("test/registryTestNode", record1, 0);
Assert.fail("Should not overwrite record");
} catch (IOException e) {
}
// The record should not be overwritten
readRecord = registry.resolve("test/registryTestNode");
Assert.assertTrue(readRecord.equals(record2));
}
@Test
public void testResolve() throws IOException {
ServiceRecord record = createRecord("0");
registry.bind("test/registryTestNode", record, 1);
Assert.assertTrue(fs.exists(new Path("test/registryTestNode/_record")));
System.out.println("Read record that exists");
ServiceRecord readRecord = registry.resolve("test/registryTestNode");
Assert.assertNotNull(readRecord);
Assert.assertTrue(record.equals(readRecord));
System.out.println("Try to read record that does not exist");
try {
readRecord = registry.resolve("test/nonExistentNode");
Assert.fail("Should throw an error, record does not exist");
} catch (IOException e) {
}
}
@Test
public void testExists() throws IOException {
System.out.println("pre-create test path");
fs.mkdirs(new Path("test/registryTestNode"));
System.out.println("Check for existing node");
boolean exists = registry.exists("test/registryTestNode");
Assert.assertTrue(exists);
System.out.println("Check for non-existing node");
exists = registry.exists("test/nonExistentNode");
Assert.assertFalse(exists);
}
@Test
public void testDeleteDirsOnly() throws IOException {
System.out.println("pre-create test path with children");
fs.mkdirs(new Path("test/registryTestNode"));
fs.mkdirs(new Path("test/registryTestNode/child1"));
fs.mkdirs(new Path("test/registryTestNode/child2"));
try {
registry.delete("test/registryTestNode", false);
Assert.fail("Deleted dir wich children, nonrecursive flag set");
} catch (IOException e) {
}
// Make sure nothing was deleted
Assert.assertTrue(fs.exists(new Path("test/registryTestNode")));
Assert.assertTrue(fs.exists(new Path("test/registryTestNode/child1")));
Assert.assertTrue(fs.exists(new Path("test/registryTestNode/child2")));
System.out.println("Delete leaf path 'test/registryTestNode/child2'");
registry.delete("test/registryTestNode/child2", false);
Assert.assertTrue(fs.exists(new Path("test/registryTestNode")));
Assert.assertTrue(fs.exists(new Path("test/registryTestNode/child1")));
Assert.assertFalse(fs.exists(new Path("test/registryTestNode/child2")));
System.out
.println("Recursively delete non-leaf path 'test/registryTestNode'");
registry.delete("test/registryTestNode", true);
Assert.assertFalse(fs.exists(new Path("test/registryTestNode")));
}
@Test
public void testDeleteWithRecords() throws IOException {
System.out.println("pre-create test path with children and mocked records");
fs.mkdirs(new Path("test/registryTestNode"));
fs.mkdirs(new Path("test/registryTestNode/child1"));
fs.mkdirs(new Path("test/registryTestNode/child2"));
// Create and close stream immediately so they aren't blocking
fs.create(new Path("test/registryTestNode/_record")).close();
fs.create(new Path("test/registryTestNode/child1/_record")).close();
System.out.println("Delete dir with child nodes and record file");
try {
registry.delete("test/registryTestNode", false);
Assert.fail("Nonrecursive delete of non-empty dir");
} catch (PathIsNotEmptyDirectoryException e) {
}
Assert.assertTrue(fs.exists(new Path("test/registryTestNode/_record")));
Assert.assertTrue(
fs.exists(new Path("test/registryTestNode/child1/_record")));
Assert.assertTrue(fs.exists(new Path("test/registryTestNode/child2")));
System.out.println("Delete dir with record file and no child dirs");
registry.delete("test/registryTestNode/child1", false);
Assert.assertFalse(fs.exists(new Path("test/registryTestNode/child1")));
Assert.assertTrue(fs.exists(new Path("test/registryTestNode/child2")));
System.out.println("Delete dir with child dir and no record file");
try {
registry.delete("test/registryTestNode", false);
Assert.fail("Nonrecursive delete of non-empty dir");
} catch (PathIsNotEmptyDirectoryException e) {
}
Assert.assertTrue(fs.exists(new Path("test/registryTestNode/child2")));
}
@Test
public void testList() throws IOException {
System.out.println("pre-create test path with children and mocked records");
fs.mkdirs(new Path("test/registryTestNode"));
fs.mkdirs(new Path("test/registryTestNode/child1"));
fs.mkdirs(new Path("test/registryTestNode/child2"));
// Create and close stream immediately so they aren't blocking
fs.create(new Path("test/registryTestNode/_record")).close();
fs.create(new Path("test/registryTestNode/child1/_record")).close();
List<String> ls = null;
ls = registry.list("test/registryTestNode");
Assert.assertNotNull(ls);
Assert.assertEquals(2, ls.size());
System.out.println(ls);
Assert.assertTrue(ls.contains("child1"));
Assert.assertTrue(ls.contains("child2"));
ls = null;
ls = registry.list("test/registryTestNode/child1");
Assert.assertNotNull(ls);
Assert.assertTrue(ls.isEmpty());
ls = null;
ls = registry.list("test/registryTestNode/child2");
Assert.assertNotNull(ls);
Assert.assertTrue(ls.isEmpty());
}
private ServiceRecord createRecord(String id) {
System.out.println("Creating mock service record");
ServiceRecord record = new ServiceRecord();
record.set(YarnRegistryAttributes.YARN_ID, id);
record.description = "testRecord";
return record;
}
}