HDFS-11156. Add new op GETFILEBLOCKLOCATIONS to WebHDFS REST API. Contributed by Weiwei Yang.

(cherry picked from commit 7fcc73fc0d)

 Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/WebHDFS.md
This commit is contained in:
Andrew Wang 2017-01-03 09:58:00 -08:00
parent 029b6fbe71
commit 80e6640848
7 changed files with 640 additions and 7 deletions

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.web;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.ContentSummary.Builder; import org.apache.hadoop.fs.ContentSummary.Builder;
import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileChecksum;
@ -637,4 +638,56 @@ class JsonUtilClient {
} }
} }
static BlockLocation[] toBlockLocationArray(Map<?, ?> json)
throws IOException{
final Map<?, ?> rootmap =
(Map<?, ?>)json.get(BlockLocation.class.getSimpleName() + "s");
final List<?> array = JsonUtilClient.getList(rootmap,
BlockLocation.class.getSimpleName());
Preconditions.checkNotNull(array);
final BlockLocation[] locations = new BlockLocation[array.size()];
int i = 0;
for (Object object : array) {
final Map<?, ?> m = (Map<?, ?>) object;
locations[i++] = JsonUtilClient.toBlockLocation(m);
}
return locations;
}
/** Convert a Json map to BlockLocation. **/
static BlockLocation toBlockLocation(Map<?, ?> m)
throws IOException{
if(m == null) {
return null;
}
long length = ((Number) m.get("length")).longValue();
long offset = ((Number) m.get("offset")).longValue();
boolean corrupt = Boolean.
getBoolean(m.get("corrupt").toString());
String[] storageIds = toStringArray(getList(m, "storageIds"));
String[] cachedHosts = toStringArray(getList(m, "cachedHosts"));
String[] hosts = toStringArray(getList(m, "hosts"));
String[] names = toStringArray(getList(m, "names"));
String[] topologyPaths = toStringArray(getList(m, "topologyPaths"));
StorageType[] storageTypes = toStorageTypeArray(
getList(m, "storageTypes"));
return new BlockLocation(names, hosts, cachedHosts,
topologyPaths, storageIds, storageTypes,
offset, length, corrupt);
}
static String[] toStringArray(List<?> list) {
if (list == null) {
return null;
} else {
final String[] array = new String[list.size()];
int i = 0;
for (Object object : list) {
array[i++] = object.toString();
}
return array;
}
}
} }

View File

@ -1610,14 +1610,68 @@ public class WebHdfsFileSystem extends FileSystem
final long offset, final long length) throws IOException { final long offset, final long length) throws IOException {
statistics.incrementReadOps(1); statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_FILE_BLOCK_LOCATIONS); storageStatistics.incrementOpCounter(OpType.GET_FILE_BLOCK_LOCATIONS);
BlockLocation[] locations = null;
try {
locations = getFileBlockLocations(
GetOpParam.Op.GETFILEBLOCKLOCATIONS,
p, offset, length);
} catch (RemoteException e) {
// See the error message from ExceptionHandle
if(e.getMessage() != null &&
e.getMessage().contains(
"Invalid value for webhdfs parameter") &&
e.getMessage().contains(
GetOpParam.Op.GETFILEBLOCKLOCATIONS.toString())) {
// Old webhdfs server doesn't support GETFILEBLOCKLOCATIONS
// operation, fall back to query again using old API
// GET_BLOCK_LOCATIONS.
LOG.info("Invalid webhdfs operation parameter "
+ GetOpParam.Op.GETFILEBLOCKLOCATIONS + ". Fallback to use "
+ GetOpParam.Op.GET_BLOCK_LOCATIONS + " instead.");
locations = getFileBlockLocations(
GetOpParam.Op.GET_BLOCK_LOCATIONS,
p, offset, length);
}
}
return locations;
}
final HttpOpParam.Op op = GetOpParam.Op.GET_BLOCK_LOCATIONS; /**
return new FsPathResponseRunner<BlockLocation[]>(op, p, * Get file block locations implementation. Provide a operation
* parameter to determine how to get block locations from a webhdfs
* server. Older server only supports <b>GET_BLOCK_LOCATIONS</b> but
* not <b>GETFILEBLOCKLOCATIONS</b>.
*
* @param path path to the file
* @param offset start offset in the given file
* @param length of the file to get locations for
* @param operation
* Valid operation is either
* {@link org.apache.hadoop.hdfs.web.resources.GetOpParam.Op
* #GET_BLOCK_LOCATIONS} or
* {@link org.apache.hadoop.hdfs.web.resources.GetOpParam.Op
* #GET_BLOCK_LOCATIONS}
* @throws IOException
* Http connection error, decoding error or given
* operation is not valid
*/
@VisibleForTesting
protected BlockLocation[] getFileBlockLocations(
GetOpParam.Op operation, final Path path,
final long offset, final long length) throws IOException {
return new FsPathResponseRunner<BlockLocation[]>(operation, path,
new OffsetParam(offset), new LengthParam(length)) { new OffsetParam(offset), new LengthParam(length)) {
@Override @Override
BlockLocation[] decodeResponse(Map<?,?> json) throws IOException { BlockLocation[] decodeResponse(Map<?,?> json) throws IOException {
return DFSUtilClient.locatedBlocks2Locations( switch(operation) {
JsonUtilClient.toLocatedBlocks(json)); case GETFILEBLOCKLOCATIONS:
return JsonUtilClient.toBlockLocationArray(json);
case GET_BLOCK_LOCATIONS:
return DFSUtilClient.locatedBlocks2Locations(
JsonUtilClient.toLocatedBlocks(json));
default :
throw new IOException("Unknown operation " + operation.name());
}
} }
}.run(); }.run();
} }

