HDFS-6874. Add GETFILEBLOCKLOCATIONS operation to HttpFS. Contributed by Weiwei Yang

This commit is contained in:
Tsz-Wo Nicholas Sze 2017-07-12 13:45:41 -07:00
parent 655110393b
commit 931a49800e
5 changed files with 188 additions and 4 deletions

View File

@ -23,9 +23,12 @@
import java.util.EnumSet;
import java.util.List;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.MapType;
import com.google.common.base.Charsets;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.DelegationTokenRenewer;
import org.apache.hadoop.fs.FSDataInputStream;
@ -119,6 +122,8 @@ public class HttpFSFileSystem extends FileSystem
public static final String NEW_LENGTH_PARAM = "newlength";
public static final String START_AFTER_PARAM = "startAfter";
public static final String POLICY_NAME_PARAM = "storagepolicy";
public static final String OFFSET_PARAM = "offset";
public static final String LENGTH_PARAM = "length";
public static final Short DEFAULT_PERMISSION = 0755;
public static final String ACLSPEC_DEFAULT = "";
@ -201,6 +206,7 @@ public static FILE_TYPE getType(FileStatus fileStatus) {
public static final String STORAGE_POLICIES_JSON = "BlockStoragePolicies";
public static final String STORAGE_POLICY_JSON = "BlockStoragePolicy";
public static final String BLOCK_LOCATIONS_JSON = "BlockLocations";
public static final int HTTP_TEMPORARY_REDIRECT = 307;
@ -1358,6 +1364,42 @@ public BlockStoragePolicy getStoragePolicy(Path src) throws IOException {
return createStoragePolicy((JSONObject) json.get(STORAGE_POLICY_JSON));
}
@Override
public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
long len) throws IOException {
Map<String, String> params = new HashMap<String, String>();
params.put(OP_PARAM, Operation.GETFILEBLOCKLOCATIONS.toString());
params.put(OFFSET_PARAM, Long.toString(start));
params.put(LENGTH_PARAM, Long.toString(len));
HttpURLConnection conn =
getConnection(Operation.GETFILEBLOCKLOCATIONS.getMethod(), params,
file.getPath(), true);
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
return toBlockLocations(json);
}
private BlockLocation[] toBlockLocations(JSONObject json)
throws IOException {
ObjectMapper mapper = new ObjectMapper();
MapType subType = mapper.getTypeFactory().constructMapType(
Map.class,
String.class,
BlockLocation[].class);
MapType rootType = mapper.getTypeFactory().constructMapType(
Map.class,
mapper.constructType(String.class),
mapper.constructType(subType));
Map<String, Map<String, BlockLocation[]>> jsonMap = mapper
.readValue(json.toJSONString(), rootType);
Map<String, BlockLocation[]> locationMap = jsonMap
.get(BLOCK_LOCATIONS_JSON);
BlockLocation[] locationArray = locationMap.get(
BlockLocation.class.getSimpleName());
return locationArray;
}
private BlockStoragePolicy createStoragePolicy(JSONObject policyJson)
throws IOException {
byte id = ((Number) policyJson.get("id")).byteValue();

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.fs.http.server;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStoragePolicySpi;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileChecksum;
@ -35,6 +36,7 @@
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.web.JsonUtil;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.lib.service.FileSystemAccess;
import org.apache.hadoop.util.StringUtils;
@ -1452,4 +1454,39 @@ public Void execute(FileSystem fs) throws IOException {
return null;
}
}
/**
* Executor that performs a getFileBlockLocations FileSystemAccess
* file system operation.
*/
@InterfaceAudience.Private
@SuppressWarnings("rawtypes")
public static class FSFileBlockLocations implements
FileSystemAccess.FileSystemExecutor<Map> {
private Path path;
private long offsetValue;
private long lengthValue;
/**
* Creates a file-block-locations executor.
*
* @param path the path to retrieve the location
* @param offsetValue offset into the given file
* @param lengthValue length for which to get locations for
*/
public FSFileBlockLocations(String path, long offsetValue,
long lengthValue) {
this.path = new Path(path);
this.offsetValue = offsetValue;
this.lengthValue = lengthValue;
}
@Override
public Map execute(FileSystem fs) throws IOException {
BlockLocation[] locations =
fs.getFileBlockLocations(this.path, this.offsetValue,
this.lengthValue);
return JsonUtil.toJsonMap(locations);
}
}
}

