HDFS-13364. RBF: Support NamenodeProtocol in the Router. Contributed by Inigo Goiri.

This commit is contained in:
Yiqun Lin 2018-04-03 15:08:40 +08:00
parent 1077392eaa
commit 2be64eb201
11 changed files with 702 additions and 69 deletions

View File

@ -17,8 +17,9 @@
*/
package org.apache.hadoop.hdfs.server.federation.router;
import java.net.InetSocketAddress;
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.ipc.RPC;
/**
@ -26,18 +27,24 @@
* a connection, it increments a counter to mark it as active. Once the client
* is done with the connection, it decreases the counter. It also takes care of
* closing the connection once is not active.
*
* The protocols currently used are:
* <ul>
* <li>{@link org.apache.hadoop.hdfs.protocol.ClientProtocol}
* <li>{@link org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol}
* </ul>
*/
public class ConnectionContext {
/** Client for the connection. */
private final ProxyAndInfo<ClientProtocol> client;
private final ProxyAndInfo<?> client;
/** How many threads are using this connection. */
private int numThreads = 0;
/** If the connection is closed. */
private boolean closed = false;
public ConnectionContext(ProxyAndInfo<ClientProtocol> connection) {
public ConnectionContext(ProxyAndInfo<?> connection) {
this.client = connection;
}
@ -74,7 +81,7 @@ public synchronized boolean isUsable() {
*
* @return Connection client.
*/
public synchronized ProxyAndInfo<ClientProtocol> getClient() {
public synchronized ProxyAndInfo<?> getClient() {
this.numThreads++;
return this.client;
}
@ -96,9 +103,27 @@ public synchronized void release() {
public synchronized void close() {
this.closed = true;
if (this.numThreads == 0) {
ClientProtocol proxy = this.client.getProxy();
Object proxy = this.client.getProxy();
// Nobody should be using this anymore so it should close right away
RPC.stopProxy(proxy);
}
}
@Override
public String toString() {
InetSocketAddress addr = this.client.getAddress();
Object proxy = this.client.getProxy();
Class<?> clazz = proxy.getClass();
StringBuilder sb = new StringBuilder();
sb.append(clazz.getSimpleName());
sb.append("@");
sb.append(addr);
sb.append("x");
sb.append(numThreads);
if (closed) {
sb.append("[CLOSED]");
}
return sb.toString();
}
}

View File

@ -166,11 +166,12 @@ public void close() {
*
* @param ugi User group information.
* @param nnAddress Namenode address for the connection.
* @param protocol Protocol for the connection.
* @return Proxy client to connect to nnId as UGI.
* @throws IOException If the connection cannot be obtained.
*/
public ConnectionContext getConnection(
UserGroupInformation ugi, String nnAddress) throws IOException {
public ConnectionContext getConnection(UserGroupInformation ugi,
String nnAddress, Class<?> protocol) throws IOException {
// Check if the manager is shutdown
if (!this.running) {
@ -181,7 +182,8 @@ public ConnectionContext getConnection(
}
// Try to get the pool if created
ConnectionPoolId connectionId = new ConnectionPoolId(ugi, nnAddress);
ConnectionPoolId connectionId =
new ConnectionPoolId(ugi, nnAddress, protocol);
ConnectionPool pool = null;
readLock.lock();
try {
@ -197,7 +199,7 @@ public ConnectionContext getConnection(
pool = this.pools.get(connectionId);
if (pool == null) {
pool = new ConnectionPool(
this.conf, nnAddress, ugi, this.minSize, this.maxSize);
this.conf, nnAddress, ugi, this.minSize, this.maxSize, protocol);
this.pools.put(connectionId, pool);
}
} finally {

View File

@ -38,6 +38,9 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryUtils;
@ -75,6 +78,8 @@ public class ConnectionPool {
private final String namenodeAddress;
/** User for this connections. */
private final UserGroupInformation ugi;
/** Class of the protocol. */
private final Class<?> protocol;
/** Pool of connections. We mimic a COW array. */
private volatile List<ConnectionContext> connections = new ArrayList<>();
@ -91,16 +96,17 @@ public class ConnectionPool {
protected ConnectionPool(Configuration config, String address,
UserGroupInformation user, int minPoolSize, int maxPoolSize)
throws IOException {
UserGroupInformation user, int minPoolSize, int maxPoolSize,
Class<?> proto) throws IOException {
this.conf = config;
// Connection pool target
this.ugi = user;
this.namenodeAddress = address;
this.protocol = proto;
this.connectionPoolId =
new ConnectionPoolId(this.ugi, this.namenodeAddress);
new ConnectionPoolId(this.ugi, this.namenodeAddress, this.protocol);
// Set configuration parameters for the pool
this.minSize = minPoolSize;
@ -287,7 +293,8 @@ public String getJSON() {
* @throws IOException
*/
public ConnectionContext newConnection() throws IOException {
return newConnection(this.conf, this.namenodeAddress, this.ugi);
return newConnection(
this.conf, this.namenodeAddress, this.ugi, this.protocol);
}
/**
@ -299,12 +306,46 @@ public ConnectionContext newConnection() throws IOException {
* @param conf Configuration for the connection.
* @param nnAddress Address of server supporting the ClientProtocol.
* @param ugi User context.
* @return Proxy for the target ClientProtocol that contains the user's
* @param proto Interface of the protocol.
* @return proto for the target ClientProtocol that contains the user's
* security context.
* @throws IOException If it cannot be created.
*/
protected static ConnectionContext newConnection(Configuration conf,
String nnAddress, UserGroupInformation ugi)
String nnAddress, UserGroupInformation ugi, Class<?> proto)
throws IOException {
ConnectionContext ret;
if (proto == ClientProtocol.class) {
ret = newClientConnection(conf, nnAddress, ugi);
} else if (proto == NamenodeProtocol.class) {
ret = newNamenodeConnection(conf, nnAddress, ugi);
} else {
String msg = "Unsupported protocol for connection to NameNode: " +
((proto != null) ? proto.getClass().getName() : "null");
LOG.error(msg);
throw new IllegalStateException(msg);
}
return ret;
}
/**
* Creates a proxy wrapper for a client NN connection. Each proxy contains
* context for a single user/security context. To maximize throughput it is
* recommended to use multiple connection per user+server, allowing multiple
* writes and reads to be dispatched in parallel.
*
* Mostly based on NameNodeProxies#createNonHAProxy() but it needs the
* connection identifier.
*
* @param conf Configuration for the connection.
* @param nnAddress Address of server supporting the ClientProtocol.
* @param ugi User context.
* @return Proxy for the target ClientProtocol that contains the user's
* security context.
* @throws IOException If it cannot be created.
*/
private static ConnectionContext newClientConnection(
Configuration conf, String nnAddress, UserGroupInformation ugi)
throws IOException {
RPC.setProtocolEngine(
conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
@ -334,4 +375,49 @@ protected static ConnectionContext newConnection(Configuration conf,
ConnectionContext connection = new ConnectionContext(clientProxy);
return connection;
}
/**
* Creates a proxy wrapper for a NN connection. Each proxy contains context
* for a single user/security context. To maximize throughput it is
* recommended to use multiple connection per user+server, allowing multiple
* writes and reads to be dispatched in parallel.
*
* @param conf Configuration for the connection.
* @param nnAddress Address of server supporting the ClientProtocol.
* @param ugi User context.
* @return Proxy for the target NamenodeProtocol that contains the user's
* security context.
* @throws IOException If it cannot be created.
*/
private static ConnectionContext newNamenodeConnection(
Configuration conf, String nnAddress, UserGroupInformation ugi)
throws IOException {
RPC.setProtocolEngine(
conf, NamenodeProtocolPB.class, ProtobufRpcEngine.class);
final RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy(
conf,
HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY,
HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT,
HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY,
HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT,
HdfsConstants.SAFEMODE_EXCEPTION_CLASS_NAME);
SocketFactory factory = SocketFactory.getDefault();
if (UserGroupInformation.isSecurityEnabled()) {
SaslRpcServer.init(conf);
}
InetSocketAddress socket = NetUtils.createSocketAddr(nnAddress);
final long version = RPC.getProtocolVersion(NamenodeProtocolPB.class);
NamenodeProtocolPB proxy = RPC.getProtocolProxy(NamenodeProtocolPB.class,
version, socket, ugi, conf,
factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy();
NamenodeProtocol client = new NamenodeProtocolTranslatorPB(proxy);
Text dtService = SecurityUtil.buildTokenService(socket);
ProxyAndInfo<NamenodeProtocol> clientProxy =
new ProxyAndInfo<NamenodeProtocol>(client, dtService, socket);
ConnectionContext connection = new ConnectionContext(clientProxy);
return connection;
}
}

View File

@ -42,16 +42,21 @@ public class ConnectionPoolId implements Comparable<ConnectionPoolId> {
private final String nnId;
/** Information about the user. */
private final UserGroupInformation ugi;
/** Protocol for the connection. */
private final Class<?> protocol;
/**
* New connection pool identifier.
*
* @param ugi Information of the user issuing the request.
* @param nnId Namenode address with port.
* @param proto Protocol of the connection.
*/
public ConnectionPoolId(final UserGroupInformation ugi, final String nnId) {
public ConnectionPoolId(final UserGroupInformation ugi, final String nnId,
final Class<?> proto) {
this.nnId = nnId;
this.ugi = ugi;
this.protocol = proto;
}
@Override
@ -60,6 +65,7 @@ public int hashCode() {
.append(this.nnId)
.append(this.ugi.toString())
.append(this.getTokenIds())
.append(this.protocol)
.toHashCode();
return hash;
}
@ -76,14 +82,18 @@ public boolean equals(Object o) {
}
String thisTokens = this.getTokenIds().toString();
String otherTokens = other.getTokenIds().toString();
return thisTokens.equals(otherTokens);
if (!thisTokens.equals(otherTokens)) {
return false;
}
return this.protocol.equals(other.protocol);
}
return false;
}
@Override
public String toString() {
return this.ugi + " " + this.getTokenIds() + "->" + this.nnId;
return this.ugi + " " + this.getTokenIds() + "->" + this.nnId + " [" +
this.protocol.getSimpleName() + "]";
}
@Override
@ -97,6 +107,9 @@ public int compareTo(ConnectionPoolId other) {
String otherTokens = other.getTokenIds().toString();
ret = thisTokens.compareTo(otherTokens);
}
if (ret == 0) {
ret = this.protocol.toString().compareTo(other.protocol.toString());
}
return ret;
}

View File

@ -38,22 +38,35 @@ public class RemoteMethod {
private final Object[] params;
/** List of method parameters types, matches parameters. */
private final Class<?>[] types;
/** Class of the protocol for the method. */
private final Class<?> protocol;
/** String name of the ClientProtocol method. */
private final String methodName;
/**
* Create a method with no parameters.
* Create a remote method generator for the ClientProtocol with no parameters.
*
* @param method The string name of the ClientProtocol method.
* @param method The string name of the protocol method.
*/
public RemoteMethod(String method) {
this.params = null;
this.types = null;
this.methodName = method;
this(ClientProtocol.class, method);
}
/**
* Creates a remote method generator.
* Create a method with no parameters.
*
* @param proto Protocol of the method.
* @param method The string name of the ClientProtocol method.
*/
public RemoteMethod(Class<?> proto, String method) {
this.params = null;
this.types = null;
this.methodName = method;
this.protocol = proto;
}
/**
* Create a remote method generator for the ClientProtocol.
*
* @param method The string name of the ClientProtocol method.
* @param pTypes A list of types to use to locate the specific method.
@ -70,16 +83,49 @@ public RemoteMethod(String method) {
*/
public RemoteMethod(String method, Class<?>[] pTypes, Object... pParams)
throws IOException {
this(ClientProtocol.class, method, pTypes, pParams);
}
/**
* Creates a remote method generator.
*
* @param proto Protocol of the method.
* @param method The string name of the ClientProtocol method.
* @param pTypes A list of types to use to locate the specific method.
* @param pParams A list of parameters for the method. The order of the
* parameter list must match the order and number of the types.
* Parameters are grouped into 2 categories:
* <ul>
* <li>Static parameters that are immutable across locations.
* <li>Dynamic parameters that are determined for each location by a
* RemoteParam object. To specify a dynamic parameter, pass an
* instance of RemoteParam in place of the parameter value.
* </ul>
* @throws IOException If the types and parameter lists are not valid.
*/
public RemoteMethod(Class<?> proto, String method, Class<?>[] pTypes,
Object... pParams) throws IOException {
if (pParams.length != pTypes.length) {
throw new IOException("Invalid parameters for method " + method);
}
this.protocol = proto;
this.params = pParams;
this.types = Arrays.copyOf(pTypes, pTypes.length);
this.methodName = method;
}
/**
* Get the interface/protocol for this method. For example, ClientProtocol or
* NamenodeProtocol.
*
* @return Protocol for this method.
*/
public Class<?> getProtocol() {
return this.protocol;
}
/**
* Get the represented java method.
*
@ -89,18 +135,18 @@ public RemoteMethod(String method, Class<?>[] pTypes, Object... pParams)
public Method getMethod() throws IOException {
try {
if (types != null) {
return ClientProtocol.class.getDeclaredMethod(methodName, types);
return protocol.getDeclaredMethod(methodName, types);
} else {
return ClientProtocol.class.getDeclaredMethod(methodName);
return protocol.getDeclaredMethod(methodName);
}
} catch (NoSuchMethodException e) {
// Re-throw as an IOException
LOG.error("Cannot get method {} with types {}",
methodName, Arrays.toString(types), e);
LOG.error("Cannot get method {} with types {} from {}",
methodName, Arrays.toString(types), protocol.getSimpleName(), e);
throw new IOException(e);
} catch (SecurityException e) {
LOG.error("Cannot access method {} with types {}",
methodName, Arrays.toString(types), e);
LOG.error("Cannot access method {} with types {} from {}",
methodName, Arrays.toString(types), protocol.getSimpleName(), e);
throw new IOException(e);
}
}
@ -161,4 +207,10 @@ public Object[] getParams(RemoteLocationContext context) {
}
return objList;
}
@Override
public String toString() {
return this.protocol.getSimpleName() + "#" + this.methodName + " " +
Arrays.toString(this.params);
}
}

View File

@ -0,0 +1,187 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
/**
* Module that implements all the RPC calls in {@link NamenodeProtocol} in the
* {@link RouterRpcServer}.
*/
public class RouterNamenodeProtocol implements NamenodeProtocol {
/** RPC server to receive client calls. */
private final RouterRpcServer rpcServer;
/** RPC clients to connect to the Namenodes. */
private final RouterRpcClient rpcClient;
/** Interface to map global name space to HDFS subcluster name spaces. */
private final FileSubclusterResolver subclusterResolver;
public RouterNamenodeProtocol(RouterRpcServer server) {
this.rpcServer = server;
this.rpcClient = this.rpcServer.getRPCClient();
this.subclusterResolver = this.rpcServer.getSubclusterResolver();
}
@Override
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size,
long minBlockSize) throws IOException {
rpcServer.checkOperation(OperationCategory.READ);
// Get the namespace where the datanode is located
Map<String, DatanodeStorageReport[]> map =
rpcServer.getDatanodeStorageReportMap(DatanodeReportType.ALL);
String nsId = null;
for (Entry<String, DatanodeStorageReport[]> entry : map.entrySet()) {
DatanodeStorageReport[] dns = entry.getValue();
for (DatanodeStorageReport dn : dns) {
DatanodeInfo dnInfo = dn.getDatanodeInfo();
if (dnInfo.getDatanodeUuid().equals(datanode.getDatanodeUuid())) {
nsId = entry.getKey();
break;
}
}
// Break the loop if already found
if (nsId != null) {
break;
}
}
// Forward to the proper namenode
if (nsId != null) {
RemoteMethod method = new RemoteMethod(
NamenodeProtocol.class, "getBlocks",
new Class<?>[] {DatanodeInfo.class, long.class, long.class},
datanode, size, minBlockSize);
return rpcClient.invokeSingle(nsId, method, BlocksWithLocations.class);
}
return null;
}
@Override
public ExportedBlockKeys getBlockKeys() throws IOException {
rpcServer.checkOperation(OperationCategory.READ);
// We return the information from the default name space
String defaultNsId = subclusterResolver.getDefaultNamespace();
RemoteMethod method =
new RemoteMethod(NamenodeProtocol.class, "getBlockKeys");
return rpcClient.invokeSingle(defaultNsId, method, ExportedBlockKeys.class);
}
@Override
public long getTransactionID() throws IOException {
rpcServer.checkOperation(OperationCategory.READ);
// We return the information from the default name space
String defaultNsId = subclusterResolver.getDefaultNamespace();
RemoteMethod method =
new RemoteMethod(NamenodeProtocol.class, "getTransactionID");
return rpcClient.invokeSingle(defaultNsId, method, long.class);
}
@Override
public long getMostRecentCheckpointTxId() throws IOException {
rpcServer.checkOperation(OperationCategory.READ);
// We return the information from the default name space
String defaultNsId = subclusterResolver.getDefaultNamespace();
RemoteMethod method =
new RemoteMethod(NamenodeProtocol.class, "getMostRecentCheckpointTxId");
return rpcClient.invokeSingle(defaultNsId, method, long.class);
}
@Override
public CheckpointSignature rollEditLog() throws IOException {
rpcServer.checkOperation(OperationCategory.WRITE, false);
return null;
}
@Override
public NamespaceInfo versionRequest() throws IOException {
rpcServer.checkOperation(OperationCategory.READ);
// We return the information from the default name space
String defaultNsId = subclusterResolver.getDefaultNamespace();
RemoteMethod method =
new RemoteMethod(NamenodeProtocol.class, "versionRequest");
return rpcClient.invokeSingle(defaultNsId, method, NamespaceInfo.class);
}
@Override
public void errorReport(NamenodeRegistration registration, int errorCode,
String msg) throws IOException {
rpcServer.checkOperation(OperationCategory.UNCHECKED, false);
}
@Override
public NamenodeRegistration registerSubordinateNamenode(
NamenodeRegistration registration) throws IOException {
rpcServer.checkOperation(OperationCategory.WRITE, false);
return null;
}
@Override
public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
throws IOException {
rpcServer.checkOperation(OperationCategory.WRITE, false);
return null;
}
@Override
public void endCheckpoint(NamenodeRegistration registration,
CheckpointSignature sig) throws IOException {
rpcServer.checkOperation(OperationCategory.WRITE, false);
}
@Override
public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
throws IOException {
rpcServer.checkOperation(OperationCategory.READ, false);
return null;
}
@Override
public boolean isUpgradeFinalized() throws IOException {
rpcServer.checkOperation(OperationCategory.READ, false);
return false;
}
@Override
public boolean isRollingUpgrade() throws IOException {
rpcServer.checkOperation(OperationCategory.READ, false);
return false;
}
}

View File

@ -48,7 +48,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
@ -225,14 +224,14 @@ public String getJSON() {
*
* @param ugi User group information.
* @param nsId Nameservice identifier.
* @param rpcAddress ClientProtocol RPC server address of the NN.
* @param rpcAddress RPC server address of the NN.
* @param proto Protocol of the connection.
* @return ConnectionContext containing a ClientProtocol proxy client for the
* NN + current user.
* @throws IOException If we cannot get a connection to the NameNode.
*/
private ConnectionContext getConnection(
UserGroupInformation ugi, String nsId, String rpcAddress)
throws IOException {
private ConnectionContext getConnection(UserGroupInformation ugi, String nsId,
String rpcAddress, Class<?> proto) throws IOException {
ConnectionContext connection = null;
try {
// Each proxy holds the UGI info for the current user when it is created.
@ -242,7 +241,7 @@ private ConnectionContext getConnection(
// for each individual request.
// TODO Add tokens from the federated UGI
connection = this.connectionManager.getConnection(ugi, rpcAddress);
connection = this.connectionManager.getConnection(ugi, rpcAddress, proto);
LOG.debug("User {} NN {} is using connection {}",
ugi.getUserName(), rpcAddress, connection);
} catch (Exception ex) {
@ -326,7 +325,8 @@ private RetryDecision shouldRetry(final IOException ioe, final int retryCount,
private Object invokeMethod(
final UserGroupInformation ugi,
final List<? extends FederationNamenodeContext> namenodes,
final Method method, final Object... params) throws IOException {
final Class<?> protocol, final Method method, final Object... params)
throws IOException {
if (namenodes == null || namenodes.isEmpty()) {
throw new IOException("No namenodes to invoke " + method.getName() +
@ -344,9 +344,10 @@ private Object invokeMethod(
try {
String nsId = namenode.getNameserviceId();
String rpcAddress = namenode.getRpcAddress();
connection = this.getConnection(ugi, nsId, rpcAddress);
ProxyAndInfo<ClientProtocol> client = connection.getClient();
ClientProtocol proxy = client.getProxy();
connection = this.getConnection(ugi, nsId, rpcAddress, protocol);
ProxyAndInfo<?> client = connection.getClient();
final Object proxy = client.getProxy();
ret = invoke(nsId, 0, method, proxy, params);
if (failover) {
// Success on alternate server, update
@ -611,7 +612,29 @@ public Object invokeSingle(final String nsId, RemoteMethod method)
List<? extends FederationNamenodeContext> nns =
getNamenodesForNameservice(nsId);
RemoteLocationContext loc = new RemoteLocation(nsId, "/");
return invokeMethod(ugi, nns, method.getMethod(), method.getParams(loc));
Class<?> proto = method.getProtocol();
Method m = method.getMethod();
Object[] params = method.getParams(loc);
return invokeMethod(ugi, nns, proto, m, params);
}
/**
* Invokes a remote method against the specified namespace.
*
* Re-throws exceptions generated by the remote RPC call as either
* RemoteException or IOException.
*
* @param nsId Target namespace for the method.
* @param method The remote method and parameters to invoke.
* @param clazz Class for the return type.
* @return The result of invoking the method.
* @throws IOException If the invoke generated an error.
*/
public <T> T invokeSingle(final String nsId, RemoteMethod method,
Class<T> clazz) throws IOException {
@SuppressWarnings("unchecked")
T ret = (T)invokeSingle(nsId, method);
return ret;
}
/**
@ -689,8 +712,9 @@ public <T> T invokeSequential(
List<? extends FederationNamenodeContext> namenodes =
getNamenodesForNameservice(ns);
try {
Class<?> proto = remoteMethod.getProtocol();
Object[] params = remoteMethod.getParams(loc);
Object result = invokeMethod(ugi, namenodes, m, params);
Object result = invokeMethod(ugi, namenodes, proto, m, params);
// Check if the result is what we expected
if (isExpectedClass(expectedResultClass, result) &&
isExpectedValue(expectedResultValue, result)) {
@ -914,8 +938,9 @@ public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
String ns = location.getNameserviceId();
final List<? extends FederationNamenodeContext> namenodes =
getNamenodesForNameservice(ns);
Class<?> proto = method.getProtocol();
Object[] paramList = method.getParams(location);
Object result = invokeMethod(ugi, namenodes, m, paramList);
Object result = invokeMethod(ugi, namenodes, proto, m, paramList);
return Collections.singletonMap(location, clazz.cast(result));
}
@ -925,6 +950,7 @@ public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
String nsId = location.getNameserviceId();
final List<? extends FederationNamenodeContext> namenodes =
getNamenodesForNameservice(nsId);
final Class<?> proto = method.getProtocol();
final Object[] paramList = method.getParams(location);
if (standby) {
// Call the objectGetter to all NNs (including standby)
@ -939,7 +965,7 @@ public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
orderedLocations.add(nnLocation);
callables.add(new Callable<Object>() {
public Object call() throws Exception {
return invokeMethod(ugi, nnList, m, paramList);
return invokeMethod(ugi, nnList, proto, m, paramList);
}
});
}
@ -948,7 +974,7 @@ public Object call() throws Exception {
orderedLocations.add(location);
callables.add(new Callable<Object>() {
public Object call() throws Exception {
return invokeMethod(ugi, namenodes, m, paramList);
return invokeMethod(ugi, namenodes, proto, m, paramList);
}
});
}

View File

@ -101,9 +101,13 @@
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
@ -113,11 +117,18 @@
import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
@ -145,7 +156,8 @@
* the requests to the active
* {@link org.apache.hadoop.hdfs.server.namenode.NameNode NameNode}.
*/
public class RouterRpcServer extends AbstractService implements ClientProtocol {
public class RouterRpcServer extends AbstractService
implements ClientProtocol, NamenodeProtocol {
private static final Logger LOG =
LoggerFactory.getLogger(RouterRpcServer.class);
@ -191,6 +203,8 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
private final Quota quotaCall;
/** Erasure coding calls. */
private final ErasureCoding erasureCoding;
/** NamenodeProtocol calls. */
private final RouterNamenodeProtocol nnProto;
/**
@ -243,6 +257,11 @@ public RouterRpcServer(Configuration configuration, Router router,
BlockingService clientNNPbService = ClientNamenodeProtocol
.newReflectiveBlockingService(clientProtocolServerTranslator);
NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator =
new NamenodeProtocolServerSideTranslatorPB(this);
BlockingService nnPbService = NamenodeProtocolService
.newReflectiveBlockingService(namenodeProtocolXlator);
InetSocketAddress confRpcAddress = conf.getSocketAddr(
RBFConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY,
RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY,
@ -261,6 +280,11 @@ public RouterRpcServer(Configuration configuration, Router router,
.setQueueSizePerHandler(handlerQueueSize)
.setVerbose(false)
.build();
// Add all the RPC protocols that the Router implements
DFSUtil.addPBProtocol(
conf, NamenodeProtocolPB.class, nnPbService, this.rpcServer);
// We don't want the server to log the full stack trace for some exceptions
this.rpcServer.addTerseExceptions(
RemoteException.class,
@ -292,6 +316,7 @@ public RouterRpcServer(Configuration configuration, Router router,
// Initialize modules
this.quotaCall = new Quota(this.router, this);
this.erasureCoding = new ErasureCoding(this);
this.nnProto = new RouterNamenodeProtocol(this);
}
@Override
@ -336,6 +361,15 @@ public RouterRpcClient getRPCClient() {
return rpcClient;
}
/**
* Get the subcluster resolver.
*
* @return Subcluster resolver.
*/
public FileSubclusterResolver getSubclusterResolver() {
return subclusterResolver;
}
/**
* Get the RPC monitor and metrics.
*
@ -1349,7 +1383,7 @@ public boolean setSafeMode(SafeModeAction action, boolean isChecked)
action, isChecked);
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
Map<FederationNamespaceInfo, Boolean> results =
rpcClient.invokeConcurrent(nss, method, true, true, boolean.class);
rpcClient.invokeConcurrent(nss, method, true, true, Boolean.class);
// We only report true if all the name space are in safe mode
int numSafemode = 0;
@ -1369,7 +1403,7 @@ public boolean restoreFailedStorage(String arg) throws IOException {
new Class<?>[] {String.class}, arg);
final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
Map<FederationNamespaceInfo, Boolean> ret =
rpcClient.invokeConcurrent(nss, method, true, false, boolean.class);
rpcClient.invokeConcurrent(nss, method, true, false, Boolean.class);
boolean success = true;
for (boolean s : ret.values()) {
@ -2070,6 +2104,77 @@ public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
return null;
}
@Override // NamenodeProtocol
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size,
long minBlockSize) throws IOException {
return nnProto.getBlocks(datanode, size, minBlockSize);
}
@Override // NamenodeProtocol
public ExportedBlockKeys getBlockKeys() throws IOException {
return nnProto.getBlockKeys();
}
@Override // NamenodeProtocol
public long getTransactionID() throws IOException {
return nnProto.getTransactionID();
}
@Override // NamenodeProtocol
public long getMostRecentCheckpointTxId() throws IOException {
return nnProto.getMostRecentCheckpointTxId();
}
@Override // NamenodeProtocol
public CheckpointSignature rollEditLog() throws IOException {
return nnProto.rollEditLog();
}
@Override // NamenodeProtocol
public NamespaceInfo versionRequest() throws IOException {
return nnProto.versionRequest();
}
@Override // NamenodeProtocol
public void errorReport(NamenodeRegistration registration, int errorCode,
String msg) throws IOException {
nnProto.errorReport(registration, errorCode, msg);
}
@Override // NamenodeProtocol
public NamenodeRegistration registerSubordinateNamenode(
NamenodeRegistration registration) throws IOException {
return nnProto.registerSubordinateNamenode(registration);
}
@Override // NamenodeProtocol
public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
throws IOException {
return nnProto.startCheckpoint(registration);
}
@Override // NamenodeProtocol
public void endCheckpoint(NamenodeRegistration registration,
CheckpointSignature sig) throws IOException {
nnProto.endCheckpoint(registration, sig);
}
@Override // NamenodeProtocol
public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
throws IOException {
return nnProto.getEditLogManifest(sinceTxId);
}
@Override // NamenodeProtocol
public boolean isUpgradeFinalized() throws IOException {
return nnProto.isUpgradeFinalized();
}
@Override // NamenodeProtocol
public boolean isRollingUpgrade() throws IOException {
return nnProto.isRollingUpgrade();
}
/**
* Locate the location with the matching block pool id.
*

View File

@ -239,6 +239,10 @@ public DFSClient getClient() throws IOException, URISyntaxException {
}
return client;
}
public Configuration getConf() {
return conf;
}
}
/**
@ -351,6 +355,10 @@ public String getConfSuffix() {
}
return suffix;
}
public Configuration getConf() {
return conf;
}
}
public MiniRouterDFSCluster(

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.server.federation.router;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
@ -68,14 +70,18 @@ public void testCleanup() throws Exception {
Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools();
ConnectionPool pool1 = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10);
conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, ClientProtocol.class);
addConnectionsToPool(pool1, 9, 4);
poolMap.put(new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS), pool1);
poolMap.put(
new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class),
pool1);
ConnectionPool pool2 = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER2, 0, 10);
conf, TEST_NN_ADDRESS, TEST_USER2, 0, 10, ClientProtocol.class);
addConnectionsToPool(pool2, 10, 10);
poolMap.put(new ConnectionPoolId(TEST_USER2, TEST_NN_ADDRESS), pool2);
poolMap.put(
new ConnectionPoolId(TEST_USER2, TEST_NN_ADDRESS, ClientProtocol.class),
pool2);
checkPoolConnections(TEST_USER1, 9, 4);
checkPoolConnections(TEST_USER2, 10, 10);
@ -94,9 +100,11 @@ public void testCleanup() throws Exception {
// Make sure the number of connections doesn't go below minSize
ConnectionPool pool3 = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER3, 2, 10);
conf, TEST_NN_ADDRESS, TEST_USER3, 2, 10, ClientProtocol.class);
addConnectionsToPool(pool3, 8, 0);
poolMap.put(new ConnectionPoolId(TEST_USER3, TEST_NN_ADDRESS), pool3);
poolMap.put(
new ConnectionPoolId(TEST_USER3, TEST_NN_ADDRESS, ClientProtocol.class),
pool3);
checkPoolConnections(TEST_USER3, 10, 0);
for (int i = 0; i < 10; i++) {
connManager.cleanup(pool3);
@ -119,9 +127,41 @@ public void testGetConnection() throws Exception {
int activeConns = 5;
ConnectionPool pool = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10);
conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, ClientProtocol.class);
addConnectionsToPool(pool, totalConns, activeConns);
poolMap.put(new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS), pool);
poolMap.put(
new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class),
pool);
// All remaining connections should be usable
final int remainingSlots = totalConns - activeConns;
for (int i = 0; i < remainingSlots; i++) {
ConnectionContext cc = pool.getConnection();
assertTrue(cc.isUsable());
cc.getClient();
activeConns++;
}
checkPoolConnections(TEST_USER1, totalConns, activeConns);
// Ask for more and this returns an active connection
ConnectionContext cc = pool.getConnection();
assertTrue(cc.isActive());
}
@Test
public void getGetConnectionNamenodeProtocol() throws Exception {
Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools();
final int totalConns = 10;
int activeConns = 5;
ConnectionPool pool = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, NamenodeProtocol.class);
addConnectionsToPool(pool, totalConns, activeConns);
poolMap.put(
new ConnectionPoolId(
TEST_USER1, TEST_NN_ADDRESS, NamenodeProtocol.class),
pool);
// All remaining connections should be usable
final int remainingSlots = totalConns - activeConns;

View File

@ -59,6 +59,7 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
@ -70,16 +71,22 @@
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyState;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
@ -133,6 +140,11 @@ public int compare(
/** Client interface to the Namenode. */
private ClientProtocol nnProtocol;
/** NameNodeProtocol interface to the Router. */
private NamenodeProtocol routerNamenodeProtocol;
/** NameNodeProtocol interface to the Namenode. */
private NamenodeProtocol nnNamenodeProtocol;
/** Filesystem interface to the Router. */
private FileSystem routerFS;
/** Filesystem interface to the Namenode. */
@ -189,22 +201,18 @@ public void testSetup() throws Exception {
// Wait to ensure NN has fully created its test directories
Thread.sleep(100);
// Default namenode and random router for this test
this.router = cluster.getRandomRouter();
this.ns = cluster.getNameservices().get(0);
this.namenode = cluster.getNamenode(ns, null);
// Random router for this test
RouterContext rndRouter = cluster.getRandomRouter();
this.setRouter(rndRouter);
// Handles to the ClientProtocol interface
this.routerProtocol = router.getClient().getNamenode();
this.nnProtocol = namenode.getClient().getNamenode();
// Handles to the filesystem client
this.nnFS = namenode.getFileSystem();
this.routerFS = router.getFileSystem();
// Pick a namenode for this test
String ns0 = cluster.getNameservices().get(0);
this.setNs(ns0);
this.setNamenode(cluster.getNamenode(ns0, null));
// Create a test file on the NN
Random r = new Random();
String randomFile = "testfile-" + r.nextInt();
Random rnd = new Random();
String randomFile = "testfile-" + rnd.nextInt();
this.nnFile =
cluster.getNamenodeTestDirectoryForNS(ns) + "/" + randomFile;
this.routerFile =
@ -245,6 +253,8 @@ protected void setRouter(RouterContext r)
this.router = r;
this.routerProtocol = r.getClient().getNamenode();
this.routerFS = r.getFileSystem();
this.routerNamenodeProtocol = NameNodeProxies.createProxy(router.getConf(),
router.getFileSystem().getUri(), NamenodeProtocol.class).getProxy();
}
protected FileSystem getRouterFileSystem() {
@ -288,6 +298,12 @@ protected void setNamenode(NamenodeContext nn)
this.namenode = nn;
this.nnProtocol = nn.getClient().getNamenode();
this.nnFS = nn.getFileSystem();
// Namenode from the default namespace
String ns0 = cluster.getNameservices().get(0);
NamenodeContext nn0 = cluster.getNamenode(ns0, null);
this.nnNamenodeProtocol = NameNodeProxies.createProxy(nn0.getConf(),
nn0.getFileSystem().getUri(), NamenodeProtocol.class).getProxy();
}
protected String getNs() {
@ -932,6 +948,79 @@ public void testProxyGetFileInfoAcessException() throws IOException {
assertEquals(routerFailure.getClass(), nnFailure.getClass());
}
@Test
public void testProxyVersionRequest() throws Exception {
NamespaceInfo rVersion = routerNamenodeProtocol.versionRequest();
NamespaceInfo nnVersion = nnNamenodeProtocol.versionRequest();
assertEquals(nnVersion.getBlockPoolID(), rVersion.getBlockPoolID());
assertEquals(nnVersion.getNamespaceID(), rVersion.getNamespaceID());
assertEquals(nnVersion.getClusterID(), rVersion.getClusterID());
assertEquals(nnVersion.getLayoutVersion(), rVersion.getLayoutVersion());
assertEquals(nnVersion.getCTime(), rVersion.getCTime());
}
@Test
public void testProxyGetBlockKeys() throws Exception {
ExportedBlockKeys rKeys = routerNamenodeProtocol.getBlockKeys();
ExportedBlockKeys nnKeys = nnNamenodeProtocol.getBlockKeys();
assertEquals(nnKeys.getCurrentKey(), rKeys.getCurrentKey());
assertEquals(nnKeys.getKeyUpdateInterval(), rKeys.getKeyUpdateInterval());
assertEquals(nnKeys.getTokenLifetime(), rKeys.getTokenLifetime());
}
@Test
public void testProxyGetBlocks() throws Exception {
// Get datanodes
DatanodeInfo[] dns =
routerProtocol.getDatanodeReport(DatanodeReportType.ALL);
DatanodeInfo dn0 = dns[0];
// Verify that checking that datanode works
BlocksWithLocations routerBlockLocations =
routerNamenodeProtocol.getBlocks(dn0, 1024, 0);
BlocksWithLocations nnBlockLocations =
nnNamenodeProtocol.getBlocks(dn0, 1024, 0);
BlockWithLocations[] routerBlocks = routerBlockLocations.getBlocks();
BlockWithLocations[] nnBlocks = nnBlockLocations.getBlocks();
assertEquals(nnBlocks.length, routerBlocks.length);
for (int i = 0; i < routerBlocks.length; i++) {
assertEquals(
nnBlocks[i].getBlock().getBlockId(),
routerBlocks[i].getBlock().getBlockId());
}
}
@Test
public void testProxyGetTransactionID() throws IOException {
long routerTransactionID = routerNamenodeProtocol.getTransactionID();
long nnTransactionID = nnNamenodeProtocol.getTransactionID();
assertEquals(nnTransactionID, routerTransactionID);
}
@Test
public void testProxyGetMostRecentCheckpointTxId() throws IOException {
long routerCheckPointId =
routerNamenodeProtocol.getMostRecentCheckpointTxId();
long nnCheckPointId = nnNamenodeProtocol.getMostRecentCheckpointTxId();
assertEquals(nnCheckPointId, routerCheckPointId);
}
@Test
public void testProxySetSafemode() throws Exception {
boolean routerSafemode =
routerProtocol.setSafeMode(SafeModeAction.SAFEMODE_GET, false);
boolean nnSafemode =
nnProtocol.setSafeMode(SafeModeAction.SAFEMODE_GET, false);
assertEquals(nnSafemode, routerSafemode);
}
@Test
public void testProxyRestoreFailedStorage() throws Exception {
boolean routerSuccess = routerProtocol.restoreFailedStorage("check");
boolean nnSuccess = nnProtocol.restoreFailedStorage("check");
assertEquals(nnSuccess, routerSuccess);
}
@Test
public void testErasureCoding() throws IOException {