HDFS-14249. RBF: Tooling to identify the subcluster location of a file. Contributed by Inigo Goiri.

This commit is contained in:
Giovanni Matteo Fumarola 2019-02-20 11:08:55 -08:00 committed by Brahma Reddy Battula
parent 50aee18a84
commit 8b8ff5ccbc
17 changed files with 628 additions and 5 deletions

View File

@ -31,6 +31,8 @@
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.EnterSafeModeResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetDisabledNameservicesRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetDisabledNameservicesResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetDestinationRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetDestinationResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetMountTableEntriesRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetMountTableEntriesResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetSafeModeRequestProto;
@ -54,6 +56,8 @@
import org.apache.hadoop.hdfs.server.federation.store.protocol.EnterSafeModeResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDisabledNameservicesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDisabledNameservicesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDestinationRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDestinationResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeRequest;
@ -76,6 +80,8 @@
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.EnterSafeModeResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetDisabledNameservicesRequestPBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetDisabledNameservicesResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetDestinationRequestPBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetDestinationResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetMountTableEntriesRequestPBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetMountTableEntriesResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetSafeModeRequestPBImpl;
@ -298,4 +304,20 @@ public RefreshMountTableEntriesResponseProto refreshMountTableEntries(
throw new ServiceException(e);
}
}
@Override
public GetDestinationResponseProto getDestination(
RpcController controller, GetDestinationRequestProto request)
throws ServiceException {
try {
GetDestinationRequest req =
new GetDestinationRequestPBImpl(request);
GetDestinationResponse response = server.getDestination(req);
GetDestinationResponsePBImpl responsePB =
(GetDestinationResponsePBImpl)response;
return responsePB.getProto();
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -32,6 +32,8 @@
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.EnterSafeModeResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetDisabledNameservicesRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetDisabledNameservicesResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetDestinationRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetDestinationResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetMountTableEntriesRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetMountTableEntriesResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetSafeModeRequestProto;
@ -57,6 +59,8 @@
import org.apache.hadoop.hdfs.server.federation.store.protocol.EnterSafeModeResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDisabledNameservicesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDisabledNameservicesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDestinationRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDestinationResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeRequest;
@ -77,6 +81,8 @@
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.EnableNameserviceResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.EnterSafeModeResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetDisabledNameservicesResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetDestinationRequestPBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetDestinationResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetMountTableEntriesRequestPBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetMountTableEntriesResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.GetSafeModeResponsePBImpl;
@ -288,4 +294,19 @@ public RefreshMountTableEntriesResponse refreshMountTableEntries(
throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
}
}
@Override
public GetDestinationResponse getDestination(
GetDestinationRequest request) throws IOException {
GetDestinationRequestPBImpl requestPB =
(GetDestinationRequestPBImpl) request;
GetDestinationRequestProto proto = requestPB.getProto();
try {
GetDestinationResponseProto response =
rpcProxy.getDestination(null, proto);
return new GetDestinationResponsePBImpl(response);
} catch (ServiceException e) {
throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
}
}
}

View File

