HDFS-16978. RBF: Admin command to support bulk add of mount points (#5554). Contributed by Viraj Jasani.

Reviewed-by: Inigo Goiri <inigoiri@apache.org>
Reviewed-by: Simbarashe Dzinamarira <sdzinamarira@linkedin.com>
Signed-off-by: Ayush Saxena <ayushsaxena@apache.org>
This commit is contained in:
Viraj Jasani 2023-05-10 17:15:34 -10:00 committed by GitHub
parent 5084e881ef
commit fe61d8f073
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 844 additions and 134 deletions

View File

@ -19,4 +19,16 @@
<Match>
<Package name="org.apache.hadoop.hdfs.federation.protocol.proto" />
</Match>
<!-- Only to be used by Router Admin while preparing for bulk add request -->
<Match>
<Class name="org.apache.hadoop.hdfs.tools.federation.AddMountAttributes" />
<Method name="getNss" />
<Bug pattern="EI_EXPOSE_REP" />
</Match>
<!-- Only to be used by Router Admin while preparing for bulk add request -->
<Match>
<Class name="org.apache.hadoop.hdfs.tools.federation.AddMountAttributes" />
<Method name="setNss" />
<Bug pattern="EI_EXPOSE_REP2" />
</Match>
</FindBugsFilter>

View File

@ -301,6 +301,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<configuration>
<excludePackageNames>
org.apache.hadoop.hdfs.federation.protocol.proto:org.apache.hadoop.hdfs.protocol.proto
</excludePackageNames>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.rat</groupId>

View File

@ -21,6 +21,8 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntriesRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntriesResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntryRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntryResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.DisableNameserviceRequestProto;
@ -48,6 +50,8 @@
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshSuperUserGroupsConfigurationRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RefreshSuperUserGroupsConfigurationResponseProto;
import org.apache.hadoop.hdfs.server.federation.router.RouterAdminServer;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.DisableNameserviceRequest;
@ -73,6 +77,8 @@
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.AddMountTableEntriesRequestPBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.AddMountTableEntriesResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.AddMountTableEntryRequestPBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.AddMountTableEntryResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.DisableNameserviceRequestPBImpl;
@ -162,6 +168,20 @@ public RemoveMountTableEntryResponseProto removeMountTableEntry(
}
}
@Override
public AddMountTableEntriesResponseProto addMountTableEntries(RpcController controller,
AddMountTableEntriesRequestProto request) throws ServiceException {
try {
AddMountTableEntriesRequest req = new AddMountTableEntriesRequestPBImpl(request);
AddMountTableEntriesResponse response = server.addMountTableEntries(req);
AddMountTableEntriesResponsePBImpl responsePB =
(AddMountTableEntriesResponsePBImpl) response;
return responsePB.getProto();
} catch (IOException e) {
throw new ServiceException(e);
}
}
/**
* Get matching mount table entries.
*/

View File

@ -22,6 +22,8 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntriesRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntriesResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntryRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntryResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.DisableNameserviceRequestProto;
@ -52,6 +54,8 @@
import org.apache.hadoop.hdfs.server.federation.resolver.RouterGenericManager;
import org.apache.hadoop.hdfs.server.federation.router.NameserviceManager;
import org.apache.hadoop.hdfs.server.federation.router.RouterStateManager;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.DisableNameserviceRequest;
@ -76,6 +80,8 @@
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.AddMountTableEntriesRequestPBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.AddMountTableEntriesResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.AddMountTableEntryRequestPBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.AddMountTableEntryResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.DisableNameserviceRequestPBImpl;
@ -154,6 +160,19 @@ public AddMountTableEntryResponse addMountTableEntry(
}
}
@Override
public AddMountTableEntriesResponse addMountTableEntries(AddMountTableEntriesRequest request)
throws IOException {
AddMountTableEntriesRequestPBImpl requestPB = (AddMountTableEntriesRequestPBImpl) request;
AddMountTableEntriesRequestProto proto = requestPB.getProto();
try {
AddMountTableEntriesResponseProto response = rpcProxy.addMountTableEntries(null, proto);
return new AddMountTableEntriesResponsePBImpl(response);
} catch (ServiceException e) {
throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
}
}
@Override
public UpdateMountTableEntryResponse updateMountTableEntry(
UpdateMountTableEntryRequest request) throws IOException {

View File

@ -19,6 +19,8 @@
import java.io.IOException;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDestinationRequest;
@ -48,6 +50,17 @@ public interface MountTableManager {
AddMountTableEntryResponse addMountTableEntry(
AddMountTableEntryRequest request) throws IOException;
/**
* Add multiple entries to the mount table.
*
* @param request Request object with fully populated list of mount point entries.
* @return True if all the mount table entries were successfully committed to the
* data store.
* @throws IOException Throws exception if the data store is not initialized.
*/
AddMountTableEntriesResponse addMountTableEntries(
AddMountTableEntriesRequest request) throws IOException;
/**
* Updates an existing entry in the mount table.
*

View File

@ -31,6 +31,8 @@
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntriesResponse;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
@ -349,15 +351,26 @@ public AddMountTableEntryResponse addMountTableEntry(
MountTable mountTable = request.getEntry();
verifyMaxComponentLength(mountTable);
if (this.mountTableCheckDestination) {
List<String> nsIds = verifyFileInDestinations(mountTable);
if (!nsIds.isEmpty()) {
throw new IllegalArgumentException("File not found in downstream " +
"nameservices: " + StringUtils.join(",", nsIds));
}
verifyFileExistenceInDest(mountTable);
}
return getMountTableStore().addMountTableEntry(request);
}
@Override
public AddMountTableEntriesResponse addMountTableEntries(AddMountTableEntriesRequest request)
throws IOException {
List<MountTable> mountTables = request.getEntries();
for (MountTable mountTable : mountTables) {
verifyMaxComponentLength(mountTable);
}
if (this.mountTableCheckDestination) {
for (MountTable mountTable : mountTables) {
verifyFileExistenceInDest(mountTable);
}
}
return getMountTableStore().addMountTableEntries(request);
}
@Override
public UpdateMountTableEntryResponse updateMountTableEntry(
UpdateMountTableEntryRequest request) throws IOException {
@ -366,11 +379,7 @@ public UpdateMountTableEntryResponse updateMountTableEntry(
// Checks max component length limit.
verifyMaxComponentLength(updateEntry);
if (this.mountTableCheckDestination) {
List<String> nsIds = verifyFileInDestinations(updateEntry);
if (!nsIds.isEmpty()) {
throw new IllegalArgumentException("File not found in downstream " +
"nameservices: " + StringUtils.join(",", nsIds));
}
verifyFileExistenceInDest(updateEntry);
}
if (this.router.getSubclusterResolver() instanceof MountTableResolver) {
MountTableResolver mResolver =
@ -408,6 +417,14 @@ public UpdateMountTableEntryResponse updateMountTableEntry(
return response;
}
private void verifyFileExistenceInDest(MountTable mountTable) throws IOException {
List<String> nsIds = verifyFileInDestinations(mountTable);
if (!nsIds.isEmpty()) {
throw new IllegalArgumentException(
"File not found in downstream nameservices: " + StringUtils.join(",", nsIds));
}
}
/**
* Checks whether quota needs to be synchronized with namespace or not. Quota
* needs to be synchronized either if there is change in mount entry quota or

View File

@ -33,6 +33,8 @@
import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage;
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDestinationRequest;
@ -129,6 +131,29 @@ public AddMountTableEntryResponse addMountTableEntry(
}
}
@Override
public AddMountTableEntriesResponse addMountTableEntries(AddMountTableEntriesRequest request)
throws IOException {
List<MountTable> mountTables = request.getEntries();
if (mountTables == null || mountTables.size() == 0) {
AddMountTableEntriesResponse response = AddMountTableEntriesResponse.newInstance();
response.setStatus(false);
return response;
}
for (MountTable mountTable : mountTables) {
mountTable.validate();
final String src = mountTable.getSourcePath();
checkMountTablePermission(src);
}
boolean status = getDriver().putAll(mountTables, false, true);
AddMountTableEntriesResponse response = AddMountTableEntriesResponse.newInstance();
response.setStatus(status);
if (status) {
updateCacheAllRouters();
}
return response;
}
@Override
public UpdateMountTableEntryResponse updateMountTableEntry(
UpdateMountTableEntryRequest request) throws IOException {

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
/**
* API request for adding all mount table entries to the state store.
*/
public abstract class AddMountTableEntriesRequest {
public static AddMountTableEntriesRequest newInstance() {
return StateStoreSerializer.newRecord(AddMountTableEntriesRequest.class);
}
public static AddMountTableEntriesRequest newInstance(List<MountTable> newEntry) {
AddMountTableEntriesRequest request = newInstance();
request.setEntries(newEntry);
return request;
}
@InterfaceAudience.Public
@InterfaceStability.Unstable
public abstract List<MountTable> getEntries();
@InterfaceAudience.Public
@InterfaceStability.Unstable
public abstract void setEntries(List<MountTable> mount);
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
/**
* API response for adding multiple mount table entries to the state store.
*/
public abstract class AddMountTableEntriesResponse {
public static AddMountTableEntriesResponse newInstance() throws IOException {
return StateStoreSerializer.newRecord(AddMountTableEntriesResponse.class);
}
@Public
@Unstable
public abstract boolean getStatus();
@Public
@Unstable
public abstract void setStatus(boolean result);
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.thirdparty.protobuf.Message;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntriesRequestProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntriesRequestProtoOrBuilder;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.MountTableRecordProto;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.MountTablePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
/**
* Protobuf implementation of the state store API object addMountTableEntriesRequest.
*/
public class AddMountTableEntriesRequestPBImpl
extends AddMountTableEntriesRequest implements PBRecord {
private final FederationProtocolPBTranslator<AddMountTableEntriesRequestProto,
AddMountTableEntriesRequestProto.Builder,
AddMountTableEntriesRequestProtoOrBuilder> translator =
new FederationProtocolPBTranslator<>(AddMountTableEntriesRequestProto.class);
public AddMountTableEntriesRequestPBImpl() {
}
public AddMountTableEntriesRequestPBImpl(AddMountTableEntriesRequestProto proto) {
this.translator.setProto(proto);
}
@Override
public AddMountTableEntriesRequestProto getProto() {
return this.translator.build();
}
@Override
public void setProto(Message proto) {
this.translator.setProto(proto);
}
@Override
public void readInstance(String base64String) throws IOException {
this.translator.readInstance(base64String);
}
@Override
public List<MountTable> getEntries() {
List<MountTableRecordProto> entryProto = this.translator.getProtoOrBuilder().getEntryList();
if (entryProto == null) {
return null;
}
List<MountTable> mountTables = new ArrayList<>();
entryProto.forEach(e -> mountTables.add(new MountTablePBImpl(e)));
return mountTables;
}
@Override
public void setEntries(List<MountTable> mountTables) {
for (MountTable mountTable : mountTables) {
if (mountTable instanceof MountTablePBImpl) {
MountTablePBImpl mountPB = (MountTablePBImpl) mountTable;
MountTableRecordProto mountProto = mountPB.getProto();
translator.getBuilder().addEntry(mountProto);
}
}
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb;
import java.io.IOException;
import org.apache.hadoop.thirdparty.protobuf.Message;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntriesResponseProto;
import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.AddMountTableEntriesResponseProtoOrBuilder;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord;
/**
* Protobuf implementation of the state store API object addMountTableEntriesResponse.
*/
public class AddMountTableEntriesResponsePBImpl
extends AddMountTableEntriesResponse implements PBRecord {
private final FederationProtocolPBTranslator<AddMountTableEntriesResponseProto,
AddMountTableEntriesResponseProto.Builder,
AddMountTableEntriesResponseProtoOrBuilder> translator =
new FederationProtocolPBTranslator<>(AddMountTableEntriesResponseProto.class);
public AddMountTableEntriesResponsePBImpl() {
}
public AddMountTableEntriesResponsePBImpl(
AddMountTableEntriesResponseProto proto) {
this.translator.setProto(proto);
}
@Override
public AddMountTableEntriesResponseProto getProto() {
return this.translator.build();
}
@Override
public void setProto(Message proto) {
this.translator.setProto(proto);
}
@Override
public void readInstance(String base64String) throws IOException {
this.translator.readInstance(base64String);
}
@Override
public boolean getStatus() {
return this.translator.getProtoOrBuilder().getStatus();
}
@Override
public void setStatus(boolean result) {
this.translator.getBuilder().setStatus(result);
}
}

View File

@ -0,0 +1,190 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.tools.federation;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
/**
* Add mount entry attributes to be used by Router admin.
*/
public class AddMountAttributes {
private String mount;
private String[] nss;
private String dest;
private boolean readonly;
private boolean faultTolerant;
private DestinationOrder order;
private RouterAdmin.ACLEntity aclInfo;
private int paramIndex;
public String getMount() {
return mount;
}
public void setMount(String mount) {
this.mount = mount;
}
public String[] getNss() {
return nss;
}
public void setNss(String[] nss) {
this.nss = nss;
}
public String getDest() {
return dest;
}
public void setDest(String dest) {
this.dest = dest;
}
public boolean isReadonly() {
return readonly;
}
public void setReadonly(boolean readonly) {
this.readonly = readonly;
}
public boolean isFaultTolerant() {
return faultTolerant;
}
public void setFaultTolerant(boolean faultTolerant) {
this.faultTolerant = faultTolerant;
}
public DestinationOrder getOrder() {
return order;
}
public void setOrder(DestinationOrder order) {
this.order = order;
}
public RouterAdmin.ACLEntity getAclInfo() {
return aclInfo;
}
public void setAclInfo(RouterAdmin.ACLEntity aclInfo) {
this.aclInfo = aclInfo;
}
public int getParamIndex() {
return paramIndex;
}
public void setParamIndex(int paramIndex) {
this.paramIndex = paramIndex;
}
/**
* Retrieve mount table object with all attributes derived from this object.
*
* @return MountTable object with updated attributes.
* @throws IOException If mount table instantiation fails.
*/
public MountTable getMountTableEntryWithAttributes() throws IOException {
String normalizedMount = RouterAdmin.normalizeFileSystemPath(this.getMount());
return getMountTableForAddRequest(normalizedMount);
}
/**
* Retrieve mount table object with all attributes derived from this object.
* The returned mount table could be either new or existing one with updated attributes.
*
* @param existingEntry Existing mount table entry. If null, new mount table object is created,
* otherwise the existing mount table object is updated.
* @return MountTable object with updated attributes.
* @throws IOException If mount table instantiation fails.
*/
public MountTable getNewOrUpdatedMountTableEntryWithAttributes(MountTable existingEntry)
throws IOException {
if (existingEntry == null) {
return getMountTableForAddRequest(this.mount);
} else {
// Update the existing entry if it exists
for (String nsId : this.getNss()) {
if (!existingEntry.addDestination(nsId, this.getDest())) {
System.err.println("Cannot add destination at " + nsId + " " + this.getDest());
return null;
}
}
updateCommonAttributes(existingEntry);
return existingEntry;
}
}
/**
* Create a new mount table object from the given mount point and update its attributes.
*
* @param mountSrc mount point src.
* @return MountTable object with updated attributes.
* @throws IOException If mount table instantiation fails.
*/
private MountTable getMountTableForAddRequest(String mountSrc) throws IOException {
Map<String, String> destMap = new LinkedHashMap<>();
for (String ns : this.getNss()) {
destMap.put(ns, this.getDest());
}
MountTable newEntry = MountTable.newInstance(mountSrc, destMap);
updateCommonAttributes(newEntry);
return newEntry;
}
/**
* Common attributes like read-only, fault-tolerant, dest order, owner, group, mode etc are
* updated for the given mount table object.
*
* @param existingEntry Mount table object.
*/
private void updateCommonAttributes(MountTable existingEntry) {
if (this.isReadonly()) {
existingEntry.setReadOnly(true);
}
if (this.isFaultTolerant()) {
existingEntry.setFaultTolerant(true);
}
if (this.getOrder() != null) {
existingEntry.setDestOrder(this.getOrder());
}
RouterAdmin.ACLEntity mountAclInfo = this.getAclInfo();
// Update ACL info of mount table entry
if (mountAclInfo.getOwner() != null) {
existingEntry.setOwnerName(mountAclInfo.getOwner());
}
if (mountAclInfo.getGroup() != null) {
existingEntry.setGroupName(mountAclInfo.getGroup());
}
if (mountAclInfo.getMode() != null) {
existingEntry.setMode(mountAclInfo.getMode());
}
existingEntry.validate();
}
}

View File

@ -20,13 +20,16 @@
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.regex.Pattern;
@ -52,6 +55,8 @@
import org.apache.hadoop.hdfs.server.federation.store.RecordStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.DisableNameserviceRequest;
@ -106,6 +111,7 @@ public class RouterAdmin extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(RouterAdmin.class);
private static final String DUMP_COMMAND = "-dumpState";
private static final String ADD_ALL_COMMAND = "-addAll";
private RouterClient client;
@ -132,15 +138,15 @@ public void printUsage() {
System.out.println(usage);
}
private void printUsage(String cmd) {
private static void printUsage(String cmd) {
String usage = getUsage(cmd);
System.out.println(usage);
}
private String getUsage(String cmd) {
private static String getUsage(String cmd) {
if (cmd == null) {
String[] commands =
{"-add", "-update", "-rm", "-ls", "-getDestination", "-setQuota",
{"-add", ADD_ALL_COMMAND, "-update", "-rm", "-ls", "-getDestination", "-setQuota",
"-setStorageTypeQuota", "-clrQuota", "-clrStorageTypeQuota",
DUMP_COMMAND, "-safemode", "-nameservice", "-getDisabledNameservices",
"-refresh", "-refreshRouterArgs",
@ -160,6 +166,16 @@ private String getUsage(String cmd) {
+ "[-readonly] [-faulttolerant] "
+ "[-order HASH|LOCAL|RANDOM|HASH_ALL|SPACE] "
+ "-owner <owner> -group <group> -mode <mode>]";
} else if (cmd.equals(ADD_ALL_COMMAND)) {
return "\t[" + ADD_ALL_COMMAND + " "
+ "<source1> <nameservice1,nameservice2,...> <destination1> "
+ "[-readonly] [-faulttolerant] " + "[-order HASH|LOCAL|RANDOM|HASH_ALL|SPACE] "
+ "-owner <owner1> -group <group1> -mode <mode1>"
+ " , "
+ "<source2> <nameservice1,nameservice2,...> <destination2> "
+ "[-readonly] [-faulttolerant] " + "[-order HASH|LOCAL|RANDOM|HASH_ALL|SPACE] "
+ "-owner <owner2> -group <group2> -mode <mode2>"
+ " , ...]";
} else if (cmd.equals("-update")) {
return "\t[-update <source>"
+ " [<nameservice1, nameservice2, ...> <destination>] "
@ -423,6 +439,12 @@ public int run(String[] argv) throws Exception {
exitCode = refreshSuperUserGroupsConfiguration();
} else if ("-refreshCallQueue".equals(cmd)) {
exitCode = refreshCallQueue();
} else if (ADD_ALL_COMMAND.equals(cmd)) {
if (addAllMount(argv, i)) {
System.out.println("Successfully added all mount points ");
} else {
exitCode = -1;
}
} else {
throw new IllegalArgumentException("Unknown Command: " + cmd);
}
@ -462,6 +484,152 @@ public int run(String[] argv) throws Exception {
return exitCode;
}
/**
* Add all mount point entries provided in the request.
*
* @param parameters Parameters for the mount points.
* @param i Current index on the parameters array.
* @return True if adding all mount points was successful, False otherwise.
* @throws IOException If the RPC call to add the mount points fail.
*/
private boolean addAllMount(String[] parameters, int i) throws IOException {
List<AddMountAttributes> addMountAttributesList = new ArrayList<>();
Set<String> mounts = new HashSet<>();
while (i < parameters.length) {
AddMountAttributes addMountAttributes = getAddMountAttributes(parameters, i, true);
if (addMountAttributes == null) {
return false;
}
if (!mounts.add(addMountAttributes.getMount())) {
System.err.println("Multiple inputs for mount: " + addMountAttributes.getMount());
return false;
}
i = addMountAttributes.getParamIndex();
addMountAttributesList.add(addMountAttributes);
}
List<MountTable> addEntries = getMountTablesFromAddAllAttributes(addMountAttributesList);
AddMountTableEntriesRequest request =
AddMountTableEntriesRequest.newInstance(addEntries);
MountTableManager mountTable = client.getMountTableManager();
AddMountTableEntriesResponse addResponse =
mountTable.addMountTableEntries(request);
boolean added = addResponse.getStatus();
if (!added) {
System.err.println("Cannot add some or all mount points");
}
return added;
}
/**
* From the given params, form and retrieve AddMountAttributes object. This object is meant
* to be used while adding single or multiple mount points with their own specific attributes.
*
* @param parameters Parameters for the mount point.
* @param i Current index on the parameters array.
* @param isMultipleAdd True if multiple mount points are to be added, False if single mount
* point is to be added.
* @return AddMountAttributes object.
*/
private static AddMountAttributes getAddMountAttributes(String[] parameters, int i,
boolean isMultipleAdd) {
// Mandatory parameters
String mount = parameters[i++];
String[] nss = parameters[i++].split(",");
String destination = parameters[i++];
if (isMultipleAdd) {
String[] destinations = destination.split(",");
if (nss.length != destinations.length && destinations.length > 1) {
String message =
"Invalid namespaces and destinations. The number of destinations " + destinations.length
+ " is not matched with the number of namespaces " + nss.length;
System.err.println(message);
return null;
}
}
// Optional parameters
boolean readOnly = false;
boolean faultTolerant = false;
String owner = null;
String group = null;
FsPermission mode = null;
DestinationOrder order = DestinationOrder.HASH;
while (i < parameters.length) {
if (isMultipleAdd && ",".equals(parameters[i])) {
i++;
break;
}
switch (parameters[i]) {
case "-readonly": {
readOnly = true;
break;
}
case "-faulttolerant": {
faultTolerant = true;
break;
}
case "-order": {
i++;
try {
order = DestinationOrder.valueOf(parameters[i]);
} catch (Exception e) {
System.err.println("Cannot parse order: " + parameters[i]);
}
break;
}
case "-owner": {
i++;
owner = parameters[i];
break;
}
case "-group": {
i++;
group = parameters[i];
break;
}
case "-mode": {
i++;
short modeValue = Short.parseShort(parameters[i], 8);
mode = new FsPermission(modeValue);
break;
}
default: {
printUsage(isMultipleAdd ? ADD_ALL_COMMAND : "-add");
return null;
}
}
i++;
}
AddMountAttributes addMountAttributes = new AddMountAttributes();
addMountAttributes.setMount(mount);
addMountAttributes.setNss(nss);
addMountAttributes.setDest(destination);
addMountAttributes.setReadonly(readOnly);
addMountAttributes.setFaultTolerant(faultTolerant);
addMountAttributes.setOrder(order);
addMountAttributes.setAclInfo(new ACLEntity(owner, group, mode));
addMountAttributes.setParamIndex(i);
return addMountAttributes;
}
/**
* Prepare and return the list of mount table objects from the given list of
* AddMountAttributes objects.
*
* @param addMountAttributesList The list of AddMountAttributes objects.
* @return The list of MountTable objects.
* @throws IOException If the creation of the mount table objects fail.
*/
private List<MountTable> getMountTablesFromAddAllAttributes(
List<AddMountAttributes> addMountAttributesList) throws IOException {
List<MountTable> mountTables = new ArrayList<>();
for (AddMountAttributes addMountAttributes : addMountAttributesList) {
mountTables.add(addMountAttributes.getMountTableEntryWithAttributes());
}
return mountTables;
}
/**
* Refresh superuser proxy groups mappings on Router.
*
@ -511,149 +679,44 @@ private boolean refreshRouterCache() throws IOException {
* @throws IOException If it cannot add the mount point.
*/
public boolean addMount(String[] parameters, int i) throws IOException {
// Mandatory parameters
String mount = parameters[i++];
String[] nss = parameters[i++].split(",");
String dest = parameters[i++];
// Optional parameters
boolean readOnly = false;
boolean faultTolerant = false;
String owner = null;
String group = null;
FsPermission mode = null;
DestinationOrder order = DestinationOrder.HASH;
while (i < parameters.length) {
if (parameters[i].equals("-readonly")) {
readOnly = true;
} else if (parameters[i].equals("-faulttolerant")) {
faultTolerant = true;
} else if (parameters[i].equals("-order")) {
i++;
try {
order = DestinationOrder.valueOf(parameters[i]);
} catch(Exception e) {
System.err.println("Cannot parse order: " + parameters[i]);
}
} else if (parameters[i].equals("-owner")) {
i++;
owner = parameters[i];
} else if (parameters[i].equals("-group")) {
i++;
group = parameters[i];
} else if (parameters[i].equals("-mode")) {
i++;
short modeValue = Short.parseShort(parameters[i], 8);
mode = new FsPermission(modeValue);
} else {
printUsage("-add");
AddMountAttributes addMountAttributes = getAddMountAttributes(parameters, i, false);
if (addMountAttributes == null) {
return false;
}
i++;
}
return addMount(mount, nss, dest, readOnly, faultTolerant, order,
new ACLEntity(owner, group, mode));
return addMount(addMountAttributes);
}
/**
* Add a mount table entry or update if it exists.
*
* @param mount Mount point.
* @param nss Namespaces where this is mounted to.
* @param dest Destination path.
* @param readonly If the mount point is read only.
* @param order Order of the destination locations.
* @param aclInfo the ACL info for mount point.
* @param addMountAttributes attributes associated with add mount point request.
* @return If the mount point was added.
* @throws IOException Error adding the mount point.
*/
public boolean addMount(String mount, String[] nss, String dest,
boolean readonly, boolean faultTolerant, DestinationOrder order,
ACLEntity aclInfo)
public boolean addMount(AddMountAttributes addMountAttributes)
throws IOException {
mount = normalizeFileSystemPath(mount);
String mount = normalizeFileSystemPath(addMountAttributes.getMount());
// Get the existing entry
MountTableManager mountTable = client.getMountTableManager();
MountTable existingEntry = getMountEntry(mount, mountTable);
MountTable existingOrNewEntry =
addMountAttributes.getNewOrUpdatedMountTableEntryWithAttributes(existingEntry);
if (existingOrNewEntry == null) {
return false;
}
if (existingEntry == null) {
// Create and add the entry if it doesn't exist
Map<String, String> destMap = new LinkedHashMap<>();
for (String ns : nss) {
destMap.put(ns, dest);
}
MountTable newEntry = MountTable.newInstance(mount, destMap);
if (readonly) {
newEntry.setReadOnly(true);
}
if (faultTolerant) {
newEntry.setFaultTolerant(true);
}
if (order != null) {
newEntry.setDestOrder(order);
}
// Set ACL info for mount table entry
if (aclInfo.getOwner() != null) {
newEntry.setOwnerName(aclInfo.getOwner());
}
if (aclInfo.getGroup() != null) {
newEntry.setGroupName(aclInfo.getGroup());
}
if (aclInfo.getMode() != null) {
newEntry.setMode(aclInfo.getMode());
}
newEntry.validate();
AddMountTableEntryRequest request =
AddMountTableEntryRequest.newInstance(newEntry);
AddMountTableEntryResponse addResponse =
mountTable.addMountTableEntry(request);
AddMountTableEntryRequest request = AddMountTableEntryRequest
.newInstance(existingOrNewEntry);
AddMountTableEntryResponse addResponse = mountTable.addMountTableEntry(request);
boolean added = addResponse.getStatus();
if (!added) {
System.err.println("Cannot add mount point " + mount);
}
return added;
} else {
// Update the existing entry if it exists
for (String nsId : nss) {
if (!existingEntry.addDestination(nsId, dest)) {
System.err.println("Cannot add destination at " + nsId + " " + dest);
return false;
}
}
if (readonly) {
existingEntry.setReadOnly(true);
}
if (faultTolerant) {
existingEntry.setFaultTolerant(true);
}
if (order != null) {
existingEntry.setDestOrder(order);
}
// Update ACL info of mount table entry
if (aclInfo.getOwner() != null) {
existingEntry.setOwnerName(aclInfo.getOwner());
}
if (aclInfo.getGroup() != null) {
existingEntry.setGroupName(aclInfo.getGroup());
}
if (aclInfo.getMode() != null) {
existingEntry.setMode(aclInfo.getMode());
}
existingEntry.validate();
UpdateMountTableEntryRequest updateRequest =
UpdateMountTableEntryRequest.newInstance(existingEntry);
UpdateMountTableEntryRequest.newInstance(existingOrNewEntry);
UpdateMountTableEntryResponse updateResponse =
mountTable.updateMountTableEntry(updateRequest);
boolean updated = updateResponse.getStatus();

View File

@ -162,6 +162,14 @@ message AddMountTableEntryResponseProto {
optional bool status = 1;
}
message AddMountTableEntriesRequestProto {
repeated MountTableRecordProto entry = 1;
}
message AddMountTableEntriesResponseProto {
optional bool status = 1;
}
message UpdateMountTableEntryRequestProto {
optional MountTableRecordProto entry = 1;
}

View File

@ -40,6 +40,11 @@ service RouterAdminProtocolService {
*/
rpc removeMountTableEntry(RemoveMountTableEntryRequestProto) returns(RemoveMountTableEntryResponseProto);
/**
* Add all mount table entries that are present in the request.
*/
rpc addMountTableEntries(AddMountTableEntriesRequestProto) returns(AddMountTableEntriesResponseProto);
/**
* Get matching mount entries
*/

View File

@ -20,6 +20,7 @@
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createNamenodeReport;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream;
@ -853,6 +854,12 @@ public void testInvalidArgumentMessage() throws Exception {
+ "[-readonly] [-faulttolerant] "
+ "[-order HASH|LOCAL|RANDOM|HASH_ALL|SPACE] "
+ "-owner <owner> -group <group> -mode <mode>]\n"
+ "\t[-addAll <source1> <nameservice1,nameservice2,...> <destination1> "
+ "[-readonly] [-faulttolerant] [-order HASH|LOCAL|RANDOM|HASH_ALL|SPACE] "
+ "-owner <owner1> -group <group1> -mode <mode1>"
+ " , <source2> <nameservice1,nameservice2,...> <destination2> "
+ "[-readonly] [-faulttolerant] [-order HASH|LOCAL|RANDOM|HASH_ALL|SPACE] "
+ "-owner <owner2> -group <group2> -mode <mode2> , ...]\n"
+ "\t[-update <source> [<nameservice1, nameservice2, ...> "
+ "<destination>] [-readonly true|false]"
+ " [-faulttolerant true|false] "
@ -1840,6 +1847,79 @@ public void testDumpState() throws Exception {
buffer.toString().trim().replaceAll("[0-9]{4,}+", "XXX"));
}
@Test
public void testAddMultipleMountPointsSuccess() throws Exception {
String[] argv =
new String[] {"-addAll", "/testAddMultipleMountPoints-01", "ns01", "/dest01", ",",
"/testAddMultipleMountPoints-02", "ns02,ns03", "/dest02", "-order", "HASH_ALL",
"-faulttolerant", ",", "/testAddMultipleMountPoints-03", "ns03", "/dest03"};
assertEquals(0, ToolRunner.run(admin, argv));
stateStore.loadCache(MountTableStoreImpl.class, true);
validateMountEntry("/testAddMultipleMountPoints-01", 1, new String[] {"/dest01"},
new String[] {"ns01"});
validateMountEntry("/testAddMultipleMountPoints-02", 2, new String[] {"/dest02", "/dest02"},
new String[] {"ns02", "ns03"});
validateMountEntry("/testAddMultipleMountPoints-03", 1, new String[] {"/dest03"},
new String[] {"ns03"});
}
private static void validateMountEntry(String mountName, int numDest, String[] dest, String[] nss)
throws IOException {
GetMountTableEntriesRequest request = GetMountTableEntriesRequest.newInstance(mountName);
GetMountTableEntriesResponse response =
client.getMountTableManager().getMountTableEntries(request);
assertEquals(1, response.getEntries().size());
List<RemoteLocation> destinations = response.getEntries().get(0).getDestinations();
assertEquals(numDest, destinations.size());
for (int i = 0; i < numDest; i++) {
assertEquals(mountName, destinations.get(i).getSrc());
assertEquals(dest[i], destinations.get(i).getDest());
assertEquals(nss[i], destinations.get(i).getNameserviceId());
}
}
@Test
public void testAddMultipleMountPointsFailure() throws Exception {
String[] argv =
new String[] {"-addAll", "/testAddMultiMountPoints-01", "ns01", ",", "/dest01", ",",
"/testAddMultiMountPoints-02", "ns02,ns03", "/dest02", "-order", "HASH_ALL",
"-faulttolerant", ",", "/testAddMultiMountPoints-03", "ns03", "/dest03", ",",
"/testAddMultiMountPoints-01", "ns02", "/dest02"};
// syntax issue
assertNotEquals(0, ToolRunner.run(admin, argv));
argv =
new String[] {"-addAll", "/testAddMultiMountPoints-01", "ns01", "/dest01", ",",
"/testAddMultiMountPoints-02", "ns02,ns03", "/dest02", "-order", "HASH_ALL",
"-faulttolerant", ",", "/testAddMultiMountPoints-03", "ns03", "/dest03", ",",
"/testAddMultiMountPoints-01", "ns02", "/dest02"};
// multiple inputs with same mount
assertNotEquals(0, ToolRunner.run(admin, argv));
argv =
new String[] {"-addAll", "/testAddMultiMountPoints-01", "ns01", "/dest01,/dest02", ",",
"/testAddMultiMountPoints-02", "ns02,ns03", "/dest02", "-order", "HASH_ALL",
"-faulttolerant"};
// multiple dest entries
assertNotEquals(0, ToolRunner.run(admin, argv));
argv =
new String[] {"-addAll", "/testAddMultiMountPoints-01", "ns01", "/dest01", ",",
"/testAddMultiMountPoints-02", "ns02,ns03", "/dest02", "-order", "HASH_ALL",
"-faulttolerant"};
// success
assertEquals(0, ToolRunner.run(admin, argv));
argv =
new String[] {"-addAll", "/testAddMultiMountPoints-01", "ns01", "/dest01", ",",
"/testAddMultiMountPoints-02", "ns02,ns03", "/dest02", "-order", "HASH_ALL",
"-faulttolerant"};
// mount points were already added
assertNotEquals(0, ToolRunner.run(admin, argv));
}
private void addMountTable(String src, String nsId, String dst)
throws Exception {
String[] argv = new String[] {"-add", src, nsId, dst};