HADOOP-15320. Remove customized getFileBlockLocations for hadoop-azure and hadoop-azure-datalake. Contributed by Shanyu Zhao
(cherry picked from commit081c350188
) (cherry picked from commitac16e8f4d3
)
This commit is contained in:
parent
f5aa36e194
commit
cc0a791794
|
@ -47,7 +47,6 @@ import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.BlockLocation;
|
|
||||||
import org.apache.hadoop.fs.ContentSummary;
|
import org.apache.hadoop.fs.ContentSummary;
|
||||||
import org.apache.hadoop.fs.ContentSummary.Builder;
|
import org.apache.hadoop.fs.ContentSummary.Builder;
|
||||||
import org.apache.hadoop.fs.CreateFlag;
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
|
@ -929,45 +928,6 @@ public class AdlFileSystem extends FileSystem {
|
||||||
return ADL_BLOCK_SIZE;
|
return ADL_BLOCK_SIZE;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public BlockLocation[] getFileBlockLocations(final FileStatus status,
|
|
||||||
final long offset, final long length) throws IOException {
|
|
||||||
if (status == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((offset < 0) || (length < 0)) {
|
|
||||||
throw new IllegalArgumentException("Invalid start or len parameter");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (status.getLen() < offset) {
|
|
||||||
return new BlockLocation[0];
|
|
||||||
}
|
|
||||||
|
|
||||||
final String[] name = {"localhost"};
|
|
||||||
final String[] host = {"localhost"};
|
|
||||||
long blockSize = ADL_BLOCK_SIZE;
|
|
||||||
int numberOfLocations =
|
|
||||||
(int) (length / blockSize) + ((length % blockSize == 0) ? 0 : 1);
|
|
||||||
BlockLocation[] locations = new BlockLocation[numberOfLocations];
|
|
||||||
for (int i = 0; i < locations.length; i++) {
|
|
||||||
long currentOffset = offset + (i * blockSize);
|
|
||||||
long currentLength = Math.min(blockSize, offset + length - currentOffset);
|
|
||||||
locations[i] = new BlockLocation(name, host, currentOffset,
|
|
||||||
currentLength);
|
|
||||||
}
|
|
||||||
|
|
||||||
return locations;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public BlockLocation[] getFileBlockLocations(final Path p, final long offset,
|
|
||||||
final long length) throws IOException {
|
|
||||||
// read ops incremented in getFileStatus
|
|
||||||
FileStatus fileStatus = getFileStatus(p);
|
|
||||||
return getFileBlockLocations(fileStatus, offset, length);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get replication.
|
* Get replication.
|
||||||
*
|
*
|
||||||
|
|
|
@ -48,7 +48,6 @@ import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.BlockLocation;
|
|
||||||
import org.apache.hadoop.fs.BufferedFSInputStream;
|
import org.apache.hadoop.fs.BufferedFSInputStream;
|
||||||
import org.apache.hadoop.fs.CreateFlag;
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
@ -725,10 +724,6 @@ public class NativeAzureFileSystem extends FileSystem {
|
||||||
|
|
||||||
static final String AZURE_CHMOD_USERLIST_PROPERTY_DEFAULT_VALUE = "*";
|
static final String AZURE_CHMOD_USERLIST_PROPERTY_DEFAULT_VALUE = "*";
|
||||||
|
|
||||||
static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME =
|
|
||||||
"fs.azure.block.location.impersonatedhost";
|
|
||||||
private static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT =
|
|
||||||
"localhost";
|
|
||||||
static final String AZURE_RINGBUFFER_CAPACITY_PROPERTY_NAME =
|
static final String AZURE_RINGBUFFER_CAPACITY_PROPERTY_NAME =
|
||||||
"fs.azure.ring.buffer.capacity";
|
"fs.azure.ring.buffer.capacity";
|
||||||
static final String AZURE_OUTPUT_STREAM_BUFFER_SIZE_PROPERTY_NAME =
|
static final String AZURE_OUTPUT_STREAM_BUFFER_SIZE_PROPERTY_NAME =
|
||||||
|
@ -3487,47 +3482,6 @@ public class NativeAzureFileSystem extends FileSystem {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Return an array containing hostnames, offset and size of
|
|
||||||
* portions of the given file. For WASB we'll just lie and give
|
|
||||||
* fake hosts to make sure we get many splits in MR jobs.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public BlockLocation[] getFileBlockLocations(FileStatus file,
|
|
||||||
long start, long len) throws IOException {
|
|
||||||
if (file == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((start < 0) || (len < 0)) {
|
|
||||||
throw new IllegalArgumentException("Invalid start or len parameter");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (file.getLen() < start) {
|
|
||||||
return new BlockLocation[0];
|
|
||||||
}
|
|
||||||
final String blobLocationHost = getConf().get(
|
|
||||||
AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME,
|
|
||||||
AZURE_BLOCK_LOCATION_HOST_DEFAULT);
|
|
||||||
final String[] name = { blobLocationHost };
|
|
||||||
final String[] host = { blobLocationHost };
|
|
||||||
long blockSize = file.getBlockSize();
|
|
||||||
if (blockSize <= 0) {
|
|
||||||
throw new IllegalArgumentException(
|
|
||||||
"The block size for the given file is not a positive number: "
|
|
||||||
+ blockSize);
|
|
||||||
}
|
|
||||||
int numberOfLocations = (int) (len / blockSize)
|
|
||||||
+ ((len % blockSize == 0) ? 0 : 1);
|
|
||||||
BlockLocation[] locations = new BlockLocation[numberOfLocations];
|
|
||||||
for (int i = 0; i < locations.length; i++) {
|
|
||||||
long currentOffset = start + (i * blockSize);
|
|
||||||
long currentLength = Math.min(blockSize, start + len - currentOffset);
|
|
||||||
locations[i] = new BlockLocation(name, host, currentOffset, currentLength);
|
|
||||||
}
|
|
||||||
return locations;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the working directory to the given directory.
|
* Set the working directory to the given directory.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -1,141 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.fs.azure;
|
|
||||||
|
|
||||||
import java.io.OutputStream;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.BlockLocation;
|
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test block location logic.
|
|
||||||
*/
|
|
||||||
public class TestNativeAzureFileSystemBlockLocations
|
|
||||||
extends AbstractWasbTestWithTimeout {
|
|
||||||
@Test
|
|
||||||
public void testNumberOfBlocks() throws Exception {
|
|
||||||
Configuration conf = new Configuration();
|
|
||||||
conf.set(NativeAzureFileSystem.AZURE_BLOCK_SIZE_PROPERTY_NAME, "500");
|
|
||||||
AzureBlobStorageTestAccount testAccount = AzureBlobStorageTestAccount
|
|
||||||
.createMock(conf);
|
|
||||||
FileSystem fs = testAccount.getFileSystem();
|
|
||||||
Path testFile = createTestFile(fs, 1200);
|
|
||||||
FileStatus stat = fs.getFileStatus(testFile);
|
|
||||||
assertEquals(500, stat.getBlockSize());
|
|
||||||
testAccount.cleanup();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testBlockLocationsTypical() throws Exception {
|
|
||||||
BlockLocation[] locations = getBlockLocationsOutput(210, 50, 0, 210);
|
|
||||||
assertEquals(5, locations.length);
|
|
||||||
assertEquals("localhost", locations[0].getHosts()[0]);
|
|
||||||
assertEquals(50, locations[0].getLength());
|
|
||||||
assertEquals(10, locations[4].getLength());
|
|
||||||
assertEquals(100, locations[2].getOffset());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testBlockLocationsEmptyFile() throws Exception {
|
|
||||||
BlockLocation[] locations = getBlockLocationsOutput(0, 50, 0, 0);
|
|
||||||
assertEquals(0, locations.length);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testBlockLocationsSmallFile() throws Exception {
|
|
||||||
BlockLocation[] locations = getBlockLocationsOutput(1, 50, 0, 1);
|
|
||||||
assertEquals(1, locations.length);
|
|
||||||
assertEquals(1, locations[0].getLength());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testBlockLocationsExactBlockSizeMultiple() throws Exception {
|
|
||||||
BlockLocation[] locations = getBlockLocationsOutput(200, 50, 0, 200);
|
|
||||||
assertEquals(4, locations.length);
|
|
||||||
assertEquals(150, locations[3].getOffset());
|
|
||||||
assertEquals(50, locations[3].getLength());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testBlockLocationsSubsetOfFile() throws Exception {
|
|
||||||
BlockLocation[] locations = getBlockLocationsOutput(205, 10, 15, 35);
|
|
||||||
assertEquals(4, locations.length);
|
|
||||||
assertEquals(10, locations[0].getLength());
|
|
||||||
assertEquals(15, locations[0].getOffset());
|
|
||||||
assertEquals(5, locations[3].getLength());
|
|
||||||
assertEquals(45, locations[3].getOffset());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testBlockLocationsOutOfRangeSubsetOfFile() throws Exception {
|
|
||||||
BlockLocation[] locations = getBlockLocationsOutput(205, 10, 300, 10);
|
|
||||||
assertEquals(0, locations.length);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testBlockLocationsEmptySubsetOfFile() throws Exception {
|
|
||||||
BlockLocation[] locations = getBlockLocationsOutput(205, 10, 0, 0);
|
|
||||||
assertEquals(0, locations.length);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testBlockLocationsDifferentLocationHost() throws Exception {
|
|
||||||
BlockLocation[] locations = getBlockLocationsOutput(100, 10, 0, 100,
|
|
||||||
"myblobhost");
|
|
||||||
assertEquals(10, locations.length);
|
|
||||||
assertEquals("myblobhost", locations[0].getHosts()[0]);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static BlockLocation[] getBlockLocationsOutput(int fileSize,
|
|
||||||
int blockSize, long start, long len) throws Exception {
|
|
||||||
return getBlockLocationsOutput(fileSize, blockSize, start, len, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static BlockLocation[] getBlockLocationsOutput(int fileSize,
|
|
||||||
int blockSize, long start, long len, String blockLocationHost)
|
|
||||||
throws Exception {
|
|
||||||
Configuration conf = new Configuration();
|
|
||||||
conf.set(NativeAzureFileSystem.AZURE_BLOCK_SIZE_PROPERTY_NAME, ""
|
|
||||||
+ blockSize);
|
|
||||||
if (blockLocationHost != null) {
|
|
||||||
conf.set(NativeAzureFileSystem.AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME,
|
|
||||||
blockLocationHost);
|
|
||||||
}
|
|
||||||
AzureBlobStorageTestAccount testAccount = AzureBlobStorageTestAccount
|
|
||||||
.createMock(conf);
|
|
||||||
FileSystem fs = testAccount.getFileSystem();
|
|
||||||
Path testFile = createTestFile(fs, fileSize);
|
|
||||||
FileStatus stat = fs.getFileStatus(testFile);
|
|
||||||
BlockLocation[] locations = fs.getFileBlockLocations(stat, start, len);
|
|
||||||
testAccount.cleanup();
|
|
||||||
return locations;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static Path createTestFile(FileSystem fs, int size) throws Exception {
|
|
||||||
Path testFile = new Path("/testFile");
|
|
||||||
OutputStream outputStream = fs.create(testFile);
|
|
||||||
outputStream.write(new byte[size]);
|
|
||||||
outputStream.close();
|
|
||||||
return testFile;
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue