MAPREDUCE-5918. LineRecordReader can return the same decompressor to CodecPool multiple times (Sergey Murylev via raviprak)
This commit is contained in:
parent
beb184ac58
commit
45692effe4
|
@ -17,9 +17,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.io.compress;
|
package org.apache.hadoop.io.compress;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.HashSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.Set;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
@ -47,15 +47,15 @@ public class CodecPool {
|
||||||
* A global compressor pool used to save the expensive
|
* A global compressor pool used to save the expensive
|
||||||
* construction/destruction of (possibly native) decompression codecs.
|
* construction/destruction of (possibly native) decompression codecs.
|
||||||
*/
|
*/
|
||||||
private static final Map<Class<Compressor>, List<Compressor>> compressorPool =
|
private static final Map<Class<Compressor>, Set<Compressor>> compressorPool =
|
||||||
new HashMap<Class<Compressor>, List<Compressor>>();
|
new HashMap<Class<Compressor>, Set<Compressor>>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A global decompressor pool used to save the expensive
|
* A global decompressor pool used to save the expensive
|
||||||
* construction/destruction of (possibly native) decompression codecs.
|
* construction/destruction of (possibly native) decompression codecs.
|
||||||
*/
|
*/
|
||||||
private static final Map<Class<Decompressor>, List<Decompressor>> decompressorPool =
|
private static final Map<Class<Decompressor>, Set<Decompressor>> decompressorPool =
|
||||||
new HashMap<Class<Decompressor>, List<Decompressor>>();
|
new HashMap<Class<Decompressor>, Set<Decompressor>>();
|
||||||
|
|
||||||
private static <T> LoadingCache<Class<T>, AtomicInteger> createCache(
|
private static <T> LoadingCache<Class<T>, AtomicInteger> createCache(
|
||||||
Class<T> klass) {
|
Class<T> klass) {
|
||||||
|
@ -80,20 +80,21 @@ public class CodecPool {
|
||||||
private static final LoadingCache<Class<Decompressor>, AtomicInteger> decompressorCounts =
|
private static final LoadingCache<Class<Decompressor>, AtomicInteger> decompressorCounts =
|
||||||
createCache(Decompressor.class);
|
createCache(Decompressor.class);
|
||||||
|
|
||||||
private static <T> T borrow(Map<Class<T>, List<T>> pool,
|
private static <T> T borrow(Map<Class<T>, Set<T>> pool,
|
||||||
Class<? extends T> codecClass) {
|
Class<? extends T> codecClass) {
|
||||||
T codec = null;
|
T codec = null;
|
||||||
|
|
||||||
// Check if an appropriate codec is available
|
// Check if an appropriate codec is available
|
||||||
List<T> codecList;
|
Set<T> codecSet;
|
||||||
synchronized (pool) {
|
synchronized (pool) {
|
||||||
codecList = pool.get(codecClass);
|
codecSet = pool.get(codecClass);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (codecList != null) {
|
if (codecSet != null) {
|
||||||
synchronized (codecList) {
|
synchronized (codecSet) {
|
||||||
if (!codecList.isEmpty()) {
|
if (!codecSet.isEmpty()) {
|
||||||
codec = codecList.remove(codecList.size() - 1);
|
codec = codecSet.iterator().next();
|
||||||
|
codecSet.remove(codec);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -101,22 +102,23 @@ public class CodecPool {
|
||||||
return codec;
|
return codec;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <T> void payback(Map<Class<T>, List<T>> pool, T codec) {
|
private static <T> boolean payback(Map<Class<T>, Set<T>> pool, T codec) {
|
||||||
if (codec != null) {
|
if (codec != null) {
|
||||||
Class<T> codecClass = ReflectionUtils.getClass(codec);
|
Class<T> codecClass = ReflectionUtils.getClass(codec);
|
||||||
List<T> codecList;
|
Set<T> codecSet;
|
||||||
synchronized (pool) {
|
synchronized (pool) {
|
||||||
codecList = pool.get(codecClass);
|
codecSet = pool.get(codecClass);
|
||||||
if (codecList == null) {
|
if (codecSet == null) {
|
||||||
codecList = new ArrayList<T>();
|
codecSet = new HashSet<T>();
|
||||||
pool.put(codecClass, codecList);
|
pool.put(codecClass, codecSet);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized (codecList) {
|
synchronized (codecSet) {
|
||||||
codecList.add(codec);
|
return codecSet.add(codec);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
|
@ -200,8 +202,9 @@ public class CodecPool {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
compressor.reset();
|
compressor.reset();
|
||||||
payback(compressorPool, compressor);
|
if (payback(compressorPool, compressor)) {
|
||||||
updateLeaseCount(compressorCounts, compressor, -1);
|
updateLeaseCount(compressorCounts, compressor, -1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -219,8 +222,9 @@ public class CodecPool {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
decompressor.reset();
|
decompressor.reset();
|
||||||
payback(decompressorPool, decompressor);
|
if (payback(decompressorPool, decompressor)) {
|
||||||
updateLeaseCount(decompressorCounts, decompressor, -1);
|
updateLeaseCount(decompressorCounts, decompressor, -1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -19,15 +19,9 @@ package org.apache.hadoop.io.compress;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.CompletionService;
|
|
||||||
import java.util.concurrent.ExecutorCompletionService;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
|
||||||
import java.util.concurrent.LinkedBlockingDeque;
|
import java.util.concurrent.LinkedBlockingDeque;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@ -35,6 +29,9 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
public class TestCodecPool {
|
public class TestCodecPool {
|
||||||
private final String LEASE_COUNT_ERR =
|
private final String LEASE_COUNT_ERR =
|
||||||
"Incorrect number of leased (de)compressors";
|
"Incorrect number of leased (de)compressors";
|
||||||
|
@ -61,6 +58,25 @@ public class TestCodecPool {
|
||||||
CodecPool.returnCompressor(comp1);
|
CodecPool.returnCompressor(comp1);
|
||||||
assertEquals(LEASE_COUNT_ERR, 0,
|
assertEquals(LEASE_COUNT_ERR, 0,
|
||||||
CodecPool.getLeasedCompressorsCount(codec));
|
CodecPool.getLeasedCompressorsCount(codec));
|
||||||
|
|
||||||
|
CodecPool.returnCompressor(comp1);
|
||||||
|
assertEquals(LEASE_COUNT_ERR, 0,
|
||||||
|
CodecPool.getLeasedCompressorsCount(codec));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 1000)
|
||||||
|
public void testCompressorNotReturnSameInstance() {
|
||||||
|
Compressor comp = CodecPool.getCompressor(codec);
|
||||||
|
CodecPool.returnCompressor(comp);
|
||||||
|
CodecPool.returnCompressor(comp);
|
||||||
|
Set<Compressor> compressors = new HashSet<Compressor>();
|
||||||
|
for (int i = 0; i < 10; ++i) {
|
||||||
|
compressors.add(CodecPool.getCompressor(codec));
|
||||||
|
}
|
||||||
|
assertEquals(10, compressors.size());
|
||||||
|
for (Compressor compressor : compressors) {
|
||||||
|
CodecPool.returnCompressor(compressor);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 1000)
|
@Test(timeout = 1000)
|
||||||
|
@ -78,6 +94,10 @@ public class TestCodecPool {
|
||||||
CodecPool.returnDecompressor(decomp1);
|
CodecPool.returnDecompressor(decomp1);
|
||||||
assertEquals(LEASE_COUNT_ERR, 0,
|
assertEquals(LEASE_COUNT_ERR, 0,
|
||||||
CodecPool.getLeasedDecompressorsCount(codec));
|
CodecPool.getLeasedDecompressorsCount(codec));
|
||||||
|
|
||||||
|
CodecPool.returnDecompressor(decomp1);
|
||||||
|
assertEquals(LEASE_COUNT_ERR, 0,
|
||||||
|
CodecPool.getLeasedCompressorsCount(codec));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 1000)
|
@Test(timeout = 1000)
|
||||||
|
@ -154,4 +174,19 @@ public class TestCodecPool {
|
||||||
assertEquals(LEASE_COUNT_ERR, 0,
|
assertEquals(LEASE_COUNT_ERR, 0,
|
||||||
CodecPool.getLeasedDecompressorsCount(codec));
|
CodecPool.getLeasedDecompressorsCount(codec));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 1000)
|
||||||
|
public void testDecompressorNotReturnSameInstance() {
|
||||||
|
Decompressor decomp = CodecPool.getDecompressor(codec);
|
||||||
|
CodecPool.returnDecompressor(decomp);
|
||||||
|
CodecPool.returnDecompressor(decomp);
|
||||||
|
Set<Decompressor> decompressors = new HashSet<Decompressor>();
|
||||||
|
for (int i = 0; i < 10; ++i) {
|
||||||
|
decompressors.add(CodecPool.getDecompressor(codec));
|
||||||
|
}
|
||||||
|
assertEquals(10, decompressors.size());
|
||||||
|
for (Decompressor decompressor : decompressors) {
|
||||||
|
CodecPool.returnDecompressor(decompressor);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,6 +12,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
|
MAPREDUCE-5918. LineRecordReader can return the same decompressor to CodecPool
|
||||||
|
multiple times (Sergey Murylev via raviprak)
|
||||||
|
|
||||||
Release 2.6.0 - 2014-11-18
|
Release 2.6.0 - 2014-11-18
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -284,6 +284,7 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
|
||||||
} finally {
|
} finally {
|
||||||
if (decompressor != null) {
|
if (decompressor != null) {
|
||||||
CodecPool.returnDecompressor(decompressor);
|
CodecPool.returnDecompressor(decompressor);
|
||||||
|
decompressor = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -232,6 +232,7 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
|
||||||
} finally {
|
} finally {
|
||||||
if (decompressor != null) {
|
if (decompressor != null) {
|
||||||
CodecPool.returnDecompressor(decompressor);
|
CodecPool.returnDecompressor(decompressor);
|
||||||
|
decompressor = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,12 +27,17 @@ import java.io.FileInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
|
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.io.LongWritable;
|
import org.apache.hadoop.io.LongWritable;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.io.compress.BZip2Codec;
|
||||||
|
import org.apache.hadoop.io.compress.CodecPool;
|
||||||
|
import org.apache.hadoop.io.compress.Decompressor;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestLineRecordReader {
|
public class TestLineRecordReader {
|
||||||
|
@ -225,4 +230,36 @@ public class TestLineRecordReader {
|
||||||
|
|
||||||
assertTrue("BOM is not skipped", skipBOM);
|
assertTrue("BOM is not skipped", skipBOM);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipleClose() throws IOException {
|
||||||
|
URL testFileUrl = getClass().getClassLoader().
|
||||||
|
getResource("recordSpanningMultipleSplits.txt.bz2");
|
||||||
|
assertNotNull("Cannot find recordSpanningMultipleSplits.txt.bz2",
|
||||||
|
testFileUrl);
|
||||||
|
File testFile = new File(testFileUrl.getFile());
|
||||||
|
Path testFilePath = new Path(testFile.getAbsolutePath());
|
||||||
|
long testFileSize = testFile.length();
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt(org.apache.hadoop.mapreduce.lib.input.
|
||||||
|
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
|
||||||
|
FileSplit split = new FileSplit(testFilePath, 0, testFileSize,
|
||||||
|
(String[])null);
|
||||||
|
|
||||||
|
LineRecordReader reader = new LineRecordReader(conf, split);
|
||||||
|
LongWritable key = new LongWritable();
|
||||||
|
Text value = new Text();
|
||||||
|
//noinspection StatementWithEmptyBody
|
||||||
|
while (reader.next(key, value)) ;
|
||||||
|
reader.close();
|
||||||
|
reader.close();
|
||||||
|
|
||||||
|
BZip2Codec codec = new BZip2Codec();
|
||||||
|
codec.setConf(conf);
|
||||||
|
Set<Decompressor> decompressors = new HashSet<Decompressor>();
|
||||||
|
for (int i = 0; i < 10; ++i) {
|
||||||
|
decompressors.add(CodecPool.getDecompressor(codec));
|
||||||
|
}
|
||||||
|
assertEquals(10, decompressors.size());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,10 +27,15 @@ import java.io.FileInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
|
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.io.compress.BZip2Codec;
|
||||||
|
import org.apache.hadoop.io.compress.CodecPool;
|
||||||
|
import org.apache.hadoop.io.compress.Decompressor;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
|
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
|
||||||
|
@ -231,4 +236,37 @@ public class TestLineRecordReader {
|
||||||
|
|
||||||
assertTrue("BOM is not skipped", skipBOM);
|
assertTrue("BOM is not skipped", skipBOM);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipleClose() throws IOException {
|
||||||
|
URL testFileUrl = getClass().getClassLoader().
|
||||||
|
getResource("recordSpanningMultipleSplits.txt.bz2");
|
||||||
|
assertNotNull("Cannot find recordSpanningMultipleSplits.txt.bz2",
|
||||||
|
testFileUrl);
|
||||||
|
File testFile = new File(testFileUrl.getFile());
|
||||||
|
Path testFilePath = new Path(testFile.getAbsolutePath());
|
||||||
|
long testFileSize = testFile.length();
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt(org.apache.hadoop.mapreduce.lib.input.
|
||||||
|
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
|
||||||
|
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
|
||||||
|
|
||||||
|
// read the data and check whether BOM is skipped
|
||||||
|
FileSplit split = new FileSplit(testFilePath, 0, testFileSize, null);
|
||||||
|
LineRecordReader reader = new LineRecordReader();
|
||||||
|
reader.initialize(split, context);
|
||||||
|
|
||||||
|
//noinspection StatementWithEmptyBody
|
||||||
|
while (reader.nextKeyValue()) ;
|
||||||
|
reader.close();
|
||||||
|
reader.close();
|
||||||
|
|
||||||
|
BZip2Codec codec = new BZip2Codec();
|
||||||
|
codec.setConf(conf);
|
||||||
|
Set<Decompressor> decompressors = new HashSet<Decompressor>();
|
||||||
|
for (int i = 0; i < 10; ++i) {
|
||||||
|
decompressors.add(CodecPool.getDecompressor(codec));
|
||||||
|
}
|
||||||
|
assertEquals(10, decompressors.size());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue