HDFS-11546. Federation Router RPC server. Contributed by Jason Kace and Inigo Goiri.

(cherry picked from commit 8a9cdebebf)
This commit is contained in:
Inigo Goiri 2017-05-11 09:57:03 -07:00
parent 4bf877b03f
commit ca4f209b49
21 changed files with 5683 additions and 396 deletions

View File

@ -1123,6 +1123,44 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
// HDFS Router-based federation
public static final String FEDERATION_ROUTER_PREFIX =
"dfs.federation.router.";
public static final String DFS_ROUTER_DEFAULT_NAMESERVICE =
FEDERATION_ROUTER_PREFIX + "default.nameserviceId";
public static final String DFS_ROUTER_HANDLER_COUNT_KEY =
FEDERATION_ROUTER_PREFIX + "handler.count";
public static final int DFS_ROUTER_HANDLER_COUNT_DEFAULT = 10;
public static final String DFS_ROUTER_READER_QUEUE_SIZE_KEY =
FEDERATION_ROUTER_PREFIX + "reader.queue.size";
public static final int DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT = 100;
public static final String DFS_ROUTER_READER_COUNT_KEY =
FEDERATION_ROUTER_PREFIX + "reader.count";
public static final int DFS_ROUTER_READER_COUNT_DEFAULT = 1;
public static final String DFS_ROUTER_HANDLER_QUEUE_SIZE_KEY =
FEDERATION_ROUTER_PREFIX + "handler.queue.size";
public static final int DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT = 100;
public static final String DFS_ROUTER_RPC_BIND_HOST_KEY =
FEDERATION_ROUTER_PREFIX + "rpc-bind-host";
public static final int DFS_ROUTER_RPC_PORT_DEFAULT = 8888;
public static final String DFS_ROUTER_RPC_ADDRESS_KEY =
FEDERATION_ROUTER_PREFIX + "rpc-address";
public static final String DFS_ROUTER_RPC_ADDRESS_DEFAULT =
"0.0.0.0:" + DFS_ROUTER_RPC_PORT_DEFAULT;
public static final String DFS_ROUTER_RPC_ENABLE =
FEDERATION_ROUTER_PREFIX + "rpc.enable";
public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true;
// HDFS Router NN client
public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE =
FEDERATION_ROUTER_PREFIX + "connection.pool-size";
public static final int DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE_DEFAULT =
64;
public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN =
FEDERATION_ROUTER_PREFIX + "connection.pool.clean.ms";
public static final long DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN_DEFAULT =
TimeUnit.MINUTES.toMillis(1);
public static final String DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS =
FEDERATION_ROUTER_PREFIX + "connection.clean.ms";
public static final long DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT =
TimeUnit.SECONDS.toMillis(10);
// HDFS Router State Store connection
public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS =

View File

