HDFS-7358. Clients may get stuck waiting when using ByteArrayManager.

This commit is contained in:
Tsz-Wo Nicholas Sze 2014-11-13 12:28:44 -08:00
parent aee68b67f3
commit 394ba94c5d
5 changed files with 89 additions and 43 deletions

View File

@ -421,6 +421,9 @@ Release 2.7.0 - UNRELEASED
HDFS-6938. Cleanup javac warnings in FSNamesystem (Charles Lamb via wheat9)
HDFS-7358. Clients may get stuck waiting when using ByteArrayManager.
(szetszwo)
Release 2.6.0 - 2014-11-15
INCOMPATIBLE CHANGES

View File

@ -262,7 +262,9 @@ public class DFSOutputStream extends FSOutputSummer
maxChunks = chunksPerPkt;
}
void writeData(byte[] inarray, int off, int len) {
synchronized void writeData(byte[] inarray, int off, int len)
throws ClosedChannelException {
checkBuffer();
if (dataPos + len > buf.length) {
throw new BufferOverflowException();
}
@ -270,7 +272,9 @@ public class DFSOutputStream extends FSOutputSummer
dataPos += len;
}
void writeChecksum(byte[] inarray, int off, int len) {
synchronized void writeChecksum(byte[] inarray, int off, int len)
throws ClosedChannelException {
checkBuffer();
if (len == 0) {
return;
}
@ -284,7 +288,9 @@ public class DFSOutputStream extends FSOutputSummer
/**
* Write the full packet, including the header, to the given output stream.
*/
void writeTo(DataOutputStream stm) throws IOException {
synchronized void writeTo(DataOutputStream stm) throws IOException {
checkBuffer();
final int dataLen = dataPos - dataStart;
final int checksumLen = checksumPos - checksumStart;
final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
@ -326,7 +332,13 @@ public class DFSOutputStream extends FSOutputSummer
}
}
private void releaseBuffer(ByteArrayManager bam) {
private synchronized void checkBuffer() throws ClosedChannelException {
if (buf == null) {
throw new ClosedChannelException();
}
}
private synchronized void releaseBuffer(ByteArrayManager bam) {
bam.release(buf);
buf = null;
}
@ -712,7 +724,7 @@ public class DFSOutputStream extends FSOutputSummer
closeResponder(); // close and join
closeStream();
streamerClosed = true;
closed = true;
setClosed();
synchronized (dataQueue) {
dataQueue.notifyAll();
}
@ -1616,8 +1628,9 @@ public class DFSOutputStream extends FSOutputSummer
return sock;
}
@Override
protected void checkClosed() throws IOException {
if (closed) {
if (isClosed()) {
IOException e = lastException.get();
throw e != null ? e : new ClosedChannelException();
}
@ -1827,7 +1840,7 @@ public class DFSOutputStream extends FSOutputSummer
synchronized (dataQueue) {
try {
// If queue is full, then wait till we have enough space
while (!closed && dataQueue.size() + ackQueue.size() > dfsClient.getConf().writeMaxPackets) {
while (!isClosed() && dataQueue.size() + ackQueue.size() > dfsClient.getConf().writeMaxPackets) {
try {
dataQueue.wait();
} catch (InterruptedException e) {
@ -2013,8 +2026,9 @@ public class DFSOutputStream extends FSOutputSummer
// So send an empty sync packet.
currentPacket = createPacket(packetSize, chunksPerPacket,
bytesCurBlock, currentSeqno++);
} else {
} else if (currentPacket != null) {
// just discard the current packet since it is already been sent.
currentPacket.releaseBuffer(byteArrayManager);
currentPacket = null;
}
}
@ -2071,7 +2085,7 @@ public class DFSOutputStream extends FSOutputSummer
} catch (IOException e) {
DFSClient.LOG.warn("Error while syncing", e);
synchronized (this) {
if (!closed) {
if (!isClosed()) {
lastException.set(new IOException("IOException flush:" + e));
closeThreads(true);
}
@ -2133,7 +2147,7 @@ public class DFSOutputStream extends FSOutputSummer
long begin = Time.monotonicNow();
try {
synchronized (dataQueue) {
while (!closed) {
while (!isClosed()) {
checkClosed();
if (lastAckedSeqno >= seqno) {
break;
@ -2166,7 +2180,7 @@ public class DFSOutputStream extends FSOutputSummer
* resources associated with this stream.
*/
synchronized void abort() throws IOException {
if (closed) {
if (isClosed()) {
return;
}
streamer.setLastException(new IOException("Lease timeout of "
@ -2175,6 +2189,25 @@ public class DFSOutputStream extends FSOutputSummer
dfsClient.endFileLease(fileId);
}
boolean isClosed() {
return closed;
}
void setClosed() {
closed = true;
synchronized (dataQueue) {
releaseBuffer(dataQueue, byteArrayManager);
releaseBuffer(ackQueue, byteArrayManager);
}
}
private static void releaseBuffer(List<Packet> packets, ByteArrayManager bam) {
for(Packet p : packets) {
p.releaseBuffer(bam);
}
packets.clear();
}
// shutdown datastreamer and responseprocessor threads.
// interrupt datastreamer if force is true
private void closeThreads(boolean force) throws IOException {
@ -2189,7 +2222,7 @@ public class DFSOutputStream extends FSOutputSummer
} finally {
streamer = null;
s = null;
closed = true;
setClosed();
}
}
@ -2199,7 +2232,7 @@ public class DFSOutputStream extends FSOutputSummer
*/
@Override
public synchronized void close() throws IOException {
if (closed) {
if (isClosed()) {
IOException e = lastException.getAndSet(null);
if (e == null)
return;
@ -2229,7 +2262,7 @@ public class DFSOutputStream extends FSOutputSummer
dfsClient.endFileLease(fileId);
} catch (ClosedChannelException e) {
} finally {
closed = true;
setClosed();
}
}

View File

@ -200,12 +200,7 @@ public abstract class ByteArrayManager {
debugMessage.get().append(", ").append(this);
}
if (numAllocated == maxAllocated) {
if (LOG.isDebugEnabled()) {
debugMessage.get().append(", notifyAll");
}
notifyAll();
}
notify();
numAllocated--;
if (numAllocated < 0) {
// it is possible to drop below 0 since
@ -346,12 +341,13 @@ public abstract class ByteArrayManager {
* the number of allocated arrays drops to below the capacity.
*
* The byte array allocated by this method must be returned for recycling
* via the {@link ByteArrayManager#recycle(byte[])} method.
* via the {@link Impl#release(byte[])} method.
*
* @return a byte array with length larger than or equal to the given length.
*/
@Override
public byte[] newByteArray(final int arrayLength) throws InterruptedException {
Preconditions.checkArgument(arrayLength >= 0);
if (LOG.isDebugEnabled()) {
debugMessage.get().append("allocate(").append(arrayLength).append(")");
}
@ -375,6 +371,7 @@ public abstract class ByteArrayManager {
}
if (LOG.isDebugEnabled()) {
debugMessage.get().append(", return byte[").append(array.length).append("]");
logDebugMessage();
}
return array;
@ -384,7 +381,9 @@ public abstract class ByteArrayManager {
* Recycle the given byte array.
*
* The byte array may or may not be allocated
* by the {@link ByteArrayManager#allocate(int)} method.
* by the {@link Impl#newByteArray(int)} method.
*
* This is a non-blocking call.
*/
@Override
public int release(final byte[] array) {

View File

@ -340,11 +340,11 @@ public class TestHFlush {
// If we made it past the hflush(), then that means that the ack made it back
// from the pipeline before we got to the wait() call. In that case we should
// still have interrupted status.
assertTrue(Thread.currentThread().interrupted());
assertTrue(Thread.interrupted());
} catch (InterruptedIOException ie) {
System.out.println("Got expected exception during flush");
}
assertFalse(Thread.currentThread().interrupted());
assertFalse(Thread.interrupted());
// Try again to flush should succeed since we no longer have interrupt status
stm.hflush();
@ -362,11 +362,11 @@ public class TestHFlush {
// If we made it past the close(), then that means that the ack made it back
// from the pipeline before we got to the wait() call. In that case we should
// still have interrupted status.
assertTrue(Thread.currentThread().interrupted());
assertTrue(Thread.interrupted());
} catch (InterruptedIOException ioe) {
System.out.println("Got expected exception during close");
// If we got the exception, we shouldn't have interrupted status anymore.
assertFalse(Thread.currentThread().interrupted());
assertFalse(Thread.interrupted());
// Now do a successful close.
stm.close();
@ -374,7 +374,7 @@ public class TestHFlush {
// verify that entire file is good
AppendTestUtil.checkFullFile(fs, p, fileLen,
AppendTestUtil.checkFullFile(fs, p, 4,
fileContents, "Failed to deal with thread interruptions");
} finally {
cluster.shutdown();

View File

@ -27,6 +27,8 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@ -141,7 +143,7 @@ public class TestByteArrayManager {
{ // recycle half of the arrays
for(int i = 0; i < countThreshold/2; i++) {
recycler.submit(removeLast(allocator.futures));
recycler.submit(removeLast(allocator.futures).get());
}
for(Future<Integer> f : recycler.furtures) {
@ -186,8 +188,8 @@ public class TestByteArrayManager {
}
// recycle an array
recycler.submit(removeLast(allocator.futures));
Assert.assertEquals(1, removeLast(recycler.furtures).intValue());
recycler.submit(removeLast(allocator.futures).get());
Assert.assertEquals(1, removeLast(recycler.furtures).get().intValue());
// check if the thread is unblocked
Thread.sleep(100);
@ -207,11 +209,11 @@ public class TestByteArrayManager {
}
}
static <T> T removeLast(List<Future<T>> furtures) throws Exception {
static <T> Future<T> removeLast(List<Future<T>> furtures) throws Exception {
return remove(furtures, furtures.size() - 1);
}
static <T> T remove(List<Future<T>> furtures, int i) throws Exception {
return furtures.isEmpty()? null: furtures.remove(i).get();
static <T> Future<T> remove(List<Future<T>> furtures, int i) throws Exception {
return furtures.isEmpty()? null: furtures.remove(i);
}
static <T> void waitForAll(List<Future<T>> furtures) throws Exception {
@ -320,12 +322,13 @@ public class TestByteArrayManager {
final Runner[] runners = new Runner[Runner.NUM_RUNNERS];
final Thread[] threads = new Thread[runners.length];
final int num = 1 << 8;
final int num = 1 << 10;
for(int i = 0; i < runners.length; i++) {
runners[i] = new Runner(i, countThreshold, countLimit, pool, i, bam);
threads[i] = runners[i].start(num);
}
final List<Exception> exceptions = new ArrayList<Exception>();
final Thread randomRecycler = new Thread() {
@Override
public void run() {
@ -336,10 +339,11 @@ public class TestByteArrayManager {
runners[j].recycle();
} catch (Exception e) {
e.printStackTrace();
Assert.fail(this + " has " + e);
exceptions.add(new Exception(this + " has an exception", e));
}
if ((i & 0xFF) == 0) {
LOG.info("randomRecycler sleep, i=" + i);
sleepMs(100);
}
}
@ -361,6 +365,7 @@ public class TestByteArrayManager {
randomRecycler.start();
randomRecycler.join();
Assert.assertTrue(exceptions.isEmpty());
Assert.assertNull(counters.get(0, false));
for(int i = 1; i < runners.length; i++) {
@ -392,7 +397,7 @@ public class TestByteArrayManager {
}
static class Runner implements Runnable {
static final int NUM_RUNNERS = 4;
static final int NUM_RUNNERS = 5;
static int index2arrayLength(int index) {
return ByteArrayManager.MIN_ARRAY_LENGTH << (index - 1);
@ -453,16 +458,22 @@ public class TestByteArrayManager {
return f;
}
byte[] removeFirst() throws Exception {
Future<byte[]> removeFirst() throws Exception {
synchronized (arrays) {
return remove(arrays, 0);
}
}
void recycle() throws Exception {
final byte[] a = removeFirst();
if (a != null) {
recycle(a);
final Future<byte[]> f = removeFirst();
if (f != null) {
printf("randomRecycler: ");
try {
recycle(f.get(10, TimeUnit.MILLISECONDS));
} catch(TimeoutException e) {
recycle(new byte[maxArrayLength]);
printf("timeout, new byte[%d]\n", maxArrayLength);
}
}
}
@ -490,9 +501,9 @@ public class TestByteArrayManager {
submitAllocate();
} else {
try {
final byte[] a = removeFirst();
if (a != null) {
submitRecycle(a);
final Future<byte[]> f = removeFirst();
if (f != null) {
submitRecycle(f.get());
}
} catch (Exception e) {
e.printStackTrace();