HDFS-10880. Federation Mount Table State Store internal API. Contributed by Jason Kace and Inigo Goiri.

(cherry picked from commit 58b97df661)
(cherry picked from commit 6f0de27318)
This commit is contained in:
Inigo Goiri 2017-08-04 18:00:12 -07:00 committed by vrushali
parent 6f787d262c
commit c77a04b10f
36 changed files with 3437 additions and 76 deletions

View File

@ -26,6 +26,8 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
@ -1070,8 +1072,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
// HDFS Router State Store connection
public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS =
FEDERATION_ROUTER_PREFIX + "file.resolver.client.class";
public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS_DEFAULT =
"org.apache.hadoop.hdfs.server.federation.MockResolver";
public static final Class<? extends FileSubclusterResolver>
FEDERATION_FILE_RESOLVER_CLIENT_CLASS_DEFAULT =
MountTableResolver.class;
public static final String FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS =
FEDERATION_ROUTER_PREFIX + "namenode.resolver.client.class";
public static final Class<? extends ActiveNamenodeResolver>

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.resolver;
import java.io.IOException;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
/**
* Manage a mount table.
*/
public interface MountTableManager {
/**
* Add an entry to the mount table.
*
* @param request Fully populated request object.
* @return True if the mount table entry was successfully committed to the
* data store.
* @throws IOException Throws exception if the data store is not initialized.
*/
AddMountTableEntryResponse addMountTableEntry(
AddMountTableEntryRequest request) throws IOException;
/**
* Updates an existing entry in the mount table.
*
* @param request Fully populated request object.
* @return True if the mount table entry was successfully committed to the
* data store.
* @throws IOException Throws exception if the data store is not initialized.
*/
UpdateMountTableEntryResponse updateMountTableEntry(
UpdateMountTableEntryRequest request) throws IOException;
/**
* Remove an entry from the mount table.
*
* @param request Fully populated request object.
* @return True the mount table entry was removed from the data store.
* @throws IOException Throws exception if the data store is not initialized.
*/
RemoveMountTableEntryResponse removeMountTableEntry(
RemoveMountTableEntryRequest request) throws IOException;
/**
* List all mount table entries present at or below the path. Fetches from the
* state store.
*
* @param request Fully populated request object.
*
* @return List of all mount table entries under the path. Zero-length list if
* none are found.
* @throws IOException Throws exception if the data store cannot be queried.
*/
GetMountTableEntriesResponse getMountTableEntries(
GetMountTableEntriesRequest request) throws IOException;
}

View File

@ -0,0 +1,544 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.resolver;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreCache;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* Mount table to map between global paths and remote locations. This allows the
* {@link org.apache.hadoop.hdfs.server.federation.router.Router Router} to map
* the global HDFS view to the remote namespaces. This is similar to
* {@link org.apache.hadoop.fs.viewfs.ViewFs ViewFs}.
* This is implemented as a tree.
*/
public class MountTableResolver
implements FileSubclusterResolver, StateStoreCache {
private static final Logger LOG =
LoggerFactory.getLogger(MountTableResolver.class);
/** Reference to Router. */
private final Router router;
/** Reference to the State Store. */
private final StateStoreService stateStore;
/** Interface to the mount table store. */
private MountTableStore mountTableStore;
/** If the tree has been initialized. */
private boolean init = false;
/** Path -> Remote HDFS location. */
private final TreeMap<String, MountTable> tree = new TreeMap<>();
/** Path -> Remote location. */
private final ConcurrentNavigableMap<String, PathLocation> locationCache =
new ConcurrentSkipListMap<>();
/** Default nameservice when no mount matches the math. */
private String defaultNameService = "";
/** Synchronization for both the tree and the cache. */
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock readLock = readWriteLock.readLock();
private final Lock writeLock = readWriteLock.writeLock();
@VisibleForTesting
public MountTableResolver(Configuration conf) {
this(conf, (StateStoreService)null);
}
public MountTableResolver(Configuration conf, Router routerService) {
this.router = routerService;
if (this.router != null) {
this.stateStore = this.router.getStateStore();
} else {
this.stateStore = null;
}
registerCacheExternal();
initDefaultNameService(conf);
}
public MountTableResolver(Configuration conf, StateStoreService store) {
this.router = null;
this.stateStore = store;
registerCacheExternal();
initDefaultNameService(conf);
}
/**
* Request cache updates from the State Store for this resolver.
*/
private void registerCacheExternal() {
if (this.stateStore != null) {
this.stateStore.registerCacheExternal(this);
}
}
/**
* Nameservice for APIs that cannot be resolved to a specific one.
*
* @param conf Configuration for this resolver.
*/
private void initDefaultNameService(Configuration conf) {
try {
this.defaultNameService = conf.get(
DFS_ROUTER_DEFAULT_NAMESERVICE,
DFSUtil.getNamenodeNameServiceId(conf));
} catch (HadoopIllegalArgumentException e) {
LOG.error("Cannot find default name service, setting it to the first");
Collection<String> nsIds = DFSUtilClient.getNameServiceIds(conf);
this.defaultNameService = nsIds.iterator().next();
LOG.info("Default name service: {}", this.defaultNameService);
}
}
/**
* Get a reference for the Router for this resolver.
*
* @return Router for this resolver.
*/
protected Router getRouter() {
return this.router;
}
/**
* Get the mount table store for this resolver.
*
* @return Mount table store.
* @throws IOException If it cannot connect to the State Store.
*/
protected MountTableStore getMountTableStore() throws IOException {
if (this.mountTableStore == null) {
this.mountTableStore = this.stateStore.getRegisteredRecordStore(
MountTableStore.class);
if (this.mountTableStore == null) {
throw new IOException("State Store does not have an interface for " +
MountTableStore.class);
}
}
return this.mountTableStore;
}
/**
* Add a mount entry to the table.
*
* @param entry The mount table record to add from the state store.
*/
public void addEntry(final MountTable entry) {
writeLock.lock();
try {
String srcPath = entry.getSourcePath();
this.tree.put(srcPath, entry);
invalidateLocationCache(srcPath);
} finally {
writeLock.unlock();
}
this.init = true;
}
/**
* Remove a mount table entry.
*
* @param srcPath Source path for the entry to remove.
*/
public void removeEntry(final String srcPath) {
writeLock.lock();
try {
this.tree.remove(srcPath);
invalidateLocationCache(srcPath);
} finally {
writeLock.unlock();
}
}
/**
* Invalidates all cache entries below this path. It requires the write lock.
*
* @param src Source path.
*/
private void invalidateLocationCache(final String path) {
if (locationCache.isEmpty()) {
return;
}
// Determine next lexicographic entry after source path
String nextSrc = path + Character.MAX_VALUE;
ConcurrentNavigableMap<String, PathLocation> subMap =
locationCache.subMap(path, nextSrc);
for (final String key : subMap.keySet()) {
locationCache.remove(key);
}
}
/**
* Updates the mount path tree with a new set of mount table entries. It also
* updates the needed caches.
*
* @param entries Full set of mount table entries to update.
*/
@VisibleForTesting
public void refreshEntries(final Collection<MountTable> entries) {
// The tree read/write must be atomic
writeLock.lock();
try {
// New entries
Map<String, MountTable> newEntries = new ConcurrentHashMap<>();
for (MountTable entry : entries) {
String srcPath = entry.getSourcePath();
newEntries.put(srcPath, entry);
}
// Old entries (reversed to sort from the leaves to the root)
Set<String> oldEntries = new TreeSet<>(Collections.reverseOrder());
for (MountTable entry : getTreeValues("/")) {
String srcPath = entry.getSourcePath();
oldEntries.add(srcPath);
}
// Entries that need to be removed
for (String srcPath : oldEntries) {
if (!newEntries.containsKey(srcPath)) {
this.tree.remove(srcPath);
invalidateLocationCache(srcPath);
LOG.info("Removed stale mount point {} from resolver", srcPath);
}
}
// Entries that need to be added
for (MountTable entry : entries) {
String srcPath = entry.getSourcePath();
if (!oldEntries.contains(srcPath)) {
// Add node, it does not exist
this.tree.put(srcPath, entry);
LOG.info("Added new mount point {} to resolver", srcPath);
} else {
// Node exists, check for updates
MountTable existingEntry = this.tree.get(srcPath);
if (existingEntry != null && !existingEntry.equals(entry)) {
// Entry has changed
invalidateLocationCache(srcPath);
LOG.info("Updated mount point {} in resolver");
}
}
}
} finally {
writeLock.unlock();
}
this.init = true;
}
/**
* Replaces the current in-memory cached of the mount table with a new
* version fetched from the data store.
*/
@Override
public boolean loadCache(boolean force) {
try {
// Our cache depends on the store, update it first
MountTableStore mountTable = this.getMountTableStore();
mountTable.loadCache(force);
GetMountTableEntriesRequest request =
GetMountTableEntriesRequest.newInstance("/");
GetMountTableEntriesResponse response =
mountTable.getMountTableEntries(request);
List<MountTable> records = response.getEntries();
refreshEntries(records);
} catch (IOException e) {
LOG.error("Cannot fetch mount table entries from State Store", e);
return false;
}
return true;
}
/**
* Clears all data.
*/
public void clear() {
LOG.info("Clearing all mount location caches");
writeLock.lock();
try {
this.locationCache.clear();
this.tree.clear();
} finally {
writeLock.unlock();
}
}
@Override
public PathLocation getDestinationForPath(final String path)
throws IOException {
verifyMountTable();
readLock.lock();
try {
return this.locationCache.computeIfAbsent(
path, this::lookupLocation);
} finally {
readLock.unlock();
}
}
/**
* Build the path location to insert into the cache atomically. It must hold
* the read lock.
* @param path Path to check/insert.
* @return New remote location.
*/
public PathLocation lookupLocation(final String path) {
PathLocation ret = null;
MountTable entry = findDeepest(path);
if (entry != null) {
ret = buildLocation(path, entry);
} else {
// Not found, use default location
RemoteLocation remoteLocation =
new RemoteLocation(defaultNameService, path);
List<RemoteLocation> locations =
Collections.singletonList(remoteLocation);
ret = new PathLocation(null, locations);
}
return ret;
}
/**
* Get the mount table entry for a path.
*
* @param path Path to look for.
* @return Mount table entry the path belongs.
* @throws IOException If the State Store could not be reached.
*/
public MountTable getMountPoint(final String path) throws IOException {
verifyMountTable();
return findDeepest(path);
}
@Override
public List<String> getMountPoints(final String path) throws IOException {
verifyMountTable();
Set<String> children = new TreeSet<>();
readLock.lock();
try {
String from = path;
String to = path + Character.MAX_VALUE;
SortedMap<String, MountTable> subMap = this.tree.subMap(from, to);
boolean exists = false;
for (String subPath : subMap.keySet()) {
String child = subPath;
// Special case for /
if (!path.equals(Path.SEPARATOR)) {
// Get the children
int ini = path.length();
child = subPath.substring(ini);
}
if (child.isEmpty()) {
// This is a mount point but without children
exists = true;
} else if (child.startsWith(Path.SEPARATOR)) {
// This is a mount point with children
exists = true;
child = child.substring(1);
// We only return immediate children
int fin = child.indexOf(Path.SEPARATOR);
if (fin > -1) {
child = child.substring(0, fin);
}
if (!child.isEmpty()) {
children.add(child);
}
}
}
if (!exists) {
return null;
}
return new LinkedList<>(children);
} finally {
readLock.unlock();
}
}
/**
* Get all the mount records at or beneath a given path.
* @param path Path to get the mount points from.
* @return List of mount table records under the path or null if the path is
* not found.
* @throws IOException If it's not connected to the State Store.
*/
public List<MountTable> getMounts(final String path) throws IOException {
verifyMountTable();
return getTreeValues(path, false);
}
/**
* Check if the Mount Table is ready to be used.
* @throws StateStoreUnavailableException If it cannot connect to the store.
*/
private void verifyMountTable() throws StateStoreUnavailableException {
if (!this.init) {
throw new StateStoreUnavailableException("Mount Table not initialized");
}
}
@Override
public String toString() {
readLock.lock();
try {
return this.tree.toString();
} finally {
readLock.unlock();
}
}
/**
* Build a location for this result beneath the discovered mount point.
*
* @param result Tree node search result.
* @return PathLocation containing the namespace, local path.
*/
private static PathLocation buildLocation(
final String path, final MountTable entry) {
String srcPath = entry.getSourcePath();
if (!path.startsWith(srcPath)) {
LOG.error("Cannot build location, {} not a child of {}", path, srcPath);
return null;
}
String remainingPath = path.substring(srcPath.length());
if (remainingPath.startsWith(Path.SEPARATOR)) {
remainingPath = remainingPath.substring(1);
}
List<RemoteLocation> locations = new LinkedList<>();
for (RemoteLocation oneDst : entry.getDestinations()) {
String nsId = oneDst.getNameserviceId();
String dest = oneDst.getDest();
String newPath = dest;
if (!newPath.endsWith(Path.SEPARATOR)) {
newPath += Path.SEPARATOR;
}
newPath += remainingPath;
RemoteLocation remoteLocation = new RemoteLocation(nsId, newPath);
locations.add(remoteLocation);
}
DestinationOrder order = entry.getDestOrder();
return new PathLocation(srcPath, locations, order);
}
@Override
public String getDefaultNamespace() {
return this.defaultNameService;
}
/**
* Find the deepest mount point for a path.
* @param path Path to look for.
* @return Mount table entry.
*/
private MountTable findDeepest(final String path) {
readLock.lock();
try {
Entry<String, MountTable> entry = this.tree.floorEntry(path);
while (entry != null && !path.startsWith(entry.getKey())) {
entry = this.tree.lowerEntry(entry.getKey());
}
if (entry == null) {
return null;
}
return entry.getValue();
} finally {
readLock.unlock();
}
}
/**
* Get the mount table entries under a path.
* @param path Path to search from.
* @return Mount Table entries.
*/
private List<MountTable> getTreeValues(final String path) {
return getTreeValues(path, false);
}
/**
* Get the mount table entries under a path.
* @param path Path to search from.
* @param reverse If the order should be reversed.
* @return Mount Table entries.
*/
private List<MountTable> getTreeValues(final String path, boolean reverse) {
LinkedList<MountTable> ret = new LinkedList<>();
readLock.lock();
try {
String from = path;
String to = path + Character.MAX_VALUE;
SortedMap<String, MountTable> subMap = this.tree.subMap(from, to);
for (MountTable entry : subMap.values()) {
if (!reverse) {
ret.add(entry);
} else {
ret.addFirst(entry);
}
}
} finally {
readLock.unlock();
}
return ret;
}
}