View File

@ -33,8 +33,18 @@ public class GetOpParam extends HttpOpParam<GetOpParam.Op> {
GETHOMEDIRECTORY(false, HttpURLConnection.HTTP_OK), GETHOMEDIRECTORY(false, HttpURLConnection.HTTP_OK),
GETDELEGATIONTOKEN(false, HttpURLConnection.HTTP_OK, true), GETDELEGATIONTOKEN(false, HttpURLConnection.HTTP_OK, true),
/** GET_BLOCK_LOCATIONS is a private unstable op. */ /**
* GET_BLOCK_LOCATIONS is a private/stable API op. It returns a
* {@link org.apache.hadoop.hdfs.protocol.LocatedBlocks}
* json object.
*/
GET_BLOCK_LOCATIONS(false, HttpURLConnection.HTTP_OK), GET_BLOCK_LOCATIONS(false, HttpURLConnection.HTTP_OK),
/**
* GETFILEBLOCKLOCATIONS is the public op that complies with
* {@link org.apache.hadoop.fs.FileSystem#getFileBlockLocations}
* interface.
*/
GETFILEBLOCKLOCATIONS(false, HttpURLConnection.HTTP_OK),
GETACLSTATUS(false, HttpURLConnection.HTTP_OK), GETACLSTATUS(false, HttpURLConnection.HTTP_OK),
GETXATTRS(false, HttpURLConnection.HTTP_OK), GETXATTRS(false, HttpURLConnection.HTTP_OK),
GETTRASHROOT(false, HttpURLConnection.HTTP_OK), GETTRASHROOT(false, HttpURLConnection.HTTP_OK),

View File

@ -54,6 +54,7 @@ import javax.ws.rs.core.StreamingOutput;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -948,6 +949,21 @@ public class NamenodeWebHdfsMethods {
return Response.ok(js).type(MediaType.APPLICATION_JSON).build(); return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
} }
} }
case GETFILEBLOCKLOCATIONS:
{
final long offsetValue = offset.getValue();
final Long lengthValue = length.getValue();
FileSystem fs = FileSystem.get(conf != null ?
conf : new Configuration());
BlockLocation[] locations = fs.getFileBlockLocations(
new org.apache.hadoop.fs.Path(fullpath),
offsetValue,
lengthValue != null? lengthValue: Long.MAX_VALUE);
final String js = JsonUtil.toJsonString("BlockLocations",
JsonUtil.toJsonMap(locations));
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
case GET_BLOCK_LOCATIONS: case GET_BLOCK_LOCATIONS:
{ {
final long offsetValue = offset.getValue(); final long offsetValue = offset.getValue();

View File

@ -463,4 +463,37 @@ public class JsonUtil {
public static String toJsonString(BlockStoragePolicy storagePolicy) { public static String toJsonString(BlockStoragePolicy storagePolicy) {
return toJsonString(BlockStoragePolicy.class, toJsonMap(storagePolicy)); return toJsonString(BlockStoragePolicy.class, toJsonMap(storagePolicy));
} }
public static Map<String, Object> toJsonMap(BlockLocation[] locations)
throws IOException {
if(locations == null) {
return null;
}
final Map<String, Object> m = new TreeMap<String, Object>();
Object[] blockLocations = new Object[locations.length];
for(int i=0; i<locations.length; i++) {
blockLocations[i] = toJsonMap(locations[i]);
}
m.put(BlockLocation.class.getSimpleName(), blockLocations);
return m;
}
public static Map<String, Object> toJsonMap(
final BlockLocation blockLocation) throws IOException {
if (blockLocation == null) {
return null;
}
final Map<String, Object> m = new TreeMap<String, Object>();
m.put("length", blockLocation.getLength());
m.put("offset", blockLocation.getOffset());
m.put("corrupt", blockLocation.isCorrupt());
m.put("storageTypes", toJsonArray(blockLocation.getStorageTypes()));
m.put("storageIds", blockLocation.getStorageIds());
m.put("cachedHosts", blockLocation.getCachedHosts());
m.put("hosts", blockLocation.getHosts());
m.put("names", blockLocation.getNames());
m.put("topologyPaths", blockLocation.getTopologyPaths());
return m;
}
} }

View File

@ -38,6 +38,7 @@ WebHDFS REST API
* [Status of a File/Directory](#Status_of_a_FileDirectory) * [Status of a File/Directory](#Status_of_a_FileDirectory)
* [List a Directory](#List_a_Directory) * [List a Directory](#List_a_Directory)
* [Iteratively List a Directory](#Iteratively_List_a_Directory) * [Iteratively List a Directory](#Iteratively_List_a_Directory)
* [Get File Block Locations](#Get_File_Block_Locations)
* [Other File System Operations](#Other_File_System_Operations) * [Other File System Operations](#Other_File_System_Operations)
* [Get Content Summary of a Directory](#Get_Content_Summary_of_a_Directory) * [Get Content Summary of a Directory](#Get_Content_Summary_of_a_Directory)
* [Get File Checksum](#Get_File_Checksum) * [Get File Checksum](#Get_File_Checksum)
@ -97,6 +98,9 @@ WebHDFS REST API
* [BlockStoragePolicy JSON Schema](#BlockStoragePolicy_JSON_Schema) * [BlockStoragePolicy JSON Schema](#BlockStoragePolicy_JSON_Schema)
* [BlockStoragePolicy Properties](#BlockStoragePolicy_Properties) * [BlockStoragePolicy Properties](#BlockStoragePolicy_Properties)
* [BlockStoragePolicies JSON Schema](#BlockStoragePolicies_JSON_Schema) * [BlockStoragePolicies JSON Schema](#BlockStoragePolicies_JSON_Schema)
* [BlockLocation JSON Schema](#BlockLocation_JSON_Schema)
* [BlockLocation Properties](#BlockLocation_Properties)
* [BlockLocations JSON Schema](#BlockLocations_JSON_Schema)
* [HTTP Query Parameter Dictionary](#HTTP_Query_Parameter_Dictionary) * [HTTP Query Parameter Dictionary](#HTTP_Query_Parameter_Dictionary)
* [ACL Spec](#ACL_Spec) * [ACL Spec](#ACL_Spec)
* [XAttr Name](#XAttr_Name) * [XAttr Name](#XAttr_Name)
@ -167,6 +171,7 @@ The HTTP REST API supports the complete [FileSystem](../../api/org/apache/hadoop
* [`CHECKACCESS`](#Check_access) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).access) * [`CHECKACCESS`](#Check_access) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).access)
* [`GETALLSTORAGEPOLICY`](#Get_all_Storage_Policies) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getAllStoragePolicies) * [`GETALLSTORAGEPOLICY`](#Get_all_Storage_Policies) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getAllStoragePolicies)
* [`GETSTORAGEPOLICY`](#Get_Storage_Policy) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getStoragePolicy) * [`GETSTORAGEPOLICY`](#Get_Storage_Policy) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getStoragePolicy)
* [`GETFILEBLOCKLOCATIONS`](#Get_File_Block_Locations) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getFileBlockLocations)
* HTTP PUT * HTTP PUT
* [`CREATE`](#Create_and_Write_to_a_File) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).create) * [`CREATE`](#Create_and_Write_to_a_File) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).create)
* [`MKDIRS`](#Make_a_Directory) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).mkdirs) * [`MKDIRS`](#Make_a_Directory) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).mkdirs)
@ -1142,7 +1147,7 @@ See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).unsetStor
{ {
"BlockStoragePolicy": { "BlockStoragePolicy": {
"copyOnCreateFile": false, "copyOnCreateFile": false,
"creationFallbacks": [], "creationFallbacks": [],
"id":7, "id":7,
"name":"HOT", "name":"HOT",
"replicationFallbacks":["ARCHIVE"], "replicationFallbacks":["ARCHIVE"],
@ -1152,6 +1157,51 @@ See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).unsetStor
See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getStoragePolicy See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getStoragePolicy
### Get File Block Locations
* Submit a HTTP GET request.
curl -i "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILEBLOCKLOCATIONS
The client receives a response with a [`BlockLocations` JSON Object](#Block_Locations_JSON_Schema):
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
{
"BlockLocations" :
{
"BlockLocation":
[
{
"cachedHosts" : [],
"corrupt" : false,
"hosts" : ["host"],
"length" : 134217728, // length of this block
"names" : ["host:ip"],
"offset" : 0, // offset of the block in the file
"storageIds" : ["storageid"],
"storageTypes" : ["DISK"], // enum {RAM_DISK, SSD, DISK, ARCHIVE}
"topologyPaths" : ["/default-rack/hostname:ip"]
}, {
"cachedHosts" : [],
"corrupt" : false,
"hosts" : ["host"],
"length" : 62599364,
"names" : ["host:ip"],
"offset" : 134217728,
"storageIds" : ["storageid"],
"storageTypes" : ["DISK"],
"topologyPaths" : ["/default-rack/hostname:ip"]
},
...
]
}
}
See also: [`offset`](#Offset), [`length`](#Length), [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getFileBlockLocations
Extended Attributes(XAttrs) Operations Extended Attributes(XAttrs) Operations
-------------------------------------- --------------------------------------
@ -2109,6 +2159,147 @@ A `BlockStoragePolicies` JSON object represents an array of `BlockStoragePolicy`
} }
} }
``` ```
#### BlockLocations JSON Schema
A `BlockLocations` JSON object represents an array of `BlockLocation` JSON objects.
```json
{
"name" : "BlockLocations",
"properties":
{
"BlockLocations":
{
"type" : "object",
"properties":
{
"BlockLocation":
{
"description": "An array of BlockLocation",
"type" : "array",
"items" : blockLocationProperties //See BlockLocation Properties
}
}
}
}
}
```
See also [`BlockLocation` Properties](#BlockLocation_Properties), [`GETFILEBLOCKLOCATIONS`](#Get_File_Block_Locations), [BlockLocation](../../api/org/apache/hadoop/fs/BlockLocation.html)
### BlockLocation JSON Schema
```json
{
"name" : "BlockLocation",
"properties":
{
"BlockLocation": blockLocationProperties //See BlockLocation Properties
}
}
```
See also [`BlockLocation` Properties](#BlockLocation_Properties), [`GETFILEBLOCKLOCATIONS`](#Get_File_Block_Locations), [BlockLocation](../../api/org/apache/hadoop/fs/BlockLocation.html)
#### BlockLocation Properties
JavaScript syntax is used to define `blockLocationProperties` so that it can be referred in both `BlockLocation` and `BlockLocations` JSON schemas.
```javascript
var blockLocationProperties =
{
"type" : "object",
"properties":
{
"cachedHosts":
{
"description": "Datanode hostnames with a cached replica",
"type" : "array",
"required" : "true",
"items" :
{
"description": "A datanode hostname",
"type" : "string"
}
},
"corrupt":
{
"description": "True if the block is corrupted",
"type" : "boolean",
"required" : "true"
},
"hosts":
{
"description": "Datanode hostnames store the block",
"type" : "array",
"required" : "true",
"items" :
{
"description": "A datanode hostname",
"type" : "string"
}
},
"length":
{
"description": "Length of the block",
"type" : "integer",
"required" : "true"
},
"names":
{
"description": "Datanode IP:xferPort for accessing the block",
"type" : "array",
"required" : "true",
"items" :
{
"description": "DatanodeIP:xferPort",
"type" : "string"
}
},
"offset":
{
"description": "Offset of the block in the file",
"type" : "integer",
"required" : "true"
},
"storageIds":
{
"description": "Storage ID of each replica",
"type" : "array",
"required" : "true",
"items" :
{
"description": "Storage ID",
"type" : "string"
}
},
"storageTypes":
{
"description": "Storage type of each replica",
"type" : "array",
"required" : "true",
"items" :
{
"description": "Storage type",
"enum" : ["RAM_DISK", "SSD", "DISK", "ARCHIVE"]
}
},
"topologyPaths":
{
"description": "Datanode addresses in network topology",
"type" : "array",
"required" : "true",
"items" :
{
"description": "/rack/host:ip",
"type" : "string"
}
}
}
};
```
HTTP Query Parameter Dictionary HTTP Query Parameter Dictionary
------------------------------- -------------------------------

View File

@ -29,6 +29,7 @@ import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketException; import java.net.SocketException;
@ -38,8 +39,16 @@ import java.net.URISyntaxException;
import java.net.URL; import java.net.URL;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Random; import java.util.Random;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.MediaType;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -66,7 +75,11 @@ import org.apache.hadoop.hdfs.TestDFSClientRetries;
import org.apache.hadoop.hdfs.TestFileCreation; import org.apache.hadoop.hdfs.TestFileCreation;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
@ -77,6 +90,8 @@ import org.apache.hadoop.hdfs.web.resources.LengthParam;
import org.apache.hadoop.hdfs.web.resources.NoRedirectParam; import org.apache.hadoop.hdfs.web.resources.NoRedirectParam;
import org.apache.hadoop.hdfs.web.resources.OffsetParam; import org.apache.hadoop.hdfs.web.resources.OffsetParam;
import org.apache.hadoop.hdfs.web.resources.Param; import org.apache.hadoop.hdfs.web.resources.Param;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.http.HttpServerFunctionalTest;
import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
@ -92,8 +107,12 @@ import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.mockito.internal.util.reflection.Whitebox; import org.mockito.internal.util.reflection.Whitebox;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.MapType;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
@ -850,6 +869,76 @@ public class TestWebHDFS {
Assert.assertTrue(storageTypes != null && storageTypes.length > 0 && Assert.assertTrue(storageTypes != null && storageTypes.length > 0 &&
storageTypes[0] == StorageType.DISK); storageTypes[0] == StorageType.DISK);
} }
// Query webhdfs REST API to get block locations
InetSocketAddress addr = cluster.getNameNode().getHttpAddress();
// Case 1
// URL without length or offset parameters
URL url1 = new URL("http", addr.getHostString(), addr.getPort(),
WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS");
LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url1);
String response1 = getResponse(url1, "GET");
LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response1);
// Parse BlockLocation array from json output using object mapper
BlockLocation[] locationArray1 = toBlockLocationArray(response1);
// Verify the result from rest call is same as file system api
verifyEquals(locations, locationArray1);
// Case 2
// URL contains length and offset parameters
URL url2 = new URL("http", addr.getHostString(), addr.getPort(),
WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
+ "&length=" + LENGTH + "&offset=" + OFFSET);
LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url2);
String response2 = getResponse(url2, "GET");
LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response2);
BlockLocation[] locationArray2 = toBlockLocationArray(response2);
verifyEquals(locations, locationArray2);
// Case 3
// URL contains length parameter but without offset parameters
URL url3 = new URL("http", addr.getHostString(), addr.getPort(),
WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
+ "&length=" + LENGTH);
LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url3);
String response3 = getResponse(url3, "GET");
LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response3);
BlockLocation[] locationArray3 = toBlockLocationArray(response3);
verifyEquals(locations, locationArray3);
// Case 4
// URL contains offset parameter but without length parameter
URL url4 = new URL("http", addr.getHostString(), addr.getPort(),
WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
+ "&offset=" + OFFSET);
LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url4);
String response4 = getResponse(url4, "GET");
LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response4);
BlockLocation[] locationArray4 = toBlockLocationArray(response4);
verifyEquals(locations, locationArray4);
// Case 5
// URL specifies offset exceeds the file length
URL url5 = new URL("http", addr.getHostString(), addr.getPort(),
WebHdfsFileSystem.PATH_PREFIX + "/foo?op=GETFILEBLOCKLOCATIONS"
+ "&offset=1200");
LOG.info("Sending GETFILEBLOCKLOCATIONS request " + url5);
String response5 = getResponse(url5, "GET");
LOG.info("The output of GETFILEBLOCKLOCATIONS request " + response5);
BlockLocation[] locationArray5 = toBlockLocationArray(response5);
// Expected an empty array of BlockLocation
verifyEquals(new BlockLocation[] {}, locationArray5);
} finally { } finally {
if (cluster != null) { if (cluster != null) {
cluster.shutdown(); cluster.shutdown();
@ -857,6 +946,66 @@ public class TestWebHDFS {
} }
} }
private BlockLocation[] toBlockLocationArray(String json)
throws IOException {
ObjectMapper mapper = new ObjectMapper();
MapType subType = mapper.getTypeFactory().constructMapType(
Map.class,
String.class,
BlockLocation[].class);
MapType rootType = mapper.getTypeFactory().constructMapType(
Map.class,
mapper.constructType(String.class),
mapper.constructType(subType));
Map<String, Map<String, BlockLocation[]>> jsonMap = mapper
.readValue(json, rootType);
Map<String, BlockLocation[]> locationMap = jsonMap
.get("BlockLocations");
BlockLocation[] locationArray = locationMap.get(
BlockLocation.class.getSimpleName());
return locationArray;
}
private void verifyEquals(BlockLocation[] locations1,
BlockLocation[] locations2) throws IOException {
for(int i=0; i<locations1.length; i++) {
BlockLocation location1 = locations1[i];
BlockLocation location2 = locations2[i];
Assert.assertEquals(location1.getLength(),
location2.getLength());
Assert.assertEquals(location1.getOffset(),
location2.getOffset());
Assert.assertArrayEquals(location1.getCachedHosts(),
location2.getCachedHosts());
Assert.assertArrayEquals(location1.getHosts(),
location2.getHosts());
Assert.assertArrayEquals(location1.getNames(),
location2.getNames());
Assert.assertArrayEquals(location1.getStorageIds(),
location2.getStorageIds());
Assert.assertArrayEquals(location1.getTopologyPaths(),
location2.getTopologyPaths());
Assert.assertArrayEquals(location1.getStorageTypes(),
location2.getStorageTypes());
}
}
private static String getResponse(URL url, String httpRequestType)
throws IOException {
HttpURLConnection conn = null;
try {
conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod(httpRequestType);
conn.setInstanceFollowRedirects(false);
return IOUtils.toString(conn.getInputStream());
} finally {
if(conn != null) {
conn.disconnect();
}
}
}
private WebHdfsFileSystem createWebHDFSAsTestUser(final Configuration conf, private WebHdfsFileSystem createWebHDFSAsTestUser(final Configuration conf,
final URI uri, final String userName) throws Exception { final URI uri, final String userName) throws Exception {
@ -1212,4 +1361,131 @@ public class TestWebHDFS {
} }
} }
} }
/**
* A mock class to handle the {@link WebHdfsFileSystem} client
* request. The format of the response depends on how many of
* times it gets called (1 to 3 times).
* <p>
* First time call it return a wrapped json response with a
* IllegalArgumentException
* <p>
* Second time call it return a valid GET_BLOCK_LOCATIONS
* json response
* <p>
* Third time call it return a wrapped json response with
* a random IOException
*
*/
public static class MockWebHdfsServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
private static int respondTimes = 0;
private static final String RANDOM_EXCEPTION_MSG =
"This is a random exception";
@Override
public void doGet(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
response.setHeader("Content-Type",
MediaType.APPLICATION_JSON);
String param = request.getParameter("op");
if(respondTimes == 0) {
Exception mockException = new IllegalArgumentException(
"Invalid value for webhdfs parameter \"op\". "
+ "" + "No enum constant " + param);
sendException(request, response, mockException);
} else if (respondTimes == 1) {
sendResponse(request, response);
} else if (respondTimes == 2) {
Exception mockException = new IOException(RANDOM_EXCEPTION_MSG);
sendException(request, response, mockException);
}
respondTimes++;
}
private void sendResponse(HttpServletRequest request,
HttpServletResponse response) throws IOException {
response.setStatus(HttpServletResponse.SC_OK);
// Construct a LocatedBlock for testing
DatanodeInfo d = DFSTestUtil.getLocalDatanodeInfo();
DatanodeInfo[] ds = new DatanodeInfo[1];
ds[0] = d;
ExtendedBlock b1 = new ExtendedBlock("bpid", 1, 121, 1);
LocatedBlock l1 = new LocatedBlock(b1, ds);
l1.setStartOffset(0);
l1.setCorrupt(false);
List<LocatedBlock> ls = Arrays.asList(l1);
LocatedBlocks locatedblocks =
new LocatedBlocks(10, false, ls, l1,
true, null, null);
try (PrintWriter pw = response.getWriter()) {
pw.write(JsonUtil.toJsonString(locatedblocks));
}
}
private void sendException(HttpServletRequest request,
HttpServletResponse response,
Exception mockException) throws IOException {
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
String errJs = JsonUtil.toJsonString(mockException);
try (PrintWriter pw = response.getWriter()) {
pw.write(errJs);
}
}
}
@Test
public void testGetFileBlockLocationsBackwardsCompatibility()
throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
final String pathSpec = WebHdfsFileSystem.PATH_PREFIX + "/*";
HttpServer2 http = null;
try {
http = HttpServerFunctionalTest.createTestServer(conf);
http.addServlet("test", pathSpec, MockWebHdfsServlet.class);
http.start();
// Write the address back to configuration so
// WebHdfsFileSystem could connect to the mock server
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY,
"localhost:" + http.getConnectorAddress(0).getPort());
final WebHdfsFileSystem webFS = WebHdfsTestUtil.getWebHdfsFileSystem(
conf, WebHdfsConstants.WEBHDFS_SCHEME);
WebHdfsFileSystem spyFs = spy(webFS);
BlockLocation[] locations = spyFs
.getFileBlockLocations(new Path("p"), 0, 100);
// Verify result
assertEquals(1, locations.length);
assertEquals(121, locations[0].getLength());
// Verify the fall back
// The function should be called exactly 2 times
// 1st time handles GETFILEBLOCKLOCATIONS and found it is not supported
// 2nd time fall back to handle GET_FILE_BLOCK_LOCATIONS
verify(spyFs, times(2)).getFileBlockLocations(any(),
any(), anyLong(), anyLong());
// Verify it doesn't erroneously fall back
// When server returns a different error, it should directly
// throw an exception.
try {
spyFs.getFileBlockLocations(new Path("p"), 0, 100);
} catch (Exception e) {
assertTrue(e instanceof IOException);
assertEquals(e.getMessage(), MockWebHdfsServlet.RANDOM_EXCEPTION_MSG);
// Totally this function has been called 3 times
verify(spyFs, times(3)).getFileBlockLocations(any(),
any(), anyLong(), anyLong());
}
} finally {
if(http != null) {
http.stop();
}
}
}
} }