@ -23,15 +23,14 @@ import org.apache.hadoop.hdfs.server.federation.router.RemoteLocationContext;
* Represents information about a single nameservice/namespace in a federated
* HDFS cluster.
*/
public class FederationNamespaceInfo
implements Comparable<FederationNamespaceInfo>, RemoteLocationContext {
public class FederationNamespaceInfo extends RemoteLocationContext {
/** Block pool identifier. */
private String blockPoolId;
private final String blockPoolId;
/** Cluster identifier. */
private String clusterId;
private final String clusterId;
/** Nameservice identifier. */
private String nameserviceId;
private final String nameserviceId;
public FederationNamespaceInfo(String bpId, String clId, String nsId) {
this.blockPoolId = bpId;
@ -39,15 +38,16 @@ public class FederationNamespaceInfo
this.nameserviceId = nsId;
}
/**
* The HDFS nameservice id for this namespace.
*
* @return Nameservice identifier.
*/
@Override
public String getNameserviceId() {
return this.nameserviceId;
}
@Override
public String getDest() {
return this.nameserviceId;
}
/**
* The HDFS cluster id for this namespace.
*
@ -66,34 +66,8 @@ public class FederationNamespaceInfo
return this.blockPoolId;
}
@Override
public int hashCode() {
return this.nameserviceId.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
} else if (obj instanceof FederationNamespaceInfo) {
return this.compareTo((FederationNamespaceInfo) obj) == 0;
} else {
return false;
}
}
@Override
public int compareTo(FederationNamespaceInfo info) {
return this.nameserviceId.compareTo(info.getNameserviceId());
}
@Override
public String toString() {
return this.nameserviceId + "->" + this.blockPoolId + ":" + this.clusterId;
}
@Override
public String getDest() {
return this.nameserviceId;
}
}

View File

@ -17,34 +17,51 @@
*/
package org.apache.hadoop.hdfs.server.federation.resolver;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.hdfs.server.federation.router.RemoteLocationContext;
/**
* A single in a remote namespace consisting of a nameservice ID
* and a HDFS path.
*/
public class RemoteLocation implements RemoteLocationContext {
public class RemoteLocation extends RemoteLocationContext {
/** Identifier of the remote namespace for this location. */
private String nameserviceId;
private final String nameserviceId;
/** Identifier of the namenode in the namespace for this location. */
private final String namenodeId;
/** Path in the remote location. */
private String path;
private final String path;
/**
* Create a new remote location.
*
* @param nsId
* @param pPath
*/
public RemoteLocation(String nsId, String pPath) {
this(nsId, null, pPath);
}
/**
* Create a new remote location pointing to a particular namenode in the
* namespace.
*
* @param nsId Destination namespace.
* @param pPath Path in the destination namespace.
*/
public RemoteLocation(String nsId, String pPath) {
public RemoteLocation(String nsId, String nnId, String pPath) {
this.nameserviceId = nsId;
this.namenodeId = nnId;
this.path = pPath;
}
@Override
public String getNameserviceId() {
return this.nameserviceId;
String ret = this.nameserviceId;
if (this.namenodeId != null) {
ret += "-" + this.namenodeId;
}
return ret;
}
@Override
@ -54,21 +71,6 @@ public class RemoteLocation implements RemoteLocationContext {
@Override
public String toString() {
return this.nameserviceId + "->" + this.path;
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 31)
.append(this.nameserviceId)
.append(this.path)
.toHashCode();
}
@Override
public boolean equals(Object obj) {
return (obj != null &&
obj.getClass() == this.getClass() &&
obj.hashCode() == this.hashCode());
return getNameserviceId() + "->" + this.path;
}
}

View File

@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.ipc.RPC;
/**
* Context to track a connection in a {@link ConnectionPool}. When a client uses
* a connection, it increments a counter to mark it as active. Once the client
* is done with the connection, it decreases the counter. It also takes care of
* closing the connection once is not active.
*/
public class ConnectionContext {
/** Client for the connection. */
private final ProxyAndInfo<ClientProtocol> client;
/** How many threads are using this connection. */
private int numThreads = 0;
/** If the connection is closed. */
private boolean closed = false;
public ConnectionContext(ProxyAndInfo<ClientProtocol> connection) {
this.client = connection;
}
/**
* Check if the connection is active.
*
* @return True if the connection is active.
*/
public synchronized boolean isActive() {
return this.numThreads > 0;
}
/**
* Check if the connection is closed.
*
* @return If the connection is closed.
*/
public synchronized boolean isClosed() {
return this.closed;
}
/**
* Check if the connection can be used. It checks if the connection is used by
* another thread or already closed.
*
* @return True if the connection can be used.
*/
public synchronized boolean isUsable() {
return !isActive() && !isClosed();
}
/**
* Get the connection client.
*
* @return Connection client.
*/
public synchronized ProxyAndInfo<ClientProtocol> getClient() {
this.numThreads++;
return this.client;
}
/**
* Release this connection. If the connection was closed, close the proxy.
* Otherwise, mark the connection as not used by us anymore.
*/
public synchronized void release() {
if (--this.numThreads == 0 && this.closed) {
close();
}
}
/**
* We will not use this connection anymore. If it's not being used, we close
* it. Otherwise, we let release() do it once we are done with it.
*/
public synchronized void close() {
this.closed = true;
if (this.numThreads == 0) {
ClientProtocol proxy = this.client.getProxy();
// Nobody should be using this anymore so it should close right away
RPC.stopProxy(proxy);
}
}
}

View File

@ -0,0 +1,408 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implements a pool of connections for the {@link Router} to be able to open
* many connections to many Namenodes.
*/
public class ConnectionManager {
private static final Logger LOG =
LoggerFactory.getLogger(ConnectionManager.class);
/** Number of parallel new connections to create. */
protected static final int MAX_NEW_CONNECTIONS = 100;
/** Minimum amount of active connections: 50%. */
protected static final float MIN_ACTIVE_RATIO = 0.5f;
/** Configuration for the connection manager, pool and sockets. */
private final Configuration conf;
/** Min number of connections per user + nn. */
private final int minSize = 1;
/** Max number of connections per user + nn. */
private final int maxSize;
/** How often we close a pool for a particular user + nn. */
private final long poolCleanupPeriodMs;
/** How often we close a connection in a pool. */
private final long connectionCleanupPeriodMs;
/** Map of connection pools, one pool per user + NN. */
private final Map<ConnectionPoolId, ConnectionPool> pools;
/** Lock for accessing pools. */
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock readLock = readWriteLock.readLock();
private final Lock writeLock = readWriteLock.writeLock();
/** Queue for creating new connections. */
private final BlockingQueue<ConnectionPool> creatorQueue =
new ArrayBlockingQueue<>(MAX_NEW_CONNECTIONS);
/** Create new connections asynchronously. */
private final ConnectionCreator creator;
/** Periodic executor to remove stale connection pools. */
private final ScheduledThreadPoolExecutor cleaner =
new ScheduledThreadPoolExecutor(1);
/** If the connection manager is running. */
private boolean running = false;
/**
* Creates a proxy client connection pool manager.
*
* @param config Configuration for the connections.
* @param minPoolSize Min size of the connection pool.
* @param maxPoolSize Max size of the connection pool.
*/
public ConnectionManager(Configuration config) {
this.conf = config;
// Configure minimum and maximum connection pools
this.maxSize = this.conf.getInt(
DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE,
DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE_DEFAULT);
// Map with the connections indexed by UGI and Namenode
this.pools = new HashMap<>();
// Create connections in a thread asynchronously
this.creator = new ConnectionCreator(creatorQueue);
this.creator.setDaemon(true);
// Cleanup periods
this.poolCleanupPeriodMs = this.conf.getLong(
DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN,
DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN_DEFAULT);
LOG.info("Cleaning connection pools every {} seconds",
TimeUnit.MILLISECONDS.toSeconds(this.poolCleanupPeriodMs));
this.connectionCleanupPeriodMs = this.conf.getLong(
DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS,
DFSConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT);
LOG.info("Cleaning connections every {} seconds",
TimeUnit.MILLISECONDS.toSeconds(this.connectionCleanupPeriodMs));
}
/**
* Start the connection manager.
*/
public void start() {
// Start the thread that creates connections asynchronously
this.creator.start();
// Schedule a task to remove stale connection pools and sockets
long recyleTimeMs = Math.min(
poolCleanupPeriodMs, connectionCleanupPeriodMs);
LOG.info("Cleaning every {} seconds",
TimeUnit.MILLISECONDS.toSeconds(recyleTimeMs));
this.cleaner.scheduleAtFixedRate(
new CleanupTask(), 0, recyleTimeMs, TimeUnit.MILLISECONDS);
// Mark the manager as running
this.running = true;
}
/**
* Stop the connection manager by closing all the pools.
*/
public void close() {
this.creator.shutdown();
this.cleaner.shutdown();
this.running = false;
writeLock.lock();
try {
for (ConnectionPool pool : this.pools.values()) {
pool.close();
}
this.pools.clear();
} finally {
writeLock.unlock();
}
}
/**
* Fetches the next available proxy client in the pool. Each client connection
* is reserved for a single user and cannot be reused until free.
*
* @param ugi User group information.
* @param nnAddress Namenode address for the connection.
* @return Proxy client to connect to nnId as UGI.
* @throws IOException If the connection cannot be obtained.
*/
public ConnectionContext getConnection(
UserGroupInformation ugi, String nnAddress) throws IOException {
// Check if the manager is shutdown
if (!this.running) {
LOG.error(
"Cannot get a connection to {} because the manager isn't running",
nnAddress);
return null;
}
// Try to get the pool if created
ConnectionPoolId connectionId = new ConnectionPoolId(ugi, nnAddress);
ConnectionPool pool = null;
readLock.lock();
try {
pool = this.pools.get(connectionId);
} finally {
readLock.unlock();
}
// Create the pool if not created before
if (pool == null) {
writeLock.lock();
try {
pool = this.pools.get(connectionId);
if (pool == null) {
pool = new ConnectionPool(
this.conf, nnAddress, ugi, this.minSize, this.maxSize);
this.pools.put(connectionId, pool);
}
} finally {
writeLock.unlock();
}
}
ConnectionContext conn = pool.getConnection();
// Add a new connection to the pool if it wasn't usable
if (conn == null || !conn.isUsable()) {
if (!this.creatorQueue.offer(pool)) {
LOG.error("Cannot add more than {} connections at the same time",
MAX_NEW_CONNECTIONS);
}
}
if (conn != null && conn.isClosed()) {
LOG.error("We got a closed connection from {}", pool);
conn = null;
}
return conn;
}
/**
* Get the number of connection pools.
*
* @return Number of connection pools.
*/
public int getNumConnectionPools() {
readLock.lock();
try {
return pools.size();
} finally {
readLock.unlock();
}
}
/**
* Get number of open connections.
*
* @return Number of open connections.
*/
public int getNumConnections() {
int total = 0;
readLock.lock();
try {
for (ConnectionPool pool : this.pools.values()) {
total += pool.getNumConnections();
}
} finally {
readLock.unlock();
}
return total;
}
/**
* Get number of active connections.
*
* @return Number of active connections.
*/
public int getNumActiveConnections() {
int total = 0;
readLock.lock();
try {
for (ConnectionPool pool : this.pools.values()) {
total += pool.getNumActiveConnections();
}
} finally {
readLock.unlock();
}
return total;
}
/**
* Get the number of connections to be created.
*
* @return Number of connections to be created.
*/
public int getNumCreatingConnections() {
return this.creatorQueue.size();
}
/**
* Removes stale connections not accessed recently from the pool. This is
* invoked periodically.
*/
private class CleanupTask implements Runnable {
@Override
public void run() {
long currentTime = Time.now();
List<ConnectionPoolId> toRemove = new LinkedList<>();
// Look for stale pools
readLock.lock();
try {
for (Entry<ConnectionPoolId, ConnectionPool> entry : pools.entrySet()) {
ConnectionPool pool = entry.getValue();
long lastTimeActive = pool.getLastActiveTime();
boolean isStale =
currentTime > (lastTimeActive + poolCleanupPeriodMs);
if (lastTimeActive > 0 && isStale) {
// Remove this pool
LOG.debug("Closing and removing stale pool {}", pool);
pool.close();
ConnectionPoolId poolId = entry.getKey();
toRemove.add(poolId);
} else {
// Keep this pool but clean connections inside
LOG.debug("Cleaning up {}", pool);
cleanup(pool);
}
}
} finally {
readLock.unlock();
}
// Remove stale pools
if (!toRemove.isEmpty()) {
writeLock.lock();
try {
for (ConnectionPoolId poolId : toRemove) {
pools.remove(poolId);
}
} finally {
writeLock.unlock();
}
}
}
/**
* Clean the unused connections for this pool.
*
* @param pool Connection pool to cleanup.
*/
private void cleanup(ConnectionPool pool) {
if (pool.getNumConnections() > pool.getMinSize()) {
// Check if the pool hasn't been active in a while or not 50% are used
long timeSinceLastActive = Time.now() - pool.getLastActiveTime();
int total = pool.getNumConnections();
int active = getNumActiveConnections();
if (timeSinceLastActive > connectionCleanupPeriodMs ||
active < MIN_ACTIVE_RATIO * total) {
// Remove and close 1 connection
List<ConnectionContext> conns = pool.removeConnections(1);
for (ConnectionContext conn : conns) {
conn.close();
}
LOG.debug("Removed connection {} used {} seconds ago. " +
"Pool has {}/{} connections", pool.getConnectionPoolId(),
TimeUnit.MILLISECONDS.toSeconds(timeSinceLastActive),
pool.getNumConnections(), pool.getMaxSize());
}
}
}
}
/**
* Thread that creates connections asynchronously.
*/
private static class ConnectionCreator extends Thread {
/** If the creator is running. */
private boolean running = true;
/** Queue to push work to. */
private BlockingQueue<ConnectionPool> queue;
ConnectionCreator(BlockingQueue<ConnectionPool> blockingQueue) {
super("Connection creator");
this.queue = blockingQueue;
}
@Override
public void run() {
while (this.running) {
try {
ConnectionPool pool = this.queue.take();
try {
int total = pool.getNumConnections();
int active = pool.getNumActiveConnections();
if (pool.getNumConnections() < pool.getMaxSize() &&
active >= MIN_ACTIVE_RATIO * total) {
ConnectionContext conn = pool.newConnection();
pool.addConnection(conn);
} else {
LOG.debug("Cannot add more than {} connections to {}",
pool.getMaxSize(), pool);
}
} catch (IOException e) {
LOG.error("Cannot create a new connection", e);
}
} catch (InterruptedException e) {
LOG.error("The connection creator was interrupted");
this.running = false;
}
}
}
/**
* Stop this connection creator.
*/
public void shutdown() {
this.running = false;
this.interrupt();
}
}
}

View File

@ -0,0 +1,314 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.SocketFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Maintains a pool of connections for each User (including tokens) + NN. The
* RPC client maintains a single socket, to achieve throughput similar to a NN,
* each request is multiplexed across multiple sockets/connections from a
* pool.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ConnectionPool {
private static final Logger LOG =
LoggerFactory.getLogger(ConnectionPool.class);
/** Configuration settings for the connection pool. */
private final Configuration conf;
/** Identifier for this connection pool. */
private final ConnectionPoolId connectionPoolId;
/** Namenode this pool connects to. */
private final String namenodeAddress;
/** User for this connections. */
private final UserGroupInformation ugi;
/** Pool of connections. We mimic a COW array. */
private volatile List<ConnectionContext> connections = new ArrayList<>();
/** Connection index for round-robin. */
private final AtomicInteger clientIndex = new AtomicInteger(0);
/** Min number of connections per user. */
private final int minSize;
/** Max number of connections per user. */
private final int maxSize;
/** The last time a connection was active. */
private volatile long lastActiveTime = 0;
protected ConnectionPool(Configuration config, String address,
UserGroupInformation user, int minPoolSize, int maxPoolSize)
throws IOException {
this.conf = config;
// Connection pool target
this.ugi = user;
this.namenodeAddress = address;
this.connectionPoolId =
new ConnectionPoolId(this.ugi, this.namenodeAddress);
// Set configuration parameters for the pool
this.minSize = minPoolSize;
this.maxSize = maxPoolSize;
// Add minimum connections to the pool
for (int i=0; i<this.minSize; i++) {
ConnectionContext newConnection = newConnection();
this.connections.add(newConnection);
}
LOG.debug("Created connection pool \"{}\" with {} connections",
this.connectionPoolId, this.minSize);
}
/**
* Get the maximum number of connections allowed in this pool.
*
* @return Maximum number of connections.
*/
protected int getMaxSize() {
return this.maxSize;
}
/**
* Get the minimum number of connections in this pool.
*
* @return Minimum number of connections.
*/
protected int getMinSize() {
return this.minSize;
}
/**
* Get the connection pool identifier.
*
* @return Connection pool identifier.
*/
protected ConnectionPoolId getConnectionPoolId() {
return this.connectionPoolId;
}
/**
* Return the next connection round-robin.
*
* @return Connection context.
*/
protected ConnectionContext getConnection() {
this.lastActiveTime = Time.now();
// Get a connection from the pool following round-robin
ConnectionContext conn = null;
List<ConnectionContext> tmpConnections = this.connections;
int size = tmpConnections.size();
int threadIndex = this.clientIndex.getAndIncrement();
for (int i=0; i<size; i++) {
int index = (threadIndex + i) % size;
conn = tmpConnections.get(index);
if (conn != null && !conn.isUsable()) {
return conn;
}
}
// We return a connection even if it's active
return conn;
}
/**
* Add a connection to the current pool. It uses a Copy-On-Write approach.
*
* @param conns New connections to add to the pool.
*/
public synchronized void addConnection(ConnectionContext conn) {
List<ConnectionContext> tmpConnections = new ArrayList<>(this.connections);
tmpConnections.add(conn);
this.connections = tmpConnections;
this.lastActiveTime = Time.now();
}
/**
* Remove connections from the current pool.
*
* @param num Number of connections to remove.
* @return Removed connections.
*/
public synchronized List<ConnectionContext> removeConnections(int num) {
List<ConnectionContext> removed = new LinkedList<>();
// Remove and close the last connection
List<ConnectionContext> tmpConnections = new ArrayList<>();
for (int i=0; i<this.connections.size(); i++) {
ConnectionContext conn = this.connections.get(i);
if (i < this.minSize || i < this.connections.size() - num) {
tmpConnections.add(conn);
} else {
removed.add(conn);
}
}
this.connections = tmpConnections;
return removed;
}
/**
* Close the connection pool.
*/
protected synchronized void close() {
long timeSinceLastActive = TimeUnit.MILLISECONDS.toSeconds(
Time.now() - getLastActiveTime());
LOG.debug("Shutting down connection pool \"{}\" used {} seconds ago",
this.connectionPoolId, timeSinceLastActive);
for (ConnectionContext connection : this.connections) {
connection.close();
}
this.connections.clear();
}
/**
* Number of connections in the pool.
*
* @return Number of connections.
*/
protected int getNumConnections() {
return this.connections.size();
}
/**
* Number of active connections in the pool.
*
* @return Number of active connections.
*/
protected int getNumActiveConnections() {
int ret = 0;
List<ConnectionContext> tmpConnections = this.connections;
for (ConnectionContext conn : tmpConnections) {
if (conn.isActive()) {
ret++;
}
}
return ret;
}
/**
* Get the last time the connection pool was used.
*
* @return Last time the connection pool was used.
*/
protected long getLastActiveTime() {
return this.lastActiveTime;
}
@Override
public String toString() {
return this.connectionPoolId.toString();
}
/**
* Create a new proxy wrapper for a client NN connection.
* @return Proxy for the target ClientProtocol that contains the user's
* security context.
* @throws IOException
*/
public ConnectionContext newConnection() throws IOException {
return newConnection(this.conf, this.namenodeAddress, this.ugi);
}
/**
* Creates a proxy wrapper for a client NN connection. Each proxy contains
* context for a single user/security context. To maximize throughput it is
* recommended to use multiple connection per user+server, allowing multiple
* writes and reads to be dispatched in parallel.
*
* @param conf Configuration for the connection.
* @param nnAddress Address of server supporting the ClientProtocol.
* @param ugi User context.
* @return Proxy for the target ClientProtocol that contains the user's
* security context.
* @throws IOException If it cannot be created.
*/
protected static ConnectionContext newConnection(Configuration conf,
String nnAddress, UserGroupInformation ugi)
throws IOException {
RPC.setProtocolEngine(
conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
final RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy(
conf,
HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY,
HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT,
HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY,
HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT,
HdfsConstants.SAFEMODE_EXCEPTION_CLASS_NAME);
SocketFactory factory = SocketFactory.getDefault();
if (UserGroupInformation.isSecurityEnabled()) {
SaslRpcServer.init(conf);
}
InetSocketAddress socket = NetUtils.createSocketAddr(nnAddress);
final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
ClientNamenodeProtocolPB.class, version, socket, ugi, conf,
factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy();
ClientProtocol client = new ClientNamenodeProtocolTranslatorPB(proxy);
Text dtService = SecurityUtil.buildTokenService(socket);
ProxyAndInfo<ClientProtocol> clientProxy =
new ProxyAndInfo<ClientProtocol>(client, dtService, socket);
ConnectionContext connection = new ConnectionContext(clientProxy);
return connection;
}
}

View File

@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
/**
* Identifier for a connection for a user to a namenode.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ConnectionPoolId implements Comparable<ConnectionPoolId> {
/** Namenode identifier. */
private final String nnId;
/** Information about the user. */
private final UserGroupInformation ugi;
/**
* New connection pool identifier.
*
* @param ugi Information of the user issuing the request.
* @param nnId Namenode address with port.
*/
public ConnectionPoolId(final UserGroupInformation ugi, final String nnId) {
this.nnId = nnId;
this.ugi = ugi;
}
@Override
public int hashCode() {
int hash = new HashCodeBuilder(17, 31)
.append(this.nnId)
.append(this.ugi.toString())
.append(this.getTokenIds())
.toHashCode();
return hash;
}
@Override
public boolean equals(Object o) {
if (o instanceof ConnectionPoolId) {
ConnectionPoolId other = (ConnectionPoolId) o;
if (!this.nnId.equals(other.nnId)) {
return false;
}
if (!this.ugi.toString().equals(other.ugi.toString())) {
return false;
}
String thisTokens = this.getTokenIds().toString();
String otherTokens = other.getTokenIds().toString();
return thisTokens.equals(otherTokens);
}
return false;
}
@Override
public String toString() {
return this.ugi + " " + this.getTokenIds() + "->" + this.nnId;
}
@Override
public int compareTo(ConnectionPoolId other) {
int ret = this.nnId.compareTo(other.nnId);
if (ret == 0) {
ret = this.ugi.toString().compareTo(other.ugi.toString());
}
if (ret == 0) {
String thisTokens = this.getTokenIds().toString();
String otherTokens = other.getTokenIds().toString();
ret = thisTokens.compareTo(otherTokens);
}
return ret;
}
/**
* Get the token identifiers for this connection.
* @return List with the token identifiers.
*/
private List<String> getTokenIds() {
List<String> tokenIds = new ArrayList<>();
Collection<Token<? extends TokenIdentifier>> tokens = this.ugi.getTokens();
for (Token<? extends TokenIdentifier> token : tokens) {
byte[] tokenIdBytes = token.getIdentifier();
String tokenId = Arrays.toString(tokenIdBytes);
tokenIds.add(tokenId);
}
Collections.sort(tokenIds);
return tokenIds;
}
}

View File

@ -17,22 +17,52 @@
*/
package org.apache.hadoop.hdfs.server.federation.router;
import org.apache.commons.lang.builder.HashCodeBuilder;
/**
* Interface for objects that are unique to a namespace.
* Base class for objects that are unique to a namespace.
*/
public interface RemoteLocationContext {
public abstract class RemoteLocationContext
implements Comparable<RemoteLocationContext> {
/**
* Returns an identifier for a unique namespace.
*
* @return Namespace identifier.
*/
String getNameserviceId();
public abstract String getNameserviceId();
/**
* Destination in this location. For example the path in a remote namespace.
*
* @return Destination in this location.
*/
String getDest();
public abstract String getDest();
@Override
public int hashCode() {
return new HashCodeBuilder(17, 31)
.append(getNameserviceId())
.append(getDest())
.toHashCode();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof RemoteLocationContext) {
RemoteLocationContext other = (RemoteLocationContext) obj;
return this.getNameserviceId().equals(other.getNameserviceId()) &&
this.getDest().equals(other.getDest());
}
return false;
}
@Override
public int compareTo(RemoteLocationContext info) {
int ret = this.getNameserviceId().compareTo(info.getNameserviceId());
if (ret == 0) {
ret = this.getDest().compareTo(info.getDest());
}
return ret;
}
}

View File

@ -0,0 +1,164 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Arrays;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Determines the remote client protocol method and the parameter list for a
* specific location.
*/
public class RemoteMethod {
private static final Logger LOG = LoggerFactory.getLogger(RemoteMethod.class);
/** List of parameters: static and dynamic values, matchings types. */
private final Object[] params;
/** List of method parameters types, matches parameters. */
private final Class<?>[] types;
/** String name of the ClientProtocol method. */
private final String methodName;
/**
* Create a method with no parameters.
*
* @param method The string name of the ClientProtocol method.
*/
public RemoteMethod(String method) {
this.params = null;
this.types = null;
this.methodName = method;
}
/**
* Creates a remote method generator.
*
* @param method The string name of the ClientProtocol method.
* @param pTypes A list of types to use to locate the specific method.
* @param pParams A list of parameters for the method. The order of the
* parameter list must match the order and number of the types.
* Parameters are grouped into 2 categories:
* <ul>
* <li>Static parameters that are immutable across locations.
* <li>Dynamic parameters that are determined for each location by a
* RemoteParam object. To specify a dynamic parameter, pass an
* instance of RemoteParam in place of the parameter value.
* </ul>
* @throws IOException If the types and parameter lists are not valid.
*/
public RemoteMethod(String method, Class<?>[] pTypes, Object... pParams)
throws IOException {
if (pParams.length != pTypes.length) {
throw new IOException("Invalid parameters for method " + method);
}
this.params = pParams;
this.types = pTypes;
this.methodName = method;
}
/**
* Get the represented java method.
*
* @return Method
* @throws IOException If the method cannot be found.
*/
public Method getMethod() throws IOException {
try {
if (types != null) {
return ClientProtocol.class.getDeclaredMethod(methodName, types);
} else {
return ClientProtocol.class.getDeclaredMethod(methodName);
}
} catch (NoSuchMethodException e) {
// Re-throw as an IOException
LOG.error("Cannot get method {} with types {}",
methodName, Arrays.toString(types), e);
throw new IOException(e);
} catch (SecurityException e) {
LOG.error("Cannot access method {} with types {}",
methodName, Arrays.toString(types), e);
throw new IOException(e);
}
}
/**
* Get the calling types for this method.
*
* @return An array of calling types.
*/
public Class<?>[] getTypes() {
return this.types;
}
/**
* Generate a list of parameters for this specific location using no context.
*
* @return A list of parameters for the method customized for the location.
*/
public Object[] getParams() {
return this.getParams(null);
}
/**
* Get the name of the method.
*
* @return Name of the method.
*/
public String getMethodName() {
return this.methodName;
}
/**
* Generate a list of parameters for this specific location. Parameters are
* grouped into 2 categories:
* <ul>
* <li>Static parameters that are immutable across locations.
* <li>Dynamic parameters that are determined for each location by a
* RemoteParam object.
* </ul>
*
* @param context The context identifying the location.
* @return A list of parameters for the method customized for the location.
*/
public Object[] getParams(RemoteLocationContext context) {
if (this.params == null) {
return new Object[] {};
}
Object[] objList = new Object[this.params.length];
for (int i = 0; i < this.params.length; i++) {
Object currentObj = this.params[i];
if (currentObj instanceof RemoteParam) {
// Map the parameter using the context
RemoteParam paramGetter = (RemoteParam) currentObj;
objList[i] = paramGetter.getParameterForContext(context);
} else {
objList[i] = currentObj;
}
}
return objList;
}
}

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import java.util.Map;
/**
* A dynamically assignable parameter that is location-specific.
* <p>
* There are 2 ways this mapping is determined:
* <ul>
* <li>Default: Uses the RemoteLocationContext's destination
* <li>Map: Uses the value of the RemoteLocationContext key provided in the
* parameter map.
* </ul>
*/
public class RemoteParam {
private final Map<? extends Object, ? extends Object> paramMap;
/**
* Constructs a default remote parameter. Always maps the value to the
* destination of the provided RemoveLocationContext.
*/
public RemoteParam() {
this.paramMap = null;
}
/**
* Constructs a map based remote parameter. Determines the value using the
* provided RemoteLocationContext as a key into the map.
*
* @param map Map with RemoteLocationContext keys.
*/
public RemoteParam(
Map<? extends RemoteLocationContext, ? extends Object> map) {
this.paramMap = map;
}
/**
* Determine the appropriate value for this parameter based on the location.
*
* @param context Context identifying the location.
* @return A parameter specific to this location.
*/
public Object getParameterForContext(RemoteLocationContext context) {
if (context == null) {
return null;
} else if (this.paramMap != null) {
return this.paramMap.get(context);
} else {
// Default case
return context.getDest();
}
}
}

View File

@ -22,12 +22,14 @@ import static org.apache.hadoop.hdfs.server.federation.router.FederationUtil.new
import static org.apache.hadoop.util.ExitUtil.terminate;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
@ -36,6 +38,8 @@ import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Router that provides a unified view of multiple federated HDFS clusters. It
@ -60,7 +64,7 @@ import org.apache.hadoop.util.StringUtils;
@InterfaceStability.Evolving
public class Router extends CompositeService {
private static final Log LOG = LogFactory.getLog(Router.class);
private static final Logger LOG = LoggerFactory.getLogger(Router.class);
/** Configuration for the Router. */
@ -71,6 +75,7 @@ public class Router extends CompositeService {
/** RPC interface to the client. */
private RouterRpcServer rpcServer;
private InetSocketAddress rpcAddress;
/** Interface with the State Store. */
private StateStoreService stateStore;
@ -105,9 +110,6 @@ public class Router extends CompositeService {
protected void serviceInit(Configuration configuration) throws Exception {
this.conf = configuration;
// TODO Interface to the State Store
this.stateStore = null;
// Resolver to track active NNs
this.namenodeResolver = newActiveNamenodeResolver(
this.conf, this.stateStore);
@ -122,6 +124,15 @@ public class Router extends CompositeService {
throw new IOException("Cannot find subcluster resolver");
}
if (conf.getBoolean(
DFSConfigKeys.DFS_ROUTER_RPC_ENABLE,
DFSConfigKeys.DFS_ROUTER_RPC_ENABLE_DEFAULT)) {
// Create RPC server
this.rpcServer = createRpcServer();
addService(this.rpcServer);
this.setRpcServerAddress(rpcServer.getRpcAddress());
}
super.serviceInit(conf);
}
@ -171,11 +182,13 @@ public class Router extends CompositeService {
router.init(conf);
router.start();
} catch (Throwable e) {
LOG.error("Failed to start router.", e);
LOG.error("Failed to start router", e);
terminate(1, e);
}
}
/////////////////////////////////////////////////////////
// RPC Server
/////////////////////////////////////////////////////////
@ -183,7 +196,7 @@ public class Router extends CompositeService {
/**
* Create a new Router RPC server to proxy ClientProtocol requests.
*
* @return RouterRpcServer
* @return New Router RPC Server.
* @throws IOException If the router RPC server was not started.
*/
protected RouterRpcServer createRpcServer() throws IOException {
@ -200,6 +213,35 @@ public class Router extends CompositeService {
return this.rpcServer;
}
/**
* Set the current RPC socket for the router.
*
* @param rpcAddress RPC address.
*/
protected void setRpcServerAddress(InetSocketAddress address) {
this.rpcAddress = address;
// Use the RPC address as our unique router Id
if (this.rpcAddress != null) {
try {
String hostname = InetAddress.getLocalHost().getHostName();
setRouterId(hostname + ":" + this.rpcAddress.getPort());
} catch (UnknownHostException ex) {
LOG.error("Cannot set unique router ID, address not resolvable {}",
this.rpcAddress);
}
}
}
/**
* Get the current RPC socket address for the router.
*
* @return InetSocketAddress
*/
public InetSocketAddress getRpcServerAddress() {
return this.rpcAddress;
}
/////////////////////////////////////////////////////////
// Submodule getters
/////////////////////////////////////////////////////////

View File

@ -0,0 +1,856 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* A client proxy for Router -> NN communication using the NN ClientProtocol.
* <p>
* Provides routers to invoke remote ClientProtocol methods and handle
* retries/failover.
* <ul>
* <li>invokeSingle Make a single request to a single namespace
* <li>invokeSequential Make a sequential series of requests to multiple
* ordered namespaces until a condition is met.
* <li>invokeConcurrent Make concurrent requests to multiple namespaces and
* return all of the results.
* </ul>
* Also maintains a cached pool of connections to NNs. Connections are managed
* by the ConnectionManager and are unique to each user + NN. The size of the
* connection pool can be configured. Larger pools allow for more simultaneous
* requests to a single NN from a single user.
*/
public class RouterRpcClient {
private static final Logger LOG =
LoggerFactory.getLogger(RouterRpcClient.class);
/** Router identifier. */
private final String routerId;
/** Interface to identify the active NN for a nameservice or blockpool ID. */
private final ActiveNamenodeResolver namenodeResolver;
/** Connection pool to the Namenodes per user for performance. */
private final ConnectionManager connectionManager;
/** Service to run asynchronous calls. */
private final ExecutorService executorService;
/** Retry policy for router -> NN communication. */
private final RetryPolicy retryPolicy;
/** Pattern to parse a stack trace line. */
private static final Pattern STACK_TRACE_PATTERN =
Pattern.compile("\\tat (.*)\\.(.*)\\((.*):(\\d*)\\)");
/**
* Create a router RPC client to manage remote procedure calls to NNs.
*
* @param conf Hdfs Configuation.
* @param resolver A NN resolver to determine the currently active NN in HA.
* @param monitor Optional performance monitor.
*/
public RouterRpcClient(Configuration conf, String identifier,
ActiveNamenodeResolver resolver) {
this.routerId = identifier;
this.namenodeResolver = resolver;
this.connectionManager = new ConnectionManager(conf);
this.connectionManager.start();
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("RPC Router Client-%d")
.build();
this.executorService = Executors.newCachedThreadPool(threadFactory);
int maxFailoverAttempts = conf.getInt(
HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY,
HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_DEFAULT);
int maxRetryAttempts = conf.getInt(
HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY,
HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_DEFAULT);
int failoverSleepBaseMillis = conf.getInt(
HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_KEY,
HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_DEFAULT);
int failoverSleepMaxMillis = conf.getInt(
HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_KEY,
HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_DEFAULT);
this.retryPolicy = RetryPolicies.failoverOnNetworkException(
RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts, maxRetryAttempts,
failoverSleepBaseMillis, failoverSleepMaxMillis);
}
/**
* Shutdown the client.
*/
public void shutdown() {
if (this.connectionManager != null) {
this.connectionManager.close();
}
if (this.executorService != null) {
this.executorService.shutdownNow();
}
}
/**
* Total number of available sockets between the router and NNs.
*
* @return Number of namenode clients.
*/
public int getNumConnections() {
return this.connectionManager.getNumConnections();
}
/**
* Total number of available sockets between the router and NNs.
*
* @return Number of namenode clients.
*/
public int getNumActiveConnections() {
return this.connectionManager.getNumActiveConnections();
}
/**
* Total number of open connection pools to a NN. Each connection pool.
* represents one user + one NN.
*
* @return Number of connection pools.
*/
public int getNumConnectionPools() {
return this.connectionManager.getNumConnectionPools();
}
/**
* Number of connections between the router and NNs being created sockets.
*
* @return Number of connections waiting to be created.
*/
public int getNumCreatingConnections() {
return this.connectionManager.getNumCreatingConnections();
}
/**
* Get ClientProtocol proxy client for a NameNode. Each combination of user +
* NN must use a unique proxy client. Previously created clients are cached
* and stored in a connection pool by the ConnectionManager.
*
* @param ugi User group information.
* @param nsId Nameservice identifier.
* @param rpcAddress ClientProtocol RPC server address of the NN.
* @return ConnectionContext containing a ClientProtocol proxy client for the
* NN + current user.
* @throws IOException If we cannot get a connection to the NameNode.
*/
private ConnectionContext getConnection(
UserGroupInformation ugi, String nsId, String rpcAddress)
throws IOException {
ConnectionContext connection = null;
try {
// Each proxy holds the UGI info for the current user when it is created.
// This cache does not scale very well, one entry per user per namenode,
// and may need to be adjusted and/or selectively pruned. The cache is
// important due to the excessive overhead of creating a new proxy wrapper
// for each individual request.
// TODO Add tokens from the federated UGI
connection = this.connectionManager.getConnection(ugi, rpcAddress);
LOG.debug("User {} NN {} is using connection {}",
ugi.getUserName(), rpcAddress, connection);
} catch (Exception ex) {
LOG.error("Cannot open NN client to address: {}", rpcAddress, ex);
}
if (connection == null) {
throw new IOException("Cannot get a connection to " + rpcAddress);
}
return connection;
}
/**
* Convert an exception to an IOException.
*
* For a non-IOException, wrap it with IOException. For a RemoteException,
* unwrap it. For an IOException which is not a RemoteException, return it.
*
* @param e Exception to convert into an exception.
* @return Created IO exception.
*/
private static IOException toIOException(Exception e) {
if (e instanceof RemoteException) {
return ((RemoteException) e).unwrapRemoteException();
}
if (e instanceof IOException) {
return (IOException)e;
}
return new IOException(e);
}
/**
* If we should retry the RPC call.
*
* @param ex Exception reported.
* @param retryCount Number of retries.
* @return Retry decision.
* @throws IOException Original exception if the retry policy generates one.
*/
private RetryDecision shouldRetry(final IOException ioe, final int retryCount)
throws IOException {
try {
final RetryPolicy.RetryAction a =
this.retryPolicy.shouldRetry(ioe, retryCount, 0, true);
return a.action;
} catch (Exception ex) {
LOG.error("Re-throwing API exception, no more retries", ex);
throw toIOException(ex);
}
}
/**
* Invokes a method against the ClientProtocol proxy server. If a standby
* exception is generated by the call to the client, retries using the
* alternate server.
*
* Re-throws exceptions generated by the remote RPC call as either
* RemoteException or IOException.
*
* @param ugi User group information.
* @param namenodes A prioritized list of namenodes within the same
* nameservice.
* @param method Remote ClientProtcol method to invoke.
* @param params Variable list of parameters matching the method.
* @return The result of invoking the method.
* @throws IOException
*/
private Object invokeMethod(
final UserGroupInformation ugi,
final List<? extends FederationNamenodeContext> namenodes,
final Method method, final Object... params) throws IOException {
if (namenodes == null || namenodes.isEmpty()) {
throw new IOException("No namenodes to invoke " + method.getName() +
" with params " + Arrays.toString(params) + " from " + this.routerId);
}
Object ret = null;
boolean failover = false;
Map<FederationNamenodeContext, IOException> ioes = new LinkedHashMap<>();
for (FederationNamenodeContext namenode : namenodes) {
ConnectionContext connection = null;
try {
String nsId = namenode.getNameserviceId();
String rpcAddress = namenode.getRpcAddress();
connection = this.getConnection(ugi, nsId, rpcAddress);
ProxyAndInfo<ClientProtocol> client = connection.getClient();
ClientProtocol proxy = client.getProxy();
ret = invoke(0, method, proxy, params);
if (failover) {
// Success on alternate server, update
InetSocketAddress address = client.getAddress();
namenodeResolver.updateActiveNamenode(nsId, address);
}
return ret;
} catch (IOException ioe) {
ioes.put(namenode, ioe);
if (ioe instanceof StandbyException) {
// Fail over indicated by retry policy and/or NN
failover = true;
} else if (ioe instanceof RemoteException) {
// RemoteException returned by NN
throw (RemoteException) ioe;
} else {
// Other communication error, this is a failure
// Communication retries are handled by the retry policy
throw ioe;
}
} finally {
if (connection != null) {
connection.release();
}
}
}
// All namenodes were unavailable or in standby
String msg = "No namenode available to invoke " + method.getName() + " " +
Arrays.toString(params);
LOG.error(msg);
for (Entry<FederationNamenodeContext, IOException> entry :
ioes.entrySet()) {
FederationNamenodeContext namenode = entry.getKey();
String nsId = namenode.getNameserviceId();
String nnId = namenode.getNamenodeId();
String addr = namenode.getRpcAddress();
IOException ioe = entry.getValue();
if (ioe instanceof StandbyException) {
LOG.error("{} {} at {} is in Standby", nsId, nnId, addr);
} else {
LOG.error("{} {} at {} error: \"{}\"",
nsId, nnId, addr, ioe.getMessage());
}
}
throw new StandbyException(msg);
}
/**
* Invokes a method on the designated object. Catches exceptions specific to
* the invocation.
*
* Re-throws exceptions generated by the remote RPC call as either
* RemoteException or IOException.
*
* @param method Method to invoke
* @param obj Target object for the method
* @param params Variable parameters
* @return Response from the remote server
* @throws IOException
* @throws InterruptedException
*/
private Object invoke(int retryCount, final Method method, final Object obj,
final Object... params) throws IOException {
try {
return method.invoke(obj, params);
} catch (IllegalAccessException e) {
LOG.error("Unexpected exception while proxying API", e);
return null;
} catch (IllegalArgumentException e) {
LOG.error("Unexpected exception while proxying API", e);
return null;
} catch (InvocationTargetException e) {
Throwable cause = e.getCause();
if (cause instanceof IOException) {
IOException ioe = (IOException) cause;
// Check if we should retry.
RetryDecision decision = shouldRetry(ioe, retryCount);
if (decision == RetryDecision.RETRY) {
// retry
return invoke(++retryCount, method, obj, params);
} else if (decision == RetryDecision.FAILOVER_AND_RETRY) {
// failover, invoker looks for standby exceptions for failover.
if (ioe instanceof StandbyException) {
throw ioe;
} else {
throw new StandbyException(ioe.getMessage());
}
} else {
if (ioe instanceof RemoteException) {
RemoteException re = (RemoteException) ioe;
ioe = re.unwrapRemoteException();
ioe = getCleanException(ioe);
}
throw ioe;
}
} else {
throw new IOException(e);
}
}
}
/**
* Get a clean copy of the exception. Sometimes the exceptions returned by the
* server contain the full stack trace in the message.
*
* @param ioe Exception to clean up.
* @return Copy of the original exception with a clean message.
*/
private static IOException getCleanException(IOException ioe) {
IOException ret = null;
String msg = ioe.getMessage();
Throwable cause = ioe.getCause();
StackTraceElement[] stackTrace = ioe.getStackTrace();
// Clean the message by removing the stack trace
int index = msg.indexOf("\n");
if (index > 0) {
String[] msgSplit = msg.split("\n");
msg = msgSplit[0];
// Parse stack trace from the message
List<StackTraceElement> elements = new LinkedList<>();
for (int i=1; i<msgSplit.length; i++) {
String line = msgSplit[i];
Matcher matcher = STACK_TRACE_PATTERN.matcher(line);
if (matcher.find()) {
String declaringClass = matcher.group(1);
String methodName = matcher.group(2);
String fileName = matcher.group(3);
int lineNumber = Integer.parseInt(matcher.group(4));
StackTraceElement element = new StackTraceElement(
declaringClass, methodName, fileName, lineNumber);
elements.add(element);
}
}
stackTrace = elements.toArray(new StackTraceElement[elements.size()]);
}
// Create the new output exception
if (ioe instanceof RemoteException) {
RemoteException re = (RemoteException)ioe;
ret = new RemoteException(re.getClassName(), msg);
} else {
// Try the simple constructor and initialize the fields
Class<? extends IOException> ioeClass = ioe.getClass();
try {
Constructor<? extends IOException> constructor =
ioeClass.getDeclaredConstructor(String.class);
ret = constructor.newInstance(msg);
} catch (ReflectiveOperationException e) {
// If there are errors, just use the input one
LOG.error("Could not create exception {}", ioeClass.getSimpleName(), e);
ret = ioe;
}
}
if (ret != null) {
ret.initCause(cause);
ret.setStackTrace(stackTrace);
}
return ret;
}
/**
* Invokes a ClientProtocol method. Determines the target nameservice via a
* provided block.
*
* Re-throws exceptions generated by the remote RPC call as either
* RemoteException or IOException.
*
* @param block Block used to determine appropriate nameservice.
* @param method The remote method and parameters to invoke.
* @return The result of invoking the method.
* @throws IOException
*/
public Object invokeSingle(final ExtendedBlock block, RemoteMethod method)
throws IOException {
String bpId = block.getBlockPoolId();
return invokeSingleBlockPool(bpId, method);
}
/**
* Invokes a ClientProtocol method. Determines the target nameservice using
* the block pool id.
*
* Re-throws exceptions generated by the remote RPC call as either
* RemoteException or IOException.
*
* @param bpId Block pool identifier.
* @param method The remote method and parameters to invoke.
* @return The result of invoking the method.
* @throws IOException
*/
public Object invokeSingleBlockPool(final String bpId, RemoteMethod method)
throws IOException {
String nsId = getNameserviceForBlockPoolId(bpId);
return invokeSingle(nsId, method);
}
/**
* Invokes a ClientProtocol method against the specified namespace.
*
* Re-throws exceptions generated by the remote RPC call as either
* RemoteException or IOException.
*
* @param nsId Target namespace for the method.
* @param method The remote method and parameters to invoke.
* @return The result of invoking the method.
* @throws IOException
*/
public Object invokeSingle(final String nsId, RemoteMethod method)
throws IOException {
UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
List<? extends FederationNamenodeContext> nns =
getNamenodesForNameservice(nsId);
RemoteLocationContext loc = new RemoteLocation(nsId, "/");
return invokeMethod(ugi, nns, method.getMethod(), method.getParams(loc));
}
/**
* Invokes a single proxy call for a single location.
*
* Re-throws exceptions generated by the remote RPC call as either
* RemoteException or IOException.
*
* @param location RemoteLocation to invoke.
* @param remoteMethod The remote method and parameters to invoke.
* @return The result of invoking the method if successful.
* @throws IOException
*/
public Object invokeSingle(final RemoteLocationContext location,
RemoteMethod remoteMethod) throws IOException {
List<RemoteLocationContext> locations = Collections.singletonList(location);
return invokeSequential(locations, remoteMethod);
}
/**
* Invokes sequential proxy calls to different locations. Continues to invoke
* calls until a call returns without throwing a remote exception.
*
* @param locations List of locations/nameservices to call concurrently.
* @param remoteMethod The remote method and parameters to invoke.
* @return The result of the first successful call, or if no calls are
* successful, the result of the last RPC call executed.
* @throws IOException if the success condition is not met and one of the RPC
* calls generated a remote exception.
*/
public Object invokeSequential(
final List<? extends RemoteLocationContext> locations,
final RemoteMethod remoteMethod) throws IOException {
return invokeSequential(locations, remoteMethod, null, null);
}
/**
* Invokes sequential proxy calls to different locations. Continues to invoke
* calls until the success condition is met, or until all locations have been
* attempted.
*
* The success condition may be specified by:
* <ul>
* <li>An expected result class
* <li>An expected result value
* </ul>
*
* If no expected result class/values are specified, the success condition is
* a call that does not throw a remote exception.
*
* @param locations List of locations/nameservices to call concurrently.
* @param remoteMethod The remote method and parameters to invoke.
* @param expectedResultClass In order to be considered a positive result, the
* return type must be of this class.
* @param expectedResultValue In order to be considered a positive result, the
* return value must equal the value of this object.
* @return The result of the first successful call, or if no calls are
* successful, the result of the first RPC call executed.
* @throws IOException if the success condition is not met, return the first
* remote exception generated.
*/
public Object invokeSequential(
final List<? extends RemoteLocationContext> locations,
final RemoteMethod remoteMethod, Class<?> expectedResultClass,
Object expectedResultValue) throws IOException {
final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
final Method m = remoteMethod.getMethod();
IOException firstThrownException = null;
IOException lastThrownException = null;
Object firstResult = null;
// Invoke in priority order
for (final RemoteLocationContext loc : locations) {
String ns = loc.getNameserviceId();
List<? extends FederationNamenodeContext> namenodes =
getNamenodesForNameservice(ns);
try {
Object[] params = remoteMethod.getParams(loc);
Object result = invokeMethod(ugi, namenodes, m, params);
// Check if the result is what we expected
if (isExpectedClass(expectedResultClass, result) &&
isExpectedValue(expectedResultValue, result)) {
// Valid result, stop here
return result;
}
if (firstResult == null) {
firstResult = result;
}
} catch (IOException ioe) {
// Record it and move on
lastThrownException = (IOException) ioe;
if (firstThrownException == null) {
firstThrownException = lastThrownException;
}
} catch (Exception e) {
// Unusual error, ClientProtocol calls always use IOException (or
// RemoteException). Re-wrap in IOException for compatibility with
// ClientProtcol.
LOG.error("Unexpected exception {} proxying {} to {}",
e.getClass(), m.getName(), ns, e);
lastThrownException = new IOException(
"Unexpected exception proxying API " + e.getMessage(), e);
if (firstThrownException == null) {
firstThrownException = lastThrownException;
}
}
}
if (firstThrownException != null) {
// re-throw the last exception thrown for compatibility
throw firstThrownException;
}
// Return the last result, whether it is the value we are looking for or a
return firstResult;
}
/**
* Checks if a result matches the required result class.
*
* @param expectedResultClass Required result class, null to skip the check.
* @param result The result to check.
* @return True if the result is an instance of the required class or if the
* expected class is null.
*/
private static boolean isExpectedClass(Class<?> expectedClass, Object clazz) {
if (expectedClass == null) {
return true;
} else if (clazz == null) {
return false;
} else {
return expectedClass.isInstance(clazz);
}
}
/**
* Checks if a result matches the expected value.
*
* @param expectedResultValue The expected value, null to skip the check.
* @param result The result to check.
* @return True if the result is equals to the expected value or if the
* expected value is null.
*/
private static boolean isExpectedValue(Object expectedValue, Object value) {
if (expectedValue == null) {
return true;
} else if (value == null) {
return false;
} else {
return value.equals(expectedValue);
}
}
/**
* Invokes multiple concurrent proxy calls to different clients. Returns an
* array of results.
*
* Re-throws exceptions generated by the remote RPC call as either
* RemoteException or IOException.
*
* @param locations List of remote locations to call concurrently.
* @param remoteMethod The remote method and parameters to invoke.
* @param requireResponse If true an exception will be thrown if all calls do
* not complete. If false exceptions are ignored and all data results
* successfully received are returned.
* @param standby If the requests should go to the standby namenodes too.
* @return Result of invoking the method per subcluster: nsId -> result.
* @throws IOException If requiredResponse=true and any of the calls throw an
* exception.
*/
@SuppressWarnings("unchecked")
public <T extends RemoteLocationContext> Map<T, Object> invokeConcurrent(
final Collection<T> locations, final RemoteMethod method,
boolean requireResponse, boolean standby) throws IOException {
final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
final Method m = method.getMethod();
if (locations.size() == 1) {
// Shortcut, just one call
T location = locations.iterator().next();
String ns = location.getNameserviceId();
final List<? extends FederationNamenodeContext> namenodes =
getNamenodesForNameservice(ns);
Object[] paramList = method.getParams(location);
Object result = invokeMethod(ugi, namenodes, m, paramList);
return Collections.singletonMap(location, result);
}
List<T> orderedLocations = new LinkedList<>();
Set<Callable<Object>> callables = new HashSet<>();
for (final T location : locations) {
String nsId = location.getNameserviceId();
final List<? extends FederationNamenodeContext> namenodes =
getNamenodesForNameservice(nsId);
final Object[] paramList = method.getParams(location);
if (standby) {
// Call the objectGetter to all NNs (including standby)
for (final FederationNamenodeContext nn : namenodes) {
String nnId = nn.getNamenodeId();
final List<FederationNamenodeContext> nnList =
Collections.singletonList(nn);
T nnLocation = location;
if (location instanceof RemoteLocation) {
nnLocation = (T)new RemoteLocation(nsId, nnId, location.getDest());
}
orderedLocations.add(nnLocation);
callables.add(new Callable<Object>() {
public Object call() throws Exception {
return invokeMethod(ugi, nnList, m, paramList);
}
});
}
} else {
// Call the objectGetter in order of nameservices in the NS list
orderedLocations.add(location);
callables.add(new Callable<Object>() {
public Object call() throws Exception {
return invokeMethod(ugi, namenodes, m, paramList);
}
});
}
}
try {
List<Future<Object>> futures = executorService.invokeAll(callables);
Map<T, Object> results = new TreeMap<>();
Map<T, IOException> exceptions = new TreeMap<>();
for (int i=0; i<futures.size(); i++) {
T location = orderedLocations.get(i);
try {
Future<Object> future = futures.get(i);
Object result = future.get();
results.put(location, result);
} catch (ExecutionException ex) {
Throwable cause = ex.getCause();
LOG.debug("Canot execute {} in {}: {}",
m.getName(), location, cause.getMessage());
// Convert into IOException if needed
IOException ioe = null;
if (cause instanceof IOException) {
ioe = (IOException) cause;
} else {
ioe = new IOException("Unhandled exception while proxying API " +
m.getName() + ": " + cause.getMessage(), cause);
}
// Response from all servers required, use this error.
if (requireResponse) {
throw ioe;
}
// Store the exceptions
exceptions.put(location, ioe);
}
}
// Throw the exception for the first location if there are no results
if (results.isEmpty()) {
T location = orderedLocations.get(0);
IOException ioe = exceptions.get(location);
if (ioe != null) {
throw ioe;
}
}
return results;
} catch (InterruptedException ex) {
LOG.error("Unexpected error while invoking API: {}", ex.getMessage());
throw new IOException(
"Unexpected error while invoking API " + ex.getMessage(), ex);
}
}
/**
* Get a prioritized list of NNs that share the same nameservice ID (in the
* same namespace). NNs that are reported as ACTIVE will be first in the list.
*
* @param nameserviceId The nameservice ID for the namespace.
* @return A prioritized list of NNs to use for communication.
* @throws IOException If a NN cannot be located for the nameservice ID.
*/
private List<? extends FederationNamenodeContext> getNamenodesForNameservice(
final String nsId) throws IOException {
final List<? extends FederationNamenodeContext> namenodes =
namenodeResolver.getNamenodesForNameserviceId(nsId);
if (namenodes == null || namenodes.isEmpty()) {
throw new IOException("Cannot locate a registered namenode for " + nsId +
" from " + this.routerId);
}
return namenodes;
}
/**
* Get a prioritized list of NNs that share the same block pool ID (in the
* same namespace). NNs that are reported as ACTIVE will be first in the list.
*
* @param blockPoolId The blockpool ID for the namespace.
* @return A prioritized list of NNs to use for communication.
* @throws IOException If a NN cannot be located for the block pool ID.
*/
private List<? extends FederationNamenodeContext> getNamenodesForBlockPoolId(
final String bpId) throws IOException {
List<? extends FederationNamenodeContext> namenodes =
namenodeResolver.getNamenodesForBlockPoolId(bpId);
if (namenodes == null || namenodes.isEmpty()) {
throw new IOException("Cannot locate a registered namenode for " + bpId +
" from " + this.routerId);
}
return namenodes;
}
/**
* Get the nameservice identifier for a block pool.
*
* @param bpId Identifier of the block pool.
* @return Nameservice identifier.
* @throws IOException If a NN cannot be located for the block pool ID.
*/
private String getNameserviceForBlockPoolId(final String bpId)
throws IOException {
List<? extends FederationNamenodeContext> namenodes =
getNamenodesForBlockPoolId(bpId);
FederationNamenodeContext namenode = namenodes.get(0);
return namenode.getNameserviceId();
}
}

View File

@ -4650,6 +4650,101 @@
</description>
</property>
<property>
<name>dfs.federation.router.default.nameserviceId</name>
<value></value>
<description>
Nameservice identifier of the default subcluster to monitor.
</description>
</property>
<property>
<name>dfs.federation.router.rpc.enable</name>
<value>true</value>
<description>
If the RPC service to handle client requests in the router is enabled.
</description>
</property>
<property>
<name>dfs.federation.router.rpc-address</name>
<value>0.0.0.0:8888</value>
<description>
RPC address that handles all clients requests.
The value of this property will take the form of router-host1:rpc-port.
</description>
</property>
<property>
<name>dfs.federation.router.rpc-bind-host</name>
<value></value>
<description>
The actual address the RPC server will bind to. If this optional address is
set, it overrides only the hostname portion of
dfs.federation.router.rpc-address. This is useful for making the name node
listen on all interfaces by setting it to 0.0.0.0.
</description>
</property>
<property>
<name>dfs.federation.router.handler.count</name>
<value>10</value>
<description>
The number of server threads for the router to handle RPC requests from
clients.
</description>
</property>
<property>
<name>dfs.federation.router.handler.queue.size</name>
<value>100</value>
<description>
The size of the queue for the number of handlers to handle RPC client requests.
</description>
</property>
<property>
<name>dfs.federation.router.reader.count</name>
<value>1</value>
<description>
The number of readers for the router to handle RPC client requests.
</description>
</property>
<property>
<name>dfs.federation.router.reader.queue.size</name>
<value>100</value>
<description>
The size of the queue for the number of readers for the router to handle RPC client requests.
</description>
</property>
<property>
<name>dfs.federation.router.connection.pool-size</name>
<value>1</value>
<description>
Size of the pool of connections from the router to namenodes.
</description>
</property>
<property>
<name>dfs.federation.router.connection.clean.ms</name>
<value>10000</value>
<description>
Time interval, in milliseconds, to check if the connection pool should
remove unused connections.
</description>
</property>
<property>
<name>dfs.federation.router.connection.pool.clean.ms</name>
<value>60000</value>
<description>
Time interval, in milliseconds, to check if the connection manager should
remove unused connection pools.
</description>
</property>
<property>
<name>dfs.federation.router.file.resolver.client.class</name>
<value>org.apache.hadoop.hdfs.server.federation.MockResolver</value>

View File

@ -20,7 +20,7 @@ package org.apache.hadoop.hdfs.server.federation;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
@ -51,17 +52,11 @@ import org.apache.hadoop.security.AccessControlException;
*/
public final class FederationTestUtils {
public final static String NAMESERVICE1 = "ns0";
public final static String NAMESERVICE2 = "ns1";
public final static String NAMENODE1 = "nn0";
public final static String NAMENODE2 = "nn1";
public final static String NAMENODE3 = "nn2";
public final static String NAMENODE4 = "nn3";
public final static String ROUTER1 = "router0";
public final static String ROUTER2 = "router1";
public final static String ROUTER3 = "router2";
public final static String ROUTER4 = "router3";
public final static long BLOCK_SIZE_BYTES = 134217728;
public final static String[] NAMESERVICES = {"ns0", "ns1"};
public final static String[] NAMENODES = {"nn0", "nn1", "nn2", "nn3"};
public final static String[] ROUTERS =
{"router0", "router1", "router2", "router3"};
private FederationTestUtils() {
// Utility class
@ -81,7 +76,7 @@ public final class FederationTestUtils {
triggeredException = e;
}
if (exceptionClass != null) {
assertNotNull("No exception was triggered, expected exception - "
assertNotNull("No exception was triggered, expected exception"
+ exceptionClass.getName(), triggeredException);
assertEquals(exceptionClass, triggeredException.getClass());
} else {
@ -101,48 +96,43 @@ public final class FederationTestUtils {
return report;
}
report.setHAServiceState(state);
report.setNamespaceInfo(new NamespaceInfo(1, "tesclusterid", ns, 0,
"testbuildvesion", "testsoftwareversion"));
NamespaceInfo nsInfo = new NamespaceInfo(
1, "tesclusterid", ns, 0, "testbuildvesion", "testsoftwareversion");
report.setNamespaceInfo(nsInfo);
return report;
}
public static void waitNamenodeRegistered(ActiveNamenodeResolver resolver,
String nameserviceId, String namenodeId,
FederationNamenodeServiceState finalState)
String nsId, String nnId, FederationNamenodeServiceState finalState)
throws InterruptedException, IllegalStateException, IOException {
for (int loopCount = 0; loopCount < 20; loopCount++) {
if (loopCount > 0) {
Thread.sleep(1000);
}
List<? extends FederationNamenodeContext> namenodes;
namenodes =
resolver.getNamenodesForNameserviceId(nameserviceId);
List<? extends FederationNamenodeContext> namenodes =
resolver.getNamenodesForNameserviceId(nsId);
for (FederationNamenodeContext namenode : namenodes) {
if (namenodeId != null
&& !namenode.getNamenodeId().equals(namenodeId)) {
// Keep looking
continue;
// Check if this is the Namenode we are checking
if (namenode.getNamenodeId() == nnId ||
namenode.getNamenodeId().equals(nnId)) {
if (finalState != null && !namenode.getState().equals(finalState)) {
// Wrong state, wait a bit more
break;
} else {
// Found and verified
return;
}
}
if (finalState != null && !namenode.getState().equals(finalState)) {
// Wrong state, wait a bit more
break;
}
// Found
return;
}
}
assertTrue("Failed to verify state store registration for state - "
+ finalState + " - " + " - " + nameserviceId + " - ", false);
fail("Failed to verify State Store registration of " + nsId + " " + nnId +
" for state " + finalState);
}
public static boolean verifyDate(Date d1, Date d2, long precision) {
if (Math.abs(d1.getTime() - d2.getTime()) < precision) {
return true;
}
return false;
return Math.abs(d1.getTime() - d2.getTime()) < precision;
}
public static boolean addDirectory(FileSystem context, String path)
@ -165,15 +155,14 @@ public final class FederationTestUtils {
} catch (Exception e) {
return false;
}
return false;
}
public static boolean checkForFileInDirectory(FileSystem context,
String testPath, String targetFile) throws AccessControlException,
FileNotFoundException,
UnsupportedFileSystemException, IllegalArgumentException,
IOException {
public static boolean checkForFileInDirectory(
FileSystem context, String testPath, String targetFile)
throws IOException, AccessControlException, FileNotFoundException,
UnsupportedFileSystemException, IllegalArgumentException {
FileStatus[] fileStatus = context.listStatus(new Path(testPath));
String file = null;
String verifyPath = testPath + "/" + targetFile;
@ -194,7 +183,8 @@ public final class FederationTestUtils {
public static int countContents(FileSystem context, String testPath)
throws IOException {
FileStatus[] fileStatus = context.listStatus(new Path(testPath));
Path path = new Path(testPath);
FileStatus[] fileStatus = context.listStatus(path);
return fileStatus.length;
}
@ -202,7 +192,7 @@ public final class FederationTestUtils {
throws IOException {
FsPermission permissions = new FsPermission("700");
FSDataOutputStream writeStream = fs.create(new Path(path), permissions,
true, 1000, (short) 1, BLOCK_SIZE_BYTES, null);
true, 1000, (short) 1, DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT, null);
for (int i = 0; i < length; i++) {
writeStream.write(i);
}

View File

@ -43,7 +43,7 @@ import org.apache.hadoop.util.Time;
/**
* In-memory cache/mock of a namenode and file resolver. Stores the most
* recently updated NN information for each nameservice and block pool. Also
* recently updated NN information for each nameservice and block pool. It also
* stores a virtual mount table for resolving global namespace paths to local NN
* paths.
*/
@ -51,82 +51,93 @@ public class MockResolver
implements ActiveNamenodeResolver, FileSubclusterResolver {
private Map<String, List<? extends FederationNamenodeContext>> resolver =
new HashMap<String, List<? extends FederationNamenodeContext>>();
private Map<String, List<RemoteLocation>> locations =
new HashMap<String, List<RemoteLocation>>();
private Set<FederationNamespaceInfo> namespaces =
new HashSet<FederationNamespaceInfo>();
new HashMap<>();
private Map<String, List<RemoteLocation>> locations = new HashMap<>();
private Set<FederationNamespaceInfo> namespaces = new HashSet<>();
private String defaultNamespace = null;
public MockResolver(Configuration conf, StateStoreService store) {
this.cleanRegistrations();
}
public void addLocation(String mount, String nameservice, String location) {
RemoteLocation remoteLocation = new RemoteLocation(nameservice, location);
List<RemoteLocation> locationsList = locations.get(mount);
public void addLocation(String mount, String nsId, String location) {
List<RemoteLocation> locationsList = this.locations.get(mount);
if (locationsList == null) {
locationsList = new LinkedList<RemoteLocation>();
locations.put(mount, locationsList);
locationsList = new LinkedList<>();
this.locations.put(mount, locationsList);
}
final RemoteLocation remoteLocation = new RemoteLocation(nsId, location);
if (!locationsList.contains(remoteLocation)) {
locationsList.add(remoteLocation);
}
if (this.defaultNamespace == null) {
this.defaultNamespace = nameservice;
this.defaultNamespace = nsId;
}
}
public synchronized void cleanRegistrations() {
this.resolver =
new HashMap<String, List<? extends FederationNamenodeContext>>();
this.namespaces = new HashSet<FederationNamespaceInfo>();
this.resolver = new HashMap<>();
this.namespaces = new HashSet<>();
}
@Override
public void updateActiveNamenode(
String ns, InetSocketAddress successfulAddress) {
String nsId, InetSocketAddress successfulAddress) {
String address = successfulAddress.getHostName() + ":" +
successfulAddress.getPort();
String key = ns;
String key = nsId;
if (key != null) {
// Update the active entry
@SuppressWarnings("unchecked")
List<FederationNamenodeContext> iterator =
(List<FederationNamenodeContext>) resolver.get(key);
for (FederationNamenodeContext namenode : iterator) {
List<FederationNamenodeContext> namenodes =
(List<FederationNamenodeContext>) this.resolver.get(key);
for (FederationNamenodeContext namenode : namenodes) {
if (namenode.getRpcAddress().equals(address)) {
MockNamenodeContext nn = (MockNamenodeContext) namenode;
nn.setState(FederationNamenodeServiceState.ACTIVE);
break;
}
}
Collections.sort(iterator, new NamenodePriorityComparator());
// This operation modifies the list so we need to be careful
synchronized(namenodes) {
Collections.sort(namenodes, new NamenodePriorityComparator());
}
}
}
@Override
public List<? extends FederationNamenodeContext>
getNamenodesForNameserviceId(String nameserviceId) {
return resolver.get(nameserviceId);
// Return a copy of the list because it is updated periodically
List<? extends FederationNamenodeContext> namenodes =
this.resolver.get(nameserviceId);
return Collections.unmodifiableList(new ArrayList<>(namenodes));
}
@Override
public List<? extends FederationNamenodeContext> getNamenodesForBlockPoolId(
String blockPoolId) {
return resolver.get(blockPoolId);
// Return a copy of the list because it is updated periodically
List<? extends FederationNamenodeContext> namenodes =
this.resolver.get(blockPoolId);
return Collections.unmodifiableList(new ArrayList<>(namenodes));
}
private static class MockNamenodeContext
implements FederationNamenodeContext {
private String namenodeId;
private String nameserviceId;
private String webAddress;
private String rpcAddress;
private String serviceAddress;
private String lifelineAddress;
private String namenodeId;
private String nameserviceId;
private FederationNamenodeServiceState state;
private long dateModified;
@ -197,6 +208,7 @@ public class MockResolver
@Override
public synchronized boolean registerNamenode(NamenodeStatusReport report)
throws IOException {
MockNamenodeContext context = new MockNamenodeContext(
report.getRpcAddress(), report.getServiceAddress(),
report.getLifelineAddress(), report.getWebAddress(),
@ -205,13 +217,14 @@ public class MockResolver
String nsId = report.getNameserviceId();
String bpId = report.getBlockPoolId();
String cId = report.getClusterId();
@SuppressWarnings("unchecked")
List<MockNamenodeContext> existingItems =
(List<MockNamenodeContext>) resolver.get(nsId);
(List<MockNamenodeContext>) this.resolver.get(nsId);
if (existingItems == null) {
existingItems = new ArrayList<MockNamenodeContext>();
resolver.put(bpId, existingItems);
resolver.put(nsId, existingItems);
existingItems = new ArrayList<>();
this.resolver.put(bpId, existingItems);
this.resolver.put(nsId, existingItems);
}
boolean added = false;
for (int i=0; i<existingItems.size() && !added; i++) {
@ -227,7 +240,7 @@ public class MockResolver
Collections.sort(existingItems, new NamenodePriorityComparator());
FederationNamespaceInfo info = new FederationNamespaceInfo(bpId, cId, nsId);
namespaces.add(info);
this.namespaces.add(info);
return true;
}
@ -238,16 +251,13 @@ public class MockResolver
@Override
public PathLocation getDestinationForPath(String path) throws IOException {
String finalPath = null;
String nameservice = null;
Set<String> namespaceSet = new HashSet<String>();
LinkedList<RemoteLocation> remoteLocations =
new LinkedList<RemoteLocation>();
for(String key : this.locations.keySet()) {
if(path.startsWith(key)) {
Set<String> namespaceSet = new HashSet<>();
List<RemoteLocation> remoteLocations = new LinkedList<>();
for (String key : this.locations.keySet()) {
if (path.startsWith(key)) {
for (RemoteLocation location : this.locations.get(key)) {
finalPath = location.getDest() + path.substring(key.length());
nameservice = location.getNameserviceId();
String finalPath = location.getDest() + path.substring(key.length());
String nameservice = location.getNameserviceId();
RemoteLocation remoteLocation =
new RemoteLocation(nameservice, finalPath);
remoteLocations.add(remoteLocation);
@ -265,7 +275,7 @@ public class MockResolver
@Override
public List<String> getMountPoints(String path) throws IOException {
List<String> mounts = new ArrayList<String>();
List<String> mounts = new ArrayList<>();
if (path.equals("/")) {
// Mounts only supported under root level
for (String mount : this.locations.keySet()) {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.federation;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
/**
* Constructs a router configuration with individual features enabled/disabled.
@ -26,15 +27,32 @@ public class RouterConfigBuilder {
private Configuration conf;
private boolean enableRpcServer = false;
public RouterConfigBuilder(Configuration configuration) {
this.conf = configuration;
}
public RouterConfigBuilder() {
this.conf = new Configuration();
this.conf = new Configuration(false);
}
public RouterConfigBuilder all() {
this.enableRpcServer = true;
return this;
}
public RouterConfigBuilder rpc(boolean enable) {
this.enableRpcServer = enable;
return this;
}
public RouterConfigBuilder rpc() {
return this.rpc(true);
}
public Configuration build() {
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_RPC_ENABLE, this.enableRpcServer);
return conf;
}
}

View File

@ -17,27 +17,44 @@
*/
package org.apache.hadoop.hdfs.server.federation;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.addDirectory;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.waitNamenodeRegistered;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@ -46,16 +63,49 @@ import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf;
import org.apache.hadoop.hdfs.MiniDFSNNTopology.NSConf;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service.STATE;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test utility to mimic a federated HDFS cluster with a router.
* Test utility to mimic a federated HDFS cluster with multiple routers.
*/
public class RouterDFSCluster {
private static final Logger LOG =
LoggerFactory.getLogger(RouterDFSCluster.class);
public static final String TEST_STRING = "teststring";
public static final String TEST_DIR = "testdir";
public static final String TEST_FILE = "testfile";
/** Nameservices in the federated cluster. */
private List<String> nameservices;
/** Namenodes in the federated cluster. */
private List<NamenodeContext> namenodes;
/** Routers in the federated cluster. */
private List<RouterContext> routers;
/** If the Namenodes are in high availability.*/
private boolean highAvailability;
/** Mini cluster. */
private MiniDFSCluster cluster;
/** Router configuration overrides. */
private Configuration routerOverrides;
/** Namenode configuration overrides. */
private Configuration namenodeOverrides;
/**
* Router context.
*/
@ -69,13 +119,14 @@ public class RouterDFSCluster {
private Configuration conf;
private URI fileSystemUri;
public RouterContext(Configuration conf, String ns, String nn)
public RouterContext(Configuration conf, String nsId, String nnId)
throws URISyntaxException {
this.namenodeId = nn;
this.nameserviceId = ns;
this.conf = conf;
router = new Router();
router.init(conf);
this.nameserviceId = nsId;
this.namenodeId = nnId;
this.router = new Router();
this.router.init(conf);
}
public Router getRouter() {
@ -99,18 +150,30 @@ public class RouterDFSCluster {
}
public void initRouter() throws URISyntaxException {
// Store the bound points for the router interfaces
InetSocketAddress rpcAddress = router.getRpcServerAddress();
if (rpcAddress != null) {
this.rpcPort = rpcAddress.getPort();
this.fileSystemUri =
URI.create("hdfs://" + NetUtils.getHostPortString(rpcAddress));
// Override the default FS to point to the router RPC
DistributedFileSystem.setDefaultUri(conf, fileSystemUri);
try {
this.fileContext = FileContext.getFileContext(conf);
} catch (UnsupportedFileSystemException e) {
this.fileContext = null;
}
}
}
public DistributedFileSystem getFileSystem() throws IOException {
DistributedFileSystem fs =
(DistributedFileSystem) DistributedFileSystem.get(conf);
return fs;
public FileSystem getFileSystem() throws IOException {
return DistributedFileSystem.get(conf);
}
public DFSClient getClient(UserGroupInformation user)
throws IOException, URISyntaxException, InterruptedException {
LOG.info("Connecting to router at " + fileSystemUri);
LOG.info("Connecting to router at {}", fileSystemUri);
return user.doAs(new PrivilegedExceptionAction<DFSClient>() {
@Override
public DFSClient run() throws IOException {
@ -120,9 +183,8 @@ public class RouterDFSCluster {
}
public DFSClient getClient() throws IOException, URISyntaxException {
if (client == null) {
LOG.info("Connecting to router at " + fileSystemUri);
LOG.info("Connecting to router at {}", fileSystemUri);
client = new DFSClient(fileSystemUri, conf);
}
return client;
@ -130,9 +192,10 @@ public class RouterDFSCluster {
}
/**
* Namenode context.
* Namenode context in the federated cluster.
*/
public class NamenodeContext {
private Configuration conf;
private NameNode namenode;
private String nameserviceId;
private String namenodeId;
@ -143,14 +206,13 @@ public class RouterDFSCluster {
private int httpPort;
private URI fileSystemUri;
private int index;
private Configuration conf;
private DFSClient client;
public NamenodeContext(Configuration conf, String ns, String nn,
int index) {
public NamenodeContext(
Configuration conf, String nsId, String nnId, int index) {
this.conf = conf;
this.namenodeId = nn;
this.nameserviceId = ns;
this.nameserviceId = nsId;
this.namenodeId = nnId;
this.index = index;
}
@ -170,20 +232,19 @@ public class RouterDFSCluster {
return this.fileContext;
}
public void setNamenode(NameNode n) throws URISyntaxException {
namenode = n;
public void setNamenode(NameNode nn) throws URISyntaxException {
this.namenode = nn;
// Store the bound ports and override the default FS with the local NN's
// RPC
rpcPort = n.getNameNodeAddress().getPort();
servicePort = n.getServiceRpcAddress().getPort();
lifelinePort = n.getServiceRpcAddress().getPort();
httpPort = n.getHttpAddress().getPort();
fileSystemUri = new URI("hdfs://" + namenode.getHostAndPort());
DistributedFileSystem.setDefaultUri(conf, fileSystemUri);
// Store the bound ports and override the default FS with the local NN RPC
this.rpcPort = nn.getNameNodeAddress().getPort();
this.servicePort = nn.getServiceRpcAddress().getPort();
this.lifelinePort = nn.getServiceRpcAddress().getPort();
this.httpPort = nn.getHttpAddress().getPort();
this.fileSystemUri = new URI("hdfs://" + namenode.getHostAndPort());
DistributedFileSystem.setDefaultUri(this.conf, this.fileSystemUri);
try {
this.fileContext = FileContext.getFileContext(conf);
this.fileContext = FileContext.getFileContext(this.conf);
} catch (UnsupportedFileSystemException e) {
this.fileContext = null;
}
@ -205,10 +266,8 @@ public class RouterDFSCluster {
return namenode.getHttpAddress().getHostName() + ":" + httpPort;
}
public DistributedFileSystem getFileSystem() throws IOException {
DistributedFileSystem fs =
(DistributedFileSystem) DistributedFileSystem.get(conf);
return fs;
public FileSystem getFileSystem() throws IOException {
return DistributedFileSystem.get(conf);
}
public void resetClient() {
@ -218,7 +277,7 @@ public class RouterDFSCluster {
public DFSClient getClient(UserGroupInformation user)
throws IOException, URISyntaxException, InterruptedException {
LOG.info("Connecting to namenode at " + fileSystemUri);
LOG.info("Connecting to namenode at {}", fileSystemUri);
return user.doAs(new PrivilegedExceptionAction<DFSClient>() {
@Override
public DFSClient run() throws IOException {
@ -229,7 +288,7 @@ public class RouterDFSCluster {
public DFSClient getClient() throws IOException, URISyntaxException {
if (client == null) {
LOG.info("Connecting to namenode at " + fileSystemUri);
LOG.info("Connecting to namenode at {}", fileSystemUri);
client = new DFSClient(fileSystemUri, conf);
}
return client;
@ -244,36 +303,20 @@ public class RouterDFSCluster {
}
}
public static final String NAMENODE1 = "nn0";
public static final String NAMENODE2 = "nn1";
public static final String NAMENODE3 = "nn2";
public static final String TEST_STRING = "teststring";
public static final String TEST_DIR = "testdir";
public static final String TEST_FILE = "testfile";
private List<String> nameservices;
private List<RouterContext> routers;
private List<NamenodeContext> namenodes;
private static final Log LOG = LogFactory.getLog(RouterDFSCluster.class);
private MiniDFSCluster cluster;
private boolean highAvailability;
protected static final int DEFAULT_HEARTBEAT_INTERVAL = 5;
protected static final int DEFAULT_CACHE_INTERVAL_SEC = 5;
private Configuration routerOverrides;
private Configuration namenodeOverrides;
private static final String NAMENODES = NAMENODE1 + "," + NAMENODE2;
public RouterDFSCluster(boolean ha, int numNameservices) {
this(ha, numNameservices, 2);
}
public RouterDFSCluster(boolean ha, int numNameservices, int numNamenodes) {
this.highAvailability = ha;
configureNameservices(numNameservices, numNamenodes);
}
public RouterDFSCluster(boolean ha, int numNameservices) {
this(ha, numNameservices, 2);
}
/**
* Add configuration settings to override default Router settings.
*
* @param conf Router configuration overrides.
*/
public void addRouterOverrides(Configuration conf) {
if (this.routerOverrides == null) {
this.routerOverrides = conf;
@ -282,6 +325,11 @@ public class RouterDFSCluster {
}
}
/**
* Add configuration settings to override default Namenode settings.
*
* @param conf Namenode configuration overrides.
*/
public void addNamenodeOverrides(Configuration conf) {
if (this.namenodeOverrides == null) {
this.namenodeOverrides = conf;
@ -290,124 +338,134 @@ public class RouterDFSCluster {
}
}
public Configuration generateNamenodeConfiguration(
String defaultNameserviceId) {
Configuration c = new HdfsConfiguration();
/**
* Generate the configuration for a client.
*
* @param nsId Nameservice identifier.
* @return New namenode configuration.
*/
public Configuration generateNamenodeConfiguration(String nsId) {
Configuration conf = new HdfsConfiguration();
c.set(DFSConfigKeys.DFS_NAMESERVICES, getNameservicesKey());
c.set("fs.defaultFS", "hdfs://" + defaultNameserviceId);
conf.set(DFS_NAMESERVICES, getNameservicesKey());
conf.set(FS_DEFAULT_NAME_KEY, "hdfs://" + nsId);
for (String ns : nameservices) {
if (highAvailability) {
c.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, NAMENODES);
conf.set(
DFS_HA_NAMENODES_KEY_PREFIX + "." + ns,
NAMENODES[0] + "," + NAMENODES[1]);
}
for (NamenodeContext context : getNamenodes(ns)) {
String suffix = context.getConfSuffix();
c.set(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + suffix,
conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY + "." + suffix,
"127.0.0.1:" + context.rpcPort);
c.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY + "." + suffix,
conf.set(DFS_NAMENODE_HTTP_ADDRESS_KEY + "." + suffix,
"127.0.0.1:" + context.httpPort);
c.set(DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY + "." + suffix,
conf.set(DFS_NAMENODE_RPC_BIND_HOST_KEY + "." + suffix,
"0.0.0.0");
}
}
if (namenodeOverrides != null) {
c.addResource(namenodeOverrides);
if (this.namenodeOverrides != null) {
conf.addResource(this.namenodeOverrides);
}
return c;
}
public Configuration generateClientConfiguration() {
Configuration conf = new HdfsConfiguration();
conf.addResource(generateNamenodeConfiguration(getNameservices().get(0)));
return conf;
}
public Configuration generateRouterConfiguration(String localNameserviceId,
String localNamenodeId) throws IOException {
Configuration conf = new HdfsConfiguration();
conf.addResource(generateNamenodeConfiguration(localNameserviceId));
/**
* Generate the configuration for a client.
*
* @return New configuration for a client.
*/
public Configuration generateClientConfiguration() {
Configuration conf = new HdfsConfiguration(false);
String ns0 = getNameservices().get(0);
conf.addResource(generateNamenodeConfiguration(ns0));
return conf;
}
/**
* Generate the configuration for a Router.
*
* @param nsId Nameservice identifier.
* @param nnId Namenode identifier.
* @return New configuration for a Router.
*/
public Configuration generateRouterConfiguration(String nsId, String nnId) {
Configuration conf = new HdfsConfiguration(false);
conf.addResource(generateNamenodeConfiguration(nsId));
conf.setInt(DFS_ROUTER_HANDLER_COUNT_KEY, 10);
conf.set(DFS_ROUTER_RPC_ADDRESS_KEY, "127.0.0.1:0");
conf.set(DFS_ROUTER_RPC_BIND_HOST_KEY, "0.0.0.0");
conf.set(DFS_ROUTER_DEFAULT_NAMESERVICE, nameservices.get(0));
// Use mock resolver classes
conf.set(DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
MockResolver.class.getCanonicalName());
conf.set(DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
MockResolver.class.getCanonicalName());
conf.setClass(FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
MockResolver.class, ActiveNamenodeResolver.class);
conf.setClass(FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
MockResolver.class, FileSubclusterResolver.class);
// Set the nameservice ID for the default NN monitor
conf.set(DFSConfigKeys.DFS_NAMESERVICE_ID, localNameserviceId);
if (localNamenodeId != null) {
conf.set(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY, localNamenodeId);
conf.set(DFS_NAMESERVICE_ID, nsId);
if (nnId != null) {
conf.set(DFS_HA_NAMENODE_ID_KEY, nnId);
}
StringBuilder routerBuilder = new StringBuilder();
for (String ns : nameservices) {
for (NamenodeContext context : getNamenodes(ns)) {
String suffix = context.getConfSuffix();
if (routerBuilder.length() != 0) {
routerBuilder.append(",");
}
routerBuilder.append(suffix);
// Add custom overrides if available
if (this.routerOverrides != null) {
for (Entry<String, String> entry : this.routerOverrides) {
String confKey = entry.getKey();
String confValue = entry.getValue();
conf.set(confKey, confValue);
}
}
return conf;
}
public void configureNameservices(int numNameservices, int numNamenodes) {
nameservices = new ArrayList<String>();
for (int i = 0; i < numNameservices; i++) {
nameservices.add("ns" + i);
}
namenodes = new ArrayList<NamenodeContext>();
int index = 0;
for (String ns : nameservices) {
this.nameservices = new ArrayList<>();
this.namenodes = new ArrayList<>();
NamenodeContext context = null;
int nnIndex = 0;
for (int i=0; i<numNameservices; i++) {
String ns = "ns" + i;
this.nameservices.add("ns" + i);
Configuration nnConf = generateNamenodeConfiguration(ns);
if (highAvailability) {
NamenodeContext context =
new NamenodeContext(nnConf, ns, NAMENODE1, index);
namenodes.add(context);
index++;
if (numNamenodes > 1) {
context = new NamenodeContext(nnConf, ns, NAMENODE2, index + 1);
namenodes.add(context);
index++;
}
if (numNamenodes > 2) {
context = new NamenodeContext(nnConf, ns, NAMENODE3, index + 1);
namenodes.add(context);
index++;
}
if (!highAvailability) {
context = new NamenodeContext(nnConf, ns, null, nnIndex++);
this.namenodes.add(context);
} else {
NamenodeContext context = new NamenodeContext(nnConf, ns, null, index);
namenodes.add(context);
index++;
for (int j=0; j<numNamenodes; j++) {
context = new NamenodeContext(nnConf, ns, NAMENODES[j], nnIndex++);
this.namenodes.add(context);
}
}
}
}
public String getNameservicesKey() {
StringBuilder ns = new StringBuilder();
for (int i = 0; i < nameservices.size(); i++) {
if (i > 0) {
ns.append(",");
StringBuilder sb = new StringBuilder();
for (String nsId : this.nameservices) {
if (sb.length() > 0) {
sb.append(",");
}
ns.append(nameservices.get(i));
sb.append(nsId);
}
return ns.toString();
return sb.toString();
}
public String getRandomNameservice() {
Random r = new Random();
return nameservices.get(r.nextInt(nameservices.size()));
int randIndex = r.nextInt(nameservices.size());
return nameservices.get(randIndex);
}
public List<String> getNameservices() {
@ -415,7 +473,7 @@ public class RouterDFSCluster {
}
public List<NamenodeContext> getNamenodes(String nameservice) {
ArrayList<NamenodeContext> nns = new ArrayList<NamenodeContext>();
List<NamenodeContext> nns = new ArrayList<>();
for (NamenodeContext c : namenodes) {
if (c.nameserviceId.equals(nameservice)) {
nns.add(c);
@ -426,23 +484,23 @@ public class RouterDFSCluster {
public NamenodeContext getRandomNamenode() {
Random rand = new Random();
return namenodes.get(rand.nextInt(namenodes.size()));
int i = rand.nextInt(this.namenodes.size());
return this.namenodes.get(i);
}
public List<NamenodeContext> getNamenodes() {
return namenodes;
return this.namenodes;
}
public boolean isHighAvailability() {
return highAvailability;
}
public NamenodeContext getNamenode(String nameservice,
String namenode) {
for (NamenodeContext c : namenodes) {
public NamenodeContext getNamenode(String nameservice, String namenode) {
for (NamenodeContext c : this.namenodes) {
if (c.nameserviceId.equals(nameservice)) {
if (namenode == null || c.namenodeId == null || namenode.isEmpty()
|| c.namenodeId.isEmpty()) {
if (namenode == null || namenode.isEmpty() ||
c.namenodeId == null || c.namenodeId.isEmpty()) {
return c;
} else if (c.namenodeId.equals(namenode)) {
return c;
@ -453,7 +511,7 @@ public class RouterDFSCluster {
}
public List<RouterContext> getRouters(String nameservice) {
ArrayList<RouterContext> nns = new ArrayList<RouterContext>();
List<RouterContext> nns = new ArrayList<>();
for (RouterContext c : routers) {
if (c.nameserviceId.equals(nameservice)) {
nns.add(c);
@ -462,14 +520,13 @@ public class RouterDFSCluster {
return nns;
}
public RouterContext getRouterContext(String nameservice,
String namenode) {
public RouterContext getRouterContext(String nsId, String nnId) {
for (RouterContext c : routers) {
if (namenode == null) {
if (nnId == null) {
return c;
}
if (c.namenodeId.equals(namenode)
&& c.nameserviceId.equals(nameservice)) {
if (c.namenodeId.equals(nnId) &&
c.nameserviceId.equals(nsId)) {
return c;
}
}
@ -485,10 +542,10 @@ public class RouterDFSCluster {
return routers;
}
public RouterContext buildRouter(String nameservice, String namenode)
public RouterContext buildRouter(String nsId, String nnId)
throws URISyntaxException, IOException {
Configuration config = generateRouterConfiguration(nameservice, namenode);
RouterContext rc = new RouterContext(config, nameservice, namenode);
Configuration config = generateRouterConfiguration(nsId, nnId);
RouterContext rc = new RouterContext(config, nsId, nnId);
return rc;
}
@ -500,10 +557,9 @@ public class RouterDFSCluster {
try {
MiniDFSNNTopology topology = new MiniDFSNNTopology();
for (String ns : nameservices) {
NSConf conf = new MiniDFSNNTopology.NSConf(ns);
if (highAvailability) {
for(int i = 0; i < namenodes.size()/nameservices.size(); i++) {
for (int i=0; i<namenodes.size()/nameservices.size(); i++) {
NNConf nnConf = new MiniDFSNNTopology.NNConf("nn" + i);
conf.addNN(nnConf);
}
@ -516,11 +572,15 @@ public class RouterDFSCluster {
topology.setFederation(true);
// Start mini DFS cluster
Configuration nnConf = generateNamenodeConfiguration(nameservices.get(0));
String ns0 = nameservices.get(0);
Configuration nnConf = generateNamenodeConfiguration(ns0);
if (overrideConf != null) {
nnConf.addResource(overrideConf);
}
cluster = new MiniDFSCluster.Builder(nnConf).nnTopology(topology).build();
cluster = new MiniDFSCluster.Builder(nnConf)
.numDataNodes(nameservices.size()*2)
.nnTopology(topology)
.build();
cluster.waitActive();
// Store NN pointers
@ -530,28 +590,32 @@ public class RouterDFSCluster {
}
} catch (Exception e) {
LOG.error("Cannot start Router DFS cluster: " + e.getMessage(), e);
cluster.shutdown();
LOG.error("Cannot start Router DFS cluster: {}", e.getMessage(), e);
if (cluster != null) {
cluster.shutdown();
}
}
}
public void startRouters()
throws InterruptedException, URISyntaxException, IOException {
// Create routers
routers = new ArrayList<RouterContext>();
for (String ns : nameservices) {
// Create one router per nameservice
this.routers = new ArrayList<>();
for (String ns : this.nameservices) {
for (NamenodeContext context : getNamenodes(ns)) {
routers.add(buildRouter(ns, context.namenodeId));
RouterContext router = buildRouter(ns, context.namenodeId);
this.routers.add(router);
}
}
// Start all routers
for (RouterContext router : routers) {
for (RouterContext router : this.routers) {
router.router.start();
}
// Wait until all routers are active and record their ports
for (RouterContext router : routers) {
for (RouterContext router : this.routers) {
waitActive(router);
router.initRouter();
}
@ -570,22 +634,21 @@ public class RouterDFSCluster {
}
Thread.sleep(1000);
}
assertFalse(
"Timeout waiting for " + router.router.toString() + " to activate.",
true);
fail("Timeout waiting for " + router.router + " to activate");
}
public void registerNamenodes() throws IOException {
for (RouterContext r : routers) {
for (RouterContext r : this.routers) {
ActiveNamenodeResolver resolver = r.router.getNamenodeResolver();
for (NamenodeContext nn : namenodes) {
for (NamenodeContext nn : this.namenodes) {
// Generate a report
NamenodeStatusReport report = new NamenodeStatusReport(nn.nameserviceId,
nn.namenodeId, nn.getRpcAddress(), nn.getServiceAddress(),
NamenodeStatusReport report = new NamenodeStatusReport(
nn.nameserviceId, nn.namenodeId,
nn.getRpcAddress(), nn.getServiceAddress(),
nn.getLifelineAddress(), nn.getHttpAddress());
report.setNamespaceInfo(nn.namenode.getNamesystem().getFSImage()
.getStorage().getNamespaceInfo());
FSImage fsImage = nn.namenode.getNamesystem().getFSImage();
NamespaceInfo nsInfo = fsImage.getStorage().getNamespaceInfo();
report.setNamespaceInfo(nsInfo);
// Determine HA state from nn public state string
String nnState = nn.namenode.getState();
@ -606,74 +669,97 @@ public class RouterDFSCluster {
public void waitNamenodeRegistration()
throws InterruptedException, IllegalStateException, IOException {
for (RouterContext r : routers) {
for (NamenodeContext nn : namenodes) {
FederationTestUtils.waitNamenodeRegistered(
r.router.getNamenodeResolver(), nn.nameserviceId, nn.namenodeId,
null);
for (RouterContext r : this.routers) {
Router router = r.router;
for (NamenodeContext nn : this.namenodes) {
ActiveNamenodeResolver nnResolver = router.getNamenodeResolver();
waitNamenodeRegistered(
nnResolver, nn.nameserviceId, nn.namenodeId, null);
}
}
}
public void waitRouterRegistrationQuorum(RouterContext router,
FederationNamenodeServiceState state, String nameservice, String namenode)
FederationNamenodeServiceState state, String nsId, String nnId)
throws InterruptedException, IOException {
LOG.info("Waiting for NN - " + nameservice + ":" + namenode
+ " to transition to state - " + state);
FederationTestUtils.waitNamenodeRegistered(
router.router.getNamenodeResolver(), nameservice, namenode, state);
}
public String getFederatedPathForNameservice(String ns) {
return "/" + ns;
}
public String getNamenodePathForNameservice(String ns) {
return "/target-" + ns;
LOG.info("Waiting for NN {} {} to transition to {}", nsId, nnId, state);
ActiveNamenodeResolver nnResolver = router.router.getNamenodeResolver();
waitNamenodeRegistered(nnResolver, nsId, nnId, state);
}
/**
* @return example:
* Get the federated path for a nameservice.
* @param nsId Nameservice identifier.
* @return Path in the Router.
*/
public String getFederatedPathForNS(String nsId) {
return "/" + nsId;
}
/**
* Get the namenode path for a nameservice.
* @param nsId Nameservice identifier.
* @return Path in the Namenode.
*/
public String getNamenodePathForNS(String nsId) {
return "/target-" + nsId;
}
/**
* Get the federated test directory for a nameservice.
* @param nsId Nameservice identifier.
* @return Example:
* <ul>
* <li>/ns0/testdir which maps to ns0->/target-ns0/testdir
* </ul>
*/
public String getFederatedTestDirectoryForNameservice(String ns) {
return getFederatedPathForNameservice(ns) + "/" + TEST_DIR;
public String getFederatedTestDirectoryForNS(String nsId) {
return getFederatedPathForNS(nsId) + "/" + TEST_DIR;
}
/**
* Get the namenode test directory for a nameservice.
* @param nsId Nameservice identifier.
* @return example:
* <ul>
* <li>/target-ns0/testdir
* </ul>
*/
public String getNamenodeTestDirectoryForNameservice(String ns) {
return getNamenodePathForNameservice(ns) + "/" + TEST_DIR;
public String getNamenodeTestDirectoryForNS(String nsId) {
return getNamenodePathForNS(nsId) + "/" + TEST_DIR;
}
/**
* Get the federated test file for a nameservice.
* @param nsId Nameservice identifier.
* @return example:
* <ul>
* <li>/ns0/testfile which maps to ns0->/target-ns0/testfile
* </ul>
*/
public String getFederatedTestFileForNameservice(String ns) {
return getFederatedPathForNameservice(ns) + "/" + TEST_FILE;
public String getFederatedTestFileForNS(String nsId) {
return getFederatedPathForNS(nsId) + "/" + TEST_FILE;
}
/**
* Get the namenode test file for a nameservice.
* @param nsId Nameservice identifier.
* @return example:
* <ul>
* <li>/target-ns0/testfile
* </ul>
*/
public String getNamenodeTestFileForNameservice(String ns) {
return getNamenodePathForNameservice(ns) + "/" + TEST_FILE;
public String getNamenodeTestFileForNS(String nsId) {
return getNamenodePathForNS(nsId) + "/" + TEST_FILE;
}
/**
* Stop the federated HDFS cluster.
*/
public void shutdown() {
cluster.shutdown();
if (cluster != null) {
cluster.shutdown();
}
if (routers != null) {
for (RouterContext context : routers) {
stopRouter(context);
@ -681,9 +767,12 @@ public class RouterDFSCluster {
}
}
/**
* Stop a router.
* @param router Router context.
*/
public void stopRouter(RouterContext router) {
try {
router.router.shutDown();
int loopCount = 0;
@ -691,7 +780,7 @@ public class RouterDFSCluster {
loopCount++;
Thread.sleep(1000);
if (loopCount > 20) {
LOG.error("Unable to shutdown router - " + router.rpcPort);
LOG.error("Cannot shutdown router {}", router.rpcPort);
break;
}
}
@ -714,26 +803,28 @@ public class RouterDFSCluster {
for (String ns : getNameservices()) {
NamenodeContext context = getNamenode(ns, null);
if (!createTestDirectoriesNamenode(context)) {
throw new IOException("Unable to create test directory for ns - " + ns);
throw new IOException("Cannot create test directory for ns " + ns);
}
}
}
public boolean createTestDirectoriesNamenode(NamenodeContext nn)
throws IOException {
return FederationTestUtils.addDirectory(nn.getFileSystem(),
getNamenodeTestDirectoryForNameservice(nn.nameserviceId));
FileSystem fs = nn.getFileSystem();
String testDir = getNamenodeTestDirectoryForNS(nn.nameserviceId);
return addDirectory(fs, testDir);
}
public void deleteAllFiles() throws IOException {
// Delete all files via the NNs and verify
for (NamenodeContext context : getNamenodes()) {
FileStatus[] status = context.getFileSystem().listStatus(new Path("/"));
for(int i = 0; i <status.length; i++) {
FileSystem fs = context.getFileSystem();
FileStatus[] status = fs.listStatus(new Path("/"));
for (int i = 0; i <status.length; i++) {
Path p = status[i].getPath();
context.getFileSystem().delete(p, true);
fs.delete(p, true);
}
status = context.getFileSystem().listStatus(new Path("/"));
status = fs.listStatus(new Path("/"));
assertEquals(status.length, 0);
}
}
@ -754,14 +845,34 @@ public class RouterDFSCluster {
MockResolver resolver =
(MockResolver) r.router.getSubclusterResolver();
// create table entries
for (String ns : nameservices) {
for (String nsId : nameservices) {
// Direct path
resolver.addLocation(getFederatedPathForNameservice(ns), ns,
getNamenodePathForNameservice(ns));
String routerPath = getFederatedPathForNS(nsId);
String nnPath = getNamenodePathForNS(nsId);
resolver.addLocation(routerPath, nsId, nnPath);
}
// Root path goes to both NS1
resolver.addLocation("/", nameservices.get(0), "/");
// Root path points to both first nameservice
String ns0 = nameservices.get(0);
resolver.addLocation("/", ns0, "/");
}
}
public MiniDFSCluster getCluster() {
return cluster;
}
/**
* Wait until the federated cluster is up and ready.
* @throws IOException If we cannot wait for the cluster to be up.
*/
public void waitClusterUp() throws IOException {
cluster.waitClusterUp();
registerNamenodes();
try {
waitNamenodeRegistration();
} catch (Exception e) {
throw new IOException("Cannot wait for the namenodes", e);
}
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.federation.router;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.io.IOException;
import java.net.URISyntaxException;
@ -51,6 +52,10 @@ public class TestRouter {
conf.set(DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
MockResolver.class.getCanonicalName());
// Bind to any available port
conf.set(DFSConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY, "0.0.0.0");
conf.set(DFSConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "127.0.0.1:0");
// Simulate a co-located NN
conf.set(DFSConfigKeys.DFS_NAMESERVICES, "ns0");
conf.set("fs.defaultFS", "hdfs://" + "ns0");
@ -90,7 +95,31 @@ public class TestRouter {
@Test
public void testRouterService() throws InterruptedException, IOException {
// Rpc only
testRouterStartup(new RouterConfigBuilder(conf).rpc().build());
// Run with all services
testRouterStartup((new RouterConfigBuilder(conf)).build());
testRouterStartup(new RouterConfigBuilder(conf).all().build());
}
@Test
public void testRouterRestartRpcService() throws IOException {
// Start
Router router = new Router();
router.init(new RouterConfigBuilder(conf).rpc().build());
router.start();
// Verify RPC server is running
assertNotNull(router.getRpcServerAddress());
RouterRpcServer rpcServer = router.getRpcServer();
assertNotNull(rpcServer);
assertEquals(STATE.STARTED, rpcServer.getServiceState());
// Stop router and RPC server
router.stop();
assertEquals(STATE.STOPPED, rpcServer.getServiceState());
router.close();
}
}

View File

@ -0,0 +1,869 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.addDirectory;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.countContents;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createFile;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.deleteFile;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getFileStatus;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists;
import static org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.TEST_STRING;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext;
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service.STATE;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* The the RPC interface of the {@link Router} implemented by
* {@link RouterRpcServer}.
*/
public class TestRouterRpc {
/** Federated HDFS cluster. */
private static RouterDFSCluster cluster;
/** Random Router for this federated cluster. */
private RouterContext router;
/** Random nameservice in the federated cluster. */
private String ns;
/** First namenode in the nameservice. */
private NamenodeContext namenode;
/** Client interface to the Router. */
private ClientProtocol routerProtocol;
/** Client interface to the Namenode. */
private ClientProtocol nnProtocol;
/** Filesystem interface to the Router. */
private FileSystem routerFS;
/** Filesystem interface to the Namenode. */
private FileSystem nnFS;
/** File in the Router. */
private String routerFile;
/** File in the Namenode. */
private String nnFile;
@BeforeClass
public static void globalSetUp() throws Exception {
cluster = new RouterDFSCluster(false, 2);
// Start NNs and DNs and wait until ready
cluster.startCluster();
// Start routers with only an RPC service
cluster.addRouterOverrides((new RouterConfigBuilder()).rpc().build());
cluster.startRouters();
// Register and verify all NNs with all routers
cluster.registerNamenodes();
cluster.waitNamenodeRegistration();
}
@AfterClass
public static void tearDown() {
cluster.shutdown();
}
@Before
public void testSetup() throws Exception {
// Create mock locations
cluster.installMockLocations();
// Delete all files via the NNs and verify
cluster.deleteAllFiles();
// Create test fixtures on NN
cluster.createTestDirectoriesNamenode();
// Wait to ensure NN has fully created its test directories
Thread.sleep(100);
// Pick a NS, namenode and router for this test
this.router = cluster.getRandomRouter();
this.ns = cluster.getRandomNameservice();
this.namenode = cluster.getNamenode(ns, null);
// Handles to the ClientProtocol interface
this.routerProtocol = router.getClient().getNamenode();
this.nnProtocol = namenode.getClient().getNamenode();
// Handles to the filesystem client
this.nnFS = namenode.getFileSystem();
this.routerFS = router.getFileSystem();
// Create a test file on the NN
Random r = new Random();
String randomFile = "testfile-" + r.nextInt();
this.nnFile =
cluster.getNamenodeTestDirectoryForNS(ns) + "/" + randomFile;
this.routerFile =
cluster.getFederatedTestDirectoryForNS(ns) + "/" + randomFile;
createFile(nnFS, nnFile, 32);
verifyFileExists(nnFS, nnFile);
}
@Test
public void testRpcService() throws IOException {
Router testRouter = new Router();
List<String> nss = cluster.getNameservices();
String ns0 = nss.get(0);
Configuration routerConfig = cluster.generateRouterConfiguration(ns0, null);
RouterRpcServer server = new RouterRpcServer(routerConfig, testRouter,
testRouter.getNamenodeResolver(), testRouter.getSubclusterResolver());
server.init(routerConfig);
assertEquals(STATE.INITED, server.getServiceState());
server.start();
assertEquals(STATE.STARTED, server.getServiceState());
server.stop();
assertEquals(STATE.STOPPED, server.getServiceState());
server.close();
testRouter.close();
}
protected RouterDFSCluster getCluster() {
return TestRouterRpc.cluster;
}
protected RouterContext getRouterContext() {
return this.router;
}
protected void setRouter(RouterContext r)
throws IOException, URISyntaxException {
this.router = r;
this.routerProtocol = r.getClient().getNamenode();
this.routerFS = r.getFileSystem();
}
protected FileSystem getRouterFileSystem() {
return this.routerFS;
}
protected FileSystem getNamenodeFileSystem() {
return this.nnFS;
}
protected ClientProtocol getRouterProtocol() {
return this.routerProtocol;
}
protected ClientProtocol getNamenodeProtocol() {
return this.nnProtocol;
}
protected NamenodeContext getNamenode() {
return this.namenode;
}
protected void setNamenodeFile(String filename) {
this.nnFile = filename;
}
protected String getNamenodeFile() {
return this.nnFile;
}
protected void setRouterFile(String filename) {
this.routerFile = filename;
}
protected String getRouterFile() {
return this.routerFile;
}
protected void setNamenode(NamenodeContext nn)
throws IOException, URISyntaxException {
this.namenode = nn;
this.nnProtocol = nn.getClient().getNamenode();
this.nnFS = nn.getFileSystem();
}
protected String getNs() {
return this.ns;
}
protected void setNs(String nameservice) {
this.ns = nameservice;
}
protected static void compareResponses(
ClientProtocol protocol1, ClientProtocol protocol2,
Method m, Object[] paramList) {
Object return1 = null;
Exception exception1 = null;
try {
return1 = m.invoke(protocol1, paramList);
} catch (Exception ex) {
exception1 = ex;
}
Object return2 = null;
Exception exception2 = null;
try {
return2 = m.invoke(protocol2, paramList);
} catch (Exception ex) {
exception2 = ex;
}
assertEquals(return1, return2);
if (exception1 == null && exception2 == null) {
return;
}
assertEquals(
exception1.getCause().getClass(),
exception2.getCause().getClass());
}
@Test
public void testProxyListFiles() throws IOException, InterruptedException,
URISyntaxException, NoSuchMethodException, SecurityException {
// Verify that the root listing is a union of the mount table destinations
// and the files stored at all nameservices mounted at the root (ns0 + ns1)
//
// / -->
// /ns0 (from mount table)
// /ns1 (from mount table)
// all items in / of ns0 (default NS)
// Collect the mount table entries from the root mount point
Set<String> requiredPaths = new TreeSet<>();
FileSubclusterResolver fileResolver =
router.getRouter().getSubclusterResolver();
for (String mount : fileResolver.getMountPoints("/")) {
requiredPaths.add(mount);
}
// Collect all files/dirs on the root path of the default NS
String defaultNs = cluster.getNameservices().get(0);
NamenodeContext nn = cluster.getNamenode(defaultNs, null);
FileStatus[] iterator = nn.getFileSystem().listStatus(new Path("/"));
for (FileStatus file : iterator) {
requiredPaths.add(file.getPath().getName());
}
// Fetch listing
DirectoryListing listing =
routerProtocol.getListing("/", HdfsFileStatus.EMPTY_NAME, false);
Iterator<String> requiredPathsIterator = requiredPaths.iterator();
// Match each path returned and verify order returned
for(HdfsFileStatus f : listing.getPartialListing()) {
String fileName = requiredPathsIterator.next();
String currentFile = f.getFullPath(new Path("/")).getName();
assertEquals(currentFile, fileName);
}
// Verify the total number of results found/matched
assertEquals(requiredPaths.size(), listing.getPartialListing().length);
// List a path that doesn't exist and validate error response with NN
// behavior.
Method m = ClientProtocol.class.getMethod(
"getListing", String.class, byte[].class, boolean.class);
String badPath = "/unknownlocation/unknowndir";
compareResponses(routerProtocol, nnProtocol, m,
new Object[] {badPath, HdfsFileStatus.EMPTY_NAME, false});
}
@Test
public void testProxyListFilesWithConflict()
throws IOException, InterruptedException {
// Add a directory to the namespace that conflicts with a mount point
NamenodeContext nn = cluster.getNamenode(ns, null);
FileSystem nnFs = nn.getFileSystem();
addDirectory(nnFs, cluster.getFederatedTestDirectoryForNS(ns));
FileSystem routerFs = router.getFileSystem();
int initialCount = countContents(routerFs, "/");
// Root file system now for NS X:
// / ->
// /ns0 (mount table)
// /ns1 (mount table)
// /target-ns0 (the target folder for the NS0 mapped to /
// /nsX (local directory that duplicates mount table)
int newCount = countContents(routerFs, "/");
assertEquals(initialCount, newCount);
// Verify that each root path is readable and contains one test directory
assertEquals(1, countContents(routerFs, cluster.getFederatedPathForNS(ns)));
// Verify that real folder for the ns contains a single test directory
assertEquals(1, countContents(nnFs, cluster.getNamenodePathForNS(ns)));
}
protected void testRename(RouterContext testRouter, String filename,
String renamedFile, boolean exceptionExpected) throws IOException {
createFile(testRouter.getFileSystem(), filename, 32);
// verify
verifyFileExists(testRouter.getFileSystem(), filename);
// rename
boolean exceptionThrown = false;
try {
DFSClient client = testRouter.getClient();
ClientProtocol clientProtocol = client.getNamenode();
clientProtocol.rename(filename, renamedFile);
} catch (Exception ex) {
exceptionThrown = true;
}
if (exceptionExpected) {
// Error was expected
assertTrue(exceptionThrown);
FileContext fileContext = testRouter.getFileContext();
assertTrue(fileContext.delete(new Path(filename), true));
} else {
// No error was expected
assertFalse(exceptionThrown);
// verify
assertTrue(verifyFileExists(testRouter.getFileSystem(), renamedFile));
// delete
FileContext fileContext = testRouter.getFileContext();
assertTrue(fileContext.delete(new Path(renamedFile), true));
}
}
protected void testRename2(RouterContext testRouter, String filename,
String renamedFile, boolean exceptionExpected) throws IOException {
createFile(testRouter.getFileSystem(), filename, 32);
// verify
verifyFileExists(testRouter.getFileSystem(), filename);
// rename
boolean exceptionThrown = false;
try {
DFSClient client = testRouter.getClient();
ClientProtocol clientProtocol = client.getNamenode();
clientProtocol.rename2(filename, renamedFile, new Options.Rename[] {});
} catch (Exception ex) {
exceptionThrown = true;
}
assertEquals(exceptionExpected, exceptionThrown);
if (exceptionExpected) {
// Error was expected
FileContext fileContext = testRouter.getFileContext();
assertTrue(fileContext.delete(new Path(filename), true));
} else {
// verify
assertTrue(verifyFileExists(testRouter.getFileSystem(), renamedFile));
// delete
FileContext fileContext = testRouter.getFileContext();
assertTrue(fileContext.delete(new Path(renamedFile), true));
}
}
@Test
public void testProxyRenameFiles() throws IOException, InterruptedException {
Thread.sleep(5000);
List<String> nss = cluster.getNameservices();
String ns0 = nss.get(0);
String ns1 = nss.get(1);
// Rename within the same namespace
// /ns0/testdir/testrename -> /ns0/testdir/testrename-append
String filename =
cluster.getFederatedTestDirectoryForNS(ns0) + "/testrename";
String renamedFile = filename + "-append";
testRename(router, filename, renamedFile, false);
testRename2(router, filename, renamedFile, false);
// Rename a file to a destination that is in a different namespace (fails)
filename = cluster.getFederatedTestDirectoryForNS(ns0) + "/testrename";
renamedFile = cluster.getFederatedTestDirectoryForNS(ns1) + "/testrename";
testRename(router, filename, renamedFile, true);
testRename2(router, filename, renamedFile, true);
}
@Test
public void testProxyChownFiles() throws Exception {
String newUsername = "TestUser";
String newGroup = "TestGroup";
// change owner
routerProtocol.setOwner(routerFile, newUsername, newGroup);
// Verify with NN
FileStatus file = getFileStatus(namenode.getFileSystem(), nnFile);
assertEquals(file.getOwner(), newUsername);
assertEquals(file.getGroup(), newGroup);
// Bad request and validate router response matches NN response.
Method m = ClientProtocol.class.getMethod("setOwner", String.class,
String.class, String.class);
String badPath = "/unknownlocation/unknowndir";
compareResponses(routerProtocol, nnProtocol, m,
new Object[] {badPath, newUsername, newGroup});
}
@Test
public void testProxyGetStats() throws Exception {
long[] combinedData = routerProtocol.getStats();
long[] individualData = new long[10];
for (String nameservice : cluster.getNameservices()) {
NamenodeContext n = cluster.getNamenode(nameservice, null);
DFSClient client = n.getClient();
ClientProtocol clientProtocol = client.getNamenode();
long[] data = clientProtocol.getStats();
for (int i = 0; i < data.length; i++) {
individualData[i] += data[i];
}
assert(data.length == combinedData.length);
}
for (int i = 0; i < combinedData.length && i < individualData.length; i++) {
if (i == ClientProtocol.GET_STATS_REMAINING_IDX) {
// Skip available storage as this fluctuates in mini cluster
continue;
}
assertEquals(combinedData[i], individualData[i]);
}
}
@Test
public void testProxyGetDatanodeReport() throws Exception {
DatanodeInfo[] combinedData =
routerProtocol.getDatanodeReport(DatanodeReportType.ALL);
Set<Integer> individualData = new HashSet<Integer>();
for (String nameservice : cluster.getNameservices()) {
NamenodeContext n = cluster.getNamenode(nameservice, null);
DFSClient client = n.getClient();
ClientProtocol clientProtocol = client.getNamenode();
DatanodeInfo[] data =
clientProtocol.getDatanodeReport(DatanodeReportType.ALL);
for (int i = 0; i < data.length; i++) {
// Collect unique DNs based on their xfer port
DatanodeInfo info = data[i];
individualData.add(info.getXferPort());
}
}
assertEquals(combinedData.length, individualData.size());
}
@Test
public void testProxyGetDatanodeStorageReport()
throws IOException, InterruptedException, URISyntaxException {
DatanodeStorageReport[] combinedData =
routerProtocol.getDatanodeStorageReport(DatanodeReportType.ALL);
Set<String> individualData = new HashSet<>();
for (String nameservice : cluster.getNameservices()) {
NamenodeContext n = cluster.getNamenode(nameservice, null);
DFSClient client = n.getClient();
ClientProtocol clientProtocol = client.getNamenode();
DatanodeStorageReport[] data =
clientProtocol.getDatanodeStorageReport(DatanodeReportType.ALL);
for (DatanodeStorageReport report : data) {
// Determine unique DN instances
DatanodeInfo dn = report.getDatanodeInfo();
individualData.add(dn.toString());
}
}
assertEquals(combinedData.length, individualData.size());
}
@Test
public void testProxyMkdir() throws Exception {
// Check the initial folders
FileStatus[] filesInitial = routerFS.listStatus(new Path("/"));
// Create a directory via the router at the root level
String dirPath = "/testdir";
FsPermission permission = new FsPermission("705");
routerProtocol.mkdirs(dirPath, permission, false);
// Verify the root listing has the item via the router
FileStatus[] files = routerFS.listStatus(new Path("/"));
assertEquals(Arrays.toString(files) + " should be " +
Arrays.toString(filesInitial) + " + " + dirPath,
filesInitial.length + 1, files.length);
assertTrue(verifyFileExists(routerFS, dirPath));
// Verify the directory is present in only 1 Namenode
int foundCount = 0;
for (NamenodeContext n : cluster.getNamenodes()) {
if (verifyFileExists(n.getFileSystem(), dirPath)) {
foundCount++;
}
}
assertEquals(1, foundCount);
assertTrue(deleteFile(routerFS, dirPath));
// Validate router failure response matches NN failure response.
Method m = ClientProtocol.class.getMethod("mkdirs", String.class,
FsPermission.class, boolean.class);
String badPath = "/unknownlocation/unknowndir";
compareResponses(routerProtocol, nnProtocol, m,
new Object[] {badPath, permission, false});
}
@Test
public void testProxyChmodFiles() throws Exception {
FsPermission permission = new FsPermission("444");
// change permissions
routerProtocol.setPermission(routerFile, permission);
// Validate permissions NN
FileStatus file = getFileStatus(namenode.getFileSystem(), nnFile);
assertEquals(permission, file.getPermission());
// Validate router failure response matches NN failure response.
Method m = ClientProtocol.class.getMethod(
"setPermission", String.class, FsPermission.class);
String badPath = "/unknownlocation/unknowndir";
compareResponses(routerProtocol, nnProtocol, m,
new Object[] {badPath, permission});
}
@Test
public void testProxySetReplication() throws Exception {
// Check current replication via NN
FileStatus file = getFileStatus(nnFS, nnFile);
assertEquals(1, file.getReplication());
// increment replication via router
routerProtocol.setReplication(routerFile, (short) 2);
// Verify via NN
file = getFileStatus(nnFS, nnFile);
assertEquals(2, file.getReplication());
// Validate router failure response matches NN failure response.
Method m = ClientProtocol.class.getMethod(
"setReplication", String.class, short.class);
String badPath = "/unknownlocation/unknowndir";
compareResponses(routerProtocol, nnProtocol, m,
new Object[] {badPath, (short) 2});
}
@Test
public void testProxyTruncateFile() throws Exception {
// Check file size via NN
FileStatus file = getFileStatus(nnFS, nnFile);
assertTrue(file.getLen() > 0);
// Truncate to 0 bytes via router
routerProtocol.truncate(routerFile, 0, "testclient");
// Verify via NN
file = getFileStatus(nnFS, nnFile);
assertEquals(0, file.getLen());
// Validate router failure response matches NN failure response.
Method m = ClientProtocol.class.getMethod(
"truncate", String.class, long.class, String.class);
String badPath = "/unknownlocation/unknowndir";
compareResponses(routerProtocol, nnProtocol, m,
new Object[] {badPath, (long) 0, "testclient"});
}
@Test
public void testProxyGetBlockLocations() throws Exception {
// Fetch block locations via router
LocatedBlocks locations =
routerProtocol.getBlockLocations(routerFile, 0, 1024);
assertEquals(1, locations.getLocatedBlocks().size());
// Validate router failure response matches NN failure response.
Method m = ClientProtocol.class.getMethod(
"getBlockLocations", String.class, long.class, long.class);
String badPath = "/unknownlocation/unknowndir";
compareResponses(routerProtocol, nnProtocol,
m, new Object[] {badPath, (long) 0, (long) 0});
}
@Test
public void testProxyStoragePolicy() throws Exception {
// Query initial policy via NN
HdfsFileStatus status = namenode.getClient().getFileInfo(nnFile);
// Set a random policy via router
BlockStoragePolicy[] policies = namenode.getClient().getStoragePolicies();
BlockStoragePolicy policy = policies[0];
while (policy.isCopyOnCreateFile()) {
// Pick a non copy on create policy
Random rand = new Random();
int randIndex = rand.nextInt(policies.length);
policy = policies[randIndex];
}
routerProtocol.setStoragePolicy(routerFile, policy.getName());
// Verify policy via NN
HdfsFileStatus newStatus = namenode.getClient().getFileInfo(nnFile);
assertTrue(newStatus.getStoragePolicy() == policy.getId());
assertTrue(newStatus.getStoragePolicy() != status.getStoragePolicy());
// Validate router failure response matches NN failure response.
Method m = ClientProtocol.class.getMethod("setStoragePolicy", String.class,
String.class);
String badPath = "/unknownlocation/unknowndir";
compareResponses(routerProtocol, nnProtocol,
m, new Object[] {badPath, "badpolicy"});
}
@Test
public void testProxyGetPreferedBlockSize() throws Exception {
// Query via NN and Router and verify
long namenodeSize = nnProtocol.getPreferredBlockSize(nnFile);
long routerSize = routerProtocol.getPreferredBlockSize(routerFile);
assertEquals(routerSize, namenodeSize);
// Validate router failure response matches NN failure response.
Method m = ClientProtocol.class.getMethod(
"getPreferredBlockSize", String.class);
String badPath = "/unknownlocation/unknowndir";
compareResponses(
routerProtocol, nnProtocol, m, new Object[] {badPath});
}
private void testConcat(
String source, String target, boolean failureExpected) {
boolean failure = false;
try {
// Concat test file with fill block length file via router
routerProtocol.concat(target, new String[] {source});
} catch (IOException ex) {
failure = true;
}
assertEquals(failureExpected, failure);
}
@Test
public void testProxyConcatFile() throws Exception {
// Create a stub file in the primary ns
String sameNameservice = ns;
String existingFile =
cluster.getFederatedTestDirectoryForNS(sameNameservice) +
"_concatfile";
int existingFileSize = 32;
createFile(routerFS, existingFile, existingFileSize);
// Identify an alternate nameservice that doesn't match the existing file
String alternateNameservice = null;
for (String n : cluster.getNameservices()) {
if (!n.equals(sameNameservice)) {
alternateNameservice = n;
break;
}
}
// Create new files, must be a full block to use concat. One file is in the
// same namespace as the target file, the other is in a different namespace.
String altRouterFile =
cluster.getFederatedTestDirectoryForNS(alternateNameservice) +
"_newfile";
String sameRouterFile =
cluster.getFederatedTestDirectoryForNS(sameNameservice) +
"_newfile";
createFile(routerFS, altRouterFile, DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
createFile(routerFS, sameRouterFile, DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
// Concat in different namespaces, fails
testConcat(existingFile, altRouterFile, true);
// Concat in same namespaces, succeeds
testConcat(existingFile, sameRouterFile, false);
// Check target file length
FileStatus status = getFileStatus(routerFS, sameRouterFile);
assertEquals(
existingFileSize + DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT,
status.getLen());
// Validate router failure response matches NN failure response.
Method m = ClientProtocol.class.getMethod(
"concat", String.class, String[].class);
String badPath = "/unknownlocation/unknowndir";
compareResponses(routerProtocol, nnProtocol, m,
new Object[] {badPath, new String[] {routerFile}});
}
@Test
public void testProxyAppend() throws Exception {
// Append a test string via router
EnumSet<CreateFlag> createFlag = EnumSet.of(CreateFlag.APPEND);
DFSClient routerClient = getRouterContext().getClient();
HdfsDataOutputStream stream =
routerClient.append(routerFile, 1024, createFlag, null, null);
stream.writeBytes(TEST_STRING);
stream.close();
// Verify file size via NN
FileStatus status = getFileStatus(nnFS, nnFile);
assertTrue(status.getLen() > TEST_STRING.length());
// Validate router failure response matches NN failure response.
Method m = ClientProtocol.class.getMethod("append", String.class,
String.class, EnumSetWritable.class);
String badPath = "/unknownlocation/unknowndir";
EnumSetWritable<CreateFlag> createFlagWritable =
new EnumSetWritable<CreateFlag>(createFlag);
compareResponses(routerProtocol, nnProtocol, m,
new Object[] {badPath, "testClient", createFlagWritable});
}
@Test
public void testProxyGetAdditionalDatanode()
throws IOException, InterruptedException, URISyntaxException {
// Use primitive APIs to open a file, add a block, and get datanode location
EnumSet<CreateFlag> createFlag = EnumSet.of(CreateFlag.CREATE);
String clientName = getRouterContext().getClient().getClientName();
String newRouterFile = routerFile + "_additionalDatanode";
HdfsFileStatus status = routerProtocol.create(
newRouterFile, new FsPermission("777"), clientName,
new EnumSetWritable<CreateFlag>(createFlag), true, (short) 1,
(long) 1024, CryptoProtocolVersion.supported(), null);
// Add a block via router (requires client to have same lease)
LocatedBlock block = routerProtocol.addBlock(
newRouterFile, clientName, null, null,
status.getFileId(), null, null);
DatanodeInfo[] exclusions = new DatanodeInfo[0];
LocatedBlock newBlock = routerProtocol.getAdditionalDatanode(
newRouterFile, status.getFileId(), block.getBlock(),
block.getLocations(), block.getStorageIDs(), exclusions, 1, clientName);
assertNotNull(newBlock);
}
@Test
public void testProxyCreateFileAlternateUser()
throws IOException, URISyntaxException, InterruptedException {
// Create via Router
String routerDir = cluster.getFederatedTestDirectoryForNS(ns);
String namenodeDir = cluster.getNamenodeTestDirectoryForNS(ns);
String newRouterFile = routerDir + "/unknownuser";
String newNamenodeFile = namenodeDir + "/unknownuser";
String username = "unknownuser";
// Allow all user access to dir
namenode.getFileContext().setPermission(
new Path(namenodeDir), new FsPermission("777"));
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(username);
DFSClient client = getRouterContext().getClient(ugi);
client.create(newRouterFile, true);
// Fetch via NN and check user
FileStatus status = getFileStatus(nnFS, newNamenodeFile);
assertEquals(status.getOwner(), username);
}
@Test
public void testProxyGetFileInfoAcessException() throws IOException {
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser("unknownuser");
// List files from the NN and trap the exception
Exception nnFailure = null;
try {
String testFile = cluster.getNamenodeTestFileForNS(ns);
namenode.getClient(ugi).getLocatedBlocks(testFile, 0);
} catch (Exception e) {
nnFailure = e;
}
assertNotNull(nnFailure);
// List files from the router and trap the exception
Exception routerFailure = null;
try {
String testFile = cluster.getFederatedTestFileForNS(ns);
getRouterContext().getClient(ugi).getLocatedBlocks(testFile, 0);
} catch (Exception e) {
routerFailure = e;
}
assertNotNull(routerFailure);
assertEquals(routerFailure.getClass(), nnFailure.getClass());
}
}

View File

@ -0,0 +1,216 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createFile;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.federation.MockResolver;
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext;
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
/**
* The the RPC interface of the {@link getRouter()} implemented by
* {@link RouterRpcServer}.
*/
public class TestRouterRpcMultiDestination extends TestRouterRpc {
@Override
public void testSetup() throws Exception {
RouterDFSCluster cluster = getCluster();
// Create mock locations
getCluster().installMockLocations();
List<RouterContext> routers = cluster.getRouters();
// Add extra location to the root mount / such that the root mount points:
// /
// ns0 -> /
// ns1 -> /
for (RouterContext rc : routers) {
Router router = rc.getRouter();
MockResolver resolver = (MockResolver) router.getSubclusterResolver();
resolver.addLocation("/", cluster.getNameservices().get(1), "/");
}
// Create a mount that points to 2 dirs in the same ns:
// /same
// ns0 -> /
// ns0 -> /target-ns0
for (RouterContext rc : routers) {
Router router = rc.getRouter();
MockResolver resolver = (MockResolver) router.getSubclusterResolver();
List<String> nss = cluster.getNameservices();
String ns0 = nss.get(0);
resolver.addLocation("/same", ns0, "/");
resolver.addLocation("/same", ns0, cluster.getNamenodePathForNS(ns0));
}
// Delete all files via the NNs and verify
cluster.deleteAllFiles();
// Create test fixtures on NN
cluster.createTestDirectoriesNamenode();
// Wait to ensure NN has fully created its test directories
Thread.sleep(100);
// Pick a NS, namenode and getRouter() for this test
RouterContext router = cluster.getRandomRouter();
this.setRouter(router);
String ns = cluster.getRandomNameservice();
this.setNs(ns);
this.setNamenode(cluster.getNamenode(ns, null));
// Create a test file on a single NN that is accessed via a getRouter() path
// with 2 destinations. All tests should failover to the alternate
// destination if the wrong NN is attempted first.
Random r = new Random();
String randomString = "testfile-" + r.nextInt();
setNamenodeFile("/" + randomString);
setRouterFile("/" + randomString);
FileSystem nnFs = getNamenodeFileSystem();
FileSystem routerFs = getRouterFileSystem();
createFile(nnFs, getNamenodeFile(), 32);
verifyFileExists(nnFs, getNamenodeFile());
verifyFileExists(routerFs, getRouterFile());
}
private void testListing(String path) throws IOException {
// Collect the mount table entries for this path
Set<String> requiredPaths = new TreeSet<>();
RouterContext rc = getRouterContext();
Router router = rc.getRouter();
FileSubclusterResolver subclusterResolver = router.getSubclusterResolver();
for (String mount : subclusterResolver.getMountPoints(path)) {
requiredPaths.add(mount);
}
// Get files/dirs from the Namenodes
PathLocation location = subclusterResolver.getDestinationForPath(path);
for (RemoteLocation loc : location.getDestinations()) {
String nsId = loc.getNameserviceId();
String dest = loc.getDest();
NamenodeContext nn = getCluster().getNamenode(nsId, null);
FileSystem fs = nn.getFileSystem();
FileStatus[] files = fs.listStatus(new Path(dest));
for (FileStatus file : files) {
String pathName = file.getPath().getName();
requiredPaths.add(pathName);
}
}
// Get files/dirs from the Router
DirectoryListing listing =
getRouterProtocol().getListing(path, HdfsFileStatus.EMPTY_NAME, false);
Iterator<String> requiredPathsIterator = requiredPaths.iterator();
// Match each path returned and verify order returned
HdfsFileStatus[] partialListing = listing.getPartialListing();
for (HdfsFileStatus fileStatus : listing.getPartialListing()) {
String fileName = requiredPathsIterator.next();
String currentFile = fileStatus.getFullPath(new Path(path)).getName();
assertEquals(currentFile, fileName);
}
// Verify the total number of results found/matched
assertEquals(
requiredPaths + " doesn't match " + Arrays.toString(partialListing),
requiredPaths.size(), partialListing.length);
}
@Override
public void testProxyListFiles() throws IOException, InterruptedException,
URISyntaxException, NoSuchMethodException, SecurityException {
// Verify that the root listing is a union of the mount table destinations
// and the files stored at all nameservices mounted at the root (ns0 + ns1)
// / -->
// /ns0 (from mount table)
// /ns1 (from mount table)
// /same (from the mount table)
// all items in / of ns0 from mapping of / -> ns0:::/)
// all items in / of ns1 from mapping of / -> ns1:::/)
testListing("/");
// Verify that the "/same" mount point lists the contents of both dirs in
// the same ns
// /same -->
// /target-ns0 (from root of ns0)
// /testdir (from contents of /target-ns0)
testListing("/same");
// List a non-existing path and validate error response with NN behavior
ClientProtocol namenodeProtocol =
getCluster().getRandomNamenode().getClient().getNamenode();
Method m = ClientProtocol.class.getMethod(
"getListing", String.class, byte[].class, boolean.class);
String badPath = "/unknownlocation/unknowndir";
compareResponses(getRouterProtocol(), namenodeProtocol, m,
new Object[] {badPath, HdfsFileStatus.EMPTY_NAME, false});
}
@Override
public void testProxyRenameFiles() throws IOException, InterruptedException {
super.testProxyRenameFiles();
List<String> nss = getCluster().getNameservices();
String ns0 = nss.get(0);
String ns1 = nss.get(1);
// Rename a file from ns0 into the root (mapped to both ns0 and ns1)
String testDir0 = getCluster().getFederatedTestDirectoryForNS(ns0);
String filename0 = testDir0 + "/testrename";
String renamedFile = "/testrename";
testRename(getRouterContext(), filename0, renamedFile, false);
testRename2(getRouterContext(), filename0, renamedFile, false);
// Rename a file from ns1 into the root (mapped to both ns0 and ns1)
String testDir1 = getCluster().getFederatedTestDirectoryForNS(ns1);
String filename1 = testDir1 + "/testrename";
testRename(getRouterContext(), filename1, renamedFile, false);
testRename2(getRouterContext(), filename1, renamedFile, false);
}
}