diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 94c521d5033..e0b35a5d32e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -166,6 +166,9 @@ Release 2.7.0 - UNRELEASED HDFS-6938. Cleanup javac warnings in FSNamesystem (Charles Lamb via wheat9) + HDFS-7358. Clients may get stuck waiting when using ByteArrayManager. + (szetszwo) + Release 2.6.0 - 2014-11-15 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 92dbc8ebed7..8f18afe719f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -262,7 +262,9 @@ public class DFSOutputStream extends FSOutputSummer maxChunks = chunksPerPkt; } - void writeData(byte[] inarray, int off, int len) { + synchronized void writeData(byte[] inarray, int off, int len) + throws ClosedChannelException { + checkBuffer(); if (dataPos + len > buf.length) { throw new BufferOverflowException(); } @@ -270,7 +272,9 @@ public class DFSOutputStream extends FSOutputSummer dataPos += len; } - void writeChecksum(byte[] inarray, int off, int len) { + synchronized void writeChecksum(byte[] inarray, int off, int len) + throws ClosedChannelException { + checkBuffer(); if (len == 0) { return; } @@ -284,7 +288,9 @@ public class DFSOutputStream extends FSOutputSummer /** * Write the full packet, including the header, to the given output stream. */ - void writeTo(DataOutputStream stm) throws IOException { + synchronized void writeTo(DataOutputStream stm) throws IOException { + checkBuffer(); + final int dataLen = dataPos - dataStart; final int checksumLen = checksumPos - checksumStart; final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen; @@ -326,7 +332,13 @@ public class DFSOutputStream extends FSOutputSummer } } - private void releaseBuffer(ByteArrayManager bam) { + private synchronized void checkBuffer() throws ClosedChannelException { + if (buf == null) { + throw new ClosedChannelException(); + } + } + + private synchronized void releaseBuffer(ByteArrayManager bam) { bam.release(buf); buf = null; } @@ -712,7 +724,7 @@ public class DFSOutputStream extends FSOutputSummer closeResponder(); // close and join closeStream(); streamerClosed = true; - closed = true; + setClosed(); synchronized (dataQueue) { dataQueue.notifyAll(); } @@ -1616,8 +1628,9 @@ public class DFSOutputStream extends FSOutputSummer return sock; } + @Override protected void checkClosed() throws IOException { - if (closed) { + if (isClosed()) { IOException e = lastException.get(); throw e != null ? e : new ClosedChannelException(); } @@ -1827,7 +1840,7 @@ public class DFSOutputStream extends FSOutputSummer synchronized (dataQueue) { try { // If queue is full, then wait till we have enough space - while (!closed && dataQueue.size() + ackQueue.size() > dfsClient.getConf().writeMaxPackets) { + while (!isClosed() && dataQueue.size() + ackQueue.size() > dfsClient.getConf().writeMaxPackets) { try { dataQueue.wait(); } catch (InterruptedException e) { @@ -2019,8 +2032,9 @@ public class DFSOutputStream extends FSOutputSummer // So send an empty sync packet. currentPacket = createPacket(packetSize, chunksPerPacket, bytesCurBlock, currentSeqno++); - } else { + } else if (currentPacket != null) { // just discard the current packet since it is already been sent. + currentPacket.releaseBuffer(byteArrayManager); currentPacket = null; } } @@ -2077,7 +2091,7 @@ public class DFSOutputStream extends FSOutputSummer } catch (IOException e) { DFSClient.LOG.warn("Error while syncing", e); synchronized (this) { - if (!closed) { + if (!isClosed()) { lastException.set(new IOException("IOException flush:" + e)); closeThreads(true); } @@ -2139,7 +2153,7 @@ public class DFSOutputStream extends FSOutputSummer long begin = Time.monotonicNow(); try { synchronized (dataQueue) { - while (!closed) { + while (!isClosed()) { checkClosed(); if (lastAckedSeqno >= seqno) { break; @@ -2172,7 +2186,7 @@ public class DFSOutputStream extends FSOutputSummer * resources associated with this stream. */ synchronized void abort() throws IOException { - if (closed) { + if (isClosed()) { return; } streamer.setLastException(new IOException("Lease timeout of " @@ -2181,6 +2195,25 @@ public class DFSOutputStream extends FSOutputSummer dfsClient.endFileLease(fileId); } + boolean isClosed() { + return closed; + } + + void setClosed() { + closed = true; + synchronized (dataQueue) { + releaseBuffer(dataQueue, byteArrayManager); + releaseBuffer(ackQueue, byteArrayManager); + } + } + + private static void releaseBuffer(List packets, ByteArrayManager bam) { + for(Packet p : packets) { + p.releaseBuffer(bam); + } + packets.clear(); + } + // shutdown datastreamer and responseprocessor threads. // interrupt datastreamer if force is true private void closeThreads(boolean force) throws IOException { @@ -2195,7 +2228,7 @@ public class DFSOutputStream extends FSOutputSummer } finally { streamer = null; s = null; - closed = true; + setClosed(); } } @@ -2205,7 +2238,7 @@ public class DFSOutputStream extends FSOutputSummer */ @Override public synchronized void close() throws IOException { - if (closed) { + if (isClosed()) { IOException e = lastException.getAndSet(null); if (e == null) return; @@ -2235,7 +2268,7 @@ public class DFSOutputStream extends FSOutputSummer dfsClient.endFileLease(fileId); } catch (ClosedChannelException e) { } finally { - closed = true; + setClosed(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteArrayManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteArrayManager.java index 4751e72800e..ea5e39d0ea5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteArrayManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteArrayManager.java @@ -200,12 +200,7 @@ public abstract class ByteArrayManager { debugMessage.get().append(", ").append(this); } - if (numAllocated == maxAllocated) { - if (LOG.isDebugEnabled()) { - debugMessage.get().append(", notifyAll"); - } - notifyAll(); - } + notify(); numAllocated--; if (numAllocated < 0) { // it is possible to drop below 0 since @@ -346,12 +341,13 @@ public abstract class ByteArrayManager { * the number of allocated arrays drops to below the capacity. * * The byte array allocated by this method must be returned for recycling - * via the {@link ByteArrayManager#recycle(byte[])} method. + * via the {@link Impl#release(byte[])} method. * * @return a byte array with length larger than or equal to the given length. */ @Override public byte[] newByteArray(final int arrayLength) throws InterruptedException { + Preconditions.checkArgument(arrayLength >= 0); if (LOG.isDebugEnabled()) { debugMessage.get().append("allocate(").append(arrayLength).append(")"); } @@ -375,6 +371,7 @@ public abstract class ByteArrayManager { } if (LOG.isDebugEnabled()) { + debugMessage.get().append(", return byte[").append(array.length).append("]"); logDebugMessage(); } return array; @@ -384,7 +381,9 @@ public abstract class ByteArrayManager { * Recycle the given byte array. * * The byte array may or may not be allocated - * by the {@link ByteArrayManager#allocate(int)} method. + * by the {@link Impl#newByteArray(int)} method. + * + * This is a non-blocking call. */ @Override public int release(final byte[] array) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java index 7b4d2bb9911..9ada95f91af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java @@ -340,11 +340,11 @@ public class TestHFlush { // If we made it past the hflush(), then that means that the ack made it back // from the pipeline before we got to the wait() call. In that case we should // still have interrupted status. - assertTrue(Thread.currentThread().interrupted()); + assertTrue(Thread.interrupted()); } catch (InterruptedIOException ie) { System.out.println("Got expected exception during flush"); } - assertFalse(Thread.currentThread().interrupted()); + assertFalse(Thread.interrupted()); // Try again to flush should succeed since we no longer have interrupt status stm.hflush(); @@ -362,11 +362,11 @@ public class TestHFlush { // If we made it past the close(), then that means that the ack made it back // from the pipeline before we got to the wait() call. In that case we should // still have interrupted status. - assertTrue(Thread.currentThread().interrupted()); + assertTrue(Thread.interrupted()); } catch (InterruptedIOException ioe) { System.out.println("Got expected exception during close"); // If we got the exception, we shouldn't have interrupted status anymore. - assertFalse(Thread.currentThread().interrupted()); + assertFalse(Thread.interrupted()); // Now do a successful close. stm.close(); @@ -374,7 +374,7 @@ public class TestHFlush { // verify that entire file is good - AppendTestUtil.checkFullFile(fs, p, fileLen, + AppendTestUtil.checkFullFile(fs, p, 4, fileContents, "Failed to deal with thread interruptions"); } finally { cluster.shutdown(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestByteArrayManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestByteArrayManager.java index 289617a6b92..77a68c6775a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestByteArrayManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestByteArrayManager.java @@ -27,6 +27,8 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; @@ -141,7 +143,7 @@ public class TestByteArrayManager { { // recycle half of the arrays for(int i = 0; i < countThreshold/2; i++) { - recycler.submit(removeLast(allocator.futures)); + recycler.submit(removeLast(allocator.futures).get()); } for(Future f : recycler.furtures) { @@ -186,8 +188,8 @@ public class TestByteArrayManager { } // recycle an array - recycler.submit(removeLast(allocator.futures)); - Assert.assertEquals(1, removeLast(recycler.furtures).intValue()); + recycler.submit(removeLast(allocator.futures).get()); + Assert.assertEquals(1, removeLast(recycler.furtures).get().intValue()); // check if the thread is unblocked Thread.sleep(100); @@ -207,11 +209,11 @@ public class TestByteArrayManager { } } - static T removeLast(List> furtures) throws Exception { + static Future removeLast(List> furtures) throws Exception { return remove(furtures, furtures.size() - 1); } - static T remove(List> furtures, int i) throws Exception { - return furtures.isEmpty()? null: furtures.remove(i).get(); + static Future remove(List> furtures, int i) throws Exception { + return furtures.isEmpty()? null: furtures.remove(i); } static void waitForAll(List> furtures) throws Exception { @@ -320,12 +322,13 @@ public class TestByteArrayManager { final Runner[] runners = new Runner[Runner.NUM_RUNNERS]; final Thread[] threads = new Thread[runners.length]; - final int num = 1 << 8; + final int num = 1 << 10; for(int i = 0; i < runners.length; i++) { runners[i] = new Runner(i, countThreshold, countLimit, pool, i, bam); threads[i] = runners[i].start(num); } + final List exceptions = new ArrayList(); final Thread randomRecycler = new Thread() { @Override public void run() { @@ -336,10 +339,11 @@ public class TestByteArrayManager { runners[j].recycle(); } catch (Exception e) { e.printStackTrace(); - Assert.fail(this + " has " + e); + exceptions.add(new Exception(this + " has an exception", e)); } if ((i & 0xFF) == 0) { + LOG.info("randomRecycler sleep, i=" + i); sleepMs(100); } } @@ -361,6 +365,7 @@ public class TestByteArrayManager { randomRecycler.start(); randomRecycler.join(); + Assert.assertTrue(exceptions.isEmpty()); Assert.assertNull(counters.get(0, false)); for(int i = 1; i < runners.length; i++) { @@ -392,7 +397,7 @@ public class TestByteArrayManager { } static class Runner implements Runnable { - static final int NUM_RUNNERS = 4; + static final int NUM_RUNNERS = 5; static int index2arrayLength(int index) { return ByteArrayManager.MIN_ARRAY_LENGTH << (index - 1); @@ -453,16 +458,22 @@ public class TestByteArrayManager { return f; } - byte[] removeFirst() throws Exception { + Future removeFirst() throws Exception { synchronized (arrays) { return remove(arrays, 0); } } void recycle() throws Exception { - final byte[] a = removeFirst(); - if (a != null) { - recycle(a); + final Future f = removeFirst(); + if (f != null) { + printf("randomRecycler: "); + try { + recycle(f.get(10, TimeUnit.MILLISECONDS)); + } catch(TimeoutException e) { + recycle(new byte[maxArrayLength]); + printf("timeout, new byte[%d]\n", maxArrayLength); + } } } @@ -490,9 +501,9 @@ public class TestByteArrayManager { submitAllocate(); } else { try { - final byte[] a = removeFirst(); - if (a != null) { - submitRecycle(a); + final Future f = removeFirst(); + if (f != null) { + submitRecycle(f.get()); } } catch (Exception e) { e.printStackTrace();