HADOOP-18321.Fix when to read an additional record from a BZip2 text file split (#4521)
* HADOOP-18321.Fix when to read an additional record from a BZip2 text file split Co-authored-by: Ashutosh Gupta <ashugpt@amazon.com> and Reviewed by Akira Ajisaka.
This commit is contained in:
parent
161b1fac2e
commit
a432925f74
|
@ -24,6 +24,7 @@ import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.VisibleForTesting;
|
||||||
import org.apache.hadoop.conf.Configurable;
|
import org.apache.hadoop.conf.Configurable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
|
@ -255,10 +256,7 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
|
||||||
|
|
||||||
private void writeStreamHeader() throws IOException {
|
private void writeStreamHeader() throws IOException {
|
||||||
if (super.out != null) {
|
if (super.out != null) {
|
||||||
// The compressed bzip2 stream should start with the
|
writeHeader(out);
|
||||||
// identifying characters BZ. Caller of CBZip2OutputStream
|
|
||||||
// i.e. this class must write these characters.
|
|
||||||
out.write(HEADER.getBytes(StandardCharsets.UTF_8));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -547,4 +545,11 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
|
||||||
|
|
||||||
}// end of BZip2CompressionInputStream
|
}// end of BZip2CompressionInputStream
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public static void writeHeader(OutputStream out) throws IOException {
|
||||||
|
// The compressed bzip2 stream should start with the
|
||||||
|
// identifying characters BZ. Caller of CBZip2OutputStream
|
||||||
|
// i.e. this class must write these characters.
|
||||||
|
out.write(HEADER.getBytes(StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,7 @@ import java.io.BufferedInputStream;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.VisibleForTesting;
|
||||||
import org.apache.hadoop.io.compress.SplittableCompressionCodec.READ_MODE;
|
import org.apache.hadoop.io.compress.SplittableCompressionCodec.READ_MODE;
|
||||||
|
|
||||||
|
|
||||||
|
@ -312,13 +313,24 @@ public class CBZip2InputStream extends InputStream implements BZip2Constants {
|
||||||
}
|
}
|
||||||
} else if (readMode == READ_MODE.BYBLOCK) {
|
} else if (readMode == READ_MODE.BYBLOCK) {
|
||||||
this.currentState = STATE.NO_PROCESS_STATE;
|
this.currentState = STATE.NO_PROCESS_STATE;
|
||||||
skipResult = this.skipToNextMarker(CBZip2InputStream.BLOCK_DELIMITER,DELIMITER_BIT_LENGTH);
|
skipResult = skipToNextBlockMarker();
|
||||||
if(!skipDecompression){
|
if(!skipDecompression){
|
||||||
changeStateToProcessABlock();
|
changeStateToProcessABlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Skips bytes in the stream until the start marker of a block is reached
|
||||||
|
* or end of stream is reached. Used for testing purposes to identify the
|
||||||
|
* start offsets of blocks.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
boolean skipToNextBlockMarker() throws IOException {
|
||||||
|
return skipToNextMarker(
|
||||||
|
CBZip2InputStream.BLOCK_DELIMITER, DELIMITER_BIT_LENGTH);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the number of bytes between the current stream position
|
* Returns the number of bytes between the current stream position
|
||||||
* and the immediate next BZip2 block marker.
|
* and the immediate next BZip2 block marker.
|
||||||
|
@ -428,7 +440,7 @@ public class CBZip2InputStream extends InputStream implements BZip2Constants {
|
||||||
//report 'end of block' or 'end of stream'
|
//report 'end of block' or 'end of stream'
|
||||||
result = b;
|
result = b;
|
||||||
|
|
||||||
skipResult = this.skipToNextMarker(CBZip2InputStream.BLOCK_DELIMITER, DELIMITER_BIT_LENGTH);
|
skipResult = skipToNextBlockMarker();
|
||||||
|
|
||||||
changeStateToProcessABlock();
|
changeStateToProcessABlock();
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,7 @@ package org.apache.hadoop.io.compress.bzip2;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.VisibleForTesting;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -781,8 +782,7 @@ public class CBZip2OutputStream extends OutputStream implements BZip2Constants {
|
||||||
inUse[i] = false;
|
inUse[i] = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* 20 is just a paranoia constant */
|
this.allowableBlockSize = getAllowableBlockSize(this.blockSize100k);
|
||||||
this.allowableBlockSize = (this.blockSize100k * BZip2Constants.baseBlockSize) - 20;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void endBlock() throws IOException {
|
private void endBlock() throws IOException {
|
||||||
|
@ -2093,4 +2093,9 @@ public class CBZip2OutputStream extends OutputStream implements BZip2Constants {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static int getAllowableBlockSize(int blockSize100k) {
|
||||||
|
/* 20 is just a paranoia constant */
|
||||||
|
return (blockSize100k * BZip2Constants.baseBlockSize) - 20;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,114 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.io.compress.bzip2;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream.MIN_BLOCKSIZE;
|
||||||
|
import static org.apache.hadoop.util.Preconditions.checkArgument;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.io.compress.BZip2Codec;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A writer that simplifies creating BZip2 compressed text data for testing
|
||||||
|
* purposes.
|
||||||
|
*/
|
||||||
|
public final class BZip2TextFileWriter implements Closeable {
|
||||||
|
|
||||||
|
// Use minimum block size to reduce amount of data to require to be written
|
||||||
|
// to CBZip2OutputStream before a new block is created.
|
||||||
|
private static final int BLOCK_SIZE_100K = MIN_BLOCKSIZE;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The amount of bytes of run-length encoded data that needs to be written
|
||||||
|
* to this writer in order for the next byte written starts a new BZip2 block.
|
||||||
|
*/
|
||||||
|
public static final int BLOCK_SIZE =
|
||||||
|
// The + 1 is needed because of how CBZip2OutputStream checks whether the
|
||||||
|
// last offset written is less than allowable block size. Because the last
|
||||||
|
// offset is one less of the amount of bytes written to the block, we need
|
||||||
|
// to write an extra byte to trigger writing a new block.
|
||||||
|
CBZip2OutputStream.getAllowableBlockSize(BLOCK_SIZE_100K) + 1;
|
||||||
|
|
||||||
|
private final CBZip2OutputStream out;
|
||||||
|
|
||||||
|
public BZip2TextFileWriter(Path path, Configuration conf) throws IOException {
|
||||||
|
this(path.getFileSystem(conf).create(path));
|
||||||
|
}
|
||||||
|
|
||||||
|
public BZip2TextFileWriter(OutputStream rawOut) throws IOException {
|
||||||
|
try {
|
||||||
|
BZip2Codec.writeHeader(rawOut);
|
||||||
|
out = new CBZip2OutputStream(rawOut, BLOCK_SIZE_100K);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
rawOut.close();
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void writeManyRecords(int totalSize, int numRecords, byte[] delimiter)
|
||||||
|
throws IOException {
|
||||||
|
checkArgument(numRecords > 0);
|
||||||
|
checkArgument(delimiter.length > 0);
|
||||||
|
|
||||||
|
int minRecordSize = totalSize / numRecords;
|
||||||
|
checkArgument(minRecordSize >= delimiter.length);
|
||||||
|
|
||||||
|
int lastRecordExtraSize = totalSize % numRecords;
|
||||||
|
|
||||||
|
for (int i = 0; i < numRecords - 1; i++) {
|
||||||
|
writeRecord(minRecordSize, delimiter);
|
||||||
|
}
|
||||||
|
writeRecord(minRecordSize + lastRecordExtraSize, delimiter);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void writeRecord(int totalSize, byte[] delimiter) throws IOException {
|
||||||
|
checkArgument(delimiter.length > 0);
|
||||||
|
checkArgument(totalSize >= delimiter.length);
|
||||||
|
|
||||||
|
int contentSize = totalSize - delimiter.length;
|
||||||
|
for (int i = 0; i < contentSize; i++) {
|
||||||
|
// Alternate between characters so that internals of CBZip2OutputStream
|
||||||
|
// cannot condensed the written bytes using run-length encoding. This
|
||||||
|
// allows the caller to use #BLOCK_SIZE in order to know whether the next
|
||||||
|
// write will end just before the end of the current block, or exceed it,
|
||||||
|
// and by how much.
|
||||||
|
out.write(i % 2 == 0 ? 'a' : 'b');
|
||||||
|
}
|
||||||
|
write(delimiter);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void write(String bytes) throws IOException {
|
||||||
|
write(bytes.getBytes(StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void write(byte[] bytes) throws IOException {
|
||||||
|
out.write(bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
out.close();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,67 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.io.compress.bzip2;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.io.compress.SplittableCompressionCodec.READ_MODE.BYBLOCK;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
public final class BZip2Utils {
|
||||||
|
|
||||||
|
private BZip2Utils() {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the start offsets of blocks that follow the first block in the
|
||||||
|
* BZip2 compressed file at the given path. The first offset corresponds to
|
||||||
|
* the first byte containing the BZip2 block marker of the second block. The
|
||||||
|
* i-th offset corresponds to the block marker of the (i + 1)-th block.
|
||||||
|
*/
|
||||||
|
public static List<Long> getNextBlockMarkerOffsets(
|
||||||
|
Path path, Configuration conf) throws IOException {
|
||||||
|
FileSystem fs = path.getFileSystem(conf);
|
||||||
|
try (InputStream fileIn = fs.open(path)) {
|
||||||
|
return getNextBlockMarkerOffsets(fileIn);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the start offsets of blocks that follow the first block in the
|
||||||
|
* BZip2 compressed input stream. The first offset corresponds to
|
||||||
|
* the first byte containing the BZip2 block marker of the second block. The
|
||||||
|
* i-th offset corresponds to the block marker of the (i + 1)-th block.
|
||||||
|
*/
|
||||||
|
public static List<Long> getNextBlockMarkerOffsets(InputStream rawIn)
|
||||||
|
throws IOException {
|
||||||
|
try (CBZip2InputStream in = new CBZip2InputStream(rawIn, BYBLOCK)) {
|
||||||
|
ArrayList<Long> offsets = new ArrayList<>();
|
||||||
|
while (in.skipToNextBlockMarker()) {
|
||||||
|
offsets.add(in.getProcessedByteCount());
|
||||||
|
}
|
||||||
|
return offsets;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,91 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.io.compress.bzip2;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.io.compress.bzip2.BZip2TextFileWriter.BLOCK_SIZE;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public final class TestBZip2TextFileWriter {
|
||||||
|
|
||||||
|
private static final byte[] DELIMITER = new byte[] {'\0'};
|
||||||
|
|
||||||
|
private ByteArrayOutputStream rawOut;
|
||||||
|
private BZip2TextFileWriter writer;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
rawOut = new ByteArrayOutputStream();
|
||||||
|
writer = new BZip2TextFileWriter(rawOut);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
rawOut = null;
|
||||||
|
writer.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void writingSingleBlockSizeOfData() throws Exception {
|
||||||
|
writer.writeRecord(BLOCK_SIZE, DELIMITER);
|
||||||
|
writer.close();
|
||||||
|
|
||||||
|
List<Long> nextBlocks = getNextBlockMarkerOffsets();
|
||||||
|
assertEquals(0, nextBlocks.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void justExceedingBeyondBlockSize() throws Exception {
|
||||||
|
writer.writeRecord(BLOCK_SIZE + 1, DELIMITER);
|
||||||
|
writer.close();
|
||||||
|
|
||||||
|
List<Long> nextBlocks = getNextBlockMarkerOffsets();
|
||||||
|
assertEquals(1, nextBlocks.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void writingTwoBlockSizesOfData() throws Exception {
|
||||||
|
writer.writeRecord(2 * BLOCK_SIZE, DELIMITER);
|
||||||
|
writer.close();
|
||||||
|
|
||||||
|
List<Long> nextBlocks = getNextBlockMarkerOffsets();
|
||||||
|
assertEquals(1, nextBlocks.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void justExceedingBeyondTwoBlocks() throws Exception {
|
||||||
|
writer.writeRecord(2 * BLOCK_SIZE + 1, DELIMITER);
|
||||||
|
writer.close();
|
||||||
|
|
||||||
|
List<Long> nextBlocks = getNextBlockMarkerOffsets();
|
||||||
|
assertEquals(2, nextBlocks.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<Long> getNextBlockMarkerOffsets() throws IOException {
|
||||||
|
ByteArrayInputStream in = new ByteArrayInputStream(rawOut.toByteArray());
|
||||||
|
return BZip2Utils.getNextBlockMarkerOffsets(in);
|
||||||
|
}
|
||||||
|
}
|
|
@ -127,6 +127,8 @@ public class CompressedSplitLineReader extends SplitLineReader {
|
||||||
@Override
|
@Override
|
||||||
protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
|
protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
boolean alreadyReadAfterSplit = didReadAfterSplit();
|
||||||
|
|
||||||
int bytesRead = in.read(buffer);
|
int bytesRead = in.read(buffer);
|
||||||
|
|
||||||
// If the split ended in the middle of a record delimiter then we need
|
// If the split ended in the middle of a record delimiter then we need
|
||||||
|
@ -135,7 +137,9 @@ public class CompressedSplitLineReader extends SplitLineReader {
|
||||||
// However if using the default delimiter and the next character is a
|
// However if using the default delimiter and the next character is a
|
||||||
// linefeed then next split will treat it as a delimiter all by itself
|
// linefeed then next split will treat it as a delimiter all by itself
|
||||||
// and the additional record read should not be performed.
|
// and the additional record read should not be performed.
|
||||||
if (inDelimiter && bytesRead > 0) {
|
boolean justReadAfterSplit = !alreadyReadAfterSplit && didReadAfterSplit();
|
||||||
|
|
||||||
|
if (justReadAfterSplit && inDelimiter && bytesRead > 0) {
|
||||||
if (usingCRLF) {
|
if (usingCRLF) {
|
||||||
needAdditionalRecord = (buffer[0] != '\n');
|
needAdditionalRecord = (buffer[0] != '\n');
|
||||||
} else {
|
} else {
|
||||||
|
@ -152,7 +156,7 @@ public class CompressedSplitLineReader extends SplitLineReader {
|
||||||
if (!finished) {
|
if (!finished) {
|
||||||
// only allow at most one more record to be read after the stream
|
// only allow at most one more record to be read after the stream
|
||||||
// reports the split ended
|
// reports the split ended
|
||||||
if (scin.getPos() > scin.getAdjustedEnd()) {
|
if (didReadAfterSplit()) {
|
||||||
finished = true;
|
finished = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -170,4 +174,8 @@ public class CompressedSplitLineReader extends SplitLineReader {
|
||||||
protected void unsetNeedAdditionalRecordAfterSplit() {
|
protected void unsetNeedAdditionalRecordAfterSplit() {
|
||||||
needAdditionalRecord = false;
|
needAdditionalRecord = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean didReadAfterSplit() throws IOException {
|
||||||
|
return scin.getPos() > scin.getAdjustedEnd();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,54 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.io.LongWritable;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.input.BaseLineRecordReaderHelper;
|
||||||
|
|
||||||
|
public final class LineRecordReaderHelper extends
|
||||||
|
BaseLineRecordReaderHelper {
|
||||||
|
|
||||||
|
public LineRecordReaderHelper(Path filePath, Configuration conf) {
|
||||||
|
super(filePath, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long countRecords(long start, long length) throws IOException {
|
||||||
|
try (LineRecordReader reader = newReader(start, length)) {
|
||||||
|
LongWritable key = new LongWritable();
|
||||||
|
Text value = new Text();
|
||||||
|
|
||||||
|
long numRecords = 0L;
|
||||||
|
while (reader.next(key, value)) {
|
||||||
|
numRecords++;
|
||||||
|
}
|
||||||
|
return numRecords;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private LineRecordReader newReader(long start, long length)
|
||||||
|
throws IOException {
|
||||||
|
FileSplit split = new FileSplit(getFilePath(), start, length, (String[]) null);
|
||||||
|
return new LineRecordReader(getConf(), split, getRecordDelimiterBytes());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,31 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.input.BaseLineRecordReaderHelper;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.input.BaseTestLineRecordReaderBZip2;
|
||||||
|
|
||||||
|
public final class TestLineRecordReaderBZip2 extends
|
||||||
|
BaseTestLineRecordReaderBZip2 {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected BaseLineRecordReaderHelper newReader(Path file) {
|
||||||
|
return new LineRecordReaderHelper(file, getConf());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,58 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.mapreduce.lib.input;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
public abstract class BaseLineRecordReaderHelper {
|
||||||
|
|
||||||
|
private final Configuration conf;
|
||||||
|
private final Path filePath;
|
||||||
|
private final byte[] recordDelimiterBytes;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
public BaseLineRecordReaderHelper(Path filePath, Configuration conf) {
|
||||||
|
this.conf = conf;
|
||||||
|
this.filePath = filePath;
|
||||||
|
|
||||||
|
conf.setInt(LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
|
||||||
|
|
||||||
|
String delimiter = conf.get("textinputformat.record.delimiter");
|
||||||
|
this.recordDelimiterBytes =
|
||||||
|
null != delimiter ? delimiter.getBytes(StandardCharsets.UTF_8) : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract long countRecords(long start, long length) throws IOException;
|
||||||
|
|
||||||
|
public Configuration getConf() {
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Path getFilePath() {
|
||||||
|
return filePath;
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte[] getRecordDelimiterBytes() {
|
||||||
|
return recordDelimiterBytes;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,375 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.mapreduce.lib.input;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
|
||||||
|
import static org.apache.hadoop.io.compress.bzip2.BZip2TextFileWriter.BLOCK_SIZE;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.io.compress.bzip2.BZip2Utils;
|
||||||
|
import org.apache.hadoop.io.compress.bzip2.BZip2TextFileWriter;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public abstract class BaseTestLineRecordReaderBZip2 {
|
||||||
|
|
||||||
|
// LF stands for line feed
|
||||||
|
private static final byte[] LF = new byte[] {'\n'};
|
||||||
|
// CR stands for cartridge return
|
||||||
|
private static final byte[] CR = new byte[] {'\r'};
|
||||||
|
private static final byte[] CR_LF = new byte[] {'\r', '\n'};
|
||||||
|
|
||||||
|
private Configuration conf;
|
||||||
|
private FileSystem fs;
|
||||||
|
private Path tempFile;
|
||||||
|
|
||||||
|
public Configuration getConf() {
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
public FileSystem getFs() {
|
||||||
|
return fs;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Path getTempFile() {
|
||||||
|
return tempFile;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
conf = new Configuration();
|
||||||
|
|
||||||
|
Path workDir = new Path(
|
||||||
|
System.getProperty("test.build.data", "target"),
|
||||||
|
"data/" + getClass().getSimpleName());
|
||||||
|
|
||||||
|
fs = workDir.getFileSystem(conf);
|
||||||
|
|
||||||
|
Path inputDir = new Path(workDir, "input");
|
||||||
|
tempFile = new Path(inputDir, "test.txt.bz2");
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
fs.delete(tempFile, /* recursive */ false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void firstBlockEndsWithLF() throws Exception {
|
||||||
|
try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
|
||||||
|
writer.writeManyRecords(BLOCK_SIZE, 1000, LF);
|
||||||
|
writer.writeRecord(10, LF);
|
||||||
|
writer.writeRecord(10, LF);
|
||||||
|
writer.writeRecord(10, LF);
|
||||||
|
}
|
||||||
|
assertRecordCountsPerSplit(tempFile, new long[] {1001, 2});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void firstBlockEndsWithLFSecondBlockStartsWithLF() throws Exception {
|
||||||
|
try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
|
||||||
|
writer.writeManyRecords(BLOCK_SIZE, 1000, LF);
|
||||||
|
// Write 254 empty rows terminating at LF, as those records will get
|
||||||
|
// rolled into the first block record due to run-length encoding, the
|
||||||
|
// 255th LF character will trigger a run to be written to the block. We
|
||||||
|
// only need 254 LF characters since the last byte written by prior
|
||||||
|
// writeManyRecords call is already a LF.
|
||||||
|
writer.writeManyRecords(254, 254, LF);
|
||||||
|
|
||||||
|
// This LF character should be the first byte of the second block, but
|
||||||
|
// if splitting at blocks, the first split will read this record as the
|
||||||
|
// additional record.
|
||||||
|
writer.writeRecord(1, LF);
|
||||||
|
|
||||||
|
writer.writeRecord(10, LF);
|
||||||
|
writer.writeRecord(10, LF);
|
||||||
|
}
|
||||||
|
assertRecordCountsPerSplit(tempFile, new long[] {1255, 2});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void firstBlockEndsWithLFSecondBlockStartsWithCR() throws Exception {
|
||||||
|
try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
|
||||||
|
writer.writeManyRecords(BLOCK_SIZE, 1000, LF);
|
||||||
|
writer.writeRecord(1, CR);
|
||||||
|
writer.writeRecord(10, LF);
|
||||||
|
writer.writeRecord(10, LF);
|
||||||
|
}
|
||||||
|
assertRecordCountsPerSplit(tempFile, new long[] {1001, 2});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void firstBlockEndsWithCRLF() throws Exception {
|
||||||
|
try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
|
||||||
|
writer.writeManyRecords(BLOCK_SIZE, 1000, CR_LF);
|
||||||
|
writer.writeRecord(10, LF);
|
||||||
|
writer.writeRecord(10, LF);
|
||||||
|
writer.writeRecord(10, LF);
|
||||||
|
}
|
||||||
|
assertRecordCountsPerSplit(tempFile, new long[] {1001, 2});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void lastRecordContentSpanAcrossBlocks()
|
||||||
|
throws Exception {
|
||||||
|
try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
|
||||||
|
writer.writeManyRecords(BLOCK_SIZE - 50, 999, LF);
|
||||||
|
writer.writeRecord(100, LF);
|
||||||
|
writer.writeRecord(10, LF);
|
||||||
|
writer.writeRecord(10, LF);
|
||||||
|
writer.writeRecord(10, LF);
|
||||||
|
}
|
||||||
|
assertRecordCountsPerSplit(tempFile, new long[] {1000, 3});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void lastRecordOfBlockHasItsLFInNextBlock() throws Exception {
|
||||||
|
try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
|
||||||
|
writer.writeManyRecords(BLOCK_SIZE - 50, 999, LF);
|
||||||
|
// The LF character is the first byte of the second block
|
||||||
|
writer.writeRecord(51, LF);
|
||||||
|
writer.writeRecord(10, LF);
|
||||||
|
writer.writeRecord(10, LF);
|
||||||
|
writer.writeRecord(10, LF);
|
||||||
|
}
|
||||||
|
assertRecordCountsPerSplit(tempFile, new long[] {1000, 3});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void lastRecordOfFirstBlockHasItsCRLFInSecondBlock() throws Exception {
|
||||||
|
try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
|
||||||
|
writer.writeManyRecords(BLOCK_SIZE - 50, 999, LF);
|
||||||
|
// Both CR + LF characters are the first two bytes of second block
|
||||||
|
writer.writeRecord(52, CR_LF);
|
||||||
|
writer.writeRecord(10, LF);
|
||||||
|
writer.writeRecord(10, LF);
|
||||||
|
writer.writeRecord(10, LF);
|
||||||
|
}
|
||||||
|
assertRecordCountsPerSplit(tempFile, new long[] {1000, 3});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void lastRecordOfFirstBlockHasItsCRLFPartlyInSecondBlock()
|
||||||
|
throws Exception {
|
||||||
|
try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
|
||||||
|
writer.writeManyRecords(BLOCK_SIZE - 50, 999, LF);
|
||||||
|
// The CR character is the last byte of the first block and the LF is
|
||||||
|
// the firs byte of the second block
|
||||||
|
writer.writeRecord(51, CR_LF);
|
||||||
|
writer.writeRecord(10, LF);
|
||||||
|
writer.writeRecord(10, LF);
|
||||||
|
writer.writeRecord(10, LF);
|
||||||
|
}
|
||||||
|
assertRecordCountsPerSplit(tempFile, new long[] {1000, 3});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void lastByteInFirstBlockIsCRFirstByteInSecondBlockIsNotLF()
|
||||||
|
throws Exception {
|
||||||
|
try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
|
||||||
|
writer.writeManyRecords(BLOCK_SIZE, 1000, CR);
|
||||||
|
writer.writeRecord(10, LF);
|
||||||
|
writer.writeRecord(10, LF);
|
||||||
|
writer.writeRecord(10, LF);
|
||||||
|
}
|
||||||
|
assertRecordCountsPerSplit(tempFile, new long[] {1001, 2});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void usingCRDelimiterWithSmallestBufferSize() throws Exception {
|
||||||
|
// Forces calling LineReader#fillBuffer for ever byte read
|
||||||
|
conf.set(IO_FILE_BUFFER_SIZE_KEY, "1");
|
||||||
|
|
||||||
|
try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
|
||||||
|
writer.writeManyRecords(BLOCK_SIZE - 50, 999, CR);
|
||||||
|
writer.writeRecord(100, CR);
|
||||||
|
writer.writeRecord(10, CR);
|
||||||
|
writer.writeRecord(10, CR);
|
||||||
|
writer.writeRecord(10, CR);
|
||||||
|
}
|
||||||
|
assertRecordCountsPerSplit(tempFile, new long[] {1000, 3});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void delimitedByCRSpanningThreeBlocks() throws Exception {
|
||||||
|
try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
|
||||||
|
writer.writeRecord(3 * BLOCK_SIZE, CR);
|
||||||
|
writer.writeRecord(3 * BLOCK_SIZE, CR);
|
||||||
|
writer.writeRecord(3 * BLOCK_SIZE, CR);
|
||||||
|
}
|
||||||
|
assertRecordCountsPerSplit(tempFile,
|
||||||
|
new long[] {1, 0, 1, 0, 0, 1, 0, 0, 0});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void customDelimiterLastThreeBytesInBlockAreDelimiter()
|
||||||
|
throws Exception {
|
||||||
|
byte[] delimiter = new byte[] {'e', 'n', 'd'};
|
||||||
|
setDelimiter(delimiter);
|
||||||
|
|
||||||
|
try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
|
||||||
|
writer.writeManyRecords(BLOCK_SIZE, 1000, delimiter);
|
||||||
|
writer.writeRecord(10, delimiter);
|
||||||
|
writer.writeRecord(10, delimiter);
|
||||||
|
writer.writeRecord(10, delimiter);
|
||||||
|
}
|
||||||
|
assertRecordCountsPerSplit(tempFile, new long[] {1001, 2});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void customDelimiterDelimiterSpansAcrossBlocks()
|
||||||
|
throws Exception {
|
||||||
|
byte[] delimiter = new byte[] {'e', 'n', 'd'};
|
||||||
|
setDelimiter(delimiter);
|
||||||
|
|
||||||
|
try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
|
||||||
|
writer.writeManyRecords(BLOCK_SIZE - 50, 999, delimiter);
|
||||||
|
writer.writeRecord(52, delimiter);
|
||||||
|
writer.writeRecord(10, delimiter);
|
||||||
|
writer.writeRecord(10, delimiter);
|
||||||
|
writer.writeRecord(10, delimiter);
|
||||||
|
}
|
||||||
|
assertRecordCountsPerSplit(tempFile, new long[] {1001, 2});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void customDelimiterLastRecordDelimiterStartsAtNextBlockStart()
|
||||||
|
throws Exception {
|
||||||
|
byte[] delimiter = new byte[] {'e', 'n', 'd'};
|
||||||
|
setDelimiter(delimiter);
|
||||||
|
|
||||||
|
try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
|
||||||
|
writer.writeManyRecords(BLOCK_SIZE - 50, 999, delimiter);
|
||||||
|
writer.writeRecord(53, delimiter);
|
||||||
|
writer.writeRecord(10, delimiter);
|
||||||
|
writer.writeRecord(10, delimiter);
|
||||||
|
writer.writeRecord(10, delimiter);
|
||||||
|
}
|
||||||
|
assertRecordCountsPerSplit(tempFile, new long[] {1000, 3});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void customDelimiterLastBlockBytesShareCommonPrefixWithDelimiter()
|
||||||
|
throws Exception {
|
||||||
|
byte[] delimiter = new byte[] {'e', 'n', 'd'};
|
||||||
|
setDelimiter(delimiter);
|
||||||
|
|
||||||
|
try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) {
|
||||||
|
writer.writeManyRecords(BLOCK_SIZE - 4, 999, delimiter);
|
||||||
|
// The first 4 bytes, "an e", will be the last 4 bytes of the first block,
|
||||||
|
// the last byte being 'e' which matches the first character of the
|
||||||
|
// delimiter "end". The first byte of the next block also matches the
|
||||||
|
// second byte of the delimiter "n"; however the next character "c" does
|
||||||
|
// not match the last character of the delimiter. Thus an additional
|
||||||
|
// record should not be read for the split that reads the first block.
|
||||||
|
// The split that reads the second block will just discard
|
||||||
|
// "nchanting tale coming to an end".
|
||||||
|
writer.write("an enchanting tale coming to an end");
|
||||||
|
writer.writeRecord(10, delimiter);
|
||||||
|
writer.writeRecord(10, delimiter);
|
||||||
|
writer.writeRecord(10, delimiter);
|
||||||
|
}
|
||||||
|
assertRecordCountsPerSplit(tempFile, new long[] {1000, 3});
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract BaseLineRecordReaderHelper newReader(Path file);
|
||||||
|
|
||||||
|
private void assertRecordCountsPerSplit(
|
||||||
|
Path path, long[] countsIfSplitAtBlocks) throws IOException {
|
||||||
|
RecordCountAssert countAssert =
|
||||||
|
new RecordCountAssert(path, countsIfSplitAtBlocks);
|
||||||
|
countAssert.assertSingleSplit();
|
||||||
|
countAssert.assertSplittingAtBlocks();
|
||||||
|
countAssert.assertSplittingJustAfterSecondBlockStarts();
|
||||||
|
}
|
||||||
|
|
||||||
|
private class RecordCountAssert {
|
||||||
|
|
||||||
|
private final BaseLineRecordReaderHelper reader;
|
||||||
|
private final long numBlocks;
|
||||||
|
private final long[] countsIfSplitAtBlocks;
|
||||||
|
private final long fileSize;
|
||||||
|
private final long totalRecords;
|
||||||
|
private final List<Long> nextBlockOffsets;
|
||||||
|
|
||||||
|
RecordCountAssert(
|
||||||
|
Path path, long[] countsIfSplitAtBlocks) throws IOException {
|
||||||
|
this.reader = newReader(path);
|
||||||
|
this.countsIfSplitAtBlocks = countsIfSplitAtBlocks;
|
||||||
|
this.fileSize = getFileSize(path);
|
||||||
|
this.totalRecords = Arrays.stream(countsIfSplitAtBlocks).sum();
|
||||||
|
this.numBlocks = countsIfSplitAtBlocks.length;
|
||||||
|
this.nextBlockOffsets = BZip2Utils.getNextBlockMarkerOffsets(path, conf);
|
||||||
|
|
||||||
|
assertEquals(numBlocks, nextBlockOffsets.size() + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertSingleSplit() throws IOException {
|
||||||
|
assertEquals(totalRecords, reader.countRecords(0, fileSize));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertSplittingAtBlocks() throws IOException {
|
||||||
|
for (int i = 0; i < numBlocks; i++) {
|
||||||
|
long start = i == 0 ? 0 : nextBlockOffsets.get(i - 1);
|
||||||
|
long end = i == numBlocks - 1 ? fileSize : nextBlockOffsets.get(i);
|
||||||
|
long length = end - start;
|
||||||
|
|
||||||
|
String message = "At i=" + i;
|
||||||
|
long expectedCount = countsIfSplitAtBlocks[i];
|
||||||
|
assertEquals(
|
||||||
|
message, expectedCount, reader.countRecords(start, length));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertSplittingJustAfterSecondBlockStarts()
|
||||||
|
throws IOException {
|
||||||
|
if (numBlocks <= 1) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
long recordsInFirstTwoBlocks =
|
||||||
|
countsIfSplitAtBlocks[0] + countsIfSplitAtBlocks[1];
|
||||||
|
long remainingRecords = totalRecords - recordsInFirstTwoBlocks;
|
||||||
|
|
||||||
|
long firstSplitSize = nextBlockOffsets.get(0) + 1;
|
||||||
|
assertEquals(
|
||||||
|
recordsInFirstTwoBlocks,
|
||||||
|
reader.countRecords(0, firstSplitSize));
|
||||||
|
assertEquals(
|
||||||
|
remainingRecords,
|
||||||
|
reader.countRecords(firstSplitSize, fileSize - firstSplitSize));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private long getFileSize(Path path) throws IOException {
|
||||||
|
return fs.getFileStatus(path).getLen();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setDelimiter(byte[] delimiter) {
|
||||||
|
conf.set("textinputformat.record.delimiter",
|
||||||
|
new String(delimiter, StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,62 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.mapreduce.lib.input;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
|
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
|
||||||
|
|
||||||
|
public final class LineRecordReaderHelper extends
|
||||||
|
BaseLineRecordReaderHelper {
|
||||||
|
|
||||||
|
public LineRecordReaderHelper(Path filePath, Configuration conf) {
|
||||||
|
super(filePath, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long countRecords(long start, long length) throws IOException {
|
||||||
|
try (LineRecordReader reader = newReader(start, length)) {
|
||||||
|
long numRecords = 0L;
|
||||||
|
while (reader.nextKeyValue()) {
|
||||||
|
numRecords++;
|
||||||
|
}
|
||||||
|
return numRecords;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private LineRecordReader newReader(long start, long length)
|
||||||
|
throws IOException {
|
||||||
|
FileSplit split = new FileSplit(getFilePath(), start, length, null);
|
||||||
|
|
||||||
|
TaskAttemptContext context =
|
||||||
|
new TaskAttemptContextImpl(getConf(), new TaskAttemptID());
|
||||||
|
|
||||||
|
LineRecordReader reader = new LineRecordReader(getRecordDelimiterBytes());
|
||||||
|
try {
|
||||||
|
reader.initialize(split, context);
|
||||||
|
return reader;
|
||||||
|
} catch (Throwable e) {
|
||||||
|
reader.close();
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,29 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.mapreduce.lib.input;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
public final class TestLineRecordReaderBZip2
|
||||||
|
extends BaseTestLineRecordReaderBZip2 {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected BaseLineRecordReaderHelper newReader(Path file) {
|
||||||
|
return new LineRecordReaderHelper(file, getConf());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue