diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 2871bdc8a18..9b77a908dba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -27,6 +27,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker; +import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; @@ -1181,8 +1183,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { // HDFS Router State Store connection public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS = FEDERATION_ROUTER_PREFIX + "file.resolver.client.class"; - public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS_DEFAULT = - "org.apache.hadoop.hdfs.server.federation.MockResolver"; + public static final Class + FEDERATION_FILE_RESOLVER_CLIENT_CLASS_DEFAULT = + MountTableResolver.class; public static final String FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS = FEDERATION_ROUTER_PREFIX + "namenode.resolver.client.class"; public static final Class diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableManager.java new file mode 100644 index 00000000000..c2e4a5b4473 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableManager.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse; + +/** + * Manage a mount table. + */ +public interface MountTableManager { + + /** + * Add an entry to the mount table. + * + * @param request Fully populated request object. + * @return True if the mount table entry was successfully committed to the + * data store. + * @throws IOException Throws exception if the data store is not initialized. + */ + AddMountTableEntryResponse addMountTableEntry( + AddMountTableEntryRequest request) throws IOException; + + /** + * Updates an existing entry in the mount table. + * + * @param request Fully populated request object. + * @return True if the mount table entry was successfully committed to the + * data store. + * @throws IOException Throws exception if the data store is not initialized. + */ + UpdateMountTableEntryResponse updateMountTableEntry( + UpdateMountTableEntryRequest request) throws IOException; + + /** + * Remove an entry from the mount table. + * + * @param request Fully populated request object. + * @return True the mount table entry was removed from the data store. + * @throws IOException Throws exception if the data store is not initialized. + */ + RemoveMountTableEntryResponse removeMountTableEntry( + RemoveMountTableEntryRequest request) throws IOException; + + /** + * List all mount table entries present at or below the path. Fetches from the + * state store. + * + * @param request Fully populated request object. + * + * @return List of all mount table entries under the path. Zero-length list if + * none are found. + * @throws IOException Throws exception if the data store cannot be queried. + */ + GetMountTableEntriesResponse getMountTableEntries( + GetMountTableEntriesRequest request) throws IOException; +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java new file mode 100644 index 00000000000..13e3db39e4c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java @@ -0,0 +1,544 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.store.MountTableStore; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreCache; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Mount table to map between global paths and remote locations. This allows the + * {@link org.apache.hadoop.hdfs.server.federation.router.Router Router} to map + * the global HDFS view to the remote namespaces. This is similar to + * {@link org.apache.hadoop.fs.viewfs.ViewFs ViewFs}. + * This is implemented as a tree. + */ +public class MountTableResolver + implements FileSubclusterResolver, StateStoreCache { + + private static final Logger LOG = + LoggerFactory.getLogger(MountTableResolver.class); + + /** Reference to Router. */ + private final Router router; + /** Reference to the State Store. */ + private final StateStoreService stateStore; + /** Interface to the mount table store. */ + private MountTableStore mountTableStore; + + /** If the tree has been initialized. */ + private boolean init = false; + /** Path -> Remote HDFS location. */ + private final TreeMap tree = new TreeMap<>(); + /** Path -> Remote location. */ + private final ConcurrentNavigableMap locationCache = + new ConcurrentSkipListMap<>(); + + /** Default nameservice when no mount matches the math. */ + private String defaultNameService = ""; + + /** Synchronization for both the tree and the cache. */ + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private final Lock readLock = readWriteLock.readLock(); + private final Lock writeLock = readWriteLock.writeLock(); + + + @VisibleForTesting + public MountTableResolver(Configuration conf) { + this(conf, (StateStoreService)null); + } + + public MountTableResolver(Configuration conf, Router routerService) { + this.router = routerService; + if (this.router != null) { + this.stateStore = this.router.getStateStore(); + } else { + this.stateStore = null; + } + + registerCacheExternal(); + initDefaultNameService(conf); + } + + public MountTableResolver(Configuration conf, StateStoreService store) { + this.router = null; + this.stateStore = store; + + registerCacheExternal(); + initDefaultNameService(conf); + } + + /** + * Request cache updates from the State Store for this resolver. + */ + private void registerCacheExternal() { + if (this.stateStore != null) { + this.stateStore.registerCacheExternal(this); + } + } + + /** + * Nameservice for APIs that cannot be resolved to a specific one. + * + * @param conf Configuration for this resolver. + */ + private void initDefaultNameService(Configuration conf) { + try { + this.defaultNameService = conf.get( + DFS_ROUTER_DEFAULT_NAMESERVICE, + DFSUtil.getNamenodeNameServiceId(conf)); + } catch (HadoopIllegalArgumentException e) { + LOG.error("Cannot find default name service, setting it to the first"); + Collection nsIds = DFSUtilClient.getNameServiceIds(conf); + this.defaultNameService = nsIds.iterator().next(); + LOG.info("Default name service: {}", this.defaultNameService); + } + } + + /** + * Get a reference for the Router for this resolver. + * + * @return Router for this resolver. + */ + protected Router getRouter() { + return this.router; + } + + /** + * Get the mount table store for this resolver. + * + * @return Mount table store. + * @throws IOException If it cannot connect to the State Store. + */ + protected MountTableStore getMountTableStore() throws IOException { + if (this.mountTableStore == null) { + this.mountTableStore = this.stateStore.getRegisteredRecordStore( + MountTableStore.class); + if (this.mountTableStore == null) { + throw new IOException("State Store does not have an interface for " + + MountTableStore.class); + } + } + return this.mountTableStore; + } + + /** + * Add a mount entry to the table. + * + * @param entry The mount table record to add from the state store. + */ + public void addEntry(final MountTable entry) { + writeLock.lock(); + try { + String srcPath = entry.getSourcePath(); + this.tree.put(srcPath, entry); + invalidateLocationCache(srcPath); + } finally { + writeLock.unlock(); + } + this.init = true; + } + + /** + * Remove a mount table entry. + * + * @param srcPath Source path for the entry to remove. + */ + public void removeEntry(final String srcPath) { + writeLock.lock(); + try { + this.tree.remove(srcPath); + invalidateLocationCache(srcPath); + } finally { + writeLock.unlock(); + } + } + + /** + * Invalidates all cache entries below this path. It requires the write lock. + * + * @param src Source path. + */ + private void invalidateLocationCache(final String path) { + if (locationCache.isEmpty()) { + return; + } + // Determine next lexicographic entry after source path + String nextSrc = path + Character.MAX_VALUE; + ConcurrentNavigableMap subMap = + locationCache.subMap(path, nextSrc); + for (final String key : subMap.keySet()) { + locationCache.remove(key); + } + } + + /** + * Updates the mount path tree with a new set of mount table entries. It also + * updates the needed caches. + * + * @param entries Full set of mount table entries to update. + */ + @VisibleForTesting + public void refreshEntries(final Collection entries) { + // The tree read/write must be atomic + writeLock.lock(); + try { + // New entries + Map newEntries = new ConcurrentHashMap<>(); + for (MountTable entry : entries) { + String srcPath = entry.getSourcePath(); + newEntries.put(srcPath, entry); + } + + // Old entries (reversed to sort from the leaves to the root) + Set oldEntries = new TreeSet<>(Collections.reverseOrder()); + for (MountTable entry : getTreeValues("/")) { + String srcPath = entry.getSourcePath(); + oldEntries.add(srcPath); + } + + // Entries that need to be removed + for (String srcPath : oldEntries) { + if (!newEntries.containsKey(srcPath)) { + this.tree.remove(srcPath); + invalidateLocationCache(srcPath); + LOG.info("Removed stale mount point {} from resolver", srcPath); + } + } + + // Entries that need to be added + for (MountTable entry : entries) { + String srcPath = entry.getSourcePath(); + if (!oldEntries.contains(srcPath)) { + // Add node, it does not exist + this.tree.put(srcPath, entry); + LOG.info("Added new mount point {} to resolver", srcPath); + } else { + // Node exists, check for updates + MountTable existingEntry = this.tree.get(srcPath); + if (existingEntry != null && !existingEntry.equals(entry)) { + // Entry has changed + invalidateLocationCache(srcPath); + LOG.info("Updated mount point {} in resolver"); + } + } + } + } finally { + writeLock.unlock(); + } + this.init = true; + } + + /** + * Replaces the current in-memory cached of the mount table with a new + * version fetched from the data store. + */ + @Override + public boolean loadCache(boolean force) { + try { + // Our cache depends on the store, update it first + MountTableStore mountTable = this.getMountTableStore(); + mountTable.loadCache(force); + + GetMountTableEntriesRequest request = + GetMountTableEntriesRequest.newInstance("/"); + GetMountTableEntriesResponse response = + mountTable.getMountTableEntries(request); + List records = response.getEntries(); + refreshEntries(records); + } catch (IOException e) { + LOG.error("Cannot fetch mount table entries from State Store", e); + return false; + } + return true; + } + + /** + * Clears all data. + */ + public void clear() { + LOG.info("Clearing all mount location caches"); + writeLock.lock(); + try { + this.locationCache.clear(); + this.tree.clear(); + } finally { + writeLock.unlock(); + } + } + + @Override + public PathLocation getDestinationForPath(final String path) + throws IOException { + verifyMountTable(); + readLock.lock(); + try { + return this.locationCache.computeIfAbsent( + path, this::lookupLocation); + } finally { + readLock.unlock(); + } + } + + /** + * Build the path location to insert into the cache atomically. It must hold + * the read lock. + * @param path Path to check/insert. + * @return New remote location. + */ + public PathLocation lookupLocation(final String path) { + PathLocation ret = null; + MountTable entry = findDeepest(path); + if (entry != null) { + ret = buildLocation(path, entry); + } else { + // Not found, use default location + RemoteLocation remoteLocation = + new RemoteLocation(defaultNameService, path); + List locations = + Collections.singletonList(remoteLocation); + ret = new PathLocation(null, locations); + } + return ret; + } + + /** + * Get the mount table entry for a path. + * + * @param path Path to look for. + * @return Mount table entry the path belongs. + * @throws IOException If the State Store could not be reached. + */ + public MountTable getMountPoint(final String path) throws IOException { + verifyMountTable(); + return findDeepest(path); + } + + @Override + public List getMountPoints(final String path) throws IOException { + verifyMountTable(); + + Set children = new TreeSet<>(); + readLock.lock(); + try { + String from = path; + String to = path + Character.MAX_VALUE; + SortedMap subMap = this.tree.subMap(from, to); + + boolean exists = false; + for (String subPath : subMap.keySet()) { + String child = subPath; + + // Special case for / + if (!path.equals(Path.SEPARATOR)) { + // Get the children + int ini = path.length(); + child = subPath.substring(ini); + } + + if (child.isEmpty()) { + // This is a mount point but without children + exists = true; + } else if (child.startsWith(Path.SEPARATOR)) { + // This is a mount point with children + exists = true; + child = child.substring(1); + + // We only return immediate children + int fin = child.indexOf(Path.SEPARATOR); + if (fin > -1) { + child = child.substring(0, fin); + } + if (!child.isEmpty()) { + children.add(child); + } + } + } + if (!exists) { + return null; + } + return new LinkedList<>(children); + } finally { + readLock.unlock(); + } + } + + /** + * Get all the mount records at or beneath a given path. + * @param path Path to get the mount points from. + * @return List of mount table records under the path or null if the path is + * not found. + * @throws IOException If it's not connected to the State Store. + */ + public List getMounts(final String path) throws IOException { + verifyMountTable(); + + return getTreeValues(path, false); + } + + /** + * Check if the Mount Table is ready to be used. + * @throws StateStoreUnavailableException If it cannot connect to the store. + */ + private void verifyMountTable() throws StateStoreUnavailableException { + if (!this.init) { + throw new StateStoreUnavailableException("Mount Table not initialized"); + } + } + + @Override + public String toString() { + readLock.lock(); + try { + return this.tree.toString(); + } finally { + readLock.unlock(); + } + } + + /** + * Build a location for this result beneath the discovered mount point. + * + * @param result Tree node search result. + * @return PathLocation containing the namespace, local path. + */ + private static PathLocation buildLocation( + final String path, final MountTable entry) { + + String srcPath = entry.getSourcePath(); + if (!path.startsWith(srcPath)) { + LOG.error("Cannot build location, {} not a child of {}", path, srcPath); + return null; + } + String remainingPath = path.substring(srcPath.length()); + if (remainingPath.startsWith(Path.SEPARATOR)) { + remainingPath = remainingPath.substring(1); + } + + List locations = new LinkedList<>(); + for (RemoteLocation oneDst : entry.getDestinations()) { + String nsId = oneDst.getNameserviceId(); + String dest = oneDst.getDest(); + String newPath = dest; + if (!newPath.endsWith(Path.SEPARATOR)) { + newPath += Path.SEPARATOR; + } + newPath += remainingPath; + RemoteLocation remoteLocation = new RemoteLocation(nsId, newPath); + locations.add(remoteLocation); + } + DestinationOrder order = entry.getDestOrder(); + return new PathLocation(srcPath, locations, order); + } + + @Override + public String getDefaultNamespace() { + return this.defaultNameService; + } + + /** + * Find the deepest mount point for a path. + * @param path Path to look for. + * @return Mount table entry. + */ + private MountTable findDeepest(final String path) { + readLock.lock(); + try { + Entry entry = this.tree.floorEntry(path); + while (entry != null && !path.startsWith(entry.getKey())) { + entry = this.tree.lowerEntry(entry.getKey()); + } + if (entry == null) { + return null; + } + return entry.getValue(); + } finally { + readLock.unlock(); + } + } + + /** + * Get the mount table entries under a path. + * @param path Path to search from. + * @return Mount Table entries. + */ + private List getTreeValues(final String path) { + return getTreeValues(path, false); + } + + /** + * Get the mount table entries under a path. + * @param path Path to search from. + * @param reverse If the order should be reversed. + * @return Mount Table entries. + */ + private List getTreeValues(final String path, boolean reverse) { + LinkedList ret = new LinkedList<>(); + readLock.lock(); + try { + String from = path; + String to = path + Character.MAX_VALUE; + SortedMap subMap = this.tree.subMap(from, to); + for (MountTable entry : subMap.values()) { + if (!reverse) { + ret.add(entry); + } else { + ret.addFirst(entry); + } + } + } finally { + readLock.unlock(); + } + return ret; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/PathLocation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/PathLocation.java index d90565c21d4..945d81df516 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/PathLocation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/PathLocation.java @@ -23,21 +23,27 @@ import java.util.LinkedList; import java.util.List; import java.util.Set; +import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * A map of the properties and target destinations (name space + path) for - * a path in the global/federated namespace. + * a path in the global/federated name space. * This data is generated from the @see MountTable records. */ public class PathLocation { + private static final Logger LOG = LoggerFactory.getLogger(PathLocation.class); + + /** Source path in global namespace. */ private final String sourcePath; - /** Remote paths in the target namespaces. */ + /** Remote paths in the target name spaces. */ private final List destinations; - - /** List of name spaces present. */ - private final Set namespaces; + /** Order for the destinations. */ + private final DestinationOrder destOrder; /** @@ -45,14 +51,23 @@ public class PathLocation { * * @param source Source path in the global name space. * @param dest Destinations of the mount table entry. - * @param namespaces Unique identifier representing the combination of - * name spaces present in the destination list. + * @param order Order of the locations. */ public PathLocation( - String source, List dest, Set nss) { + String source, List dest, DestinationOrder order) { this.sourcePath = source; - this.destinations = dest; - this.namespaces = nss; + this.destinations = Collections.unmodifiableList(dest); + this.destOrder = order; + } + + /** + * Create a new PathLocation with default HASH order. + * + * @param source Source path in the global name space. + * @param dest Destinations of the mount table entry. + */ + public PathLocation(String source, List dest) { + this(source, dest, DestinationOrder.HASH); } /** @@ -60,10 +75,55 @@ public class PathLocation { * * @param other Other path location to copy from. */ - public PathLocation(PathLocation other) { + public PathLocation(final PathLocation other) { this.sourcePath = other.sourcePath; - this.destinations = new LinkedList(other.destinations); - this.namespaces = new HashSet(other.namespaces); + this.destinations = Collections.unmodifiableList(other.destinations); + this.destOrder = other.destOrder; + } + + /** + * Create a path location from another path with the destinations sorted. + * + * @param other Other path location to copy from. + * @param firstNsId Identifier of the namespace to place first. + */ + public PathLocation(PathLocation other, String firstNsId) { + this.sourcePath = other.sourcePath; + this.destOrder = other.destOrder; + this.destinations = orderedNamespaces(other.destinations, firstNsId); + } + + /** + * Prioritize a location/destination by its name space/nameserviceId. + * This destination might be used by other threads, so the source is not + * modifiable. + * + * @param original List of destinations to order. + * @param nsId The name space/nameserviceID to prioritize. + * @return Prioritized list of detinations that cannot be modified. + */ + private static List orderedNamespaces( + final List original, final String nsId) { + if (original.size() <= 1) { + return original; + } + + LinkedList newDestinations = new LinkedList<>(); + boolean found = false; + for (RemoteLocation dest : original) { + if (dest.getNameserviceId().equals(nsId)) { + found = true; + newDestinations.addFirst(dest); + } else { + newDestinations.add(dest); + } + } + + if (!found) { + LOG.debug("Cannot find location with namespace {} in {}", + nsId, original); + } + return Collections.unmodifiableList(newDestinations); } /** @@ -76,16 +136,37 @@ public class PathLocation { } /** - * Get the list of subclusters defined for the destinations. + * Get the subclusters defined for the destinations. + * + * @return Set containing the subclusters. */ public Set getNamespaces() { - return Collections.unmodifiableSet(this.namespaces); + Set namespaces = new HashSet<>(); + List locations = this.getDestinations(); + for (RemoteLocation location : locations) { + String nsId = location.getNameserviceId(); + namespaces.add(nsId); + } + return namespaces; } @Override public String toString() { - RemoteLocation loc = getDefaultLocation(); - return loc.getNameserviceId() + "->" + loc.getDest(); + StringBuilder sb = new StringBuilder(); + for (RemoteLocation destination : this.destinations) { + String nsId = destination.getNameserviceId(); + String path = destination.getDest(); + if (sb.length() > 0) { + sb.append(","); + } + sb.append(nsId + "->" + path); + } + if (this.destinations.size() > 1) { + sb.append(" ["); + sb.append(this.destOrder.toString()); + sb.append("]"); + } + return sb.toString(); } /** @@ -107,6 +188,15 @@ public class PathLocation { return Collections.unmodifiableList(this.destinations); } + /** + * Get the order for the destinations. + * + * @return Order for the destinations. + */ + public DestinationOrder getDestinationOrder() { + return this.destOrder; + } + /** * Get the default or highest priority location. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java new file mode 100644 index 00000000000..4bccf1097ac --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver.order; + +/** + * Order of the destinations when we have multiple of them. When the resolver + * of files to subclusters (FileSubclusterResolver) has multiple destinations, + * this determines which location should be checked first. + */ +public enum DestinationOrder { + HASH, // Follow consistent hashing + LOCAL, // Local first + RANDOM // Random order +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/package-info.java new file mode 100644 index 00000000000..f90152fc729 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/package-info.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * A federated location can be resolved to multiple subclusters. This package + * takes care of the order in which this multiple destinations should be used. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving + +package org.apache.hadoop.hdfs.server.federation.resolver.order; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java index 78c473a3d46..99af2d83db9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java @@ -135,66 +135,20 @@ public final class FederationUtil { } } - /** - * Create an instance of an interface with a constructor using a state store - * constructor. - * - * @param conf Configuration - * @param context Context object to pass to the instance. - * @param contextType Type of the context passed to the constructor. - * @param configurationKeyName Configuration key to retrieve the class to load - * @param defaultClassName Default class to load if the configuration key is - * not set - * @param clazz Class/interface that must be implemented by the instance. - * @return New instance of the specified class that implements the desired - * interface and a single parameter constructor containing a - * StateStore reference. - */ - private static T newInstance(final Configuration conf, - final R context, final Class contextClass, - final String configKeyName, final String defaultClassName, - final Class clazz) { - - String className = conf.get(configKeyName, defaultClassName); - try { - Class instance = conf.getClassByName(className); - if (clazz.isAssignableFrom(instance)) { - if (contextClass == null) { - // Default constructor if no context - @SuppressWarnings("unchecked") - Constructor constructor = - (Constructor) instance.getConstructor(); - return constructor.newInstance(); - } else { - // Constructor with context - @SuppressWarnings("unchecked") - Constructor constructor = (Constructor) instance.getConstructor( - Configuration.class, contextClass); - return constructor.newInstance(conf, context); - } - } else { - throw new RuntimeException("Class " + className + " not instance of " - + clazz.getCanonicalName()); - } - } catch (ReflectiveOperationException e) { - LOG.error("Could not instantiate: " + className, e); - return null; - } - } - /** * Creates an instance of a FileSubclusterResolver from the configuration. * * @param conf Configuration that defines the file resolver class. - * @param obj Context object passed to class constructor. - * @return FileSubclusterResolver + * @param router Router service. + * @return New file subcluster resolver. */ public static FileSubclusterResolver newFileSubclusterResolver( - Configuration conf, StateStoreService stateStore) { - return newInstance(conf, stateStore, StateStoreService.class, + Configuration conf, Router router) { + Class clazz = conf.getClass( DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS, DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS_DEFAULT, FileSubclusterResolver.class); + return newInstance(conf, router, Router.class, clazz); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java index cfddf200b66..213a58fc110 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java @@ -124,8 +124,7 @@ public class Router extends CompositeService { } // Lookup interface to map between the global and subcluster name spaces - this.subclusterResolver = newFileSubclusterResolver( - this.conf, this.stateStore); + this.subclusterResolver = newFileSubclusterResolver(this.conf, this); if (this.subclusterResolver == null) { throw new IOException("Cannot find subcluster resolver"); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java new file mode 100644 index 00000000000..b43965997d0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; + +/** + * Management API for the HDFS mount table information stored in + * {@link org.apache.hadoop.hdfs.server.federation.store.records.MountTable + * MountTable} records. The mount table contains entries that map a particular + * global namespace path one or more HDFS nameservices (NN) + target path. It is + * possible to map mount locations for root folders, directories or individual + * files. + *

+ * Once fetched from the + * {@link org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver + * StateStoreDriver}, MountTable records are cached in a tree for faster access. + * Each path in the global namespace is mapped to a nameserivce ID and local + * path upon request. The cache is periodically updated by the @{link + * StateStoreCacheUpdateService}. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class MountTableStore extends CachedRecordStore + implements MountTableManager { + + public MountTableStore(StateStoreDriver driver) { + super(MountTable.class, driver); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java index 73f607f0dd3..3aa3ffd394b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java @@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; import org.apache.hadoop.hdfs.server.federation.store.impl.MembershipStoreImpl; +import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; import org.apache.hadoop.service.CompositeService; @@ -136,6 +137,7 @@ public class StateStoreService extends CompositeService { // Add supported record stores addRecordStore(MembershipStoreImpl.class); + addRecordStore(MountTableStoreImpl.class); // Check the connection to the State Store periodically this.monitorService = new StateStoreConnectionMonitorService(this); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java new file mode 100644 index 00000000000..e6affb2cae8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.impl; + +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.federation.store.MountTableStore; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.hdfs.server.federation.store.records.Query; +import org.apache.hadoop.util.Time; + +/** + * Implementation of the {@link MountTableStore} state store API. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class MountTableStoreImpl extends MountTableStore { + + public MountTableStoreImpl(StateStoreDriver driver) { + super(driver); + } + + @Override + public AddMountTableEntryResponse addMountTableEntry( + AddMountTableEntryRequest request) throws IOException { + boolean status = getDriver().put(request.getEntry(), false, true); + AddMountTableEntryResponse response = + AddMountTableEntryResponse.newInstance(); + response.setStatus(status); + return response; + } + + @Override + public UpdateMountTableEntryResponse updateMountTableEntry( + UpdateMountTableEntryRequest request) throws IOException { + MountTable entry = request.getEntry(); + boolean status = getDriver().put(entry, true, true); + UpdateMountTableEntryResponse response = + UpdateMountTableEntryResponse.newInstance(); + response.setStatus(status); + return response; + } + + @Override + public RemoveMountTableEntryResponse removeMountTableEntry( + RemoveMountTableEntryRequest request) throws IOException { + final String srcPath = request.getSrcPath(); + final MountTable partial = MountTable.newInstance(); + partial.setSourcePath(srcPath); + final Query query = new Query<>(partial); + int removedRecords = getDriver().remove(getRecordClass(), query); + boolean status = (removedRecords == 1); + RemoveMountTableEntryResponse response = + RemoveMountTableEntryResponse.newInstance(); + response.setStatus(status); + return response; + } + + @Override + public GetMountTableEntriesResponse getMountTableEntries( + GetMountTableEntriesRequest request) throws IOException { + + // Get all values from the cache + List records = getCachedRecords(); + + // Sort and filter + Collections.sort(records); + String reqSrcPath = request.getSrcPath(); + if (reqSrcPath != null && !reqSrcPath.isEmpty()) { + // Return only entries beneath this path + Iterator it = records.iterator(); + while (it.hasNext()) { + MountTable record = it.next(); + String srcPath = record.getSourcePath(); + if (!srcPath.startsWith(reqSrcPath)) { + it.remove(); + } + } + } + + GetMountTableEntriesResponse response = + GetMountTableEntriesResponse.newInstance(); + response.setEntries(records); + response.setTimestamp(Time.now()); + return response; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/AddMountTableEntryRequest.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/AddMountTableEntryRequest.java new file mode 100644 index 00000000000..2d9f1027764 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/AddMountTableEntryRequest.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; + +/** + * API request for adding a mount table entry to the state store. + */ +public abstract class AddMountTableEntryRequest { + + public static AddMountTableEntryRequest newInstance() { + return StateStoreSerializer.newRecord(AddMountTableEntryRequest.class); + } + + public static AddMountTableEntryRequest newInstance(MountTable newEntry) { + AddMountTableEntryRequest request = newInstance(); + request.setEntry(newEntry); + return request; + } + + @Public + @Unstable + public abstract MountTable getEntry(); + + @Public + @Unstable + public abstract void setEntry(MountTable mount); +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/AddMountTableEntryResponse.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/AddMountTableEntryResponse.java new file mode 100644 index 00000000000..9bc7f920d54 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/AddMountTableEntryResponse.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; + +/** + * API response for adding a mount table entry to the state store. + */ +public abstract class AddMountTableEntryResponse { + + public static AddMountTableEntryResponse newInstance() throws IOException { + return StateStoreSerializer.newRecord(AddMountTableEntryResponse.class); + } + + @Public + @Unstable + public abstract boolean getStatus(); + + @Public + @Unstable + public abstract void setStatus(boolean result); +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetMountTableEntriesRequest.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetMountTableEntriesRequest.java new file mode 100644 index 00000000000..cd6c2781780 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetMountTableEntriesRequest.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; + +/** + * API request for listing mount table entries present in the state store. + */ +public abstract class GetMountTableEntriesRequest { + + public static GetMountTableEntriesRequest newInstance() throws IOException { + return StateStoreSerializer.newRecord(GetMountTableEntriesRequest.class); + } + + public static GetMountTableEntriesRequest newInstance(String srcPath) + throws IOException { + GetMountTableEntriesRequest request = newInstance(); + request.setSrcPath(srcPath); + return request; + } + + @Public + @Unstable + public abstract String getSrcPath(); + + @Public + @Unstable + public abstract void setSrcPath(String path); +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetMountTableEntriesResponse.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetMountTableEntriesResponse.java new file mode 100644 index 00000000000..cebc3f6288d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/GetMountTableEntriesResponse.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; + +/** + * API response for listing mount table entries present in the state store. + */ +public abstract class GetMountTableEntriesResponse { + + public static GetMountTableEntriesResponse newInstance() throws IOException { + return StateStoreSerializer.newRecord(GetMountTableEntriesResponse.class); + } + + @Public + @Unstable + public abstract List getEntries() throws IOException; + + @Public + @Unstable + public abstract void setEntries(List entries) + throws IOException; + + @Public + @Unstable + public abstract long getTimestamp(); + + @Public + @Unstable + public abstract void setTimestamp(long time); +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RemoveMountTableEntryRequest.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RemoveMountTableEntryRequest.java new file mode 100644 index 00000000000..642ee0dbd28 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RemoveMountTableEntryRequest.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; + +/** + * API request for removing a mount table path present in the state store. + */ +public abstract class RemoveMountTableEntryRequest { + + public static RemoveMountTableEntryRequest newInstance() throws IOException { + return StateStoreSerializer.newRecord(RemoveMountTableEntryRequest.class); + } + + public static RemoveMountTableEntryRequest newInstance(String path) + throws IOException { + RemoveMountTableEntryRequest request = newInstance(); + request.setSrcPath(path); + return request; + } + + @Public + @Unstable + public abstract String getSrcPath(); + + @Public + @Unstable + public abstract void setSrcPath(String path); +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RemoveMountTableEntryResponse.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RemoveMountTableEntryResponse.java new file mode 100644 index 00000000000..70f117d590a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RemoveMountTableEntryResponse.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; + +/** + * API response for removing a mount table path present in the state store. + */ +public abstract class RemoveMountTableEntryResponse { + + public static RemoveMountTableEntryResponse newInstance() throws IOException { + return StateStoreSerializer.newRecord(RemoveMountTableEntryResponse.class); + } + + @Public + @Unstable + public abstract boolean getStatus(); + + @Public + @Unstable + public abstract void setStatus(boolean result); +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/UpdateMountTableEntryRequest.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/UpdateMountTableEntryRequest.java new file mode 100644 index 00000000000..afd5128910d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/UpdateMountTableEntryRequest.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; + +/** + * API request for updating the destination of an existing mount point in the + * state store. + */ +public abstract class UpdateMountTableEntryRequest { + + public static UpdateMountTableEntryRequest newInstance() throws IOException { + return StateStoreSerializer.newRecord(UpdateMountTableEntryRequest.class); + } + + public static UpdateMountTableEntryRequest newInstance(MountTable entry) + throws IOException { + UpdateMountTableEntryRequest request = newInstance(); + request.setEntry(entry); + return request; + } + + @Public + @Unstable + public abstract MountTable getEntry() throws IOException; + + @Public + @Unstable + public abstract void setEntry(MountTable mount) throws IOException; +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/UpdateMountTableEntryResponse.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/UpdateMountTableEntryResponse.java new file mode 100644 index 00000000000..7097e1047fb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/UpdateMountTableEntryResponse.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; + +/** + * API response for updating the destination of an existing mount point in the + * state store. + */ +public abstract class UpdateMountTableEntryResponse { + + public static UpdateMountTableEntryResponse newInstance() throws IOException { + return StateStoreSerializer.newRecord(UpdateMountTableEntryResponse.class); + } + + @Public + @Unstable + public abstract boolean getStatus(); + + @Public + @Unstable + public abstract void setStatus(boolean result); +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/AddMountTableEntryRequestPBImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/AddMountTableEntryRequestPBImpl.java new file mode 100644 index 00000000000..35455d2dc40 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/AddMountTableEntryRequestPBImpl.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntryRequestProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntryRequestProtoOrBuilder; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProto; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.MountTablePBImpl; +import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord; + +import com.google.protobuf.Message; + +/** + * Protobuf implementation of the state store API object + * AddMountTableEntryRequest. + */ +public class AddMountTableEntryRequestPBImpl + extends AddMountTableEntryRequest implements PBRecord { + + private FederationProtocolPBTranslator translator = + new FederationProtocolPBTranslator( + AddMountTableEntryRequestProto.class); + + public AddMountTableEntryRequestPBImpl() { + } + + public AddMountTableEntryRequestPBImpl(AddMountTableEntryRequestProto proto) { + this.translator.setProto(proto); + } + + @Override + public AddMountTableEntryRequestProto getProto() { + return this.translator.build(); + } + + @Override + public void setProto(Message proto) { + this.translator.setProto(proto); + } + + @Override + public void readInstance(String base64String) throws IOException { + this.translator.readInstance(base64String); + } + + @Override + public MountTable getEntry() { + MountTableRecordProto entryProto = + this.translator.getProtoOrBuilder().getEntry(); + return new MountTablePBImpl(entryProto); + } + + @Override + public void setEntry(MountTable mount) { + if (mount instanceof MountTablePBImpl) { + MountTablePBImpl mountPB = (MountTablePBImpl)mount; + MountTableRecordProto mountProto = mountPB.getProto(); + translator.getBuilder().setEntry(mountProto); + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/AddMountTableEntryResponsePBImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/AddMountTableEntryResponsePBImpl.java new file mode 100644 index 00000000000..c1d9a6544ab --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/AddMountTableEntryResponsePBImpl.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntryResponseProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntryResponseProtoOrBuilder; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord; + +import com.google.protobuf.Message; + +/** + * Protobuf implementation of the state store API object + * AddMountTableEntryResponse. + */ +public class AddMountTableEntryResponsePBImpl + extends AddMountTableEntryResponse implements PBRecord { + + private FederationProtocolPBTranslator translator = + new FederationProtocolPBTranslator( + AddMountTableEntryResponseProto.class); + + public AddMountTableEntryResponsePBImpl() { + } + + public AddMountTableEntryResponsePBImpl( + AddMountTableEntryResponseProto proto) { + this.translator.setProto(proto); + } + + @Override + public AddMountTableEntryResponseProto getProto() { + return this.translator.build(); + } + + @Override + public void setProto(Message proto) { + this.translator.setProto(proto); + } + + @Override + public void readInstance(String base64String) throws IOException { + this.translator.readInstance(base64String); + } + + @Override + public boolean getStatus() { + return this.translator.getProtoOrBuilder().getStatus(); + } + + @Override + public void setStatus(boolean result) { + this.translator.getBuilder().setStatus(result); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/GetMountTableEntriesRequestPBImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/GetMountTableEntriesRequestPBImpl.java new file mode 100644 index 00000000000..3e0d1a61d7f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/GetMountTableEntriesRequestPBImpl.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetMountTableEntriesRequestProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetMountTableEntriesRequestProtoOrBuilder; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord; + +import com.google.protobuf.Message; + +/** + * Protobuf implementation of the state store API object + * GetMountTableEntriesRequest. + */ +public class GetMountTableEntriesRequestPBImpl + extends GetMountTableEntriesRequest implements PBRecord { + + private FederationProtocolPBTranslator translator = + new FederationProtocolPBTranslator( + GetMountTableEntriesRequestProto.class); + + public GetMountTableEntriesRequestPBImpl() { + } + + public GetMountTableEntriesRequestPBImpl( + GetMountTableEntriesRequestProto proto) { + this.translator.setProto(proto); + } + + @Override + public GetMountTableEntriesRequestProto getProto() { + return this.translator.build(); + } + + @Override + public void setProto(Message proto) { + this.translator.setProto(proto); + } + + @Override + public void readInstance(String base64String) throws IOException { + this.translator.readInstance(base64String); + } + + @Override + public String getSrcPath() { + return this.translator.getProtoOrBuilder().getSrcPath(); + } + + @Override + public void setSrcPath(String path) { + this.translator.getBuilder().setSrcPath(path); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/GetMountTableEntriesResponsePBImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/GetMountTableEntriesResponsePBImpl.java new file mode 100644 index 00000000000..9d64bc9bfa7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/GetMountTableEntriesResponsePBImpl.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetMountTableEntriesResponseProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetMountTableEntriesResponseProtoOrBuilder; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProto; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.MountTablePBImpl; +import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord; + +import com.google.protobuf.Message; + +/** + * Protobuf implementation of the state store API object + * GetMountTableEntriesResponse. + */ +public class GetMountTableEntriesResponsePBImpl + extends GetMountTableEntriesResponse implements PBRecord { + + private FederationProtocolPBTranslator translator = + new FederationProtocolPBTranslator( + GetMountTableEntriesResponseProto.class); + + public GetMountTableEntriesResponsePBImpl() { + } + + public GetMountTableEntriesResponsePBImpl( + GetMountTableEntriesResponseProto proto) { + this.translator.setProto(proto); + } + + @Override + public GetMountTableEntriesResponseProto getProto() { + return this.translator.build(); + } + + @Override + public void setProto(Message proto) { + this.translator.setProto(proto); + } + + @Override + public void readInstance(String base64String) throws IOException { + this.translator.readInstance(base64String); + } + + @Override + public List getEntries() throws IOException { + List entries = + this.translator.getProtoOrBuilder().getEntriesList(); + List ret = new ArrayList(); + for (MountTableRecordProto entry : entries) { + MountTable record = new MountTablePBImpl(entry); + ret.add(record); + } + return ret; + } + + @Override + public void setEntries(List records) throws IOException { + this.translator.getBuilder().clearEntries(); + for (MountTable entry : records) { + if (entry instanceof MountTablePBImpl) { + MountTablePBImpl entryPB = (MountTablePBImpl)entry; + this.translator.getBuilder().addEntries(entryPB.getProto()); + } + } + } + + @Override + public long getTimestamp() { + return this.translator.getProtoOrBuilder().getTimestamp(); + } + + @Override + public void setTimestamp(long time) { + this.translator.getBuilder().setTimestamp(time); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/RemoveMountTableEntryRequestPBImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/RemoveMountTableEntryRequestPBImpl.java new file mode 100644 index 00000000000..7f7c998f270 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/RemoveMountTableEntryRequestPBImpl.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryRequestProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryRequestProtoOrBuilder; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord; + +import com.google.protobuf.Message; + +/** + * Protobuf implementation of the state store API object + * RemoveMountTableEntryRequest. + */ +public class RemoveMountTableEntryRequestPBImpl + extends RemoveMountTableEntryRequest implements PBRecord { + + private FederationProtocolPBTranslator translator = + new FederationProtocolPBTranslator( + RemoveMountTableEntryRequestProto.class); + + public RemoveMountTableEntryRequestPBImpl() { + } + + public RemoveMountTableEntryRequestPBImpl( + RemoveMountTableEntryRequestProto proto) { + this.setProto(proto); + } + + @Override + public RemoveMountTableEntryRequestProto getProto() { + return this.translator.build(); + } + + @Override + public void setProto(Message proto) { + this.translator.setProto(proto); + } + + @Override + public void readInstance(String base64String) throws IOException { + this.translator.readInstance(base64String); + } + + @Override + public String getSrcPath() { + return this.translator.getProtoOrBuilder().getSrcPath(); + } + + @Override + public void setSrcPath(String path) { + this.translator.getBuilder().setSrcPath(path); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/RemoveMountTableEntryResponsePBImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/RemoveMountTableEntryResponsePBImpl.java new file mode 100644 index 00000000000..0c943ac3249 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/RemoveMountTableEntryResponsePBImpl.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryResponseProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryResponseProto.Builder; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoveMountTableEntryResponseProtoOrBuilder; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord; + +import com.google.protobuf.Message; + +/** + * Protobuf implementation of the state store API object + * RemoveMountTableEntryResponse. + */ +public class RemoveMountTableEntryResponsePBImpl + extends RemoveMountTableEntryResponse implements PBRecord { + + private FederationProtocolPBTranslator translator = + new FederationProtocolPBTranslator( + RemoveMountTableEntryResponseProto.class); + + public RemoveMountTableEntryResponsePBImpl() { + } + + public RemoveMountTableEntryResponsePBImpl( + RemoveMountTableEntryResponseProto proto) { + this.setProto(proto); + } + + @Override + public RemoveMountTableEntryResponseProto getProto() { + return this.translator.build(); + } + + @Override + public void setProto(Message proto) { + this.translator.setProto(proto); + } + + @Override + public void readInstance(String base64String) throws IOException { + this.translator.readInstance(base64String); + } + + @Override + public boolean getStatus() { + return this.translator.getProtoOrBuilder().getStatus(); + } + + @Override + public void setStatus(boolean result) { + this.translator.getBuilder().setStatus(result); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/UpdateMountTableEntryRequestPBImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/UpdateMountTableEntryRequestPBImpl.java new file mode 100644 index 00000000000..621bb3a091b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/UpdateMountTableEntryRequestPBImpl.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryRequestProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryRequestProtoOrBuilder; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.MountTablePBImpl; +import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord; + +import com.google.protobuf.Message; + +/** + * Protobuf implementation of the state store API object + * UpdateMountTableEntryRequest. + */ +public class UpdateMountTableEntryRequestPBImpl + extends UpdateMountTableEntryRequest implements PBRecord { + + private FederationProtocolPBTranslator translator = + new FederationProtocolPBTranslator( + UpdateMountTableEntryRequestProto.class); + + public UpdateMountTableEntryRequestPBImpl() { + } + + public UpdateMountTableEntryRequestPBImpl( + UpdateMountTableEntryRequestProto proto) { + this.translator.setProto(proto); + } + + @Override + public UpdateMountTableEntryRequestProto getProto() { + return this.translator.build(); + } + + @Override + public void setProto(Message proto) { + this.translator.setProto(proto); + } + + @Override + public void readInstance(String base64String) throws IOException { + this.translator.readInstance(base64String); + } + + @Override + public MountTable getEntry() throws IOException { + MountTableRecordProto statsProto = + this.translator.getProtoOrBuilder().getEntry(); + MountTable stats = StateStoreSerializer.newRecord(MountTable.class); + if (stats instanceof MountTablePBImpl) { + MountTablePBImpl entryPB = (MountTablePBImpl)stats; + entryPB.setProto(statsProto); + return entryPB; + } else { + throw new IOException("Cannot get stats for the membership"); + } + } + + @Override + public void setEntry(MountTable mount) throws IOException { + if (mount instanceof MountTablePBImpl) { + MountTablePBImpl mountPB = (MountTablePBImpl)mount; + MountTableRecordProto mountProto = + (MountTableRecordProto)mountPB.getProto(); + this.translator.getBuilder().setEntry(mountProto); + } else { + throw new IOException("Cannot set mount table entry"); + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/UpdateMountTableEntryResponsePBImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/UpdateMountTableEntryResponsePBImpl.java new file mode 100644 index 00000000000..5d566d6d0d8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/impl/pb/UpdateMountTableEntryResponsePBImpl.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryResponseProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.UpdateMountTableEntryResponseProtoOrBuilder; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord; + +import com.google.protobuf.Message; + +/** + * Protobuf implementation of the state store API object + * UpdateMountTableEntryResponse. + */ +public class UpdateMountTableEntryResponsePBImpl + extends UpdateMountTableEntryResponse implements PBRecord { + + private FederationProtocolPBTranslator translator = + new FederationProtocolPBTranslator( + UpdateMountTableEntryResponseProto.class); + + public UpdateMountTableEntryResponsePBImpl() { + } + + public UpdateMountTableEntryResponsePBImpl( + UpdateMountTableEntryResponseProto proto) { + this.setProto(proto); + } + + @Override + public UpdateMountTableEntryResponseProto getProto() { + return this.translator.build(); + } + + @Override + public void setProto(Message proto) { + this.translator.setProto(proto); + } + + @Override + public void readInstance(String base64String) throws IOException { + this.translator.readInstance(base64String); + } + + @Override + public boolean getStatus() { + return this.translator.getProtoOrBuilder().getStatus(); + } + + @Override + public void setStatus(boolean result) { + this.translator.getBuilder().setStatus(result); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java new file mode 100644 index 00000000000..16f2b8ba20c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java @@ -0,0 +1,301 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.records; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.SortedMap; +import java.util.TreeMap; + +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Data schema for + * {@link org.apache.hadoop.hdfs.server.federation.store. + * MountTableStore FederationMountTableStore} data stored in the + * {@link org.apache.hadoop.hdfs.server.federation.store. + * StateStoreService FederationStateStoreService}. Supports string + * serialization. + */ +public abstract class MountTable extends BaseRecord { + + private static final Logger LOG = LoggerFactory.getLogger(MountTable.class); + + + /** + * Default constructor for a mount table entry. + */ + public MountTable() { + super(); + } + + public static MountTable newInstance() { + MountTable record = StateStoreSerializer.newRecord(MountTable.class); + record.init(); + return record; + } + + /** + * Constructor for a mount table entry with a single destinations. + * + * @param src Source path in the mount entry. + * @param destinations Nameservice destination of the mount point. + * @param dateCreated Created date. + * @param dateModified Modified date. + * @throws IOException + */ + public static MountTable newInstance(final String src, + final Map destinations, + long dateCreated, long dateModified) throws IOException { + + MountTable record = newInstance(src, destinations); + record.setDateCreated(dateCreated); + record.setDateModified(dateModified); + return record; + } + + /** + * Constructor for a mount table entry with multiple destinations. + * + * @param src Source path in the mount entry. + * @param destinations Nameservice destinations of the mount point. + * @throws IOException + */ + public static MountTable newInstance(final String src, + final Map destinations) throws IOException { + MountTable record = newInstance(); + + // Normalize the mount path + record.setSourcePath(normalizeFileSystemPath(src)); + + // Build a list of remote locations + final List locations = new LinkedList<>(); + for (Entry entry : destinations.entrySet()) { + String nsId = entry.getKey(); + String path = normalizeFileSystemPath(entry.getValue()); + RemoteLocation location = new RemoteLocation(nsId, path); + locations.add(location); + } + + // Set the serialized dest string + record.setDestinations(locations); + + // Validate + record.validate(); + return record; + } + + /** + * Get source path in the federated namespace. + * + * @return Source path in the federated namespace. + */ + public abstract String getSourcePath(); + + /** + * Set source path in the federated namespace. + * + * @param path Source path in the federated namespace. + */ + public abstract void setSourcePath(String path); + + /** + * Get a list of destinations (namespace + path) present for this entry. + * + * @return List of RemoteLocation destinations. Null if no destinations. + */ + public abstract List getDestinations(); + + /** + * Set the destination paths. + * + * @param paths Destination paths. + */ + public abstract void setDestinations(List dests); + + /** + * Add a new destination to this mount table entry. + */ + public abstract boolean addDestination(String nsId, String path); + + /** + * Check if the entry is read only. + * + * @return If the entry is read only. + */ + public abstract boolean isReadOnly(); + + /** + * Set an entry to be read only. + * + * @param ro If the entry is read only. + */ + public abstract void setReadOnly(boolean ro); + + /** + * Get the order of the destinations for this mount table entry. + * + * @return Order of the destinations. + */ + public abstract DestinationOrder getDestOrder(); + + /** + * Set the order of the destinations for this mount table entry. + * + * @param order Order of the destinations. + */ + public abstract void setDestOrder(DestinationOrder order); + + /** + * Get the default location. + * @return The default location. + */ + public RemoteLocation getDefaultLocation() { + List dests = this.getDestinations(); + if (dests == null || dests.isEmpty()) { + return null; + } + return dests.get(0); + } + + @Override + public boolean like(final BaseRecord o) { + if (o instanceof MountTable) { + MountTable other = (MountTable)o; + if (getSourcePath() != null && + !getSourcePath().equals(other.getSourcePath())) { + return false; + } + if (getDestinations() != null && + !getDestinations().equals(other.getDestinations())) { + return false; + } + return true; + } + return false; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(this.getSourcePath()); + sb.append("->"); + List destinations = this.getDestinations(); + sb.append(destinations); + if (destinations != null && destinations.size() > 1) { + sb.append("[" + this.getDestOrder() + "]"); + } + if (this.isReadOnly()) { + sb.append("[RO]"); + } + return sb.toString(); + } + + @Override + public SortedMap getPrimaryKeys() { + SortedMap map = new TreeMap<>(); + map.put("sourcePath", this.getSourcePath()); + return map; + } + + @Override + public boolean validate() { + boolean ret = super.validate(); + if (this.getSourcePath() == null || this.getSourcePath().length() == 0) { + LOG.error("Invalid entry, no source path specified ", this); + ret = false; + } + if (!this.getSourcePath().startsWith("/")) { + LOG.error("Invalid entry, all mount points must start with / ", this); + ret = false; + } + if (this.getDestinations() == null || this.getDestinations().size() == 0) { + LOG.error("Invalid entry, no destination paths specified ", this); + ret = false; + } + for (RemoteLocation loc : getDestinations()) { + String nsId = loc.getNameserviceId(); + if (nsId == null || nsId.length() == 0) { + LOG.error("Invalid entry, invalid destination nameservice ", this); + ret = false; + } + if (loc.getDest() == null || loc.getDest().length() == 0) { + LOG.error("Invalid entry, invalid destination path ", this); + ret = false; + } + if (!loc.getDest().startsWith("/")) { + LOG.error("Invalid entry, all destination must start with / ", this); + ret = false; + } + } + return ret; + } + + @Override + public long getExpirationMs() { + return 0; + } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 31) + .append(this.getSourcePath()) + .append(this.getDestinations()) + .append(this.isReadOnly()) + .append(this.getDestOrder()) + .toHashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof MountTable) { + MountTable other = (MountTable)obj; + if (!this.getSourcePath().equals(other.getSourcePath())) { + return false; + } else if (!this.getDestinations().equals(other.getDestinations())) { + return false; + } else if (this.isReadOnly() != other.isReadOnly()) { + return false; + } else if (!this.getDestOrder().equals(other.getDestOrder())) { + return false; + } + return true; + } + return false; + } + + /** + * Normalize a path for that filesystem. + * + * @param path Path to normalize. + * @return Normalized path. + */ + private static String normalizeFileSystemPath(final String path) { + Path normalizedPath = new Path(path); + return normalizedPath.toString(); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java new file mode 100644 index 00000000000..d2870bdbe1d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.records.impl.pb; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProto; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProto.Builder; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProto.DestOrder; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProtoOrBuilder; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RemoteLocationProto; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; +import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.FederationProtocolPBTranslator; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; + +import com.google.protobuf.Message; + +/** + * Protobuf implementation of the MountTable record. + */ +public class MountTablePBImpl extends MountTable implements PBRecord { + + private FederationProtocolPBTranslator translator = + new FederationProtocolPBTranslator(MountTableRecordProto.class); + + public MountTablePBImpl() { + } + + public MountTablePBImpl(MountTableRecordProto proto) { + this.setProto(proto); + } + + @Override + public MountTableRecordProto getProto() { + return this.translator.build(); + } + + @Override + public void setProto(Message proto) { + this.translator.setProto(proto); + } + + @Override + public void readInstance(String base64String) throws IOException { + this.translator.readInstance(base64String); + } + + @Override + public String getSourcePath() { + MountTableRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder(); + if (!proto.hasSrcPath()) { + return null; + } + return proto.getSrcPath(); + } + + @Override + public void setSourcePath(String path) { + Builder builder = this.translator.getBuilder(); + if (path == null) { + builder.clearSrcPath(); + } else { + builder.setSrcPath(path); + } + } + + @Override + public List getDestinations() { + MountTableRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder(); + if (proto.getDestinationsCount() == 0) { + return null; + } + + final List ret = new LinkedList<>(); + final List destList = proto.getDestinationsList(); + for (RemoteLocationProto dest : destList) { + String nsId = dest.getNameserviceId(); + String path = dest.getPath(); + RemoteLocation loc = new RemoteLocation(nsId, path); + ret.add(loc); + } + return ret; + } + + @Override + public void setDestinations(final List dests) { + Builder builder = this.translator.getBuilder(); + builder.clearDestinations(); + for (RemoteLocation dest : dests) { + RemoteLocationProto.Builder itemBuilder = + RemoteLocationProto.newBuilder(); + String nsId = dest.getNameserviceId(); + String path = dest.getDest(); + itemBuilder.setNameserviceId(nsId); + itemBuilder.setPath(path); + RemoteLocationProto item = itemBuilder.build(); + builder.addDestinations(item); + } + } + + @Override + public boolean addDestination(String nsId, String path) { + // Check if the location is already there + List dests = getDestinations(); + for (RemoteLocation dest : dests) { + if (dest.getNameserviceId().equals(nsId) && dest.getDest().equals(path)) { + return false; + } + } + + // Add it to the existing list + Builder builder = this.translator.getBuilder(); + RemoteLocationProto.Builder itemBuilder = + RemoteLocationProto.newBuilder(); + itemBuilder.setNameserviceId(nsId); + itemBuilder.setPath(path); + RemoteLocationProto item = itemBuilder.build(); + builder.addDestinations(item); + return true; + } + + @Override + public void setDateModified(long time) { + this.translator.getBuilder().setDateModified(time); + } + + @Override + public long getDateModified() { + return this.translator.getProtoOrBuilder().getDateModified(); + } + + @Override + public void setDateCreated(long time) { + this.translator.getBuilder().setDateCreated(time); + } + + @Override + public long getDateCreated() { + return this.translator.getProtoOrBuilder().getDateCreated(); + } + + @Override + public boolean isReadOnly() { + MountTableRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder(); + if (!proto.hasReadOnly()) { + return false; + } + return proto.getReadOnly(); + } + + @Override + public void setReadOnly(boolean ro) { + this.translator.getBuilder().setReadOnly(ro); + } + + @Override + public DestinationOrder getDestOrder() { + MountTableRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder(); + return convert(proto.getDestOrder()); + } + + @Override + public void setDestOrder(DestinationOrder order) { + Builder builder = this.translator.getBuilder(); + if (order == null) { + builder.clearDestOrder(); + } else { + builder.setDestOrder(convert(order)); + } + } + + private DestinationOrder convert(DestOrder order) { + switch (order) { + case LOCAL: + return DestinationOrder.LOCAL; + case RANDOM: + return DestinationOrder.RANDOM; + default: + return DestinationOrder.HASH; + } + } + + private DestOrder convert(DestinationOrder order) { + switch (order) { + case LOCAL: + return DestOrder.LOCAL; + case RANDOM: + return DestOrder.RANDOM; + default: + return DestOrder.HASH; + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto index 487fe46bd5f..32a6250f54e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/FederationProtocol.proto @@ -104,4 +104,63 @@ message NamenodeHeartbeatRequestProto { message NamenodeHeartbeatResponseProto { optional bool status = 1; -} \ No newline at end of file +} + + +///////////////////////////////////////////////// +// Mount table +///////////////////////////////////////////////// + +message RemoteLocationProto { + optional string nameserviceId = 1; + optional string path = 2; +} + +message MountTableRecordProto { + optional string srcPath = 1; + repeated RemoteLocationProto destinations = 2; + optional uint64 dateCreated = 3; + optional uint64 dateModified = 4; + optional bool readOnly = 5 [default = false]; + + enum DestOrder { + HASH = 0; + LOCAL = 1; + RANDOM = 2; + } + optional DestOrder destOrder = 6 [default = HASH]; +} + +message AddMountTableEntryRequestProto { + optional MountTableRecordProto entry = 1; +} + +message AddMountTableEntryResponseProto { + optional bool status = 1; +} + +message UpdateMountTableEntryRequestProto { + optional MountTableRecordProto entry = 1; +} + +message UpdateMountTableEntryResponseProto { + optional bool status = 1; +} + +message RemoveMountTableEntryRequestProto { + optional string srcPath = 1; +} + +message RemoveMountTableEntryResponseProto{ + optional bool status = 1; +} + +message GetMountTableEntriesRequestProto { + optional string srcPath = 1; +} + +message GetMountTableEntriesResponseProto { + repeated MountTableRecordProto entries = 1; + optional uint64 timestamp = 2; +} + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java index 87427fd6bec..a48155324b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.server.federation.resolver.NamenodePriorityCompara import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport; import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.federation.router.Router; import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; import org.apache.hadoop.util.Time; @@ -68,6 +69,10 @@ public class MockResolver this(); } + public MockResolver(Configuration conf, Router router) { + this(); + } + public void addLocation(String mount, String nsId, String location) { List locationsList = this.locations.get(mount); if (locationsList == null) { @@ -258,7 +263,6 @@ public class MockResolver @Override public PathLocation getDestinationForPath(String path) throws IOException { - Set namespaceSet = new HashSet<>(); List remoteLocations = new LinkedList<>(); for (String key : this.locations.keySet()) { if (path.startsWith(key)) { @@ -268,7 +272,6 @@ public class MockResolver RemoteLocation remoteLocation = new RemoteLocation(nameservice, finalPath); remoteLocations.add(remoteLocation); - namespaceSet.add(nameservice); } break; } @@ -277,7 +280,7 @@ public class MockResolver // Path isn't supported, mimic resolver behavior. return null; } - return new PathLocation(path, remoteLocations, namespaceSet); + return new PathLocation(path, remoteLocations); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMountTableResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMountTableResolver.java new file mode 100644 index 00000000000..682d5698db0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestMountTableResolver.java @@ -0,0 +1,396 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.resolver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.store.MountTableStore; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test the {@link MountTableStore} from the {@link Router}. + */ +public class TestMountTableResolver { + + private static final Logger LOG = + LoggerFactory.getLogger(TestMountTableResolver.class); + + private MountTableResolver mountTable; + + private Map getMountTableEntry( + String subcluster, String path) { + Map ret = new HashMap<>(); + ret.put(subcluster, path); + return ret; + } + + /** + * Setup the mount table. + * / -> 1:/ + * __tmp -> 2:/tmp + * __user -> 3:/user + * ____a -> 2:/user/test + * ______demo + * ________test + * __________a -> 1:/user/test + * __________b -> 3:/user/test + * ____b + * ______file1.txt -> 4:/user/file1.txt + * __usr + * ____bin -> 2:/bin + * + * @throws IOException If it cannot set the mount table. + */ + private void setupMountTable() throws IOException { + Configuration conf = new Configuration(); + mountTable = new MountTableResolver(conf); + + // Root mount point + Map map = getMountTableEntry("1", "/"); + mountTable.addEntry(MountTable.newInstance("/", map)); + + // /tmp + map = getMountTableEntry("2", "/"); + mountTable.addEntry(MountTable.newInstance("/tmp", map)); + + // /user + map = getMountTableEntry("3", "/user"); + mountTable.addEntry(MountTable.newInstance("/user", map)); + + // /usr/bin + map = getMountTableEntry("2", "/bin"); + mountTable.addEntry(MountTable.newInstance("/usr/bin", map)); + + // /user/a + map = getMountTableEntry("2", "/user/test"); + mountTable.addEntry(MountTable.newInstance("/user/a", map)); + + // /user/b/file1.txt + map = getMountTableEntry("4", "/user/file1.txt"); + mountTable.addEntry(MountTable.newInstance("/user/b/file1.txt", map)); + + // /user/a/demo/test/a + map = getMountTableEntry("1", "/user/test"); + mountTable.addEntry(MountTable.newInstance("/user/a/demo/test/a", map)); + + // /user/a/demo/test/b + map = getMountTableEntry("3", "/user/test"); + mountTable.addEntry(MountTable.newInstance("/user/a/demo/test/b", map)); + } + + @Before + public void setup() throws IOException { + setupMountTable(); + } + + @Test + public void testDestination() throws IOException { + + // Check files + assertEquals("1->/tesfile1.txt", + mountTable.getDestinationForPath("/tesfile1.txt").toString()); + + assertEquals("3->/user/testfile2.txt", + mountTable.getDestinationForPath("/user/testfile2.txt").toString()); + + assertEquals("2->/user/test/testfile3.txt", + mountTable.getDestinationForPath("/user/a/testfile3.txt").toString()); + + assertEquals("3->/user/b/testfile4.txt", + mountTable.getDestinationForPath("/user/b/testfile4.txt").toString()); + + assertEquals("1->/share/file5.txt", + mountTable.getDestinationForPath("/share/file5.txt").toString()); + + assertEquals("2->/bin/file7.txt", + mountTable.getDestinationForPath("/usr/bin/file7.txt").toString()); + + assertEquals("1->/usr/file8.txt", + mountTable.getDestinationForPath("/usr/file8.txt").toString()); + + assertEquals("2->/user/test/demo/file9.txt", + mountTable.getDestinationForPath("/user/a/demo/file9.txt").toString()); + + // Check folders + assertEquals("3->/user/testfolder", + mountTable.getDestinationForPath("/user/testfolder").toString()); + + assertEquals("2->/user/test/b", + mountTable.getDestinationForPath("/user/a/b").toString()); + + assertEquals("3->/user/test/a", + mountTable.getDestinationForPath("/user/test/a").toString()); + + } + + private void compareLists(List list1, String[] list2) { + assertEquals(list1.size(), list2.length); + for (String item : list2) { + assertTrue(list1.contains(item)); + } + } + + @Test + public void testGetMountPoints() throws IOException { + + // Check getting all mount points (virtual and real) beneath a path + List mounts = mountTable.getMountPoints("/"); + assertEquals(3, mounts.size()); + compareLists(mounts, new String[] {"tmp", "user", "usr"}); + + mounts = mountTable.getMountPoints("/user"); + assertEquals(2, mounts.size()); + compareLists(mounts, new String[] {"a", "b"}); + + mounts = mountTable.getMountPoints("/user/a"); + assertEquals(1, mounts.size()); + compareLists(mounts, new String[] {"demo"}); + + mounts = mountTable.getMountPoints("/user/a/demo"); + assertEquals(1, mounts.size()); + compareLists(mounts, new String[] {"test"}); + + mounts = mountTable.getMountPoints("/user/a/demo/test"); + assertEquals(2, mounts.size()); + compareLists(mounts, new String[] {"a", "b"}); + + mounts = mountTable.getMountPoints("/tmp"); + assertEquals(0, mounts.size()); + + mounts = mountTable.getMountPoints("/t"); + assertNull(mounts); + + mounts = mountTable.getMountPoints("/unknownpath"); + assertNull(mounts); + } + + private void compareRecords(List list1, String[] list2) { + assertEquals(list1.size(), list2.length); + for (String item : list2) { + for (MountTable record : list1) { + if (record.getSourcePath().equals(item)) { + return; + } + } + } + fail(); + } + + @Test + public void testGetMounts() throws IOException { + + // Check listing the mount table records at or beneath a path + List records = mountTable.getMounts("/"); + assertEquals(8, records.size()); + compareRecords(records, new String[] {"/", "/tmp", "/user", "/usr/bin", + "user/a", "/user/a/demo/a", "/user/a/demo/b", "/user/b/file1.txt"}); + + records = mountTable.getMounts("/user"); + assertEquals(5, records.size()); + compareRecords(records, new String[] {"/user", "/user/a/demo/a", + "/user/a/demo/b", "user/a", "/user/b/file1.txt"}); + + records = mountTable.getMounts("/user/a"); + assertEquals(3, records.size()); + compareRecords(records, + new String[] {"/user/a/demo/a", "/user/a/demo/b", "/user/a"}); + + records = mountTable.getMounts("/tmp"); + assertEquals(1, records.size()); + compareRecords(records, new String[] {"/tmp"}); + } + + @Test + public void testRemoveSubTree() + throws UnsupportedOperationException, IOException { + + // 3 mount points are present /tmp, /user, /usr + compareLists(mountTable.getMountPoints("/"), + new String[] {"user", "usr", "tmp"}); + + // /tmp currently points to namespace 2 + assertEquals("2", mountTable.getDestinationForPath("/tmp/testfile.txt") + .getDefaultLocation().getNameserviceId()); + + // Remove tmp + mountTable.removeEntry("/tmp"); + + // Now 2 mount points are present /user, /usr + compareLists(mountTable.getMountPoints("/"), + new String[] {"user", "usr"}); + + // /tmp no longer exists, uses default namespace for mapping / + assertEquals("1", mountTable.getDestinationForPath("/tmp/testfile.txt") + .getDefaultLocation().getNameserviceId()); + } + + @Test + public void testRemoveVirtualNode() + throws UnsupportedOperationException, IOException { + + // 3 mount points are present /tmp, /user, /usr + compareLists(mountTable.getMountPoints("/"), + new String[] {"user", "usr", "tmp"}); + + // /usr is virtual, uses namespace 1->/ + assertEquals("1", mountTable.getDestinationForPath("/usr/testfile.txt") + .getDefaultLocation().getNameserviceId()); + + // Attempt to remove /usr + mountTable.removeEntry("/usr"); + + // Verify the remove failed + compareLists(mountTable.getMountPoints("/"), + new String[] {"user", "usr", "tmp"}); + } + + @Test + public void testRemoveLeafNode() + throws UnsupportedOperationException, IOException { + + // /user/a/demo/test/a currently points to namespace 1 + assertEquals("1", mountTable.getDestinationForPath("/user/a/demo/test/a") + .getDefaultLocation().getNameserviceId()); + + // Remove /user/a/demo/test/a + mountTable.removeEntry("/user/a/demo/test/a"); + + // Now /user/a/demo/test/a points to namespace 2 using the entry for /user/a + assertEquals("2", mountTable.getDestinationForPath("/user/a/demo/test/a") + .getDefaultLocation().getNameserviceId()); + + // Verify the virtual node at /user/a/demo still exists and was not deleted + compareLists(mountTable.getMountPoints("/user/a"), new String[] {"demo"}); + + // Verify the sibling node was unaffected and still points to ns 3 + assertEquals("3", mountTable.getDestinationForPath("/user/a/demo/test/b") + .getDefaultLocation().getNameserviceId()); + } + + @Test + public void testRefreshEntries() + throws UnsupportedOperationException, IOException { + + // Initial table loaded + testDestination(); + assertEquals(8, mountTable.getMounts("/").size()); + + // Replace table with /1 and /2 + List records = new ArrayList<>(); + Map map1 = getMountTableEntry("1", "/"); + records.add(MountTable.newInstance("/1", map1)); + Map map2 = getMountTableEntry("2", "/"); + records.add(MountTable.newInstance("/2", map2)); + mountTable.refreshEntries(records); + + // Verify addition + PathLocation destination1 = mountTable.getDestinationForPath("/1"); + RemoteLocation defaultLoc1 = destination1.getDefaultLocation(); + assertEquals("1", defaultLoc1.getNameserviceId()); + + PathLocation destination2 = mountTable.getDestinationForPath("/2"); + RemoteLocation defaultLoc2 = destination2.getDefaultLocation(); + assertEquals("2", defaultLoc2.getNameserviceId()); + + // Verify existing entries were removed + assertEquals(2, mountTable.getMounts("/").size()); + boolean assertionThrown = false; + try { + testDestination(); + fail(); + } catch (AssertionError e) { + // The / entry was removed, so it triggers an exception + assertionThrown = true; + } + assertTrue(assertionThrown); + } + + @Test + public void testMountTableScalability() throws IOException { + + List emptyList = new ArrayList<>(); + mountTable.refreshEntries(emptyList); + + // Add 100,000 entries in flat list + for (int i = 0; i < 100000; i++) { + Map map = getMountTableEntry("1", "/" + i); + MountTable record = MountTable.newInstance("/" + i, map); + mountTable.addEntry(record); + if (i % 10000 == 0) { + LOG.info("Adding flat mount record {}: {}", i, record); + } + } + + assertEquals(100000, mountTable.getMountPoints("/").size()); + assertEquals(100000, mountTable.getMounts("/").size()); + + // Add 1000 entries in deep list + mountTable.refreshEntries(emptyList); + String parent = "/"; + for (int i = 0; i < 1000; i++) { + final int index = i; + Map map = getMountTableEntry("1", "/" + index); + if (i > 0) { + parent = parent + "/"; + } + parent = parent + i; + MountTable record = MountTable.newInstance(parent, map); + mountTable.addEntry(record); + } + + assertEquals(1, mountTable.getMountPoints("/").size()); + assertEquals(1000, mountTable.getMounts("/").size()); + + // Add 100,000 entries in deep and wide tree + mountTable.refreshEntries(emptyList); + Random rand = new Random(); + parent = "/" + Integer.toString(rand.nextInt()); + int numRootTrees = 1; + for (int i = 0; i < 100000; i++) { + final int index = i; + Map map = getMountTableEntry("1", "/" + index); + parent = parent + "/" + i; + if (parent.length() > 2000) { + // Start new tree + parent = "/" + Integer.toString(rand.nextInt()); + numRootTrees++; + } + MountTable record = MountTable.newInstance(parent, map); + mountTable.addEntry(record); + } + + assertEquals(numRootTrees, mountTable.getMountPoints("/").size()); + assertEquals(100000, mountTable.getMounts("/").size()); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java index 598b9cfec9d..dbb8f3f2397 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java @@ -25,7 +25,9 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -40,6 +42,7 @@ import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFile import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; import org.apache.hadoop.util.Time; /** @@ -234,6 +237,19 @@ public final class FederationStateStoreTestUtils { return false; } + public static List createMockMountTable( + List nameservices) throws IOException { + // create table entries + List entries = new ArrayList<>(); + for (String ns : nameservices) { + Map destMap = new HashMap<>(); + destMap.put(ns, "/target-" + ns); + MountTable entry = MountTable.newInstance("/" + ns, destMap); + entries.add(entry); + } + return entries; + } + public static MembershipState createMockRegistrationForNamenode( String nameserviceId, String namenodeId, FederationNamenodeServiceState state) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMountTable.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMountTable.java new file mode 100644 index 00000000000..d30d6baea44 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMountTable.java @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMESERVICES; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyException; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.clearRecords; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.createMockMountTable; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test the basic {@link StateStoreService} + * {@link MountTableStore} functionality. + */ +public class TestStateStoreMountTable extends TestStateStoreBase { + + private static List nameservices; + private static MountTableStore mountStore; + + @BeforeClass + public static void create() throws IOException { + nameservices = new ArrayList<>(); + nameservices.add(NAMESERVICES[0]); + nameservices.add(NAMESERVICES[1]); + } + + @Before + public void setup() throws IOException, InterruptedException { + mountStore = + getStateStore().getRegisteredRecordStore(MountTableStore.class); + // Clear Mount table registrations + assertTrue(clearRecords(getStateStore(), MountTable.class)); + } + + @Test + public void testStateStoreDisconnected() throws Exception { + + // Close the data store driver + getStateStore().closeDriver(); + assertFalse(getStateStore().isDriverReady()); + + // Test APIs that access the store to check they throw the correct exception + AddMountTableEntryRequest addRequest = + AddMountTableEntryRequest.newInstance(); + verifyException(mountStore, "addMountTableEntry", + StateStoreUnavailableException.class, + new Class[] {AddMountTableEntryRequest.class}, + new Object[] {addRequest}); + + UpdateMountTableEntryRequest updateRequest = + UpdateMountTableEntryRequest.newInstance(); + verifyException(mountStore, "updateMountTableEntry", + StateStoreUnavailableException.class, + new Class[] {UpdateMountTableEntryRequest.class}, + new Object[] {updateRequest}); + + RemoveMountTableEntryRequest removeRequest = + RemoveMountTableEntryRequest.newInstance(); + verifyException(mountStore, "removeMountTableEntry", + StateStoreUnavailableException.class, + new Class[] {RemoveMountTableEntryRequest.class}, + new Object[] {removeRequest}); + + GetMountTableEntriesRequest getRequest = + GetMountTableEntriesRequest.newInstance(); + mountStore.loadCache(true); + verifyException(mountStore, "getMountTableEntries", + StateStoreUnavailableException.class, + new Class[] {GetMountTableEntriesRequest.class}, + new Object[] {getRequest}); + } + + @Test + public void testSynchronizeMountTable() throws IOException { + // Synchronize and get mount table entries + List entries = createMockMountTable(nameservices); + assertTrue(synchronizeRecords(getStateStore(), entries, MountTable.class)); + for (MountTable e : entries) { + mountStore.loadCache(true); + MountTable entry = getMountTableEntry(e.getSourcePath()); + assertNotNull(entry); + assertEquals(e.getDefaultLocation().getDest(), + entry.getDefaultLocation().getDest()); + } + } + + @Test + public void testAddMountTableEntry() throws IOException { + + // Add 1 + List entries = createMockMountTable(nameservices); + List entries1 = getMountTableEntries("/").getRecords(); + assertEquals(0, entries1.size()); + MountTable entry0 = entries.get(0); + AddMountTableEntryRequest request = + AddMountTableEntryRequest.newInstance(entry0); + AddMountTableEntryResponse response = + mountStore.addMountTableEntry(request); + assertTrue(response.getStatus()); + + mountStore.loadCache(true); + List entries2 = getMountTableEntries("/").getRecords(); + assertEquals(1, entries2.size()); + } + + @Test + public void testRemoveMountTableEntry() throws IOException { + + // Add many + List entries = createMockMountTable(nameservices); + synchronizeRecords(getStateStore(), entries, MountTable.class); + mountStore.loadCache(true); + List entries1 = getMountTableEntries("/").getRecords(); + assertEquals(entries.size(), entries1.size()); + + // Remove 1 + RemoveMountTableEntryRequest request = + RemoveMountTableEntryRequest.newInstance(); + request.setSrcPath(entries.get(0).getSourcePath()); + assertTrue(mountStore.removeMountTableEntry(request).getStatus()); + + // Verify remove + mountStore.loadCache(true); + List entries2 = getMountTableEntries("/").getRecords(); + assertEquals(entries.size() - 1, entries2.size()); + } + + @Test + public void testUpdateMountTableEntry() throws IOException { + + // Add 1 + List entries = createMockMountTable(nameservices); + MountTable entry0 = entries.get(0); + String srcPath = entry0.getSourcePath(); + String nsId = entry0.getDefaultLocation().getNameserviceId(); + AddMountTableEntryRequest request = + AddMountTableEntryRequest.newInstance(entry0); + AddMountTableEntryResponse response = + mountStore.addMountTableEntry(request); + assertTrue(response.getStatus()); + + // Verify + mountStore.loadCache(true); + MountTable matchingEntry0 = getMountTableEntry(srcPath); + assertNotNull(matchingEntry0); + assertEquals(nsId, matchingEntry0.getDefaultLocation().getNameserviceId()); + + // Edit destination nameservice for source path + Map destMap = + Collections.singletonMap("testnameservice", "/"); + MountTable replacement = + MountTable.newInstance(srcPath, destMap); + UpdateMountTableEntryRequest updateRequest = + UpdateMountTableEntryRequest.newInstance(replacement); + UpdateMountTableEntryResponse updateResponse = + mountStore.updateMountTableEntry(updateRequest); + assertTrue(updateResponse.getStatus()); + + // Verify + mountStore.loadCache(true); + MountTable matchingEntry1 = getMountTableEntry(srcPath); + assertNotNull(matchingEntry1); + assertEquals("testnameservice", + matchingEntry1.getDefaultLocation().getNameserviceId()); + } + + /** + * Gets an existing mount table record in the state store. + * + * @param mount The mount point of the record to remove. + * @return The matching record if found, null if it is not found. + * @throws IOException If the state store could not be accessed. + */ + private MountTable getMountTableEntry(String mount) throws IOException { + GetMountTableEntriesRequest request = + GetMountTableEntriesRequest.newInstance(mount); + GetMountTableEntriesResponse response = + mountStore.getMountTableEntries(request); + List results = response.getEntries(); + if (results.size() > 0) { + // First result is sorted to have the shortest mount string length + return results.get(0); + } + return null; + } + + /** + * Fetch all mount table records beneath a root path. + * + * @param store FederationMountTableStore instance to commit the data. + * @param mount The root search path, enter "/" to return all mount table + * records. + * + * @return A list of all mount table records found below the root mount. + * + * @throws IOException If the state store could not be accessed. + */ + private QueryResult getMountTableEntries(String mount) + throws IOException { + if (mount == null) { + throw new IOException("Please specify a root search path"); + } + GetMountTableEntriesRequest request = + GetMountTableEntriesRequest.newInstance(); + request.setSrcPath(mount); + GetMountTableEntriesResponse response = + mountStore.getMountTableEntries(request); + List records = response.getEntries(); + long timestamp = response.getTimestamp(); + return new QueryResult(records, timestamp); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java index dc51ee966a9..8239fb125a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -39,6 +40,7 @@ import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUt import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; import org.apache.hadoop.hdfs.server.federation.store.records.Query; import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; import org.junit.AfterClass; @@ -109,6 +111,11 @@ public class TestStateStoreDriverBase { generateRandomString(), generateRandomString(), generateRandomString(), generateRandomString(), generateRandomEnum(FederationNamenodeServiceState.class), false); + } else if (recordClass == MountTable.class) { + String src = "/" + generateRandomString(); + Map destMap = Collections.singletonMap( + generateRandomString(), "/" + generateRandomString()); + return (T) MountTable.newInstance(src, destMap); } return null; @@ -155,6 +162,7 @@ public class TestStateStoreDriverBase { public static void removeAll(StateStoreDriver driver) throws IOException { driver.removeAll(MembershipState.class); + driver.removeAll(MountTable.class); } public void testInsert( @@ -347,22 +355,26 @@ public class TestStateStoreDriverBase { public void testInsert(StateStoreDriver driver) throws IllegalArgumentException, IllegalAccessException, IOException { testInsert(driver, MembershipState.class); + testInsert(driver, MountTable.class); } public void testPut(StateStoreDriver driver) throws IllegalArgumentException, ReflectiveOperationException, IOException, SecurityException { testPut(driver, MembershipState.class); + testPut(driver, MountTable.class); } public void testRemove(StateStoreDriver driver) throws IllegalArgumentException, IllegalAccessException, IOException { testRemove(driver, MembershipState.class); + testRemove(driver, MountTable.class); } public void testFetchErrors(StateStoreDriver driver) throws IllegalArgumentException, IllegalAccessException, IOException { testFetchErrors(driver, MembershipState.class); + testFetchErrors(driver, MountTable.class); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMountTable.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMountTable.java new file mode 100644 index 00000000000..b6f91cf8117 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMountTable.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.records; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; +import org.junit.Test; + +/** + * Test the Mount Table entry in the State Store. + */ +public class TestMountTable { + + private static final String SRC = "/test"; + private static final String DST_NS_0 = "ns0"; + private static final String DST_NS_1 = "ns1"; + private static final String DST_PATH_0 = "/path1"; + private static final String DST_PATH_1 = "/path/path2"; + private static final List DST = new LinkedList<>(); + static { + DST.add(new RemoteLocation(DST_NS_0, DST_PATH_0)); + DST.add(new RemoteLocation(DST_NS_1, DST_PATH_1)); + } + private static final Map DST_MAP = new LinkedHashMap<>(); + static { + DST_MAP.put(DST_NS_0, DST_PATH_0); + DST_MAP.put(DST_NS_1, DST_PATH_1); + } + + private static final long DATE_CREATED = 100; + private static final long DATE_MOD = 200; + + + @Test + public void testGetterSetter() throws IOException { + + MountTable record = MountTable.newInstance(SRC, DST_MAP); + + validateDestinations(record); + assertEquals(SRC, record.getSourcePath()); + assertEquals(DST, record.getDestinations()); + assertTrue(DATE_CREATED > 0); + assertTrue(DATE_MOD > 0); + + MountTable record2 = + MountTable.newInstance(SRC, DST_MAP, DATE_CREATED, DATE_MOD); + + validateDestinations(record2); + assertEquals(SRC, record2.getSourcePath()); + assertEquals(DST, record2.getDestinations()); + assertEquals(DATE_CREATED, record2.getDateCreated()); + assertEquals(DATE_MOD, record2.getDateModified()); + assertFalse(record.isReadOnly()); + assertEquals(DestinationOrder.HASH, record.getDestOrder()); + } + + @Test + public void testSerialization() throws IOException { + testSerialization(DestinationOrder.RANDOM); + testSerialization(DestinationOrder.HASH); + testSerialization(DestinationOrder.LOCAL); + } + + private void testSerialization(final DestinationOrder order) + throws IOException { + + MountTable record = MountTable.newInstance( + SRC, DST_MAP, DATE_CREATED, DATE_MOD); + record.setReadOnly(true); + record.setDestOrder(order); + + StateStoreSerializer serializer = StateStoreSerializer.getSerializer(); + String serializedString = serializer.serializeString(record); + MountTable record2 = + serializer.deserialize(serializedString, MountTable.class); + + validateDestinations(record2); + assertEquals(SRC, record2.getSourcePath()); + assertEquals(DST, record2.getDestinations()); + assertEquals(DATE_CREATED, record2.getDateCreated()); + assertEquals(DATE_MOD, record2.getDateModified()); + assertTrue(record2.isReadOnly()); + assertEquals(order, record2.getDestOrder()); + } + + @Test + public void testReadOnly() throws IOException { + + Map dest = new HashMap<>(); + dest.put(DST_NS_0, DST_PATH_0); + dest.put(DST_NS_1, DST_PATH_1); + MountTable record1 = MountTable.newInstance(SRC, dest); + record1.setReadOnly(true); + + validateDestinations(record1); + assertEquals(SRC, record1.getSourcePath()); + assertEquals(DST, record1.getDestinations()); + assertTrue(DATE_CREATED > 0); + assertTrue(DATE_MOD > 0); + assertTrue(record1.isReadOnly()); + + MountTable record2 = MountTable.newInstance( + SRC, DST_MAP, DATE_CREATED, DATE_MOD); + record2.setReadOnly(true); + + validateDestinations(record2); + assertEquals(SRC, record2.getSourcePath()); + assertEquals(DST, record2.getDestinations()); + assertEquals(DATE_CREATED, record2.getDateCreated()); + assertEquals(DATE_MOD, record2.getDateModified()); + assertTrue(record2.isReadOnly()); + } + + @Test + public void testOrder() throws IOException { + testOrder(DestinationOrder.HASH); + testOrder(DestinationOrder.LOCAL); + testOrder(DestinationOrder.RANDOM); + } + + private void testOrder(final DestinationOrder order) + throws IOException { + + MountTable record = MountTable.newInstance( + SRC, DST_MAP, DATE_CREATED, DATE_MOD); + record.setDestOrder(order); + + validateDestinations(record); + assertEquals(SRC, record.getSourcePath()); + assertEquals(DST, record.getDestinations()); + assertEquals(DATE_CREATED, record.getDateCreated()); + assertEquals(DATE_MOD, record.getDateModified()); + assertEquals(order, record.getDestOrder()); + } + + private void validateDestinations(MountTable record) { + + assertEquals(SRC, record.getSourcePath()); + assertEquals(2, record.getDestinations().size()); + + RemoteLocation location1 = record.getDestinations().get(0); + assertEquals(DST_NS_0, location1.getNameserviceId()); + assertEquals(DST_PATH_0, location1.getDest()); + + RemoteLocation location2 = record.getDestinations().get(1); + assertEquals(DST_NS_1, location2.getNameserviceId()); + assertEquals(DST_PATH_1, location2.getDest()); + } +} \ No newline at end of file