diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java index 59229586a20..1ab890f3f50 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java @@ -23,9 +23,12 @@ import java.util.Collection; import java.util.EnumSet; import java.util.List; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.type.MapType; import com.google.common.base.Charsets; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.DelegationTokenRenewer; import org.apache.hadoop.fs.FSDataInputStream; @@ -119,6 +122,8 @@ public class HttpFSFileSystem extends FileSystem public static final String NEW_LENGTH_PARAM = "newlength"; public static final String START_AFTER_PARAM = "startAfter"; public static final String POLICY_NAME_PARAM = "storagepolicy"; + public static final String OFFSET_PARAM = "offset"; + public static final String LENGTH_PARAM = "length"; public static final Short DEFAULT_PERMISSION = 0755; public static final String ACLSPEC_DEFAULT = ""; @@ -201,6 +206,7 @@ public class HttpFSFileSystem extends FileSystem public static final String STORAGE_POLICIES_JSON = "BlockStoragePolicies"; public static final String STORAGE_POLICY_JSON = "BlockStoragePolicy"; + public static final String BLOCK_LOCATIONS_JSON = "BlockLocations"; public static final int HTTP_TEMPORARY_REDIRECT = 307; @@ -1358,6 +1364,42 @@ public class HttpFSFileSystem extends FileSystem return createStoragePolicy((JSONObject) json.get(STORAGE_POLICY_JSON)); } + @Override + public BlockLocation[] getFileBlockLocations(FileStatus file, long start, + long len) throws IOException { + Map params = new HashMap(); + params.put(OP_PARAM, Operation.GETFILEBLOCKLOCATIONS.toString()); + params.put(OFFSET_PARAM, Long.toString(start)); + params.put(LENGTH_PARAM, Long.toString(len)); + HttpURLConnection conn = + getConnection(Operation.GETFILEBLOCKLOCATIONS.getMethod(), params, + file.getPath(), true); + HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK); + JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn); + return toBlockLocations(json); + } + + private BlockLocation[] toBlockLocations(JSONObject json) + throws IOException { + ObjectMapper mapper = new ObjectMapper(); + MapType subType = mapper.getTypeFactory().constructMapType( + Map.class, + String.class, + BlockLocation[].class); + MapType rootType = mapper.getTypeFactory().constructMapType( + Map.class, + mapper.constructType(String.class), + mapper.constructType(subType)); + + Map> jsonMap = mapper + .readValue(json.toJSONString(), rootType); + Map locationMap = jsonMap + .get(BLOCK_LOCATIONS_JSON); + BlockLocation[] locationArray = locationMap.get( + BlockLocation.class.getSimpleName()); + return locationArray; + } + private BlockStoragePolicy createStoragePolicy(JSONObject policyJson) throws IOException { byte id = ((Number) policyJson.get("id")).byteValue(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java index 3373582453d..0fb665a2bea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.http.server; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockStoragePolicySpi; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileChecksum; @@ -35,6 +36,7 @@ import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.web.JsonUtil; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.lib.service.FileSystemAccess; import org.apache.hadoop.util.StringUtils; @@ -1452,4 +1454,39 @@ public class FSOperations { return null; } } + + /** + * Executor that performs a getFileBlockLocations FileSystemAccess + * file system operation. + */ + @InterfaceAudience.Private + @SuppressWarnings("rawtypes") + public static class FSFileBlockLocations implements + FileSystemAccess.FileSystemExecutor { + private Path path; + private long offsetValue; + private long lengthValue; + + /** + * Creates a file-block-locations executor. + * + * @param path the path to retrieve the location + * @param offsetValue offset into the given file + * @param lengthValue length for which to get locations for + */ + public FSFileBlockLocations(String path, long offsetValue, + long lengthValue) { + this.path = new Path(path); + this.offsetValue = offsetValue; + this.lengthValue = lengthValue; + } + + @Override + public Map execute(FileSystem fs) throws IOException { + BlockLocation[] locations = + fs.getFileBlockLocations(this.path, this.offsetValue, + this.lengthValue); + return JsonUtil.toJsonMap(locations); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java index a9d350a7316..347a74750dc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSParametersProvider.java @@ -58,7 +58,8 @@ public class HttpFSParametersProvider extends ParametersProvider { PARAMS_DEF.put(Operation.GETHOMEDIRECTORY, new Class[]{}); PARAMS_DEF.put(Operation.GETCONTENTSUMMARY, new Class[]{}); PARAMS_DEF.put(Operation.GETFILECHECKSUM, new Class[]{}); - PARAMS_DEF.put(Operation.GETFILEBLOCKLOCATIONS, new Class[]{}); + PARAMS_DEF.put(Operation.GETFILEBLOCKLOCATIONS, + new Class[] {OffsetParam.class, LenParam.class}); PARAMS_DEF.put(Operation.GETACLSTATUS, new Class[]{}); PARAMS_DEF.put(Operation.GETTRASHROOT, new Class[]{}); PARAMS_DEF.put(Operation.INSTRUMENTATION, new Class[]{}); diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java index f526053dfad..5c0c9b5f967 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/HttpFSServer.java @@ -49,6 +49,7 @@ import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.XAttrEncodingPa import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.XAttrNameParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.XAttrSetFlagParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.XAttrValueParam; +import org.apache.hadoop.hdfs.web.JsonUtil; import org.apache.hadoop.http.JettyUtils; import org.apache.hadoop.lib.service.FileSystemAccess; import org.apache.hadoop.lib.service.FileSystemAccessException; @@ -296,7 +297,25 @@ public class HttpFSServer { break; } case GETFILEBLOCKLOCATIONS: { - response = Response.status(Response.Status.BAD_REQUEST).build(); + long offset = 0; + // In case length is not given, reset to max long + // in order to retrieve all file block locations + long len = Long.MAX_VALUE; + Long offsetParam = params.get(OffsetParam.NAME, OffsetParam.class); + Long lenParam = params.get(LenParam.NAME, LenParam.class); + AUDIT_LOG.info("[{}] offset [{}] len [{}]", + new Object[] {path, offsetParam, lenParam}); + if (offsetParam != null && offsetParam.longValue() > 0) { + offset = offsetParam.longValue(); + } + if (lenParam != null && lenParam.longValue() > 0) { + len = lenParam.longValue(); + } + FSOperations.FSFileBlockLocations command = + new FSOperations.FSFileBlockLocations(path, offset, len); + @SuppressWarnings("rawtypes") Map locations = fsExecute(user, command); + final String json = JsonUtil.toJsonString("BlockLocations", locations); + response = Response.ok(json).type(MediaType.APPLICATION_JSON).build(); break; } case GETACLSTATUS: { diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/client/BaseTestHttpFSWith.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/client/BaseTestHttpFSWith.java index 36d0ad98abc..0fd3f914249 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/client/BaseTestHttpFSWith.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/client/BaseTestHttpFSWith.java @@ -20,6 +20,7 @@ package org.apache.hadoop.fs.http.client; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockStoragePolicySpi; +import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileChecksum; @@ -1009,10 +1010,10 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase { protected enum Operation { GET, OPEN, CREATE, APPEND, TRUNCATE, CONCAT, RENAME, DELETE, LIST_STATUS, - WORKING_DIRECTORY, MKDIRS, SET_TIMES, SET_PERMISSION, SET_OWNER, + WORKING_DIRECTORY, MKDIRS, SET_TIMES, SET_PERMISSION, SET_OWNER, SET_REPLICATION, CHECKSUM, CONTENT_SUMMARY, FILEACLS, DIRACLS, SET_XATTR, GET_XATTRS, REMOVE_XATTR, LIST_XATTRS, ENCRYPTION, LIST_STATUS_BATCH, - GETTRASHROOT, STORAGEPOLICY, ERASURE_CODING + GETTRASHROOT, STORAGEPOLICY, ERASURE_CODING, GETFILEBLOCKLOCATIONS } private void operation(Operation op) throws Exception { @@ -1101,6 +1102,9 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase { case ERASURE_CODING: testErasureCoding(); break; + case GETFILEBLOCKLOCATIONS: + testGetFileBlockLocations(); + break; } } @@ -1147,4 +1151,85 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase { }); } + private void testGetFileBlockLocations() throws Exception { + BlockLocation[] locations1, locations2, locations11, locations21 = null; + Path testFile = null; + + // Test single block file block locations. + try (FileSystem fs = FileSystem.get(getProxiedFSConf())) { + testFile = new Path(getProxiedFSTestDir(), "singleBlock.txt"); + DFSTestUtil.createFile(fs, testFile, (long) 1, (short) 1, 0L); + locations1 = fs.getFileBlockLocations(testFile, 0, 1); + Assert.assertNotNull(locations1); + } + + try (FileSystem fs = getHttpFSFileSystem()) { + locations2 = fs.getFileBlockLocations(testFile, 0, 1); + Assert.assertNotNull(locations2); + } + + verifyBlockLocations(locations1, locations2); + + // Test multi-block single replica file block locations. + try (FileSystem fs = FileSystem.get(getProxiedFSConf())) { + testFile = new Path(getProxiedFSTestDir(), "multipleBlocks.txt"); + DFSTestUtil.createFile(fs, testFile, 512, (short) 2048, + (long) 512, (short) 1, 0L); + locations1 = fs.getFileBlockLocations(testFile, 0, 1024); + locations11 = fs.getFileBlockLocations(testFile, 1024, 2048); + Assert.assertNotNull(locations1); + Assert.assertNotNull(locations11); + } + + try (FileSystem fs = getHttpFSFileSystem()) { + locations2 = fs.getFileBlockLocations(testFile, 0, 1024); + locations21 = fs.getFileBlockLocations(testFile, 1024, 2048); + Assert.assertNotNull(locations2); + Assert.assertNotNull(locations21); + } + + verifyBlockLocations(locations1, locations2); + verifyBlockLocations(locations11, locations21); + + // Test multi-block multi-replica file block locations. + try (FileSystem fs = FileSystem.get(getProxiedFSConf())) { + testFile = new Path(getProxiedFSTestDir(), "multipleBlocks.txt"); + DFSTestUtil.createFile(fs, testFile, 512, (short) 2048, + (long) 512, (short) 3, 0L); + locations1 = fs.getFileBlockLocations(testFile, 0, 2048); + Assert.assertNotNull(locations1); + } + + try (FileSystem fs = getHttpFSFileSystem()) { + locations2 = fs.getFileBlockLocations(testFile, 0, 2048); + Assert.assertNotNull(locations2); + } + + verifyBlockLocations(locations1, locations2); + } + + private void verifyBlockLocations(BlockLocation[] locations1, + BlockLocation[] locations2) throws IOException { + Assert.assertEquals(locations1.length, locations2.length); + for (int i = 0; i < locations1.length; i++) { + BlockLocation location1 = locations1[i]; + BlockLocation location2 = locations2[i]; + + Assert.assertEquals(location1.isCorrupt(), location2.isCorrupt()); + Assert.assertEquals(location1.getOffset(), location2.getOffset()); + Assert.assertEquals(location1.getLength(), location2.getLength()); + + Arrays.sort(location1.getHosts()); + Arrays.sort(location2.getHosts()); + Arrays.sort(location1.getNames()); + Arrays.sort(location2.getNames()); + Arrays.sort(location1.getTopologyPaths()); + Arrays.sort(location2.getTopologyPaths()); + + Assert.assertArrayEquals(location1.getHosts(), location2.getHosts()); + Assert.assertArrayEquals(location1.getNames(), location2.getNames()); + Assert.assertArrayEquals(location1.getTopologyPaths(), + location2.getTopologyPaths()); + } + } }