HDFS-7358. Clients may get stuck waiting when using ByteArrayManager.
This commit is contained in:
parent
d0285d1ad6
commit
679684ab10
|
@ -166,6 +166,9 @@ Release 2.7.0 - UNRELEASED
|
|||
|
||||
HDFS-6938. Cleanup javac warnings in FSNamesystem (Charles Lamb via wheat9)
|
||||
|
||||
HDFS-7358. Clients may get stuck waiting when using ByteArrayManager.
|
||||
(szetszwo)
|
||||
|
||||
Release 2.6.0 - 2014-11-15
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -262,7 +262,9 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
maxChunks = chunksPerPkt;
|
||||
}
|
||||
|
||||
void writeData(byte[] inarray, int off, int len) {
|
||||
synchronized void writeData(byte[] inarray, int off, int len)
|
||||
throws ClosedChannelException {
|
||||
checkBuffer();
|
||||
if (dataPos + len > buf.length) {
|
||||
throw new BufferOverflowException();
|
||||
}
|
||||
|
@ -270,7 +272,9 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
dataPos += len;
|
||||
}
|
||||
|
||||
void writeChecksum(byte[] inarray, int off, int len) {
|
||||
synchronized void writeChecksum(byte[] inarray, int off, int len)
|
||||
throws ClosedChannelException {
|
||||
checkBuffer();
|
||||
if (len == 0) {
|
||||
return;
|
||||
}
|
||||
|
@ -284,7 +288,9 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
/**
|
||||
* Write the full packet, including the header, to the given output stream.
|
||||
*/
|
||||
void writeTo(DataOutputStream stm) throws IOException {
|
||||
synchronized void writeTo(DataOutputStream stm) throws IOException {
|
||||
checkBuffer();
|
||||
|
||||
final int dataLen = dataPos - dataStart;
|
||||
final int checksumLen = checksumPos - checksumStart;
|
||||
final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
|
||||
|
@ -326,7 +332,13 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
}
|
||||
}
|
||||
|
||||
private void releaseBuffer(ByteArrayManager bam) {
|
||||
private synchronized void checkBuffer() throws ClosedChannelException {
|
||||
if (buf == null) {
|
||||
throw new ClosedChannelException();
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void releaseBuffer(ByteArrayManager bam) {
|
||||
bam.release(buf);
|
||||
buf = null;
|
||||
}
|
||||
|
@ -712,7 +724,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
closeResponder(); // close and join
|
||||
closeStream();
|
||||
streamerClosed = true;
|
||||
closed = true;
|
||||
setClosed();
|
||||
synchronized (dataQueue) {
|
||||
dataQueue.notifyAll();
|
||||
}
|
||||
|
@ -1616,8 +1628,9 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
return sock;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void checkClosed() throws IOException {
|
||||
if (closed) {
|
||||
if (isClosed()) {
|
||||
IOException e = lastException.get();
|
||||
throw e != null ? e : new ClosedChannelException();
|
||||
}
|
||||
|
@ -1827,7 +1840,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
synchronized (dataQueue) {
|
||||
try {
|
||||
// If queue is full, then wait till we have enough space
|
||||
while (!closed && dataQueue.size() + ackQueue.size() > dfsClient.getConf().writeMaxPackets) {
|
||||
while (!isClosed() && dataQueue.size() + ackQueue.size() > dfsClient.getConf().writeMaxPackets) {
|
||||
try {
|
||||
dataQueue.wait();
|
||||
} catch (InterruptedException e) {
|
||||
|
@ -2019,8 +2032,9 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
// So send an empty sync packet.
|
||||
currentPacket = createPacket(packetSize, chunksPerPacket,
|
||||
bytesCurBlock, currentSeqno++);
|
||||
} else {
|
||||
} else if (currentPacket != null) {
|
||||
// just discard the current packet since it is already been sent.
|
||||
currentPacket.releaseBuffer(byteArrayManager);
|
||||
currentPacket = null;
|
||||
}
|
||||
}
|
||||
|
@ -2077,7 +2091,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
} catch (IOException e) {
|
||||
DFSClient.LOG.warn("Error while syncing", e);
|
||||
synchronized (this) {
|
||||
if (!closed) {
|
||||
if (!isClosed()) {
|
||||
lastException.set(new IOException("IOException flush:" + e));
|
||||
closeThreads(true);
|
||||
}
|
||||
|
@ -2139,7 +2153,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
long begin = Time.monotonicNow();
|
||||
try {
|
||||
synchronized (dataQueue) {
|
||||
while (!closed) {
|
||||
while (!isClosed()) {
|
||||
checkClosed();
|
||||
if (lastAckedSeqno >= seqno) {
|
||||
break;
|
||||
|
@ -2172,7 +2186,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
* resources associated with this stream.
|
||||
*/
|
||||
synchronized void abort() throws IOException {
|
||||
if (closed) {
|
||||
if (isClosed()) {
|
||||
return;
|
||||
}
|
||||
streamer.setLastException(new IOException("Lease timeout of "
|
||||
|
@ -2181,6 +2195,25 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
dfsClient.endFileLease(fileId);
|
||||
}
|
||||
|
||||
boolean isClosed() {
|
||||
return closed;
|
||||
}
|
||||
|
||||
void setClosed() {
|
||||
closed = true;
|
||||
synchronized (dataQueue) {
|
||||
releaseBuffer(dataQueue, byteArrayManager);
|
||||
releaseBuffer(ackQueue, byteArrayManager);
|
||||
}
|
||||
}
|
||||
|
||||
private static void releaseBuffer(List<Packet> packets, ByteArrayManager bam) {
|
||||
for(Packet p : packets) {
|
||||
p.releaseBuffer(bam);
|
||||
}
|
||||
packets.clear();
|
||||
}
|
||||
|
||||
// shutdown datastreamer and responseprocessor threads.
|
||||
// interrupt datastreamer if force is true
|
||||
private void closeThreads(boolean force) throws IOException {
|
||||
|
@ -2195,7 +2228,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
} finally {
|
||||
streamer = null;
|
||||
s = null;
|
||||
closed = true;
|
||||
setClosed();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2205,7 +2238,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
*/
|
||||
@Override
|
||||
public synchronized void close() throws IOException {
|
||||
if (closed) {
|
||||
if (isClosed()) {
|
||||
IOException e = lastException.getAndSet(null);
|
||||
if (e == null)
|
||||
return;
|
||||
|
@ -2235,7 +2268,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
dfsClient.endFileLease(fileId);
|
||||
} catch (ClosedChannelException e) {
|
||||
} finally {
|
||||
closed = true;
|
||||
setClosed();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -200,12 +200,7 @@ public abstract class ByteArrayManager {
|
|||
debugMessage.get().append(", ").append(this);
|
||||
}
|
||||
|
||||
if (numAllocated == maxAllocated) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
debugMessage.get().append(", notifyAll");
|
||||
}
|
||||
notifyAll();
|
||||
}
|
||||
notify();
|
||||
numAllocated--;
|
||||
if (numAllocated < 0) {
|
||||
// it is possible to drop below 0 since
|
||||
|
@ -346,12 +341,13 @@ public abstract class ByteArrayManager {
|
|||
* the number of allocated arrays drops to below the capacity.
|
||||
*
|
||||
* The byte array allocated by this method must be returned for recycling
|
||||
* via the {@link ByteArrayManager#recycle(byte[])} method.
|
||||
* via the {@link Impl#release(byte[])} method.
|
||||
*
|
||||
* @return a byte array with length larger than or equal to the given length.
|
||||
*/
|
||||
@Override
|
||||
public byte[] newByteArray(final int arrayLength) throws InterruptedException {
|
||||
Preconditions.checkArgument(arrayLength >= 0);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
debugMessage.get().append("allocate(").append(arrayLength).append(")");
|
||||
}
|
||||
|
@ -375,6 +371,7 @@ public abstract class ByteArrayManager {
|
|||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
debugMessage.get().append(", return byte[").append(array.length).append("]");
|
||||
logDebugMessage();
|
||||
}
|
||||
return array;
|
||||
|
@ -384,7 +381,9 @@ public abstract class ByteArrayManager {
|
|||
* Recycle the given byte array.
|
||||
*
|
||||
* The byte array may or may not be allocated
|
||||
* by the {@link ByteArrayManager#allocate(int)} method.
|
||||
* by the {@link Impl#newByteArray(int)} method.
|
||||
*
|
||||
* This is a non-blocking call.
|
||||
*/
|
||||
@Override
|
||||
public int release(final byte[] array) {
|
||||
|
|
|
@ -340,11 +340,11 @@ public class TestHFlush {
|
|||
// If we made it past the hflush(), then that means that the ack made it back
|
||||
// from the pipeline before we got to the wait() call. In that case we should
|
||||
// still have interrupted status.
|
||||
assertTrue(Thread.currentThread().interrupted());
|
||||
assertTrue(Thread.interrupted());
|
||||
} catch (InterruptedIOException ie) {
|
||||
System.out.println("Got expected exception during flush");
|
||||
}
|
||||
assertFalse(Thread.currentThread().interrupted());
|
||||
assertFalse(Thread.interrupted());
|
||||
|
||||
// Try again to flush should succeed since we no longer have interrupt status
|
||||
stm.hflush();
|
||||
|
@ -362,11 +362,11 @@ public class TestHFlush {
|
|||
// If we made it past the close(), then that means that the ack made it back
|
||||
// from the pipeline before we got to the wait() call. In that case we should
|
||||
// still have interrupted status.
|
||||
assertTrue(Thread.currentThread().interrupted());
|
||||
assertTrue(Thread.interrupted());
|
||||
} catch (InterruptedIOException ioe) {
|
||||
System.out.println("Got expected exception during close");
|
||||
// If we got the exception, we shouldn't have interrupted status anymore.
|
||||
assertFalse(Thread.currentThread().interrupted());
|
||||
assertFalse(Thread.interrupted());
|
||||
|
||||
// Now do a successful close.
|
||||
stm.close();
|
||||
|
@ -374,7 +374,7 @@ public class TestHFlush {
|
|||
|
||||
|
||||
// verify that entire file is good
|
||||
AppendTestUtil.checkFullFile(fs, p, fileLen,
|
||||
AppendTestUtil.checkFullFile(fs, p, 4,
|
||||
fileContents, "Failed to deal with thread interruptions");
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
|
|
|
@ -27,6 +27,8 @@ import java.util.concurrent.Callable;
|
|||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -141,7 +143,7 @@ public class TestByteArrayManager {
|
|||
|
||||
{ // recycle half of the arrays
|
||||
for(int i = 0; i < countThreshold/2; i++) {
|
||||
recycler.submit(removeLast(allocator.futures));
|
||||
recycler.submit(removeLast(allocator.futures).get());
|
||||
}
|
||||
|
||||
for(Future<Integer> f : recycler.furtures) {
|
||||
|
@ -186,8 +188,8 @@ public class TestByteArrayManager {
|
|||
}
|
||||
|
||||
// recycle an array
|
||||
recycler.submit(removeLast(allocator.futures));
|
||||
Assert.assertEquals(1, removeLast(recycler.furtures).intValue());
|
||||
recycler.submit(removeLast(allocator.futures).get());
|
||||
Assert.assertEquals(1, removeLast(recycler.furtures).get().intValue());
|
||||
|
||||
// check if the thread is unblocked
|
||||
Thread.sleep(100);
|
||||
|
@ -207,11 +209,11 @@ public class TestByteArrayManager {
|
|||
}
|
||||
}
|
||||
|
||||
static <T> T removeLast(List<Future<T>> furtures) throws Exception {
|
||||
static <T> Future<T> removeLast(List<Future<T>> furtures) throws Exception {
|
||||
return remove(furtures, furtures.size() - 1);
|
||||
}
|
||||
static <T> T remove(List<Future<T>> furtures, int i) throws Exception {
|
||||
return furtures.isEmpty()? null: furtures.remove(i).get();
|
||||
static <T> Future<T> remove(List<Future<T>> furtures, int i) throws Exception {
|
||||
return furtures.isEmpty()? null: furtures.remove(i);
|
||||
}
|
||||
|
||||
static <T> void waitForAll(List<Future<T>> furtures) throws Exception {
|
||||
|
@ -320,12 +322,13 @@ public class TestByteArrayManager {
|
|||
final Runner[] runners = new Runner[Runner.NUM_RUNNERS];
|
||||
final Thread[] threads = new Thread[runners.length];
|
||||
|
||||
final int num = 1 << 8;
|
||||
final int num = 1 << 10;
|
||||
for(int i = 0; i < runners.length; i++) {
|
||||
runners[i] = new Runner(i, countThreshold, countLimit, pool, i, bam);
|
||||
threads[i] = runners[i].start(num);
|
||||
}
|
||||
|
||||
final List<Exception> exceptions = new ArrayList<Exception>();
|
||||
final Thread randomRecycler = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
@ -336,10 +339,11 @@ public class TestByteArrayManager {
|
|||
runners[j].recycle();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
Assert.fail(this + " has " + e);
|
||||
exceptions.add(new Exception(this + " has an exception", e));
|
||||
}
|
||||
|
||||
if ((i & 0xFF) == 0) {
|
||||
LOG.info("randomRecycler sleep, i=" + i);
|
||||
sleepMs(100);
|
||||
}
|
||||
}
|
||||
|
@ -361,6 +365,7 @@ public class TestByteArrayManager {
|
|||
randomRecycler.start();
|
||||
|
||||
randomRecycler.join();
|
||||
Assert.assertTrue(exceptions.isEmpty());
|
||||
|
||||
Assert.assertNull(counters.get(0, false));
|
||||
for(int i = 1; i < runners.length; i++) {
|
||||
|
@ -392,7 +397,7 @@ public class TestByteArrayManager {
|
|||
}
|
||||
|
||||
static class Runner implements Runnable {
|
||||
static final int NUM_RUNNERS = 4;
|
||||
static final int NUM_RUNNERS = 5;
|
||||
|
||||
static int index2arrayLength(int index) {
|
||||
return ByteArrayManager.MIN_ARRAY_LENGTH << (index - 1);
|
||||
|
@ -453,16 +458,22 @@ public class TestByteArrayManager {
|
|||
return f;
|
||||
}
|
||||
|
||||
byte[] removeFirst() throws Exception {
|
||||
Future<byte[]> removeFirst() throws Exception {
|
||||
synchronized (arrays) {
|
||||
return remove(arrays, 0);
|
||||
}
|
||||
}
|
||||
|
||||
void recycle() throws Exception {
|
||||
final byte[] a = removeFirst();
|
||||
if (a != null) {
|
||||
recycle(a);
|
||||
final Future<byte[]> f = removeFirst();
|
||||
if (f != null) {
|
||||
printf("randomRecycler: ");
|
||||
try {
|
||||
recycle(f.get(10, TimeUnit.MILLISECONDS));
|
||||
} catch(TimeoutException e) {
|
||||
recycle(new byte[maxArrayLength]);
|
||||
printf("timeout, new byte[%d]\n", maxArrayLength);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -490,9 +501,9 @@ public class TestByteArrayManager {
|
|||
submitAllocate();
|
||||
} else {
|
||||
try {
|
||||
final byte[] a = removeFirst();
|
||||
if (a != null) {
|
||||
submitRecycle(a);
|
||||
final Future<byte[]> f = removeFirst();
|
||||
if (f != null) {
|
||||
submitRecycle(f.get());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
|
|
Loading…
Reference in New Issue