diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractNativeRawDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractNativeRawDecoder.java index e84574709f7..cb71a80d5f5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractNativeRawDecoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractNativeRawDecoder.java @@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * Abstract native raw decoder for all native coders to extend with. @@ -34,36 +35,46 @@ abstract class AbstractNativeRawDecoder extends RawErasureDecoder { public static Logger LOG = LoggerFactory.getLogger(AbstractNativeRawDecoder.class); + // Protect ISA-L coder data structure in native layer from being accessed and + // updated concurrently by the init, release and decode functions. + protected final ReentrantReadWriteLock decoderLock = + new ReentrantReadWriteLock(); + public AbstractNativeRawDecoder(ErasureCoderOptions coderOptions) { super(coderOptions); } @Override - protected synchronized void doDecode(ByteBufferDecodingState decodingState) + protected void doDecode(ByteBufferDecodingState decodingState) throws IOException { - if (nativeCoder == 0) { - throw new IOException(String.format("%s closed", - getClass().getSimpleName())); - } - int[] inputOffsets = new int[decodingState.inputs.length]; - int[] outputOffsets = new int[decodingState.outputs.length]; - - ByteBuffer buffer; - for (int i = 0; i < decodingState.inputs.length; ++i) { - buffer = decodingState.inputs[i]; - if (buffer != null) { - inputOffsets[i] = buffer.position(); + decoderLock.readLock().lock(); + try { + if (nativeCoder == 0) { + throw new IOException(String.format("%s closed", + getClass().getSimpleName())); } - } + int[] inputOffsets = new int[decodingState.inputs.length]; + int[] outputOffsets = new int[decodingState.outputs.length]; - for (int i = 0; i < decodingState.outputs.length; ++i) { - buffer = decodingState.outputs[i]; - outputOffsets[i] = buffer.position(); - } + ByteBuffer buffer; + for (int i = 0; i < decodingState.inputs.length; ++i) { + buffer = decodingState.inputs[i]; + if (buffer != null) { + inputOffsets[i] = buffer.position(); + } + } - performDecodeImpl(decodingState.inputs, inputOffsets, - decodingState.decodeLength, decodingState.erasedIndexes, - decodingState.outputs, outputOffsets); + for (int i = 0; i < decodingState.outputs.length; ++i) { + buffer = decodingState.outputs[i]; + outputOffsets[i] = buffer.position(); + } + + performDecodeImpl(decodingState.inputs, inputOffsets, + decodingState.decodeLength, decodingState.erasedIndexes, + decodingState.outputs, outputOffsets); + } finally { + decoderLock.readLock().unlock(); + } } protected abstract void performDecodeImpl(ByteBuffer[] inputs, diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractNativeRawEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractNativeRawEncoder.java index cab53839b34..44d89c2a1c0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractNativeRawEncoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractNativeRawEncoder.java @@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * Abstract native raw encoder for all native coders to extend with. @@ -34,34 +35,44 @@ abstract class AbstractNativeRawEncoder extends RawErasureEncoder { public static Logger LOG = LoggerFactory.getLogger(AbstractNativeRawEncoder.class); + // Protect ISA-L coder data structure in native layer from being accessed and + // updated concurrently by the init, release and encode functions. + protected final ReentrantReadWriteLock encoderLock = + new ReentrantReadWriteLock(); + public AbstractNativeRawEncoder(ErasureCoderOptions coderOptions) { super(coderOptions); } @Override - protected synchronized void doEncode(ByteBufferEncodingState encodingState) + protected void doEncode(ByteBufferEncodingState encodingState) throws IOException { - if (nativeCoder == 0) { - throw new IOException(String.format("%s closed", - getClass().getSimpleName())); - } - int[] inputOffsets = new int[encodingState.inputs.length]; - int[] outputOffsets = new int[encodingState.outputs.length]; - int dataLen = encodingState.inputs[0].remaining(); + encoderLock.readLock().lock(); + try { + if (nativeCoder == 0) { + throw new IOException(String.format("%s closed", + getClass().getSimpleName())); + } + int[] inputOffsets = new int[encodingState.inputs.length]; + int[] outputOffsets = new int[encodingState.outputs.length]; + int dataLen = encodingState.inputs[0].remaining(); - ByteBuffer buffer; - for (int i = 0; i < encodingState.inputs.length; ++i) { - buffer = encodingState.inputs[i]; - inputOffsets[i] = buffer.position(); - } + ByteBuffer buffer; + for (int i = 0; i < encodingState.inputs.length; ++i) { + buffer = encodingState.inputs[i]; + inputOffsets[i] = buffer.position(); + } - for (int i = 0; i < encodingState.outputs.length; ++i) { - buffer = encodingState.outputs[i]; - outputOffsets[i] = buffer.position(); - } + for (int i = 0; i < encodingState.outputs.length; ++i) { + buffer = encodingState.outputs[i]; + outputOffsets[i] = buffer.position(); + } - performEncodeImpl(encodingState.inputs, inputOffsets, dataLen, - encodingState.outputs, outputOffsets); + performEncodeImpl(encodingState.inputs, inputOffsets, dataLen, + encodingState.outputs, outputOffsets); + } finally { + encoderLock.readLock().unlock(); + } } protected abstract void performEncodeImpl( diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/NativeRSRawDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/NativeRSRawDecoder.java index 85722223039..dc2c33a6bf6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/NativeRSRawDecoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/NativeRSRawDecoder.java @@ -36,19 +36,30 @@ public class NativeRSRawDecoder extends AbstractNativeRawDecoder { public NativeRSRawDecoder(ErasureCoderOptions coderOptions) { super(coderOptions); - initImpl(coderOptions.getNumDataUnits(), coderOptions.getNumParityUnits()); + decoderLock.writeLock().lock(); + try { + initImpl(coderOptions.getNumDataUnits(), + coderOptions.getNumParityUnits()); + } finally { + decoderLock.writeLock().unlock(); + } } @Override - protected synchronized void performDecodeImpl( + protected void performDecodeImpl( ByteBuffer[] inputs, int[] inputOffsets, int dataLen, int[] erased, ByteBuffer[] outputs, int[] outputOffsets) throws IOException { decodeImpl(inputs, inputOffsets, dataLen, erased, outputs, outputOffsets); } @Override - public synchronized void release() { - destroyImpl(); + public void release() { + decoderLock.writeLock().lock(); + try { + destroyImpl(); + } finally { + decoderLock.writeLock().unlock(); + } } @Override diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/NativeRSRawEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/NativeRSRawEncoder.java index 754ec884102..ad06927ffe3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/NativeRSRawEncoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/NativeRSRawEncoder.java @@ -36,19 +36,30 @@ public class NativeRSRawEncoder extends AbstractNativeRawEncoder { public NativeRSRawEncoder(ErasureCoderOptions coderOptions) { super(coderOptions); - initImpl(coderOptions.getNumDataUnits(), coderOptions.getNumParityUnits()); + encoderLock.writeLock().lock(); + try { + initImpl(coderOptions.getNumDataUnits(), + coderOptions.getNumParityUnits()); + } finally { + encoderLock.writeLock().unlock(); + } } @Override - protected synchronized void performEncodeImpl( + protected void performEncodeImpl( ByteBuffer[] inputs, int[] inputOffsets, int dataLen, ByteBuffer[] outputs, int[] outputOffsets) throws IOException { encodeImpl(inputs, inputOffsets, dataLen, outputs, outputOffsets); } @Override - public synchronized void release() { - destroyImpl(); + public void release() { + encoderLock.writeLock().lock(); + try { + destroyImpl(); + } finally { + encoderLock.writeLock().unlock(); + } } @Override diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/NativeXORRawDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/NativeXORRawDecoder.java index 17630424985..dd708eb53e1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/NativeXORRawDecoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/NativeXORRawDecoder.java @@ -36,19 +36,30 @@ public class NativeXORRawDecoder extends AbstractNativeRawDecoder { public NativeXORRawDecoder(ErasureCoderOptions coderOptions) { super(coderOptions); - initImpl(coderOptions.getNumDataUnits(), coderOptions.getNumParityUnits()); + decoderLock.writeLock().lock(); + try { + initImpl(coderOptions.getNumDataUnits(), + coderOptions.getNumParityUnits()); + } finally { + decoderLock.writeLock().unlock(); + } } @Override - protected synchronized void performDecodeImpl( + protected void performDecodeImpl( ByteBuffer[] inputs, int[] inputOffsets, int dataLen, int[] erased, ByteBuffer[] outputs, int[] outputOffsets) throws IOException { decodeImpl(inputs, inputOffsets, dataLen, erased, outputs, outputOffsets); } @Override - public synchronized void release() { - destroyImpl(); + public void release() { + decoderLock.writeLock().lock(); + try { + destroyImpl(); + } finally { + decoderLock.writeLock().unlock(); + } } private native void initImpl(int numDataUnits, int numParityUnits); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/NativeXORRawEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/NativeXORRawEncoder.java index 7f4265b2fa5..66b0a1bff7d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/NativeXORRawEncoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/NativeXORRawEncoder.java @@ -36,19 +36,30 @@ public class NativeXORRawEncoder extends AbstractNativeRawEncoder { public NativeXORRawEncoder(ErasureCoderOptions coderOptions) { super(coderOptions); - initImpl(coderOptions.getNumDataUnits(), coderOptions.getNumParityUnits()); + encoderLock.writeLock().lock(); + try { + initImpl(coderOptions.getNumDataUnits(), + coderOptions.getNumParityUnits()); + } finally { + encoderLock.writeLock().unlock(); + } } @Override - protected synchronized void performEncodeImpl( + protected void performEncodeImpl( ByteBuffer[] inputs, int[] inputOffsets, int dataLen, ByteBuffer[] outputs, int[] outputOffsets) throws IOException { encodeImpl(inputs, inputOffsets, dataLen, outputs, outputOffsets); } @Override - public synchronized void release() { - destroyImpl(); + public void release() { + encoderLock.writeLock().lock(); + try { + destroyImpl(); + } finally { + encoderLock.writeLock().unlock(); + } } private native void initImpl(int numDataUnits, int numParityUnits); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureCoderBenchmark.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureCoderBenchmark.java index c005e77cb3a..df8c54b9cdd 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureCoderBenchmark.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureCoderBenchmark.java @@ -230,6 +230,12 @@ public final class RawErasureCoderBenchmark { throw e; } finally { executor.shutdown(); + if (encoder != null) { + encoder.release(); + } + if (decoder != null) { + decoder.release(); + } } }