HADOOP-10681. Remove unnecessary synchronization from Snappy & Zlib codecs. Contributed by Gopal Vijayaraghavan.

(cherry picked from commit 8f9ab998e2)
This commit is contained in:
Arun C. Murthy 2014-10-05 07:38:21 -07:00
parent 1329d1e701
commit 7dd508b880
6 changed files with 145 additions and 52 deletions

View File

@ -293,6 +293,12 @@ Release 2.6.0 - UNRELEASED
HADOOP-11111 MiniKDC to use locale EN_US for case conversions. (stevel) HADOOP-11111 MiniKDC to use locale EN_US for case conversions. (stevel)
HADOOP-10731. Remove @date JavaDoc comment in ProgramDriver class (Henry
Saputra via aw)
HADOOP-10681. Remove unnecessary synchronization from Snappy & Zlib
codecs. (Gopal Vijayaraghavan via acmurthy)
BUG FIXES BUG FIXES
HADOOP-10781. Unportable getgrouplist() usage breaks FreeBSD (Dmitry HADOOP-10781. Unportable getgrouplist() usage breaks FreeBSD (Dmitry

View File

@ -100,7 +100,7 @@ public SnappyCompressor() {
* @param len Length * @param len Length
*/ */
@Override @Override
public synchronized void setInput(byte[] b, int off, int len) { public void setInput(byte[] b, int off, int len) {
if (b == null) { if (b == null) {
throw new NullPointerException(); throw new NullPointerException();
} }
@ -127,7 +127,7 @@ public synchronized void setInput(byte[] b, int off, int len) {
* aside to be loaded by this function while the compressed data are * aside to be loaded by this function while the compressed data are
* consumed. * consumed.
*/ */
synchronized void setInputFromSavedData() { void setInputFromSavedData() {
if (0 >= userBufLen) { if (0 >= userBufLen) {
return; return;
} }
@ -146,7 +146,7 @@ synchronized void setInputFromSavedData() {
* Does nothing. * Does nothing.
*/ */
@Override @Override
public synchronized void setDictionary(byte[] b, int off, int len) { public void setDictionary(byte[] b, int off, int len) {
// do nothing // do nothing
} }
@ -158,7 +158,7 @@ public synchronized void setDictionary(byte[] b, int off, int len) {
* #setInput() should be called in order to provide more input. * #setInput() should be called in order to provide more input.
*/ */
@Override @Override
public synchronized boolean needsInput() { public boolean needsInput() {
return !(compressedDirectBuf.remaining() > 0 return !(compressedDirectBuf.remaining() > 0
|| uncompressedDirectBuf.remaining() == 0 || userBufLen > 0); || uncompressedDirectBuf.remaining() == 0 || userBufLen > 0);
} }
@ -168,7 +168,7 @@ public synchronized boolean needsInput() {
* with the current contents of the input buffer. * with the current contents of the input buffer.
*/ */
@Override @Override
public synchronized void finish() { public void finish() {
finish = true; finish = true;
} }
@ -180,7 +180,7 @@ public synchronized void finish() {
* data output stream has been reached. * data output stream has been reached.
*/ */
@Override @Override
public synchronized boolean finished() { public boolean finished() {
// Check if all uncompressed data has been consumed // Check if all uncompressed data has been consumed
return (finish && finished && compressedDirectBuf.remaining() == 0); return (finish && finished && compressedDirectBuf.remaining() == 0);
} }
@ -197,7 +197,7 @@ public synchronized boolean finished() {
* @return The actual number of bytes of compressed data. * @return The actual number of bytes of compressed data.
*/ */
@Override @Override
public synchronized int compress(byte[] b, int off, int len) public int compress(byte[] b, int off, int len)
throws IOException { throws IOException {
if (b == null) { if (b == null) {
throw new NullPointerException(); throw new NullPointerException();
@ -250,7 +250,7 @@ public synchronized int compress(byte[] b, int off, int len)
* Resets compressor so that a new set of input data can be processed. * Resets compressor so that a new set of input data can be processed.
*/ */
@Override @Override
public synchronized void reset() { public void reset() {
finish = false; finish = false;
finished = false; finished = false;
uncompressedDirectBuf.clear(); uncompressedDirectBuf.clear();
@ -268,7 +268,7 @@ public synchronized void reset() {
* @param conf Configuration from which new setting are fetched * @param conf Configuration from which new setting are fetched
*/ */
@Override @Override
public synchronized void reinit(Configuration conf) { public void reinit(Configuration conf) {
reset(); reset();
} }
@ -276,7 +276,7 @@ public synchronized void reinit(Configuration conf) {
* Return number of bytes given to this compressor since last reset. * Return number of bytes given to this compressor since last reset.
*/ */
@Override @Override
public synchronized long getBytesRead() { public long getBytesRead() {
return bytesRead; return bytesRead;
} }
@ -284,7 +284,7 @@ public synchronized long getBytesRead() {
* Return number of bytes consumed by callers of compress since last reset. * Return number of bytes consumed by callers of compress since last reset.
*/ */
@Override @Override
public synchronized long getBytesWritten() { public long getBytesWritten() {
return bytesWritten; return bytesWritten;
} }
@ -292,7 +292,7 @@ public synchronized long getBytesWritten() {
* Closes the compressor and discards any unprocessed input. * Closes the compressor and discards any unprocessed input.
*/ */
@Override @Override
public synchronized void end() { public void end() {
} }
private native static void initIDs(); private native static void initIDs();

View File

@ -103,7 +103,7 @@ public SnappyDecompressor() {
* @param len Length * @param len Length
*/ */
@Override @Override
public synchronized void setInput(byte[] b, int off, int len) { public void setInput(byte[] b, int off, int len) {
if (b == null) { if (b == null) {
throw new NullPointerException(); throw new NullPointerException();
} }
@ -127,7 +127,7 @@ public synchronized void setInput(byte[] b, int off, int len) {
* aside to be loaded by this function while the compressed data are * aside to be loaded by this function while the compressed data are
* consumed. * consumed.
*/ */
synchronized void setInputFromSavedData() { void setInputFromSavedData() {
compressedDirectBufLen = Math.min(userBufLen, directBufferSize); compressedDirectBufLen = Math.min(userBufLen, directBufferSize);
// Reinitialize snappy's input direct buffer // Reinitialize snappy's input direct buffer
@ -144,7 +144,7 @@ synchronized void setInputFromSavedData() {
* Does nothing. * Does nothing.
*/ */
@Override @Override
public synchronized void setDictionary(byte[] b, int off, int len) { public void setDictionary(byte[] b, int off, int len) {
// do nothing // do nothing
} }
@ -158,7 +158,7 @@ public synchronized void setDictionary(byte[] b, int off, int len) {
* order to provide more input. * order to provide more input.
*/ */
@Override @Override
public synchronized boolean needsInput() { public boolean needsInput() {
// Consume remaining compressed data? // Consume remaining compressed data?
if (uncompressedDirectBuf.remaining() > 0) { if (uncompressedDirectBuf.remaining() > 0) {
return false; return false;
@ -183,7 +183,7 @@ public synchronized boolean needsInput() {
* @return <code>false</code>. * @return <code>false</code>.
*/ */
@Override @Override
public synchronized boolean needsDictionary() { public boolean needsDictionary() {
return false; return false;
} }
@ -195,7 +195,7 @@ public synchronized boolean needsDictionary() {
* data output stream has been reached. * data output stream has been reached.
*/ */
@Override @Override
public synchronized boolean finished() { public boolean finished() {
return (finished && uncompressedDirectBuf.remaining() == 0); return (finished && uncompressedDirectBuf.remaining() == 0);
} }
@ -212,7 +212,7 @@ public synchronized boolean finished() {
* @throws IOException * @throws IOException
*/ */
@Override @Override
public synchronized int decompress(byte[] b, int off, int len) public int decompress(byte[] b, int off, int len)
throws IOException { throws IOException {
if (b == null) { if (b == null) {
throw new NullPointerException(); throw new NullPointerException();
@ -257,13 +257,13 @@ public synchronized int decompress(byte[] b, int off, int len)
* @return <code>0</code>. * @return <code>0</code>.
*/ */
@Override @Override
public synchronized int getRemaining() { public int getRemaining() {
// Never use this function in BlockDecompressorStream. // Never use this function in BlockDecompressorStream.
return 0; return 0;
} }
@Override @Override
public synchronized void reset() { public void reset() {
finished = false; finished = false;
compressedDirectBufLen = 0; compressedDirectBufLen = 0;
uncompressedDirectBuf.limit(directBufferSize); uncompressedDirectBuf.limit(directBufferSize);
@ -276,7 +276,7 @@ public synchronized void reset() {
* input data can be processed. * input data can be processed.
*/ */
@Override @Override
public synchronized void end() { public void end() {
// do nothing // do nothing
} }
@ -333,7 +333,7 @@ public void reset() {
private boolean endOfInput; private boolean endOfInput;
@Override @Override
public synchronized void decompress(ByteBuffer src, ByteBuffer dst) public void decompress(ByteBuffer src, ByteBuffer dst)
throws IOException { throws IOException {
assert dst.isDirect() : "dst.isDirect()"; assert dst.isDirect() : "dst.isDirect()";
assert src.isDirect() : "src.isDirect()"; assert src.isDirect() : "src.isDirect()";
@ -343,13 +343,13 @@ public synchronized void decompress(ByteBuffer src, ByteBuffer dst)
} }
@Override @Override
public synchronized void setDictionary(byte[] b, int off, int len) { public void setDictionary(byte[] b, int off, int len) {
throw new UnsupportedOperationException( throw new UnsupportedOperationException(
"byte[] arrays are not supported for DirectDecompressor"); "byte[] arrays are not supported for DirectDecompressor");
} }
@Override @Override
public synchronized int decompress(byte[] b, int off, int len) { public int decompress(byte[] b, int off, int len) {
throw new UnsupportedOperationException( throw new UnsupportedOperationException(
"byte[] arrays are not supported for DirectDecompressor"); "byte[] arrays are not supported for DirectDecompressor");
} }

View File

@ -243,7 +243,7 @@ public ZlibCompressor(CompressionLevel level, CompressionStrategy strategy,
* @param conf Configuration storing new settings * @param conf Configuration storing new settings
*/ */
@Override @Override
public synchronized void reinit(Configuration conf) { public void reinit(Configuration conf) {
reset(); reset();
if (conf == null) { if (conf == null) {
return; return;
@ -260,7 +260,7 @@ public synchronized void reinit(Configuration conf) {
} }
@Override @Override
public synchronized void setInput(byte[] b, int off, int len) { public void setInput(byte[] b, int off, int len) {
if (b== null) { if (b== null) {
throw new NullPointerException(); throw new NullPointerException();
} }
@ -280,7 +280,7 @@ public synchronized void setInput(byte[] b, int off, int len) {
} }
//copy enough data from userBuf to uncompressedDirectBuf //copy enough data from userBuf to uncompressedDirectBuf
synchronized void setInputFromSavedData() { void setInputFromSavedData() {
int len = Math.min(userBufLen, uncompressedDirectBuf.remaining()); int len = Math.min(userBufLen, uncompressedDirectBuf.remaining());
((ByteBuffer)uncompressedDirectBuf).put(userBuf, userBufOff, len); ((ByteBuffer)uncompressedDirectBuf).put(userBuf, userBufOff, len);
userBufLen -= len; userBufLen -= len;
@ -289,7 +289,7 @@ synchronized void setInputFromSavedData() {
} }
@Override @Override
public synchronized void setDictionary(byte[] b, int off, int len) { public void setDictionary(byte[] b, int off, int len) {
if (stream == 0 || b == null) { if (stream == 0 || b == null) {
throw new NullPointerException(); throw new NullPointerException();
} }
@ -300,7 +300,7 @@ public synchronized void setDictionary(byte[] b, int off, int len) {
} }
@Override @Override
public synchronized boolean needsInput() { public boolean needsInput() {
// Consume remaining compressed data? // Consume remaining compressed data?
if (compressedDirectBuf.remaining() > 0) { if (compressedDirectBuf.remaining() > 0) {
return false; return false;
@ -329,19 +329,19 @@ public synchronized boolean needsInput() {
} }
@Override @Override
public synchronized void finish() { public void finish() {
finish = true; finish = true;
} }
@Override @Override
public synchronized boolean finished() { public boolean finished() {
// Check if 'zlib' says its 'finished' and // Check if 'zlib' says its 'finished' and
// all compressed data has been consumed // all compressed data has been consumed
return (finished && compressedDirectBuf.remaining() == 0); return (finished && compressedDirectBuf.remaining() == 0);
} }
@Override @Override
public synchronized int compress(byte[] b, int off, int len) public int compress(byte[] b, int off, int len)
throws IOException { throws IOException {
if (b == null) { if (b == null) {
throw new NullPointerException(); throw new NullPointerException();
@ -392,7 +392,7 @@ public synchronized int compress(byte[] b, int off, int len)
* @return the total (non-negative) number of compressed bytes output so far * @return the total (non-negative) number of compressed bytes output so far
*/ */
@Override @Override
public synchronized long getBytesWritten() { public long getBytesWritten() {
checkStream(); checkStream();
return getBytesWritten(stream); return getBytesWritten(stream);
} }
@ -403,13 +403,13 @@ public synchronized long getBytesWritten() {
* @return the total (non-negative) number of uncompressed bytes input so far * @return the total (non-negative) number of uncompressed bytes input so far
*/ */
@Override @Override
public synchronized long getBytesRead() { public long getBytesRead() {
checkStream(); checkStream();
return getBytesRead(stream); return getBytesRead(stream);
} }
@Override @Override
public synchronized void reset() { public void reset() {
checkStream(); checkStream();
reset(stream); reset(stream);
finish = false; finish = false;
@ -423,7 +423,7 @@ public synchronized void reset() {
} }
@Override @Override
public synchronized void end() { public void end() {
if (stream != 0) { if (stream != 0) {
end(stream); end(stream);
stream = 0; stream = 0;

View File

@ -120,7 +120,7 @@ public ZlibDecompressor() {
} }
@Override @Override
public synchronized void setInput(byte[] b, int off, int len) { public void setInput(byte[] b, int off, int len) {
if (b == null) { if (b == null) {
throw new NullPointerException(); throw new NullPointerException();
} }
@ -139,7 +139,7 @@ public synchronized void setInput(byte[] b, int off, int len) {
uncompressedDirectBuf.position(directBufferSize); uncompressedDirectBuf.position(directBufferSize);
} }
synchronized void setInputFromSavedData() { void setInputFromSavedData() {
compressedDirectBufOff = 0; compressedDirectBufOff = 0;
compressedDirectBufLen = userBufLen; compressedDirectBufLen = userBufLen;
if (compressedDirectBufLen > directBufferSize) { if (compressedDirectBufLen > directBufferSize) {
@ -157,7 +157,7 @@ synchronized void setInputFromSavedData() {
} }
@Override @Override
public synchronized void setDictionary(byte[] b, int off, int len) { public void setDictionary(byte[] b, int off, int len) {
if (stream == 0 || b == null) { if (stream == 0 || b == null) {
throw new NullPointerException(); throw new NullPointerException();
} }
@ -169,7 +169,7 @@ public synchronized void setDictionary(byte[] b, int off, int len) {
} }
@Override @Override
public synchronized boolean needsInput() { public boolean needsInput() {
// Consume remaining compressed data? // Consume remaining compressed data?
if (uncompressedDirectBuf.remaining() > 0) { if (uncompressedDirectBuf.remaining() > 0) {
return false; return false;
@ -189,19 +189,19 @@ public synchronized boolean needsInput() {
} }
@Override @Override
public synchronized boolean needsDictionary() { public boolean needsDictionary() {
return needDict; return needDict;
} }
@Override @Override
public synchronized boolean finished() { public boolean finished() {
// Check if 'zlib' says it's 'finished' and // Check if 'zlib' says it's 'finished' and
// all compressed data has been consumed // all compressed data has been consumed
return (finished && uncompressedDirectBuf.remaining() == 0); return (finished && uncompressedDirectBuf.remaining() == 0);
} }
@Override @Override
public synchronized int decompress(byte[] b, int off, int len) public int decompress(byte[] b, int off, int len)
throws IOException { throws IOException {
if (b == null) { if (b == null) {
throw new NullPointerException(); throw new NullPointerException();
@ -240,7 +240,7 @@ public synchronized int decompress(byte[] b, int off, int len)
* *
* @return the total (non-negative) number of uncompressed bytes output so far * @return the total (non-negative) number of uncompressed bytes output so far
*/ */
public synchronized long getBytesWritten() { public long getBytesWritten() {
checkStream(); checkStream();
return getBytesWritten(stream); return getBytesWritten(stream);
} }
@ -250,7 +250,7 @@ public synchronized long getBytesWritten() {
* *
* @return the total (non-negative) number of compressed bytes input so far * @return the total (non-negative) number of compressed bytes input so far
*/ */
public synchronized long getBytesRead() { public long getBytesRead() {
checkStream(); checkStream();
return getBytesRead(stream); return getBytesRead(stream);
} }
@ -263,7 +263,7 @@ public synchronized long getBytesRead() {
* @return the total (non-negative) number of unprocessed bytes in input * @return the total (non-negative) number of unprocessed bytes in input
*/ */
@Override @Override
public synchronized int getRemaining() { public int getRemaining() {
checkStream(); checkStream();
return userBufLen + getRemaining(stream); // userBuf + compressedDirectBuf return userBufLen + getRemaining(stream); // userBuf + compressedDirectBuf
} }
@ -272,7 +272,7 @@ public synchronized int getRemaining() {
* Resets everything including the input buffers (user and direct).</p> * Resets everything including the input buffers (user and direct).</p>
*/ */
@Override @Override
public synchronized void reset() { public void reset() {
checkStream(); checkStream();
reset(stream); reset(stream);
finished = false; finished = false;
@ -284,7 +284,7 @@ public synchronized void reset() {
} }
@Override @Override
public synchronized void end() { public void end() {
if (stream != 0) { if (stream != 0) {
end(stream); end(stream);
stream = 0; stream = 0;
@ -372,7 +372,7 @@ public void reset() {
private boolean endOfInput; private boolean endOfInput;
@Override @Override
public synchronized void decompress(ByteBuffer src, ByteBuffer dst) public void decompress(ByteBuffer src, ByteBuffer dst)
throws IOException { throws IOException {
assert dst.isDirect() : "dst.isDirect()"; assert dst.isDirect() : "dst.isDirect()";
assert src.isDirect() : "src.isDirect()"; assert src.isDirect() : "src.isDirect()";
@ -382,13 +382,13 @@ public synchronized void decompress(ByteBuffer src, ByteBuffer dst)
} }
@Override @Override
public synchronized void setDictionary(byte[] b, int off, int len) { public void setDictionary(byte[] b, int off, int len) {
throw new UnsupportedOperationException( throw new UnsupportedOperationException(
"byte[] arrays are not supported for DirectDecompressor"); "byte[] arrays are not supported for DirectDecompressor");
} }
@Override @Override
public synchronized int decompress(byte[] b, int off, int len) { public int decompress(byte[] b, int off, int len) {
throw new UnsupportedOperationException( throw new UnsupportedOperationException(
"byte[] arrays are not supported for DirectDecompressor"); "byte[] arrays are not supported for DirectDecompressor");
} }

View File

@ -19,6 +19,18 @@
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -67,4 +79,79 @@ public void testDecompressorPoolCounts() {
assertEquals(LEASE_COUNT_ERR, 0, assertEquals(LEASE_COUNT_ERR, 0,
CodecPool.getLeasedDecompressorsCount(codec)); CodecPool.getLeasedDecompressorsCount(codec));
} }
@Test(timeout = 1000)
public void testMultiThreadedCompressorPool() throws InterruptedException {
final int iterations = 4;
ExecutorService threadpool = Executors.newFixedThreadPool(3);
final LinkedBlockingDeque<Compressor> queue = new LinkedBlockingDeque<Compressor>(
2 * iterations);
Callable<Boolean> consumer = new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
Compressor c = queue.take();
CodecPool.returnCompressor(c);
return c != null;
}
};
Callable<Boolean> producer = new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
Compressor c = CodecPool.getCompressor(codec);
queue.put(c);
return c != null;
}
};
for (int i = 0; i < iterations; i++) {
threadpool.submit(consumer);
threadpool.submit(producer);
}
// wait for completion
threadpool.shutdown();
threadpool.awaitTermination(1000, TimeUnit.SECONDS);
assertEquals(LEASE_COUNT_ERR, 0, CodecPool.getLeasedCompressorsCount(codec));
}
@Test(timeout = 1000)
public void testMultiThreadedDecompressorPool() throws InterruptedException {
final int iterations = 4;
ExecutorService threadpool = Executors.newFixedThreadPool(3);
final LinkedBlockingDeque<Decompressor> queue = new LinkedBlockingDeque<Decompressor>(
2 * iterations);
Callable<Boolean> consumer = new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
Decompressor dc = queue.take();
CodecPool.returnDecompressor(dc);
return dc != null;
}
};
Callable<Boolean> producer = new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
Decompressor c = CodecPool.getDecompressor(codec);
queue.put(c);
return c != null;
}
};
for (int i = 0; i < iterations; i++) {
threadpool.submit(consumer);
threadpool.submit(producer);
}
// wait for completion
threadpool.shutdown();
threadpool.awaitTermination(1000, TimeUnit.SECONDS);
assertEquals(LEASE_COUNT_ERR, 0,
CodecPool.getLeasedDecompressorsCount(codec));
}
} }