View File

@ -58,7 +58,8 @@ public class HttpFSParametersProvider extends ParametersProvider {
PARAMS_DEF.put(Operation.GETHOMEDIRECTORY, new Class[]{});
PARAMS_DEF.put(Operation.GETCONTENTSUMMARY, new Class[]{});
PARAMS_DEF.put(Operation.GETFILECHECKSUM, new Class[]{});
PARAMS_DEF.put(Operation.GETFILEBLOCKLOCATIONS, new Class[]{});
PARAMS_DEF.put(Operation.GETFILEBLOCKLOCATIONS,
new Class[] {OffsetParam.class, LenParam.class});
PARAMS_DEF.put(Operation.GETACLSTATUS, new Class[]{});
PARAMS_DEF.put(Operation.GETTRASHROOT, new Class[]{});
PARAMS_DEF.put(Operation.INSTRUMENTATION, new Class[]{});

View File

@ -49,6 +49,7 @@
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.XAttrNameParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.XAttrSetFlagParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.XAttrValueParam;
import org.apache.hadoop.hdfs.web.JsonUtil;
import org.apache.hadoop.http.JettyUtils;
import org.apache.hadoop.lib.service.FileSystemAccess;
import org.apache.hadoop.lib.service.FileSystemAccessException;
@ -296,7 +297,25 @@ public InputStream run() throws Exception {
break;
}
case GETFILEBLOCKLOCATIONS: {
response = Response.status(Response.Status.BAD_REQUEST).build();
long offset = 0;
// In case length is not given, reset to max long
// in order to retrieve all file block locations
long len = Long.MAX_VALUE;
Long offsetParam = params.get(OffsetParam.NAME, OffsetParam.class);
Long lenParam = params.get(LenParam.NAME, LenParam.class);
AUDIT_LOG.info("[{}] offset [{}] len [{}]",
new Object[] {path, offsetParam, lenParam});
if (offsetParam != null && offsetParam.longValue() > 0) {
offset = offsetParam.longValue();
}
if (lenParam != null && lenParam.longValue() > 0) {
len = lenParam.longValue();
}
FSOperations.FSFileBlockLocations command =
new FSOperations.FSFileBlockLocations(path, offset, len);
@SuppressWarnings("rawtypes") Map locations = fsExecute(user, command);
final String json = JsonUtil.toJsonString("BlockLocations", locations);
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break;
}
case GETACLSTATUS: {

View File

@ -20,6 +20,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockStoragePolicySpi;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileChecksum;
@ -1009,10 +1010,10 @@ private void testStoragePolicy() throws Exception {
protected enum Operation {
GET, OPEN, CREATE, APPEND, TRUNCATE, CONCAT, RENAME, DELETE, LIST_STATUS,
WORKING_DIRECTORY, MKDIRS, SET_TIMES, SET_PERMISSION, SET_OWNER,
WORKING_DIRECTORY, MKDIRS, SET_TIMES, SET_PERMISSION, SET_OWNER,
SET_REPLICATION, CHECKSUM, CONTENT_SUMMARY, FILEACLS, DIRACLS, SET_XATTR,
GET_XATTRS, REMOVE_XATTR, LIST_XATTRS, ENCRYPTION, LIST_STATUS_BATCH,
GETTRASHROOT, STORAGEPOLICY, ERASURE_CODING
GETTRASHROOT, STORAGEPOLICY, ERASURE_CODING, GETFILEBLOCKLOCATIONS
}
private void operation(Operation op) throws Exception {
@ -1101,6 +1102,9 @@ private void operation(Operation op) throws Exception {
case ERASURE_CODING:
testErasureCoding();
break;
case GETFILEBLOCKLOCATIONS:
testGetFileBlockLocations();
break;
}
}
@ -1147,4 +1151,85 @@ public Void run() throws Exception {
});
}
private void testGetFileBlockLocations() throws Exception {
BlockLocation[] locations1, locations2, locations11, locations21 = null;
Path testFile = null;
// Test single block file block locations.
try (FileSystem fs = FileSystem.get(getProxiedFSConf())) {
testFile = new Path(getProxiedFSTestDir(), "singleBlock.txt");
DFSTestUtil.createFile(fs, testFile, (long) 1, (short) 1, 0L);
locations1 = fs.getFileBlockLocations(testFile, 0, 1);
Assert.assertNotNull(locations1);
}
try (FileSystem fs = getHttpFSFileSystem()) {
locations2 = fs.getFileBlockLocations(testFile, 0, 1);
Assert.assertNotNull(locations2);
}
verifyBlockLocations(locations1, locations2);
// Test multi-block single replica file block locations.
try (FileSystem fs = FileSystem.get(getProxiedFSConf())) {
testFile = new Path(getProxiedFSTestDir(), "multipleBlocks.txt");
DFSTestUtil.createFile(fs, testFile, 512, (short) 2048,
(long) 512, (short) 1, 0L);
locations1 = fs.getFileBlockLocations(testFile, 0, 1024);
locations11 = fs.getFileBlockLocations(testFile, 1024, 2048);
Assert.assertNotNull(locations1);
Assert.assertNotNull(locations11);
}
try (FileSystem fs = getHttpFSFileSystem()) {
locations2 = fs.getFileBlockLocations(testFile, 0, 1024);
locations21 = fs.getFileBlockLocations(testFile, 1024, 2048);
Assert.assertNotNull(locations2);
Assert.assertNotNull(locations21);
}
verifyBlockLocations(locations1, locations2);
verifyBlockLocations(locations11, locations21);
// Test multi-block multi-replica file block locations.
try (FileSystem fs = FileSystem.get(getProxiedFSConf())) {
testFile = new Path(getProxiedFSTestDir(), "multipleBlocks.txt");
DFSTestUtil.createFile(fs, testFile, 512, (short) 2048,
(long) 512, (short) 3, 0L);
locations1 = fs.getFileBlockLocations(testFile, 0, 2048);
Assert.assertNotNull(locations1);
}
try (FileSystem fs = getHttpFSFileSystem()) {
locations2 = fs.getFileBlockLocations(testFile, 0, 2048);
Assert.assertNotNull(locations2);
}
verifyBlockLocations(locations1, locations2);
}
private void verifyBlockLocations(BlockLocation[] locations1,
BlockLocation[] locations2) throws IOException {
Assert.assertEquals(locations1.length, locations2.length);
for (int i = 0; i < locations1.length; i++) {
BlockLocation location1 = locations1[i];
BlockLocation location2 = locations2[i];
Assert.assertEquals(location1.isCorrupt(), location2.isCorrupt());
Assert.assertEquals(location1.getOffset(), location2.getOffset());
Assert.assertEquals(location1.getLength(), location2.getLength());
Arrays.sort(location1.getHosts());
Arrays.sort(location2.getHosts());
Arrays.sort(location1.getNames());
Arrays.sort(location2.getNames());
Arrays.sort(location1.getTopologyPaths());
Arrays.sort(location2.getTopologyPaths());
Assert.assertArrayEquals(location1.getHosts(), location2.getHosts());
Assert.assertArrayEquals(location1.getNames(), location2.getNames());
Assert.assertArrayEquals(location1.getTopologyPaths(),
location2.getTopologyPaths());
}
}
}