@ -129,7 +129,7 @@ public void startOp() {
public long proxyOp() {
PROXY_TIME.set(monotonicNow());
long processingTime = getProcessingTime();
if (processingTime >= 0) {
if (metrics != null && processingTime >= 0) {
metrics.addProcessingTime(processingTime);
}
return Thread.currentThread().getId();
@ -139,7 +139,7 @@ public long proxyOp() {
public void proxyOpComplete(boolean success) {
if (success) {
long proxyTime = getProxyTime();
if (proxyTime >= 0) {
if (metrics != null && proxyTime >= 0) {
metrics.addProxyTime(proxyTime);
}
}
@ -147,7 +147,9 @@ public void proxyOpComplete(boolean success) {
@Override
public void proxyOpFailureStandby() {
metrics.incrProxyOpFailureStandby();
if (metrics != null) {
metrics.incrProxyOpFailureStandby();
}
}
@Override

View File

@ -21,6 +21,8 @@
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDestinationRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDestinationResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
@ -93,4 +95,14 @@ GetMountTableEntriesResponse getMountTableEntries(
*/
RefreshMountTableEntriesResponse refreshMountTableEntries(
RefreshMountTableEntriesRequest request) throws IOException;
/**
* Get the destination subcluster (namespace) of a file/directory.
*
* @param request Fully populated request object including the file to check.
* @return The response including the subcluster where the input file is.
* @throws IOException Throws exception if the data store is not initialized.
*/
GetDestinationResponse getDestination(
GetDestinationRequest request) throws IOException;
}

View File

@ -23,7 +23,10 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.google.common.base.Preconditions;
@ -39,6 +42,7 @@
import org.apache.hadoop.hdfs.protocolPB.RouterPolicyProvider;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.store.DisabledNameserviceStore;
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreCache;
@ -52,6 +56,8 @@
import org.apache.hadoop.hdfs.server.federation.store.protocol.EnterSafeModeResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDisabledNameservicesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDisabledNameservicesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDestinationRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDestinationResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeRequest;
@ -378,6 +384,36 @@ public RefreshMountTableEntriesResponse refreshMountTableEntries(
}
}
@Override
public GetDestinationResponse getDestination(
GetDestinationRequest request) throws IOException {
final String src = request.getSrcPath();
final List<String> nsIds = new ArrayList<>();
RouterRpcServer rpcServer = this.router.getRpcServer();
List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, false);
RouterRpcClient rpcClient = rpcServer.getRPCClient();
RemoteMethod method = new RemoteMethod("getFileInfo",
new Class<?>[] {String.class}, new RemoteParam());
try {
Map<RemoteLocation, HdfsFileStatus> responses =
rpcClient.invokeConcurrent(
locations, method, false, false, HdfsFileStatus.class);
for (RemoteLocation location : locations) {
if (responses.get(location) != null) {
nsIds.add(location.getNameserviceId());
}
}
} catch (IOException ioe) {
LOG.error("Cannot get location for {}: {}",
src, ioe.getMessage());
}
if (nsIds.isEmpty() && !locations.isEmpty()) {
String nsId = locations.get(0).getNameserviceId();
nsIds.add(nsId);
}
return GetDestinationResponse.newInstance(nsIds);
}
/**
* Verify if Router set safe mode state correctly.
* @param isInSafeMode Expected state to be set.

View File

@ -31,6 +31,8 @@
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDestinationRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDestinationResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest;
@ -169,4 +171,9 @@ public RefreshMountTableEntriesResponse refreshMountTableEntries(
return response;
}
@Override
public GetDestinationResponse getDestination(
GetDestinationRequest request) throws IOException {
throw new UnsupportedOperationException("Requires the RouterRpcServer");
}
}

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
/**
* API request for getting the destination subcluster of a file.
*/
public abstract class GetDestinationRequest {
public static GetDestinationRequest newInstance()
throws IOException {
return StateStoreSerializer
.newRecord(GetDestinationRequest.class);
}
public static GetDestinationRequest newInstance(String srcPath)
throws IOException {
GetDestinationRequest request = newInstance();
request.setSrcPath(srcPath);
return request;
}
public static GetDestinationRequest newInstance(Path srcPath)
throws IOException {
return newInstance(srcPath.toString());
}
@Public
@Unstable
public abstract String getSrcPath();
@Public
@Unstable
public abstract void setSrcPath(String srcPath);
}

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
/**
* API response for getting the destination subcluster of a file.
*/
public abstract class GetDestinationResponse {
public static GetDestinationResponse newInstance()
throws IOException {
return StateStoreSerializer
.newRecord(GetDestinationResponse.class);
}
public static GetDestinationResponse newInstance(
Collection<String> nsIds) throws IOException {
GetDestinationResponse request = newInstance();
request.setDestinations(nsIds);
return request;
}
@Public
@Unstable
public abstract Collection<String> getDestinations();
@Public
@Unstable
public void setDestination(String nsId) {
setDestinations(Collections.singletonList(nsId));
}
@Public
@Unstable
public abstract void setDestinations(Collection<String> nsIds);
}

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
import java.io.IOException;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetDestinationRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetDestinationRequestProtoOrBuilder;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetDestinationRequestProto.Builder;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDestinationRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
import com.google.protobuf.Message;
/**
* Protobuf implementation of the state store API object
* GetDestinationRequest.
*/
public class GetDestinationRequestPBImpl extends GetDestinationRequest
implements PBRecord {
private FederationProtocolPBTranslator<GetDestinationRequestProto,
Builder, GetDestinationRequestProtoOrBuilder> translator =
new FederationProtocolPBTranslator<>(
GetDestinationRequestProto.class);
public GetDestinationRequestPBImpl() {
}
public GetDestinationRequestPBImpl(GetDestinationRequestProto proto) {
this.translator.setProto(proto);
}
@Override
public GetDestinationRequestProto getProto() {
return this.translator.build();
}
@Override
public void setProto(Message proto) {
this.translator.setProto(proto);
}
@Override
public void readInstance(String base64String) throws IOException {
this.translator.readInstance(base64String);
}
@Override
public String getSrcPath() {
return this.translator.getProtoOrBuilder().getSrcPath();
}
@Override
public void setSrcPath(String path) {
this.translator.getBuilder().setSrcPath(path);
}
}

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetDestinationResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetDestinationResponseProto.Builder;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.GetDestinationResponseProtoOrBuilder;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDestinationResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
import com.google.protobuf.Message;
/**
* Protobuf implementation of the state store API object
* GetDestinationResponse.
*/
public class GetDestinationResponsePBImpl
extends GetDestinationResponse implements PBRecord {
private FederationProtocolPBTranslator<GetDestinationResponseProto,
Builder, GetDestinationResponseProtoOrBuilder> translator =
new FederationProtocolPBTranslator<>(
GetDestinationResponseProto.class);
public GetDestinationResponsePBImpl() {
}
public GetDestinationResponsePBImpl(
GetDestinationResponseProto proto) {
this.translator.setProto(proto);
}
@Override
public GetDestinationResponseProto getProto() {
// if builder is null build() returns null, calling getBuilder() to
// instantiate builder
this.translator.getBuilder();
return this.translator.build();
}
@Override
public void setProto(Message proto) {
this.translator.setProto(proto);
}
@Override
public void readInstance(String base64String) throws IOException {
this.translator.readInstance(base64String);
}
@Override
public Collection<String> getDestinations() {
return new ArrayList<>(
this.translator.getProtoOrBuilder().getDestinationsList());
}
@Override
public void setDestinations(Collection<String> nsIds) {
this.translator.getBuilder().clearDestinations();
for (String nsId : nsIds) {
this.translator.getBuilder().addDestinations(nsId);
}
}
}

View File

@ -52,6 +52,8 @@
import org.apache.hadoop.hdfs.server.federation.store.protocol.EnterSafeModeResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDisabledNameservicesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDisabledNameservicesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDestinationRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDestinationResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeRequest;
@ -117,7 +119,8 @@ private void printUsage(String cmd) {
private String getUsage(String cmd) {
if (cmd == null) {
String[] commands =
{"-add", "-update", "-rm", "-ls", "-setQuota", "-clrQuota",
{"-add", "-update", "-rm", "-ls", "-getDestination",
"-setQuota", "-clrQuota",
"-safemode", "-nameservice", "-getDisabledNameservices",
"-refresh"};
StringBuilder usage = new StringBuilder();
@ -143,6 +146,8 @@ private String getUsage(String cmd) {
return "\t[-rm <source>]";
} else if (cmd.equals("-ls")) {
return "\t[-ls <path>]";
} else if (cmd.equals("-getDestination")) {
return "\t[-getDestination <path>]";
} else if (cmd.equals("-setQuota")) {
return "\t[-setQuota <path> -nsQuota <nsQuota> -ssQuota "
+ "<quota in bytes or quota size string>]";
@ -172,6 +177,11 @@ private void validateMax(String[] arg) {
throw new IllegalArgumentException(
"Too many arguments, Max=1 argument allowed");
}
} else if (arg[0].equals("-getDestination")) {
if (arg.length > 2) {
throw new IllegalArgumentException(
"Too many arguments, Max=1 argument allowed only");
}
} else if (arg[0].equals("-safemode")) {
if (arg.length > 2) {
throw new IllegalArgumentException(
@ -208,6 +218,10 @@ private boolean validateMin(String[] argv) {
if (argv.length < 2) {
return false;
}
} else if ("-getDestination".equals(cmd)) {
if (argv.length < 2) {
return false;
}
} else if ("-setQuota".equals(cmd)) {
if (argv.length < 4) {
return false;
@ -302,6 +316,8 @@ public int run(String[] argv) throws Exception {
} else {
listMounts("/");
}
} else if ("-getDestination".equals(cmd)) {
getDestination(argv[i]);
} else if ("-setQuota".equals(cmd)) {
if (setQuota(argv, i)) {
System.out.println(
@ -709,6 +725,16 @@ private static void printMounts(List<MountTable> entries) {
}
}
private void getDestination(String path) throws IOException {
path = normalizeFileSystemPath(path);
MountTableManager mountTable = client.getMountTableManager();
GetDestinationRequest request =
GetDestinationRequest.newInstance(path);
GetDestinationResponse response = mountTable.getDestination(request);
System.out.println("Destination: " +
StringUtils.join(",", response.getDestinations()));
}
/**
* Set quota for a mount table entry.
*

View File

@ -175,6 +175,14 @@ message GetMountTableEntriesResponseProto {
optional uint64 timestamp = 2;
}
message GetDestinationRequestProto {
optional string srcPath = 1;
}
message GetDestinationResponseProto {
repeated string destinations = 1;
}
/////////////////////////////////////////////////
// Routers

View File

@ -79,4 +79,9 @@ service RouterAdminProtocolService {
* Refresh mount entries
*/
rpc refreshMountTableEntries(RefreshMountTableEntriesRequestProto) returns(RefreshMountTableEntriesResponseProto);
/**
* Get the destination of a file/directory in the federation.
*/
rpc getDestination(GetDestinationRequestProto) returns (GetDestinationResponseProto);
}

View File

@ -261,6 +261,10 @@ RANDOM can be used for reading and writing data from/into different subclusters.
The common use for this approach is to have the same data in multiple subclusters and balance the reads across subclusters.
For example, if thousands of containers need to read the same data (e.g., a library), one can use RANDOM to read the data from any of the subclusters.
To determine which subcluster contains a file:
[hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -getDestination /user/user1/file.txt
Note that consistency of the data across subclusters is not guaranteed by the Router.
### Disabling nameservices

View File

@ -26,6 +26,11 @@
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
@ -36,6 +41,8 @@
import org.apache.hadoop.hdfs.server.federation.metrics.FederationMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
@ -78,7 +85,8 @@ public class TestRouterAdminCLI {
@BeforeClass
public static void globalSetUp() throws Exception {
cluster = new StateStoreDFSCluster(false, 1);
cluster = new StateStoreDFSCluster(false, 1,
MultipleDestinationMountTableResolver.class);
// Build and start a router with State Store + admin + RPC
Configuration conf = new RouterConfigBuilder()
.stateStore()
@ -550,6 +558,11 @@ public void testInvalidArgumentMessage() throws Exception {
.contains("\t[-nameservice enable | disable <nameservice>]"));
out.reset();
argv = new String[] {"-getDestination"};
assertEquals(-1, ToolRunner.run(admin, argv));
assertTrue(out.toString().contains("\t[-getDestination <path>]"));
out.reset();
argv = new String[] {"-Random"};
assertEquals(-1, ToolRunner.run(admin, argv));
String expected = "Usage: hdfs dfsrouteradmin :\n"
@ -560,6 +573,7 @@ public void testInvalidArgumentMessage() throws Exception {
+ "<destination> " + "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
+ "-owner <owner> -group <group> -mode <mode>]\n" + "\t[-rm <source>]\n"
+ "\t[-ls <path>]\n"
+ "\t[-getDestination <path>]\n"
+ "\t[-setQuota <path> -nsQuota <nsQuota> -ssQuota "
+ "<quota in bytes or quota size string>]\n" + "\t[-clrQuota <path>]\n"
+ "\t[-safemode enter | leave | get]\n"
@ -1091,4 +1105,52 @@ private void testUpdateOrderMountTable(DestinationOrder order)
assertEquals(dest, mountTable.getDestinations().get(0).getDest());
assertEquals(order, mountTable.getDestOrder());
}
@Test
public void testGetDestination() throws Exception {
// Test the basic destination feature
System.setOut(new PrintStream(out));
String[] argv = new String[] {"-getDestination", "/file.txt"};
assertEquals(0, ToolRunner.run(admin, argv));
assertEquals("Destination: ns0" + System.lineSeparator(), out.toString());
// Add a HASH_ALL entry to check the destination changing
argv = new String[] {"-add", "/testGetDest", "ns0,ns1",
"/testGetDestination",
"-order", DestinationOrder.HASH_ALL.toString()};
assertEquals(0, ToolRunner.run(admin, argv));
stateStore.loadCache(MountTableStoreImpl.class, true);
MountTableResolver resolver =
(MountTableResolver) router.getSubclusterResolver();
resolver.loadCache(true);
// Files should be distributed across ns0 and ns1
Map<String, AtomicInteger> counter = new TreeMap<>();
final Pattern p = Pattern.compile("Destination: (.*)");
for (int i = 0; i < 10; i++) {
out.reset();
String filename = "file" + i+ ".txt";
argv = new String[] {"-getDestination", "/testGetDest/" + filename};
assertEquals(0, ToolRunner.run(admin, argv));
String outLine = out.toString();
Matcher m = p.matcher(outLine);
assertTrue(m.find());
String nsId = m.group(1);
if (counter.containsKey(nsId)) {
counter.get(nsId).getAndIncrement();
} else {
counter.put(nsId, new AtomicInteger(1));
}
}
assertEquals("Wrong counter size: " + counter, 2, counter.size());
assertTrue(counter + " should contain ns0", counter.containsKey("ns0"));
assertTrue(counter + " should contain ns1", counter.containsKey("ns1"));
// Bad cases
argv = new String[] {"-getDestination"};
assertEquals(-1, ToolRunner.run(admin, argv));
argv = new String[] {"-getDestination /file1.txt /file2.txt"};
assertEquals(-1, ToolRunner.run(admin, argv));
}
}

View File

@ -23,11 +23,19 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSTestUtil;
@ -41,8 +49,11 @@
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDestinationRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDestinationResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -52,6 +63,8 @@
* Tests router rpc with multiple destination mount table resolver.
*/
public class TestRouterRPCMultipleDestinationMountTableResolver {
private static final List<String> NS_IDS = Arrays.asList("ns0", "ns1");
private static StateStoreDFSCluster cluster;
private static RouterContext routerContext;
private static MountTableResolver resolver;
@ -391,4 +404,135 @@ private boolean addMountTable(final MountTable entry) throws IOException {
return addResponse.getStatus();
}
@Test
public void testGetDestinationHashAll() throws Exception {
testGetDestination(DestinationOrder.HASH_ALL,
Arrays.asList("ns1"),
Arrays.asList("ns1"),
Arrays.asList("ns1", "ns0"));
}
@Test
public void testGetDestinationHash() throws Exception {
testGetDestination(DestinationOrder.HASH,
Arrays.asList("ns1"),
Arrays.asList("ns1"),
Arrays.asList("ns1"));
}
@Test
public void testGetDestinationRandom() throws Exception {
testGetDestination(DestinationOrder.RANDOM,
null, null, Arrays.asList("ns0", "ns1"));
}
/**
* Generic test for getting the destination subcluster.
* @param order DestinationOrder of the mount point.
* @param expectFileLocation Expected subclusters of a file. null for any.
* @param expectNoFileLocation Expected subclusters of a non-existing file.
* @param expectDirLocation Expected subclusters of a nested directory.
* @throws Exception If the test cannot run.
*/
private void testGetDestination(DestinationOrder order,
List<String> expectFileLocation,
List<String> expectNoFileLocation,
List<String> expectDirLocation) throws Exception {
setupOrderMountPath(order);
RouterClient client = routerContext.getAdminClient();
MountTableManager mountTableManager = client.getMountTableManager();
// If the file exists, it should be in the expected subcluster
final String pathFile = "dir/file";
final Path pathRouterFile = new Path("/mount", pathFile);
final Path pathLocalFile = new Path("/tmp", pathFile);
FileStatus fileStatus = routerFs.getFileStatus(pathRouterFile);
assertTrue(fileStatus + " should be a file", fileStatus.isFile());
GetDestinationResponse respFile = mountTableManager.getDestination(
GetDestinationRequest.newInstance(pathRouterFile));
if (expectFileLocation != null) {
assertEquals(expectFileLocation, respFile.getDestinations());
assertPathStatus(expectFileLocation, pathLocalFile, false);
} else {
Collection<String> dests = respFile.getDestinations();
assertPathStatus(dests, pathLocalFile, false);
}
// If the file does not exist, it should give us the expected subclusters
final String pathNoFile = "dir/no-file";
final Path pathRouterNoFile = new Path("/mount", pathNoFile);
final Path pathLocalNoFile = new Path("/tmp", pathNoFile);
LambdaTestUtils.intercept(FileNotFoundException.class,
() -> routerFs.getFileStatus(pathRouterNoFile));
GetDestinationResponse respNoFile = mountTableManager.getDestination(
GetDestinationRequest.newInstance(pathRouterNoFile));
if (expectNoFileLocation != null) {
assertEquals(expectNoFileLocation, respNoFile.getDestinations());
}
assertPathStatus(Collections.emptyList(), pathLocalNoFile, false);
// If the folder exists, it should be in the expected subcluster
final String pathNestedDir = "dir/dir";
final Path pathRouterNestedDir = new Path("/mount", pathNestedDir);
final Path pathLocalNestedDir = new Path("/tmp", pathNestedDir);
FileStatus dirStatus = routerFs.getFileStatus(pathRouterNestedDir);
assertTrue(dirStatus + " should be a directory", dirStatus.isDirectory());
GetDestinationResponse respDir = mountTableManager.getDestination(
GetDestinationRequest.newInstance(pathRouterNestedDir));
assertEqualsCollection(expectDirLocation, respDir.getDestinations());
assertPathStatus(expectDirLocation, pathLocalNestedDir, true);
}
/**
* Assert that the status of a file in the subcluster is the expected one.
* @param expectedLocations Subclusters where the file is expected to exist.
* @param path Path of the file/directory to check.
* @param isDir If the path is expected to be a directory.
* @throws Exception If the file cannot be checked.
*/
private void assertPathStatus(Collection<String> expectedLocations,
Path path, boolean isDir) throws Exception {
for (String nsId : NS_IDS) {
final FileSystem fs = getFileSystem(nsId);
if (expectedLocations.contains(nsId)) {
assertTrue(path + " should exist in " + nsId, fs.exists(path));
final FileStatus status = fs.getFileStatus(path);
if (isDir) {
assertTrue(path + " should be a directory", status.isDirectory());
} else {
assertTrue(path + " should be a file", status.isFile());
}
} else {
assertFalse(path + " should not exist in " + nsId, fs.exists(path));
}
}
}
/**
* Assert if two collections are equal without checking the order.
* @param col1 First collection to compare.
* @param col2 Second collection to compare.
*/
private static void assertEqualsCollection(
Collection<String> col1, Collection<String> col2) {
assertEquals(new TreeSet<>(col1), new TreeSet<>(col2));
}
/**
* Get the filesystem for each subcluster.
* @param nsId Identifier of the name space (subcluster).
* @return The FileSystem for
*/
private static FileSystem getFileSystem(final String nsId) {
if (nsId.equals("ns0")) {
return nnFs0;
}
if (nsId.equals("ns1")) {
return nnFs1;
}
return null;
}
}

View File

@ -432,6 +432,7 @@ Usage:
[-update <source> <nameservice1, nameservice2, ...> <destination> [-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] -owner <owner> -group <group> -mode <mode>]
[-rm <source>]
[-ls <path>]
[-getDestination <path>]
[-setQuota <path> -nsQuota <nsQuota> -ssQuota <quota in bytes or quota size string>]
[-clrQuota <path>]
[-safemode enter | leave | get]
@ -446,6 +447,7 @@ Usage:
| `-update` *source* *nameservices* *destination* | Update a mount table entry or create one if it does not exist. |
| `-rm` *source* | Remove mount point of specified path. |
| `-ls` *path* | List mount points under specified path. |
| `-getDestination` *path* | Get the subcluster where a file is or should be created. |
| `-setQuota` *path* `-nsQuota` *nsQuota* `-ssQuota` *ssQuota* | Set quota for specified path. See [HDFS Quotas Guide](./HdfsQuotaAdminGuide.html) for the quota detail. |
| `-clrQuota` *path* | Clear quota of given mount point. See [HDFS Quotas Guide](./HdfsQuotaAdminGuide.html) for the quota detail. |
| `-safemode` `enter` `leave` `get` | Manually set the Router entering or leaving safe mode. The option *get* will be used for verifying if the Router is in safe mode state. |