View File

@ -23,21 +23,27 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A map of the properties and target destinations (name space + path) for
* a path in the global/federated namespace.
* a path in the global/federated name space.
* This data is generated from the @see MountTable records.
*/
public class PathLocation {
private static final Logger LOG = LoggerFactory.getLogger(PathLocation.class);
/** Source path in global namespace. */
private final String sourcePath;
/** Remote paths in the target namespaces. */
/** Remote paths in the target name spaces. */
private final List<RemoteLocation> destinations;
/** List of name spaces present. */
private final Set<String> namespaces;
/** Order for the destinations. */
private final DestinationOrder destOrder;
/**
@ -45,14 +51,23 @@ public class PathLocation {
*
* @param source Source path in the global name space.
* @param dest Destinations of the mount table entry.
* @param namespaces Unique identifier representing the combination of
* name spaces present in the destination list.
* @param order Order of the locations.
*/
public PathLocation(
String source, List<RemoteLocation> dest, Set<String> nss) {
String source, List<RemoteLocation> dest, DestinationOrder order) {
this.sourcePath = source;
this.destinations = dest;
this.namespaces = nss;
this.destinations = Collections.unmodifiableList(dest);
this.destOrder = order;
}
/**
* Create a new PathLocation with default HASH order.
*
* @param source Source path in the global name space.
* @param dest Destinations of the mount table entry.
*/
public PathLocation(String source, List<RemoteLocation> dest) {
this(source, dest, DestinationOrder.HASH);
}
/**
@ -60,10 +75,55 @@ public class PathLocation {
*
* @param other Other path location to copy from.
*/
public PathLocation(PathLocation other) {
public PathLocation(final PathLocation other) {
this.sourcePath = other.sourcePath;
this.destinations = new LinkedList<RemoteLocation>(other.destinations);
this.namespaces = new HashSet<String>(other.namespaces);
this.destinations = Collections.unmodifiableList(other.destinations);
this.destOrder = other.destOrder;
}
/**
* Create a path location from another path with the destinations sorted.
*
* @param other Other path location to copy from.
* @param firstNsId Identifier of the namespace to place first.
*/
public PathLocation(PathLocation other, String firstNsId) {
this.sourcePath = other.sourcePath;
this.destOrder = other.destOrder;
this.destinations = orderedNamespaces(other.destinations, firstNsId);
}
/**
* Prioritize a location/destination by its name space/nameserviceId.
* This destination might be used by other threads, so the source is not
* modifiable.
*
* @param original List of destinations to order.
* @param nsId The name space/nameserviceID to prioritize.
* @return Prioritized list of detinations that cannot be modified.
*/
private static List<RemoteLocation> orderedNamespaces(
final List<RemoteLocation> original, final String nsId) {
if (original.size() <= 1) {
return original;
}
LinkedList<RemoteLocation> newDestinations = new LinkedList<>();
boolean found = false;
for (RemoteLocation dest : original) {
if (dest.getNameserviceId().equals(nsId)) {
found = true;
newDestinations.addFirst(dest);
} else {
newDestinations.add(dest);
}
}
if (!found) {
LOG.debug("Cannot find location with namespace {} in {}",
nsId, original);
}
return Collections.unmodifiableList(newDestinations);
}
/**
@ -76,16 +136,37 @@ public class PathLocation {
}
/**
* Get the list of subclusters defined for the destinations.
* Get the subclusters defined for the destinations.
*
* @return Set containing the subclusters.
*/
public Set<String> getNamespaces() {
return Collections.unmodifiableSet(this.namespaces);
Set<String> namespaces = new HashSet<>();
List<RemoteLocation> locations = this.getDestinations();
for (RemoteLocation location : locations) {
String nsId = location.getNameserviceId();
namespaces.add(nsId);
}
return namespaces;
}
@Override
public String toString() {
RemoteLocation loc = getDefaultLocation();
return loc.getNameserviceId() + "->" + loc.getDest();
StringBuilder sb = new StringBuilder();
for (RemoteLocation destination : this.destinations) {
String nsId = destination.getNameserviceId();
String path = destination.getDest();
if (sb.length() > 0) {
sb.append(",");
}
sb.append(nsId + "->" + path);
}
if (this.destinations.size() > 1) {
sb.append(" [");
sb.append(this.destOrder.toString());
sb.append("]");
}
return sb.toString();
}
/**
@ -107,6 +188,15 @@ public class PathLocation {
return Collections.unmodifiableList(this.destinations);
}
/**
* Get the order for the destinations.
*
* @return Order for the destinations.
*/
public DestinationOrder getDestinationOrder() {
return this.destOrder;
}
/**
* Get the default or highest priority location.
*

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.resolver.order;
/**
* Order of the destinations when we have multiple of them. When the resolver
* of files to subclusters (FileSubclusterResolver) has multiple destinations,
* this determines which location should be checked first.
*/
public enum DestinationOrder {
HASH, // Follow consistent hashing
LOCAL, // Local first
RANDOM // Random order
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* A federated location can be resolved to multiple subclusters. This package
* takes care of the order in which this multiple destinations should be used.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
package org.apache.hadoop.hdfs.server.federation.resolver.order;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -135,66 +135,20 @@ public final class FederationUtil {
}
}
/**
* Create an instance of an interface with a constructor using a state store
* constructor.
*
* @param conf Configuration
* @param context Context object to pass to the instance.
* @param contextType Type of the context passed to the constructor.
* @param configurationKeyName Configuration key to retrieve the class to load
* @param defaultClassName Default class to load if the configuration key is
* not set
* @param clazz Class/interface that must be implemented by the instance.
* @return New instance of the specified class that implements the desired
* interface and a single parameter constructor containing a
* StateStore reference.
*/
private static <T, R> T newInstance(final Configuration conf,
final R context, final Class<R> contextClass,
final String configKeyName, final String defaultClassName,
final Class<T> clazz) {
String className = conf.get(configKeyName, defaultClassName);
try {
Class<?> instance = conf.getClassByName(className);
if (clazz.isAssignableFrom(instance)) {
if (contextClass == null) {
// Default constructor if no context
@SuppressWarnings("unchecked")
Constructor<T> constructor =
(Constructor<T>) instance.getConstructor();
return constructor.newInstance();
} else {
// Constructor with context
@SuppressWarnings("unchecked")
Constructor<T> constructor = (Constructor<T>) instance.getConstructor(
Configuration.class, contextClass);
return constructor.newInstance(conf, context);
}
} else {
throw new RuntimeException("Class " + className + " not instance of "
+ clazz.getCanonicalName());
}
} catch (ReflectiveOperationException e) {
LOG.error("Could not instantiate: " + className, e);
return null;
}
}
/**
* Creates an instance of a FileSubclusterResolver from the configuration.
*
* @param conf Configuration that defines the file resolver class.
* @param obj Context object passed to class constructor.
* @return FileSubclusterResolver
* @param router Router service.
* @return New file subcluster resolver.
*/
public static FileSubclusterResolver newFileSubclusterResolver(
Configuration conf, StateStoreService stateStore) {
return newInstance(conf, stateStore, StateStoreService.class,
Configuration conf, Router router) {
Class<? extends FileSubclusterResolver> clazz = conf.getClass(
DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS_DEFAULT,
FileSubclusterResolver.class);
return newInstance(conf, router, Router.class, clazz);
}
/**

View File

@ -124,8 +124,7 @@ public class Router extends CompositeService {
}
// Lookup interface to map between the global and subcluster name spaces
this.subclusterResolver = newFileSubclusterResolver(
this.conf, this.stateStore);
this.subclusterResolver = newFileSubclusterResolver(this.conf, this);
if (this.subclusterResolver == null) {
throw new IOException("Cannot find subcluster resolver");
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
/**
* Management API for the HDFS mount table information stored in
* {@link org.apache.hadoop.hdfs.server.federation.store.records.MountTable
* MountTable} records. The mount table contains entries that map a particular
* global namespace path one or more HDFS nameservices (NN) + target path. It is
* possible to map mount locations for root folders, directories or individual
* files.
* <p>
* Once fetched from the
* {@link org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver
* StateStoreDriver}, MountTable records are cached in a tree for faster access.
* Each path in the global namespace is mapped to a nameserivce ID and local
* path upon request. The cache is periodically updated by the @{link
* StateStoreCacheUpdateService}.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class MountTableStore extends CachedRecordStore<MountTable>
implements MountTableManager {
public MountTableStore(StateStoreDriver driver) {
super(MountTable.class, driver);
}
}

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.impl.MembershipStoreImpl;
import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.service.CompositeService;
@ -136,6 +137,7 @@ public class StateStoreService extends CompositeService {
// Add supported record stores
addRecordStore(MembershipStoreImpl.class);
addRecordStore(MountTableStoreImpl.class);
// Check the connection to the State Store periodically
this.monitorService = new StateStoreConnectionMonitorService(this);

View File

@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.impl;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.federation.store.records.Query;
import org.apache.hadoop.util.Time;
/**
* Implementation of the {@link MountTableStore} state store API.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class MountTableStoreImpl extends MountTableStore {
public MountTableStoreImpl(StateStoreDriver driver) {
super(driver);
}
@Override
public AddMountTableEntryResponse addMountTableEntry(
AddMountTableEntryRequest request) throws IOException {
boolean status = getDriver().put(request.getEntry(), false, true);
AddMountTableEntryResponse response =
AddMountTableEntryResponse.newInstance();
response.setStatus(status);
return response;
}
@Override
public UpdateMountTableEntryResponse updateMountTableEntry(
UpdateMountTableEntryRequest request) throws IOException {
MountTable entry = request.getEntry();
boolean status = getDriver().put(entry, true, true);
UpdateMountTableEntryResponse response =
UpdateMountTableEntryResponse.newInstance();
response.setStatus(status);
return response;
}
@Override
public RemoveMountTableEntryResponse removeMountTableEntry(
RemoveMountTableEntryRequest request) throws IOException {
final String srcPath = request.getSrcPath();
final MountTable partial = MountTable.newInstance();
partial.setSourcePath(srcPath);
final Query<MountTable> query = new Query<>(partial);
int removedRecords = getDriver().remove(getRecordClass(), query);
boolean status = (removedRecords == 1);
RemoveMountTableEntryResponse response =
RemoveMountTableEntryResponse.newInstance();
response.setStatus(status);
return response;
}
@Override
public GetMountTableEntriesResponse getMountTableEntries(
GetMountTableEntriesRequest request) throws IOException {
// Get all values from the cache
List<MountTable> records = getCachedRecords();
// Sort and filter
Collections.sort(records);
String reqSrcPath = request.getSrcPath();
if (reqSrcPath != null && !reqSrcPath.isEmpty()) {
// Return only entries beneath this path
Iterator<MountTable> it = records.iterator();
while (it.hasNext()) {
MountTable record = it.next();
String srcPath = record.getSourcePath();
if (!srcPath.startsWith(reqSrcPath)) {
it.remove();
}
}
}
GetMountTableEntriesResponse response =
GetMountTableEntriesResponse.newInstance();
response.setEntries(records);
response.setTimestamp(Time.now());
return response;
}
}

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
/**
* API request for adding a mount table entry to the state store.
*/
public abstract class AddMountTableEntryRequest {
public static AddMountTableEntryRequest newInstance() {
return StateStoreSerializer.newRecord(AddMountTableEntryRequest.class);
}
public static AddMountTableEntryRequest newInstance(MountTable newEntry) {
AddMountTableEntryRequest request = newInstance();
request.setEntry(newEntry);
return request;
}
@Public
@Unstable
public abstract MountTable getEntry();
@Public
@Unstable
public abstract void setEntry(MountTable mount);
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
/**
* API response for adding a mount table entry to the state store.
*/
public abstract class AddMountTableEntryResponse {
public static AddMountTableEntryResponse newInstance() throws IOException {
return StateStoreSerializer.newRecord(AddMountTableEntryResponse.class);
}
@Public
@Unstable
public abstract boolean getStatus();
@Public
@Unstable
public abstract void setStatus(boolean result);
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
/**
* API request for listing mount table entries present in the state store.
*/
public abstract class GetMountTableEntriesRequest {
public static GetMountTableEntriesRequest newInstance() throws IOException {
return StateStoreSerializer.newRecord(GetMountTableEntriesRequest.class);
}
public static GetMountTableEntriesRequest newInstance(String srcPath)
throws IOException {
GetMountTableEntriesRequest request = newInstance();
request.setSrcPath(srcPath);
return request;
}
@Public
@Unstable
public abstract String getSrcPath();
@Public
@Unstable
public abstract void setSrcPath(String path);
}

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
/**
* API response for listing mount table entries present in the state store.
*/
public abstract class GetMountTableEntriesResponse {
public static GetMountTableEntriesResponse newInstance() throws IOException {
return StateStoreSerializer.newRecord(GetMountTableEntriesResponse.class);
}
@Public
@Unstable
public abstract List<MountTable> getEntries() throws IOException;
@Public
@Unstable
public abstract void setEntries(List<MountTable> entries)
throws IOException;
@Public
@Unstable
public abstract long getTimestamp();
@Public
@Unstable
public abstract void setTimestamp(long time);
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
/**
* API request for removing a mount table path present in the state store.
*/
public abstract class RemoveMountTableEntryRequest {
public static RemoveMountTableEntryRequest newInstance() throws IOException {
return StateStoreSerializer.newRecord(RemoveMountTableEntryRequest.class);
}
public static RemoveMountTableEntryRequest newInstance(String path)
throws IOException {
RemoveMountTableEntryRequest request = newInstance();
request.setSrcPath(path);
return request;
}
@Public
@Unstable
public abstract String getSrcPath();
@Public
@Unstable
public abstract void setSrcPath(String path);
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
/**
* API response for removing a mount table path present in the state store.
*/
public abstract class RemoveMountTableEntryResponse {
public static RemoveMountTableEntryResponse newInstance() throws IOException {
return StateStoreSerializer.newRecord(RemoveMountTableEntryResponse.class);
}
@Public
@Unstable
public abstract boolean getStatus();
@Public
@Unstable
public abstract void setStatus(boolean result);
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
/**
* API request for updating the destination of an existing mount point in the
* state store.
*/
public abstract class UpdateMountTableEntryRequest {
public static UpdateMountTableEntryRequest newInstance() throws IOException {
return StateStoreSerializer.newRecord(UpdateMountTableEntryRequest.class);
}
public static UpdateMountTableEntryRequest newInstance(MountTable entry)
throws IOException {
UpdateMountTableEntryRequest request = newInstance();
request.setEntry(entry);
return request;
}
@Public
@Unstable
public abstract MountTable getEntry() throws IOException;
@Public
@Unstable
public abstract void setEntry(MountTable mount) throws IOException;
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
/**
* API response for updating the destination of an existing mount point in the
* state store.
*/
public abstract class UpdateMountTableEntryResponse {
public static UpdateMountTableEntryResponse newInstance() throws IOException {
return StateStoreSerializer.newRecord(UpdateMountTableEntryResponse.class);
}
@Public
@Unstable
public abstract boolean getStatus();
@Public
@Unstable
public abstract void setStatus(boolean result);
}

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
import java.io.IOException;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntryRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntryRequestProtoOrBuilder;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProto;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.MountTablePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
import com.google.protobuf.Message;
/**
* Protobuf implementation of the state store API object
* AddMountTableEntryRequest.
*/
public class AddMountTableEntryRequestPBImpl
extends AddMountTableEntryRequest implements PBRecord {
private FederationProtocolPBTranslator<AddMountTableEntryRequestProto,
AddMountTableEntryRequestProto.Builder,
AddMountTableEntryRequestProtoOrBuilder> translator =
new FederationProtocolPBTranslator<AddMountTableEntryRequestProto,
AddMountTableEntryRequestProto.Builder,
AddMountTableEntryRequestProtoOrBuilder>(
AddMountTableEntryRequestProto.class);
public AddMountTableEntryRequestPBImpl() {
}
public AddMountTableEntryRequestPBImpl(AddMountTableEntryRequestProto proto) {
this.translator.setProto(proto);
}
@Override
public AddMountTableEntryRequestProto getProto() {
return this.translator.build();
}
@Override
public void setProto(Message proto) {
this.translator.setProto(proto);
}
@Override
public void readInstance(String base64String) throws IOException {
this.translator.readInstance(base64String);
}
@Override
public MountTable getEntry() {
MountTableRecordProto entryProto =
this.translator.getProtoOrBuilder().getEntry();
return new MountTablePBImpl(entryProto);
}
@Override
public void setEntry(MountTable mount) {
if (mount instanceof MountTablePBImpl) {
MountTablePBImpl mountPB = (MountTablePBImpl)mount;
MountTableRecordProto mountProto = mountPB.getProto();
translator.getBuilder().setEntry(mountProto);
}
}
}

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
import java.io.IOException;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntryResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntryResponseProtoOrBuilder;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
import com.google.protobuf.Message;
/**
* Protobuf implementation of the state store API object
* AddMountTableEntryResponse.
*/
public class AddMountTableEntryResponsePBImpl
extends AddMountTableEntryResponse implements PBRecord {
private FederationProtocolPBTranslator<AddMountTableEntryResponseProto,
AddMountTableEntryResponseProto.Builder,
AddMountTableEntryResponseProtoOrBuilder> translator =
new FederationProtocolPBTranslator<AddMountTableEntryResponseProto,
AddMountTableEntryResponseProto.Builder,
AddMountTableEntryResponseProtoOrBuilder>(
AddMountTableEntryResponseProto.class);
public AddMountTableEntryResponsePBImpl() {
}
public AddMountTableEntryResponsePBImpl(
AddMountTableEntryResponseProto proto) {
this.translator.setProto(proto);
}
@Override
public AddMountTableEntryResponseProto getProto() {
return this.translator.build();
}
@Override
public void setProto(Message proto) {
this.translator.setProto(proto);
}
@Override
public void readInstance(String base64String) throws IOException {
this.translator.readInstance(base64String);
}
@Override
public boolean getStatus() {
return this.translator.getProtoOrBuilder().getStatus();
}
@Override
public void setStatus(boolean result) {
this.translator.getBuilder().setStatus(result);
}
}

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
import java.io.IOException;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetMountTableEntriesRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetMountTableEntriesRequestProtoOrBuilder;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
import com.google.protobuf.Message;
/**
* Protobuf implementation of the state store API object
* GetMountTableEntriesRequest.
*/
public class GetMountTableEntriesRequestPBImpl
extends GetMountTableEntriesRequest implements PBRecord {
private FederationProtocolPBTranslator<GetMountTableEntriesRequestProto,
GetMountTableEntriesRequestProto.Builder,
GetMountTableEntriesRequestProtoOrBuilder> translator =
new FederationProtocolPBTranslator<GetMountTableEntriesRequestProto,
GetMountTableEntriesRequestProto.Builder,
GetMountTableEntriesRequestProtoOrBuilder>(
GetMountTableEntriesRequestProto.class);
public GetMountTableEntriesRequestPBImpl() {
}
public GetMountTableEntriesRequestPBImpl(
GetMountTableEntriesRequestProto proto) {
this.translator.setProto(proto);
}
@Override
public GetMountTableEntriesRequestProto getProto() {
return this.translator.build();
}
@Override
public void setProto(Message proto) {
this.translator.setProto(proto);
}
@Override
public void readInstance(String base64String) throws IOException {
this.translator.readInstance(base64String);
}
@Override
public String getSrcPath() {
return this.translator.getProtoOrBuilder().getSrcPath();
}
@Override
public void setSrcPath(String path) {
this.translator.getBuilder().setSrcPath(path);
}
}

View File

@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetMountTableEntriesResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetMountTableEntriesResponseProtoOrBuilder;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProto;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.MountTablePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
import com.google.protobuf.Message;
/**
* Protobuf implementation of the state store API object
* GetMountTableEntriesResponse.
*/
public class GetMountTableEntriesResponsePBImpl
extends GetMountTableEntriesResponse implements PBRecord {
private FederationProtocolPBTranslator<GetMountTableEntriesResponseProto,
GetMountTableEntriesResponseProto.Builder,
GetMountTableEntriesResponseProtoOrBuilder> translator =
new FederationProtocolPBTranslator<GetMountTableEntriesResponseProto,
GetMountTableEntriesResponseProto.Builder,
GetMountTableEntriesResponseProtoOrBuilder>(
GetMountTableEntriesResponseProto.class);
public GetMountTableEntriesResponsePBImpl() {
}
public GetMountTableEntriesResponsePBImpl(
GetMountTableEntriesResponseProto proto) {
this.translator.setProto(proto);
}
@Override
public GetMountTableEntriesResponseProto getProto() {
return this.translator.build();
}
@Override
public void setProto(Message proto) {
this.translator.setProto(proto);
}
@Override
public void readInstance(String base64String) throws IOException {
this.translator.readInstance(base64String);
}
@Override
public List<MountTable> getEntries() throws IOException {
List<MountTableRecordProto> entries =
this.translator.getProtoOrBuilder().getEntriesList();
List<MountTable> ret = new ArrayList<MountTable>();
for (MountTableRecordProto entry : entries) {
MountTable record = new MountTablePBImpl(entry);
ret.add(record);
}
return ret;
}
@Override
public void setEntries(List<MountTable> records) throws IOException {
this.translator.getBuilder().clearEntries();
for (MountTable entry : records) {
if (entry instanceof MountTablePBImpl) {
MountTablePBImpl entryPB = (MountTablePBImpl)entry;
this.translator.getBuilder().addEntries(entryPB.getProto());
}
}
}
@Override
public long getTimestamp() {
return this.translator.getProtoOrBuilder().getTimestamp();
}
@Override
public void setTimestamp(long time) {
this.translator.getBuilder().setTimestamp(time);
}
}

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
import java.io.IOException;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryRequestProtoOrBuilder;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
import com.google.protobuf.Message;
/**
* Protobuf implementation of the state store API object
* RemoveMountTableEntryRequest.
*/
public class RemoveMountTableEntryRequestPBImpl
extends RemoveMountTableEntryRequest implements PBRecord {
private FederationProtocolPBTranslator<RemoveMountTableEntryRequestProto,
RemoveMountTableEntryRequestProto.Builder,
RemoveMountTableEntryRequestProtoOrBuilder> translator =
new FederationProtocolPBTranslator<RemoveMountTableEntryRequestProto,
RemoveMountTableEntryRequestProto.Builder,
RemoveMountTableEntryRequestProtoOrBuilder>(
RemoveMountTableEntryRequestProto.class);
public RemoveMountTableEntryRequestPBImpl() {
}
public RemoveMountTableEntryRequestPBImpl(
RemoveMountTableEntryRequestProto proto) {
this.setProto(proto);
}
@Override
public RemoveMountTableEntryRequestProto getProto() {
return this.translator.build();
}
@Override
public void setProto(Message proto) {
this.translator.setProto(proto);
}
@Override
public void readInstance(String base64String) throws IOException {
this.translator.readInstance(base64String);
}
@Override
public String getSrcPath() {
return this.translator.getProtoOrBuilder().getSrcPath();
}
@Override
public void setSrcPath(String path) {
this.translator.getBuilder().setSrcPath(path);
}
}

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
import java.io.IOException;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryResponseProto.Builder;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryResponseProtoOrBuilder;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
import com.google.protobuf.Message;
/**
* Protobuf implementation of the state store API object
* RemoveMountTableEntryResponse.
*/
public class RemoveMountTableEntryResponsePBImpl
extends RemoveMountTableEntryResponse implements PBRecord {
private FederationProtocolPBTranslator<RemoveMountTableEntryResponseProto,
Builder, RemoveMountTableEntryResponseProtoOrBuilder> translator =
new FederationProtocolPBTranslator<RemoveMountTableEntryResponseProto,
RemoveMountTableEntryResponseProto.Builder,
RemoveMountTableEntryResponseProtoOrBuilder>(
RemoveMountTableEntryResponseProto.class);
public RemoveMountTableEntryResponsePBImpl() {
}
public RemoveMountTableEntryResponsePBImpl(
RemoveMountTableEntryResponseProto proto) {
this.setProto(proto);
}
@Override
public RemoveMountTableEntryResponseProto getProto() {
return this.translator.build();
}
@Override
public void setProto(Message proto) {
this.translator.setProto(proto);
}
@Override
public void readInstance(String base64String) throws IOException {
this.translator.readInstance(base64String);
}
@Override
public boolean getStatus() {
return this.translator.getProtoOrBuilder().getStatus();
}
@Override
public void setStatus(boolean result) {
this.translator.getBuilder().setStatus(result);
}
}

View File

@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
import java.io.IOException;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryRequestProtoOrBuilder;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.MountTablePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
import com.google.protobuf.Message;
/**
* Protobuf implementation of the state store API object
* UpdateMountTableEntryRequest.
*/
public class UpdateMountTableEntryRequestPBImpl
extends UpdateMountTableEntryRequest implements PBRecord {
private FederationProtocolPBTranslator<UpdateMountTableEntryRequestProto,
UpdateMountTableEntryRequestProto.Builder,
UpdateMountTableEntryRequestProtoOrBuilder> translator =
new FederationProtocolPBTranslator<UpdateMountTableEntryRequestProto,
UpdateMountTableEntryRequestProto.Builder,
UpdateMountTableEntryRequestProtoOrBuilder>(
UpdateMountTableEntryRequestProto.class);
public UpdateMountTableEntryRequestPBImpl() {
}
public UpdateMountTableEntryRequestPBImpl(
UpdateMountTableEntryRequestProto proto) {
this.translator.setProto(proto);
}
@Override
public UpdateMountTableEntryRequestProto getProto() {
return this.translator.build();
}
@Override
public void setProto(Message proto) {
this.translator.setProto(proto);
}
@Override
public void readInstance(String base64String) throws IOException {
this.translator.readInstance(base64String);
}
@Override
public MountTable getEntry() throws IOException {
MountTableRecordProto statsProto =
this.translator.getProtoOrBuilder().getEntry();
MountTable stats = StateStoreSerializer.newRecord(MountTable.class);
if (stats instanceof MountTablePBImpl) {
MountTablePBImpl entryPB = (MountTablePBImpl)stats;
entryPB.setProto(statsProto);
return entryPB;
} else {
throw new IOException("Cannot get stats for the membership");
}
}
@Override
public void setEntry(MountTable mount) throws IOException {
if (mount instanceof MountTablePBImpl) {
MountTablePBImpl mountPB = (MountTablePBImpl)mount;
MountTableRecordProto mountProto =
(MountTableRecordProto)mountPB.getProto();
this.translator.getBuilder().setEntry(mountProto);
} else {
throw new IOException("Cannot set mount table entry");
}
}
}

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
import java.io.IOException;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryResponseProtoOrBuilder;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
import com.google.protobuf.Message;
/**
* Protobuf implementation of the state store API object
* UpdateMountTableEntryResponse.
*/
public class UpdateMountTableEntryResponsePBImpl
extends UpdateMountTableEntryResponse implements PBRecord {
private FederationProtocolPBTranslator<UpdateMountTableEntryResponseProto,
UpdateMountTableEntryResponseProto.Builder,
UpdateMountTableEntryResponseProtoOrBuilder> translator =
new FederationProtocolPBTranslator<UpdateMountTableEntryResponseProto,
UpdateMountTableEntryResponseProto.Builder,
UpdateMountTableEntryResponseProtoOrBuilder>(
UpdateMountTableEntryResponseProto.class);
public UpdateMountTableEntryResponsePBImpl() {
}
public UpdateMountTableEntryResponsePBImpl(
UpdateMountTableEntryResponseProto proto) {
this.setProto(proto);
}
@Override
public UpdateMountTableEntryResponseProto getProto() {
return this.translator.build();
}
@Override
public void setProto(Message proto) {
this.translator.setProto(proto);
}
@Override
public void readInstance(String base64String) throws IOException {
this.translator.readInstance(base64String);
}
@Override
public boolean getStatus() {
return this.translator.getProtoOrBuilder().getStatus();
}
@Override
public void setStatus(boolean result) {
this.translator.getBuilder().setStatus(result);
}
}

View File

@ -0,0 +1,301 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.records;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Data schema for
* {@link org.apache.hadoop.hdfs.server.federation.store.
* MountTableStore FederationMountTableStore} data stored in the
* {@link org.apache.hadoop.hdfs.server.federation.store.
* StateStoreService FederationStateStoreService}. Supports string
* serialization.
*/
public abstract class MountTable extends BaseRecord {
private static final Logger LOG = LoggerFactory.getLogger(MountTable.class);
/**
* Default constructor for a mount table entry.
*/
public MountTable() {
super();
}
public static MountTable newInstance() {
MountTable record = StateStoreSerializer.newRecord(MountTable.class);
record.init();
return record;
}
/**
* Constructor for a mount table entry with a single destinations.
*
* @param src Source path in the mount entry.
* @param destinations Nameservice destination of the mount point.
* @param dateCreated Created date.
* @param dateModified Modified date.
* @throws IOException
*/
public static MountTable newInstance(final String src,
final Map<String, String> destinations,
long dateCreated, long dateModified) throws IOException {
MountTable record = newInstance(src, destinations);
record.setDateCreated(dateCreated);
record.setDateModified(dateModified);
return record;
}
/**
* Constructor for a mount table entry with multiple destinations.
*
* @param src Source path in the mount entry.
* @param destinations Nameservice destinations of the mount point.
* @throws IOException
*/
public static MountTable newInstance(final String src,
final Map<String, String> destinations) throws IOException {
MountTable record = newInstance();
// Normalize the mount path
record.setSourcePath(normalizeFileSystemPath(src));
// Build a list of remote locations
final List<RemoteLocation> locations = new LinkedList<>();
for (Entry<String, String> entry : destinations.entrySet()) {
String nsId = entry.getKey();
String path = normalizeFileSystemPath(entry.getValue());
RemoteLocation location = new RemoteLocation(nsId, path);
locations.add(location);
}
// Set the serialized dest string
record.setDestinations(locations);
// Validate
record.validate();
return record;
}
/**
* Get source path in the federated namespace.
*
* @return Source path in the federated namespace.
*/
public abstract String getSourcePath();
/**
* Set source path in the federated namespace.
*
* @param path Source path in the federated namespace.
*/
public abstract void setSourcePath(String path);
/**
* Get a list of destinations (namespace + path) present for this entry.
*
* @return List of RemoteLocation destinations. Null if no destinations.
*/
public abstract List<RemoteLocation> getDestinations();
/**
* Set the destination paths.
*
* @param paths Destination paths.
*/
public abstract void setDestinations(List<RemoteLocation> dests);
/**
* Add a new destination to this mount table entry.
*/
public abstract boolean addDestination(String nsId, String path);
/**
* Check if the entry is read only.
*
* @return If the entry is read only.
*/
public abstract boolean isReadOnly();
/**
* Set an entry to be read only.
*
* @param ro If the entry is read only.
*/
public abstract void setReadOnly(boolean ro);
/**
* Get the order of the destinations for this mount table entry.
*
* @return Order of the destinations.
*/
public abstract DestinationOrder getDestOrder();
/**
* Set the order of the destinations for this mount table entry.
*
* @param order Order of the destinations.
*/
public abstract void setDestOrder(DestinationOrder order);
/**
* Get the default location.
* @return The default location.
*/
public RemoteLocation getDefaultLocation() {
List<RemoteLocation> dests = this.getDestinations();
if (dests == null || dests.isEmpty()) {
return null;
}
return dests.get(0);
}
@Override
public boolean like(final BaseRecord o) {
if (o instanceof MountTable) {
MountTable other = (MountTable)o;
if (getSourcePath() != null &&
!getSourcePath().equals(other.getSourcePath())) {
return false;
}
if (getDestinations() != null &&
!getDestinations().equals(other.getDestinations())) {
return false;
}
return true;
}
return false;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(this.getSourcePath());
sb.append("->");
List<RemoteLocation> destinations = this.getDestinations();
sb.append(destinations);
if (destinations != null && destinations.size() > 1) {
sb.append("[" + this.getDestOrder() + "]");
}
if (this.isReadOnly()) {
sb.append("[RO]");
}
return sb.toString();
}
@Override
public SortedMap<String, String> getPrimaryKeys() {
SortedMap<String, String> map = new TreeMap<>();
map.put("sourcePath", this.getSourcePath());
return map;
}
@Override
public boolean validate() {
boolean ret = super.validate();
if (this.getSourcePath() == null || this.getSourcePath().length() == 0) {
LOG.error("Invalid entry, no source path specified ", this);
ret = false;
}
if (!this.getSourcePath().startsWith("/")) {
LOG.error("Invalid entry, all mount points must start with / ", this);
ret = false;
}
if (this.getDestinations() == null || this.getDestinations().size() == 0) {
LOG.error("Invalid entry, no destination paths specified ", this);
ret = false;
}
for (RemoteLocation loc : getDestinations()) {
String nsId = loc.getNameserviceId();
if (nsId == null || nsId.length() == 0) {
LOG.error("Invalid entry, invalid destination nameservice ", this);
ret = false;
}
if (loc.getDest() == null || loc.getDest().length() == 0) {
LOG.error("Invalid entry, invalid destination path ", this);
ret = false;
}
if (!loc.getDest().startsWith("/")) {
LOG.error("Invalid entry, all destination must start with / ", this);
ret = false;
}
}
return ret;
}
@Override
public long getExpirationMs() {
return 0;
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 31)
.append(this.getSourcePath())
.append(this.getDestinations())
.append(this.isReadOnly())
.append(this.getDestOrder())
.toHashCode();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof MountTable) {
MountTable other = (MountTable)obj;
if (!this.getSourcePath().equals(other.getSourcePath())) {
return false;
} else if (!this.getDestinations().equals(other.getDestinations())) {
return false;
} else if (this.isReadOnly() != other.isReadOnly()) {
return false;
} else if (!this.getDestOrder().equals(other.getDestOrder())) {
return false;
}
return true;
}
return false;
}
/**
* Normalize a path for that filesystem.
*
* @param path Path to normalize.
* @return Normalized path.
*/
private static String normalizeFileSystemPath(final String path) {
Path normalizedPath = new Path(path);
return normalizedPath.toString();
}
}

View File

@ -0,0 +1,213 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.records.impl.pb;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProto.Builder;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProto.DestOrder;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProtoOrBuilder;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoteLocationProto;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.FederationProtocolPBTranslator;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import com.google.protobuf.Message;
/**
* Protobuf implementation of the MountTable record.
*/
public class MountTablePBImpl extends MountTable implements PBRecord {
private FederationProtocolPBTranslator<MountTableRecordProto, Builder,
MountTableRecordProtoOrBuilder> translator =
new FederationProtocolPBTranslator<MountTableRecordProto, Builder,
MountTableRecordProtoOrBuilder>(MountTableRecordProto.class);
public MountTablePBImpl() {
}
public MountTablePBImpl(MountTableRecordProto proto) {
this.setProto(proto);
}
@Override
public MountTableRecordProto getProto() {
return this.translator.build();
}
@Override
public void setProto(Message proto) {
this.translator.setProto(proto);
}
@Override
public void readInstance(String base64String) throws IOException {
this.translator.readInstance(base64String);
}
@Override
public String getSourcePath() {
MountTableRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder();
if (!proto.hasSrcPath()) {
return null;
}
return proto.getSrcPath();
}
@Override
public void setSourcePath(String path) {
Builder builder = this.translator.getBuilder();
if (path == null) {
builder.clearSrcPath();
} else {
builder.setSrcPath(path);
}
}
@Override
public List<RemoteLocation> getDestinations() {
MountTableRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder();
if (proto.getDestinationsCount() == 0) {
return null;
}
final List<RemoteLocation> ret = new LinkedList<>();
final List<RemoteLocationProto> destList = proto.getDestinationsList();
for (RemoteLocationProto dest : destList) {
String nsId = dest.getNameserviceId();
String path = dest.getPath();
RemoteLocation loc = new RemoteLocation(nsId, path);
ret.add(loc);
}
return ret;
}
@Override
public void setDestinations(final List<RemoteLocation> dests) {
Builder builder = this.translator.getBuilder();
builder.clearDestinations();
for (RemoteLocation dest : dests) {
RemoteLocationProto.Builder itemBuilder =
RemoteLocationProto.newBuilder();
String nsId = dest.getNameserviceId();
String path = dest.getDest();
itemBuilder.setNameserviceId(nsId);
itemBuilder.setPath(path);
RemoteLocationProto item = itemBuilder.build();
builder.addDestinations(item);
}
}
@Override
public boolean addDestination(String nsId, String path) {
// Check if the location is already there
List<RemoteLocation> dests = getDestinations();
for (RemoteLocation dest : dests) {
if (dest.getNameserviceId().equals(nsId) && dest.getDest().equals(path)) {
return false;
}
}
// Add it to the existing list
Builder builder = this.translator.getBuilder();
RemoteLocationProto.Builder itemBuilder =
RemoteLocationProto.newBuilder();
itemBuilder.setNameserviceId(nsId);
itemBuilder.setPath(path);
RemoteLocationProto item = itemBuilder.build();
builder.addDestinations(item);
return true;
}
@Override
public void setDateModified(long time) {
this.translator.getBuilder().setDateModified(time);
}
@Override
public long getDateModified() {
return this.translator.getProtoOrBuilder().getDateModified();
}
@Override
public void setDateCreated(long time) {
this.translator.getBuilder().setDateCreated(time);
}
@Override
public long getDateCreated() {
return this.translator.getProtoOrBuilder().getDateCreated();
}
@Override
public boolean isReadOnly() {
MountTableRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder();
if (!proto.hasReadOnly()) {
return false;
}
return proto.getReadOnly();
}
@Override
public void setReadOnly(boolean ro) {
this.translator.getBuilder().setReadOnly(ro);
}
@Override
public DestinationOrder getDestOrder() {
MountTableRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder();
return convert(proto.getDestOrder());
}
@Override
public void setDestOrder(DestinationOrder order) {
Builder builder = this.translator.getBuilder();
if (order == null) {
builder.clearDestOrder();
} else {
builder.setDestOrder(convert(order));
}
}
private DestinationOrder convert(DestOrder order) {
switch (order) {
case LOCAL:
return DestinationOrder.LOCAL;
case RANDOM:
return DestinationOrder.RANDOM;
default:
return DestinationOrder.HASH;
}
}
private DestOrder convert(DestinationOrder order) {
switch (order) {
case LOCAL:
return DestOrder.LOCAL;
case RANDOM:
return DestOrder.RANDOM;
default:
return DestOrder.HASH;
}
}
}

View File

@ -104,4 +104,63 @@ message NamenodeHeartbeatRequestProto {
message NamenodeHeartbeatResponseProto {
optional bool status = 1;
}
}
/////////////////////////////////////////////////
// Mount table
/////////////////////////////////////////////////
message RemoteLocationProto {
optional string nameserviceId = 1;
optional string path = 2;
}
message MountTableRecordProto {
optional string srcPath = 1;
repeated RemoteLocationProto destinations = 2;
optional uint64 dateCreated = 3;
optional uint64 dateModified = 4;
optional bool readOnly = 5 [default = false];
enum DestOrder {
HASH = 0;
LOCAL = 1;
RANDOM = 2;
}
optional DestOrder destOrder = 6 [default = HASH];
}
message AddMountTableEntryRequestProto {
optional MountTableRecordProto entry = 1;
}
message AddMountTableEntryResponseProto {
optional bool status = 1;
}
message UpdateMountTableEntryRequestProto {
optional MountTableRecordProto entry = 1;
}
message UpdateMountTableEntryResponseProto {
optional bool status = 1;
}
message RemoveMountTableEntryRequestProto {
optional string srcPath = 1;
}
message RemoveMountTableEntryResponseProto{
optional bool status = 1;
}
message GetMountTableEntriesRequestProto {
optional string srcPath = 1;
}
message GetMountTableEntriesResponseProto {
repeated MountTableRecordProto entries = 1;
optional uint64 timestamp = 2;
}

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.server.federation.resolver.NamenodePriorityCompara
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.util.Time;
@ -68,6 +69,10 @@ public class MockResolver
this();
}
public MockResolver(Configuration conf, Router router) {
this();
}
public void addLocation(String mount, String nsId, String location) {
List<RemoteLocation> locationsList = this.locations.get(mount);
if (locationsList == null) {
@ -258,7 +263,6 @@ public class MockResolver
@Override
public PathLocation getDestinationForPath(String path) throws IOException {
Set<String> namespaceSet = new HashSet<>();
List<RemoteLocation> remoteLocations = new LinkedList<>();
for (String key : this.locations.keySet()) {
if (path.startsWith(key)) {
@ -268,7 +272,6 @@ public class MockResolver
RemoteLocation remoteLocation =
new RemoteLocation(nameservice, finalPath);
remoteLocations.add(remoteLocation);
namespaceSet.add(nameservice);
}
break;
}
@ -277,7 +280,7 @@ public class MockResolver
// Path isn't supported, mimic resolver behavior.
return null;
}
return new PathLocation(path, remoteLocations, namespaceSet);
return new PathLocation(path, remoteLocations);
}
@Override

View File

@ -0,0 +1,396 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.resolver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test the {@link MountTableStore} from the {@link Router}.
*/
public class TestMountTableResolver {
private static final Logger LOG =
LoggerFactory.getLogger(TestMountTableResolver.class);
private MountTableResolver mountTable;
private Map<String, String> getMountTableEntry(
String subcluster, String path) {
Map<String, String> ret = new HashMap<>();
ret.put(subcluster, path);
return ret;
}
/**
* Setup the mount table.
* / -> 1:/
* __tmp -> 2:/tmp
* __user -> 3:/user
* ____a -> 2:/user/test
* ______demo
* ________test
* __________a -> 1:/user/test
* __________b -> 3:/user/test
* ____b
* ______file1.txt -> 4:/user/file1.txt
* __usr
* ____bin -> 2:/bin
*
* @throws IOException If it cannot set the mount table.
*/
private void setupMountTable() throws IOException {
Configuration conf = new Configuration();
mountTable = new MountTableResolver(conf);
// Root mount point
Map<String, String> map = getMountTableEntry("1", "/");
mountTable.addEntry(MountTable.newInstance("/", map));
// /tmp
map = getMountTableEntry("2", "/");
mountTable.addEntry(MountTable.newInstance("/tmp", map));
// /user
map = getMountTableEntry("3", "/user");
mountTable.addEntry(MountTable.newInstance("/user", map));
// /usr/bin
map = getMountTableEntry("2", "/bin");
mountTable.addEntry(MountTable.newInstance("/usr/bin", map));
// /user/a
map = getMountTableEntry("2", "/user/test");
mountTable.addEntry(MountTable.newInstance("/user/a", map));
// /user/b/file1.txt
map = getMountTableEntry("4", "/user/file1.txt");
mountTable.addEntry(MountTable.newInstance("/user/b/file1.txt", map));
// /user/a/demo/test/a
map = getMountTableEntry("1", "/user/test");
mountTable.addEntry(MountTable.newInstance("/user/a/demo/test/a", map));
// /user/a/demo/test/b
map = getMountTableEntry("3", "/user/test");
mountTable.addEntry(MountTable.newInstance("/user/a/demo/test/b", map));
}
@Before
public void setup() throws IOException {
setupMountTable();
}
@Test
public void testDestination() throws IOException {
// Check files
assertEquals("1->/tesfile1.txt",
mountTable.getDestinationForPath("/tesfile1.txt").toString());
assertEquals("3->/user/testfile2.txt",
mountTable.getDestinationForPath("/user/testfile2.txt").toString());
assertEquals("2->/user/test/testfile3.txt",
mountTable.getDestinationForPath("/user/a/testfile3.txt").toString());
assertEquals("3->/user/b/testfile4.txt",
mountTable.getDestinationForPath("/user/b/testfile4.txt").toString());
assertEquals("1->/share/file5.txt",
mountTable.getDestinationForPath("/share/file5.txt").toString());
assertEquals("2->/bin/file7.txt",
mountTable.getDestinationForPath("/usr/bin/file7.txt").toString());
assertEquals("1->/usr/file8.txt",
mountTable.getDestinationForPath("/usr/file8.txt").toString());
assertEquals("2->/user/test/demo/file9.txt",
mountTable.getDestinationForPath("/user/a/demo/file9.txt").toString());
// Check folders
assertEquals("3->/user/testfolder",
mountTable.getDestinationForPath("/user/testfolder").toString());
assertEquals("2->/user/test/b",
mountTable.getDestinationForPath("/user/a/b").toString());
assertEquals("3->/user/test/a",
mountTable.getDestinationForPath("/user/test/a").toString());
}
private void compareLists(List<String> list1, String[] list2) {
assertEquals(list1.size(), list2.length);
for (String item : list2) {
assertTrue(list1.contains(item));
}
}
@Test
public void testGetMountPoints() throws IOException {
// Check getting all mount points (virtual and real) beneath a path
List<String> mounts = mountTable.getMountPoints("/");
assertEquals(3, mounts.size());
compareLists(mounts, new String[] {"tmp", "user", "usr"});
mounts = mountTable.getMountPoints("/user");
assertEquals(2, mounts.size());
compareLists(mounts, new String[] {"a", "b"});
mounts = mountTable.getMountPoints("/user/a");
assertEquals(1, mounts.size());
compareLists(mounts, new String[] {"demo"});
mounts = mountTable.getMountPoints("/user/a/demo");
assertEquals(1, mounts.size());
compareLists(mounts, new String[] {"test"});
mounts = mountTable.getMountPoints("/user/a/demo/test");
assertEquals(2, mounts.size());
compareLists(mounts, new String[] {"a", "b"});
mounts = mountTable.getMountPoints("/tmp");
assertEquals(0, mounts.size());
mounts = mountTable.getMountPoints("/t");
assertNull(mounts);
mounts = mountTable.getMountPoints("/unknownpath");
assertNull(mounts);
}
private void compareRecords(List<MountTable> list1, String[] list2) {
assertEquals(list1.size(), list2.length);
for (String item : list2) {
for (MountTable record : list1) {
if (record.getSourcePath().equals(item)) {
return;
}
}
}
fail();
}
@Test
public void testGetMounts() throws IOException {
// Check listing the mount table records at or beneath a path
List<MountTable> records = mountTable.getMounts("/");
assertEquals(8, records.size());
compareRecords(records, new String[] {"/", "/tmp", "/user", "/usr/bin",
"user/a", "/user/a/demo/a", "/user/a/demo/b", "/user/b/file1.txt"});
records = mountTable.getMounts("/user");
assertEquals(5, records.size());
compareRecords(records, new String[] {"/user", "/user/a/demo/a",
"/user/a/demo/b", "user/a", "/user/b/file1.txt"});
records = mountTable.getMounts("/user/a");
assertEquals(3, records.size());
compareRecords(records,
new String[] {"/user/a/demo/a", "/user/a/demo/b", "/user/a"});
records = mountTable.getMounts("/tmp");
assertEquals(1, records.size());
compareRecords(records, new String[] {"/tmp"});
}
@Test
public void testRemoveSubTree()
throws UnsupportedOperationException, IOException {
// 3 mount points are present /tmp, /user, /usr
compareLists(mountTable.getMountPoints("/"),
new String[] {"user", "usr", "tmp"});
// /tmp currently points to namespace 2
assertEquals("2", mountTable.getDestinationForPath("/tmp/testfile.txt")
.getDefaultLocation().getNameserviceId());
// Remove tmp
mountTable.removeEntry("/tmp");
// Now 2 mount points are present /user, /usr
compareLists(mountTable.getMountPoints("/"),
new String[] {"user", "usr"});
// /tmp no longer exists, uses default namespace for mapping /
assertEquals("1", mountTable.getDestinationForPath("/tmp/testfile.txt")
.getDefaultLocation().getNameserviceId());
}
@Test
public void testRemoveVirtualNode()
throws UnsupportedOperationException, IOException {
// 3 mount points are present /tmp, /user, /usr
compareLists(mountTable.getMountPoints("/"),
new String[] {"user", "usr", "tmp"});
// /usr is virtual, uses namespace 1->/
assertEquals("1", mountTable.getDestinationForPath("/usr/testfile.txt")
.getDefaultLocation().getNameserviceId());
// Attempt to remove /usr
mountTable.removeEntry("/usr");
// Verify the remove failed
compareLists(mountTable.getMountPoints("/"),
new String[] {"user", "usr", "tmp"});
}
@Test
public void testRemoveLeafNode()
throws UnsupportedOperationException, IOException {
// /user/a/demo/test/a currently points to namespace 1
assertEquals("1", mountTable.getDestinationForPath("/user/a/demo/test/a")
.getDefaultLocation().getNameserviceId());
// Remove /user/a/demo/test/a
mountTable.removeEntry("/user/a/demo/test/a");
// Now /user/a/demo/test/a points to namespace 2 using the entry for /user/a
assertEquals("2", mountTable.getDestinationForPath("/user/a/demo/test/a")
.getDefaultLocation().getNameserviceId());
// Verify the virtual node at /user/a/demo still exists and was not deleted
compareLists(mountTable.getMountPoints("/user/a"), new String[] {"demo"});
// Verify the sibling node was unaffected and still points to ns 3
assertEquals("3", mountTable.getDestinationForPath("/user/a/demo/test/b")
.getDefaultLocation().getNameserviceId());
}
@Test
public void testRefreshEntries()
throws UnsupportedOperationException, IOException {
// Initial table loaded
testDestination();
assertEquals(8, mountTable.getMounts("/").size());
// Replace table with /1 and /2
List<MountTable> records = new ArrayList<>();
Map<String, String> map1 = getMountTableEntry("1", "/");
records.add(MountTable.newInstance("/1", map1));
Map<String, String> map2 = getMountTableEntry("2", "/");
records.add(MountTable.newInstance("/2", map2));
mountTable.refreshEntries(records);
// Verify addition
PathLocation destination1 = mountTable.getDestinationForPath("/1");
RemoteLocation defaultLoc1 = destination1.getDefaultLocation();
assertEquals("1", defaultLoc1.getNameserviceId());
PathLocation destination2 = mountTable.getDestinationForPath("/2");
RemoteLocation defaultLoc2 = destination2.getDefaultLocation();
assertEquals("2", defaultLoc2.getNameserviceId());
// Verify existing entries were removed
assertEquals(2, mountTable.getMounts("/").size());
boolean assertionThrown = false;
try {
testDestination();
fail();
} catch (AssertionError e) {
// The / entry was removed, so it triggers an exception
assertionThrown = true;
}
assertTrue(assertionThrown);
}
@Test
public void testMountTableScalability() throws IOException {
List<MountTable> emptyList = new ArrayList<>();
mountTable.refreshEntries(emptyList);
// Add 100,000 entries in flat list
for (int i = 0; i < 100000; i++) {
Map<String, String> map = getMountTableEntry("1", "/" + i);
MountTable record = MountTable.newInstance("/" + i, map);
mountTable.addEntry(record);
if (i % 10000 == 0) {
LOG.info("Adding flat mount record {}: {}", i, record);
}
}
assertEquals(100000, mountTable.getMountPoints("/").size());
assertEquals(100000, mountTable.getMounts("/").size());
// Add 1000 entries in deep list
mountTable.refreshEntries(emptyList);
String parent = "/";
for (int i = 0; i < 1000; i++) {
final int index = i;
Map<String, String> map = getMountTableEntry("1", "/" + index);
if (i > 0) {
parent = parent + "/";
}
parent = parent + i;
MountTable record = MountTable.newInstance(parent, map);
mountTable.addEntry(record);
}
assertEquals(1, mountTable.getMountPoints("/").size());
assertEquals(1000, mountTable.getMounts("/").size());
// Add 100,000 entries in deep and wide tree
mountTable.refreshEntries(emptyList);
Random rand = new Random();
parent = "/" + Integer.toString(rand.nextInt());
int numRootTrees = 1;
for (int i = 0; i < 100000; i++) {
final int index = i;
Map<String, String> map = getMountTableEntry("1", "/" + index);
parent = parent + "/" + i;
if (parent.length() > 2000) {
// Start new tree
parent = "/" + Integer.toString(rand.nextInt());
numRootTrees++;
}
MountTable record = MountTable.newInstance(parent, map);
mountTable.addEntry(record);
}
assertEquals(numRootTrees, mountTable.getMountPoints("/").size());
assertEquals(100000, mountTable.getMounts("/").size());
}
}

View File

@ -25,7 +25,9 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@ -40,6 +42,7 @@ import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFile
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.util.Time;
/**
@ -234,6 +237,19 @@ public final class FederationStateStoreTestUtils {
return false;
}
public static List<MountTable> createMockMountTable(
List<String> nameservices) throws IOException {
// create table entries
List<MountTable> entries = new ArrayList<>();
for (String ns : nameservices) {
Map<String, String> destMap = new HashMap<>();
destMap.put(ns, "/target-" + ns);
MountTable entry = MountTable.newInstance("/" + ns, destMap);
entries.add(entry);
}
return entries;
}
public static MembershipState createMockRegistrationForNamenode(
String nameserviceId, String namenodeId,
FederationNamenodeServiceState state) throws IOException {

View File

@ -0,0 +1,250 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMESERVICES;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyException;
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.clearRecords;
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.createMockMountTable;
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Test the basic {@link StateStoreService}
* {@link MountTableStore} functionality.
*/
public class TestStateStoreMountTable extends TestStateStoreBase {
private static List<String> nameservices;
private static MountTableStore mountStore;
@BeforeClass
public static void create() throws IOException {
nameservices = new ArrayList<>();
nameservices.add(NAMESERVICES[0]);
nameservices.add(NAMESERVICES[1]);
}
@Before
public void setup() throws IOException, InterruptedException {
mountStore =
getStateStore().getRegisteredRecordStore(MountTableStore.class);
// Clear Mount table registrations
assertTrue(clearRecords(getStateStore(), MountTable.class));
}
@Test
public void testStateStoreDisconnected() throws Exception {
// Close the data store driver
getStateStore().closeDriver();
assertFalse(getStateStore().isDriverReady());
// Test APIs that access the store to check they throw the correct exception
AddMountTableEntryRequest addRequest =
AddMountTableEntryRequest.newInstance();
verifyException(mountStore, "addMountTableEntry",
StateStoreUnavailableException.class,
new Class[] {AddMountTableEntryRequest.class},
new Object[] {addRequest});
UpdateMountTableEntryRequest updateRequest =
UpdateMountTableEntryRequest.newInstance();
verifyException(mountStore, "updateMountTableEntry",
StateStoreUnavailableException.class,
new Class[] {UpdateMountTableEntryRequest.class},
new Object[] {updateRequest});
RemoveMountTableEntryRequest removeRequest =
RemoveMountTableEntryRequest.newInstance();
verifyException(mountStore, "removeMountTableEntry",
StateStoreUnavailableException.class,
new Class[] {RemoveMountTableEntryRequest.class},
new Object[] {removeRequest});
GetMountTableEntriesRequest getRequest =
GetMountTableEntriesRequest.newInstance();
mountStore.loadCache(true);
verifyException(mountStore, "getMountTableEntries",
StateStoreUnavailableException.class,
new Class[] {GetMountTableEntriesRequest.class},
new Object[] {getRequest});
}
@Test
public void testSynchronizeMountTable() throws IOException {
// Synchronize and get mount table entries
List<MountTable> entries = createMockMountTable(nameservices);
assertTrue(synchronizeRecords(getStateStore(), entries, MountTable.class));
for (MountTable e : entries) {
mountStore.loadCache(true);
MountTable entry = getMountTableEntry(e.getSourcePath());
assertNotNull(entry);
assertEquals(e.getDefaultLocation().getDest(),
entry.getDefaultLocation().getDest());
}
}
@Test
public void testAddMountTableEntry() throws IOException {
// Add 1
List<MountTable> entries = createMockMountTable(nameservices);
List<MountTable> entries1 = getMountTableEntries("/").getRecords();
assertEquals(0, entries1.size());
MountTable entry0 = entries.get(0);
AddMountTableEntryRequest request =
AddMountTableEntryRequest.newInstance(entry0);
AddMountTableEntryResponse response =
mountStore.addMountTableEntry(request);
assertTrue(response.getStatus());
mountStore.loadCache(true);
List<MountTable> entries2 = getMountTableEntries("/").getRecords();
assertEquals(1, entries2.size());
}
@Test
public void testRemoveMountTableEntry() throws IOException {
// Add many
List<MountTable> entries = createMockMountTable(nameservices);
synchronizeRecords(getStateStore(), entries, MountTable.class);
mountStore.loadCache(true);
List<MountTable> entries1 = getMountTableEntries("/").getRecords();
assertEquals(entries.size(), entries1.size());
// Remove 1
RemoveMountTableEntryRequest request =
RemoveMountTableEntryRequest.newInstance();
request.setSrcPath(entries.get(0).getSourcePath());
assertTrue(mountStore.removeMountTableEntry(request).getStatus());
// Verify remove
mountStore.loadCache(true);
List<MountTable> entries2 = getMountTableEntries("/").getRecords();
assertEquals(entries.size() - 1, entries2.size());
}
@Test
public void testUpdateMountTableEntry() throws IOException {
// Add 1
List<MountTable> entries = createMockMountTable(nameservices);
MountTable entry0 = entries.get(0);
String srcPath = entry0.getSourcePath();
String nsId = entry0.getDefaultLocation().getNameserviceId();
AddMountTableEntryRequest request =
AddMountTableEntryRequest.newInstance(entry0);
AddMountTableEntryResponse response =
mountStore.addMountTableEntry(request);
assertTrue(response.getStatus());
// Verify
mountStore.loadCache(true);
MountTable matchingEntry0 = getMountTableEntry(srcPath);
assertNotNull(matchingEntry0);
assertEquals(nsId, matchingEntry0.getDefaultLocation().getNameserviceId());
// Edit destination nameservice for source path
Map<String, String> destMap =
Collections.singletonMap("testnameservice", "/");
MountTable replacement =
MountTable.newInstance(srcPath, destMap);
UpdateMountTableEntryRequest updateRequest =
UpdateMountTableEntryRequest.newInstance(replacement);
UpdateMountTableEntryResponse updateResponse =
mountStore.updateMountTableEntry(updateRequest);
assertTrue(updateResponse.getStatus());
// Verify
mountStore.loadCache(true);
MountTable matchingEntry1 = getMountTableEntry(srcPath);
assertNotNull(matchingEntry1);
assertEquals("testnameservice",
matchingEntry1.getDefaultLocation().getNameserviceId());
}
/**
* Gets an existing mount table record in the state store.
*
* @param mount The mount point of the record to remove.
* @return The matching record if found, null if it is not found.
* @throws IOException If the state store could not be accessed.
*/
private MountTable getMountTableEntry(String mount) throws IOException {
GetMountTableEntriesRequest request =
GetMountTableEntriesRequest.newInstance(mount);
GetMountTableEntriesResponse response =
mountStore.getMountTableEntries(request);
List<MountTable> results = response.getEntries();
if (results.size() > 0) {
// First result is sorted to have the shortest mount string length
return results.get(0);
}
return null;
}
/**
* Fetch all mount table records beneath a root path.
*
* @param store FederationMountTableStore instance to commit the data.
* @param mount The root search path, enter "/" to return all mount table
* records.
*
* @return A list of all mount table records found below the root mount.
*
* @throws IOException If the state store could not be accessed.
*/
private QueryResult<MountTable> getMountTableEntries(String mount)
throws IOException {
if (mount == null) {
throw new IOException("Please specify a root search path");
}
GetMountTableEntriesRequest request =
GetMountTableEntriesRequest.newInstance();
request.setSrcPath(mount);
GetMountTableEntriesResponse response =
mountStore.getMountTableEntries(request);
List<MountTable> records = response.getEntries();
long timestamp = response.getTimestamp();
return new QueryResult<MountTable>(records, timestamp);
}
}

View File

@ -27,6 +27,7 @@ import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -39,6 +40,7 @@ import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUt
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.federation.store.records.Query;
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
import org.junit.AfterClass;
@ -109,6 +111,11 @@ public class TestStateStoreDriverBase {
generateRandomString(), generateRandomString(),
generateRandomString(), generateRandomString(),
generateRandomEnum(FederationNamenodeServiceState.class), false);
} else if (recordClass == MountTable.class) {
String src = "/" + generateRandomString();
Map<String, String> destMap = Collections.singletonMap(
generateRandomString(), "/" + generateRandomString());
return (T) MountTable.newInstance(src, destMap);
}
return null;
@ -155,6 +162,7 @@ public class TestStateStoreDriverBase {
public static void removeAll(StateStoreDriver driver) throws IOException {
driver.removeAll(MembershipState.class);
driver.removeAll(MountTable.class);
}
public <T extends BaseRecord> void testInsert(
@ -347,22 +355,26 @@ public class TestStateStoreDriverBase {
public void testInsert(StateStoreDriver driver)
throws IllegalArgumentException, IllegalAccessException, IOException {
testInsert(driver, MembershipState.class);
testInsert(driver, MountTable.class);
}
public void testPut(StateStoreDriver driver)
throws IllegalArgumentException, ReflectiveOperationException,
IOException, SecurityException {
testPut(driver, MembershipState.class);
testPut(driver, MountTable.class);
}
public void testRemove(StateStoreDriver driver)
throws IllegalArgumentException, IllegalAccessException, IOException {
testRemove(driver, MembershipState.class);
testRemove(driver, MountTable.class);
}
public void testFetchErrors(StateStoreDriver driver)
throws IllegalArgumentException, IllegalAccessException, IOException {
testFetchErrors(driver, MembershipState.class);
testFetchErrors(driver, MountTable.class);
}
/**

View File

@ -0,0 +1,176 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.records;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
import org.junit.Test;
/**
* Test the Mount Table entry in the State Store.
*/
public class TestMountTable {
private static final String SRC = "/test";
private static final String DST_NS_0 = "ns0";
private static final String DST_NS_1 = "ns1";
private static final String DST_PATH_0 = "/path1";
private static final String DST_PATH_1 = "/path/path2";
private static final List<RemoteLocation> DST = new LinkedList<>();
static {
DST.add(new RemoteLocation(DST_NS_0, DST_PATH_0));
DST.add(new RemoteLocation(DST_NS_1, DST_PATH_1));
}
private static final Map<String, String> DST_MAP = new LinkedHashMap<>();
static {
DST_MAP.put(DST_NS_0, DST_PATH_0);
DST_MAP.put(DST_NS_1, DST_PATH_1);
}
private static final long DATE_CREATED = 100;
private static final long DATE_MOD = 200;
@Test
public void testGetterSetter() throws IOException {
MountTable record = MountTable.newInstance(SRC, DST_MAP);
validateDestinations(record);
assertEquals(SRC, record.getSourcePath());
assertEquals(DST, record.getDestinations());
assertTrue(DATE_CREATED > 0);
assertTrue(DATE_MOD > 0);
MountTable record2 =
MountTable.newInstance(SRC, DST_MAP, DATE_CREATED, DATE_MOD);
validateDestinations(record2);
assertEquals(SRC, record2.getSourcePath());
assertEquals(DST, record2.getDestinations());
assertEquals(DATE_CREATED, record2.getDateCreated());
assertEquals(DATE_MOD, record2.getDateModified());
assertFalse(record.isReadOnly());
assertEquals(DestinationOrder.HASH, record.getDestOrder());
}
@Test
public void testSerialization() throws IOException {
testSerialization(DestinationOrder.RANDOM);
testSerialization(DestinationOrder.HASH);
testSerialization(DestinationOrder.LOCAL);
}
private void testSerialization(final DestinationOrder order)
throws IOException {
MountTable record = MountTable.newInstance(
SRC, DST_MAP, DATE_CREATED, DATE_MOD);
record.setReadOnly(true);
record.setDestOrder(order);
StateStoreSerializer serializer = StateStoreSerializer.getSerializer();
String serializedString = serializer.serializeString(record);
MountTable record2 =
serializer.deserialize(serializedString, MountTable.class);
validateDestinations(record2);
assertEquals(SRC, record2.getSourcePath());
assertEquals(DST, record2.getDestinations());
assertEquals(DATE_CREATED, record2.getDateCreated());
assertEquals(DATE_MOD, record2.getDateModified());
assertTrue(record2.isReadOnly());
assertEquals(order, record2.getDestOrder());
}
@Test
public void testReadOnly() throws IOException {
Map<String, String> dest = new HashMap<>();
dest.put(DST_NS_0, DST_PATH_0);
dest.put(DST_NS_1, DST_PATH_1);
MountTable record1 = MountTable.newInstance(SRC, dest);
record1.setReadOnly(true);
validateDestinations(record1);
assertEquals(SRC, record1.getSourcePath());
assertEquals(DST, record1.getDestinations());
assertTrue(DATE_CREATED > 0);
assertTrue(DATE_MOD > 0);
assertTrue(record1.isReadOnly());
MountTable record2 = MountTable.newInstance(
SRC, DST_MAP, DATE_CREATED, DATE_MOD);
record2.setReadOnly(true);
validateDestinations(record2);
assertEquals(SRC, record2.getSourcePath());
assertEquals(DST, record2.getDestinations());
assertEquals(DATE_CREATED, record2.getDateCreated());
assertEquals(DATE_MOD, record2.getDateModified());
assertTrue(record2.isReadOnly());
}
@Test
public void testOrder() throws IOException {
testOrder(DestinationOrder.HASH);
testOrder(DestinationOrder.LOCAL);
testOrder(DestinationOrder.RANDOM);
}
private void testOrder(final DestinationOrder order)
throws IOException {
MountTable record = MountTable.newInstance(
SRC, DST_MAP, DATE_CREATED, DATE_MOD);
record.setDestOrder(order);
validateDestinations(record);
assertEquals(SRC, record.getSourcePath());
assertEquals(DST, record.getDestinations());
assertEquals(DATE_CREATED, record.getDateCreated());
assertEquals(DATE_MOD, record.getDateModified());
assertEquals(order, record.getDestOrder());
}
private void validateDestinations(MountTable record) {
assertEquals(SRC, record.getSourcePath());
assertEquals(2, record.getDestinations().size());
RemoteLocation location1 = record.getDestinations().get(0);
assertEquals(DST_NS_0, location1.getNameserviceId());
assertEquals(DST_PATH_0, location1.getDest());
RemoteLocation location2 = record.getDestinations().get(1);
assertEquals(DST_NS_1, location2.getNameserviceId());
assertEquals(DST_PATH_1, location2.getDest());
}
}