HDFS-11653. [READ] ProvidedReplica should return an InputStream that is bounded by its length

This commit is contained in:
Virajith Jalaparti 2017-05-04 12:43:48 -07:00 committed by Chris Douglas
parent d65df0f273
commit 1108cb7691
2 changed files with 167 additions and 1 deletions

View File

@ -22,6 +22,8 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.URI; import java.net.URI;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -98,7 +100,8 @@ public abstract class ProvidedReplica extends ReplicaInfo {
if (remoteFS != null) { if (remoteFS != null) {
FSDataInputStream ins = remoteFS.open(new Path(fileURI)); FSDataInputStream ins = remoteFS.open(new Path(fileURI));
ins.seek(fileOffset + seekOffset); ins.seek(fileOffset + seekOffset);
return new FSDataInputStream(ins); return new BoundedInputStream(
new FSDataInputStream(ins), getBlockDataLength());
} else { } else {
throw new IOException("Remote filesystem for provided replica " + this + throw new IOException("Remote filesystem for provided replica " + this +
" does not exist"); " does not exist");

View File

@ -0,0 +1,163 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests the implementation of {@link ProvidedReplica}.
*/
public class TestProvidedReplicaImpl {
private static final Logger LOG =
LoggerFactory.getLogger(TestProvidedReplicaImpl.class);
private static final String BASE_DIR =
new FileSystemTestHelper().getTestRootDir();
private static final String FILE_NAME = "provided-test";
//length of the file that is associated with the provided blocks.
private static final long FILE_LEN = 128 * 1024 * 10L + 64 * 1024;
//length of each provided block.
private static final long BLK_LEN = 128 * 1024L;
private static List<ProvidedReplica> replicas;
private static void createFileIfNotExists(String baseDir) throws IOException {
File newFile = new File(baseDir, FILE_NAME);
newFile.getParentFile().mkdirs();
if(!newFile.exists()) {
newFile.createNewFile();
OutputStream writer = new FileOutputStream(newFile.getAbsolutePath());
//FILE_LEN is length in bytes.
byte[] bytes = new byte[1];
bytes[0] = (byte) 0;
for(int i=0; i< FILE_LEN; i++) {
writer.write(bytes);
}
writer.flush();
writer.close();
LOG.info("Created provided file " + newFile +
" of length " + newFile.length());
}
}
private static void createProvidedReplicas(Configuration conf) {
long numReplicas = (long) Math.ceil((double) FILE_LEN/BLK_LEN);
File providedFile = new File(BASE_DIR, FILE_NAME);
replicas = new ArrayList<ProvidedReplica>();
LOG.info("Creating " + numReplicas + " provided replicas");
for (int i=0; i<numReplicas; i++) {
long currentReplicaLength =
FILE_LEN >= (i+1)*BLK_LEN ? BLK_LEN : FILE_LEN - i*BLK_LEN;
replicas.add(
new FinalizedProvidedReplica(i, providedFile.toURI(), i*BLK_LEN,
currentReplicaLength, 0, null, conf));
}
}
@Before
public void setUp() throws IOException {
createFileIfNotExists(new File(BASE_DIR).getAbsolutePath());
createProvidedReplicas(new Configuration());
}
/**
* Checks if {@code ins} matches the provided file from offset
* {@code fileOffset} for length {@ dataLength}.
* @param file the local file
* @param ins input stream to compare against
* @param fileOffset offset
* @param dataLength length
* @throws IOException
*/
private void verifyReplicaContents(File file,
InputStream ins, long fileOffset, long dataLength)
throws IOException {
InputStream fileIns = new FileInputStream(file);
fileIns.skip(fileOffset);
try (ReadableByteChannel i =
Channels.newChannel(new BoundedInputStream(fileIns, dataLength))) {
try (ReadableByteChannel j = Channels.newChannel(ins)) {
ByteBuffer ib = ByteBuffer.allocate(4096);
ByteBuffer jb = ByteBuffer.allocate(4096);
while (true) {
int il = i.read(ib);
int jl = j.read(jb);
if (il < 0 || jl < 0) {
assertEquals(il, jl);
break;
}
ib.flip();
jb.flip();
int cmp = Math.min(ib.remaining(), jb.remaining());
for (int k = 0; k < cmp; ++k) {
assertEquals(ib.get(), jb.get());
}
ib.compact();
jb.compact();
}
}
}
}
@Test
public void testProvidedReplicaRead() throws IOException {
File providedFile = new File(BASE_DIR, FILE_NAME);
for(int i=0; i < replicas.size(); i++) {
ProvidedReplica replica = replicas.get(i);
//block data should exist!
assertTrue(replica.blockDataExists());
assertEquals(providedFile.toURI(), replica.getBlockURI());
verifyReplicaContents(providedFile, replica.getDataInputStream(0),
BLK_LEN*i, replica.getBlockDataLength());
}
LOG.info("All replica contents verified");
providedFile.delete();
//the block data should no longer be found!
for(int i=0; i < replicas.size(); i++) {
ProvidedReplica replica = replicas.get(i);
assertTrue(!replica.blockDataExists());
}
}
}