HADOOP-17156. ABFS: Release the byte buffers held by input streams in close() (#3285)

Contributed By: Mukund Thakur
This commit is contained in:
Mukund Thakur 2021-09-07 15:13:36 +05:30
parent 09e8e5c5cb
commit 3b1c594355
4 changed files with 250 additions and 12 deletions

View File

@ -555,6 +555,7 @@
<exclude>**/azurebfs/ITestAzureBlobFileSystemListStatus.java</exclude> <exclude>**/azurebfs/ITestAzureBlobFileSystemListStatus.java</exclude>
<exclude>**/azurebfs/extensions/ITestAbfsDelegationTokens.java</exclude> <exclude>**/azurebfs/extensions/ITestAbfsDelegationTokens.java</exclude>
<exclude>**/azurebfs/ITestSmallWriteOptimization.java</exclude> <exclude>**/azurebfs/ITestSmallWriteOptimization.java</exclude>
<exclude>**/azurebfs/services/ITestReadBufferManager.java</exclude>
</excludes> </excludes>
</configuration> </configuration>
@ -595,6 +596,7 @@
<include>**/azurebfs/ITestAzureBlobFileSystemListStatus.java</include> <include>**/azurebfs/ITestAzureBlobFileSystemListStatus.java</include>
<include>**/azurebfs/extensions/ITestAbfsDelegationTokens.java</include> <include>**/azurebfs/extensions/ITestAbfsDelegationTokens.java</include>
<include>**/azurebfs/ITestSmallWriteOptimization.java</include> <include>**/azurebfs/ITestSmallWriteOptimization.java</include>
<include>**/azurebfs/services/ITestReadBufferManager.java</include>
</includes> </includes>
</configuration> </configuration>
</execution> </execution>

View File

@ -668,9 +668,10 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
@Override @Override
public synchronized void close() throws IOException { public synchronized void close() throws IOException {
LOG.debug("Closing {}", this);
closed = true; closed = true;
buffer = null; // de-reference the buffer so it can be GC'ed sooner buffer = null; // de-reference the buffer so it can be GC'ed sooner
LOG.debug("Closing {}", this); ReadBufferManager.getBufferManager().purgeBuffersForStream(this);
} }
/** /**

View File

@ -24,7 +24,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.Stack; import java.util.Stack;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -452,18 +454,23 @@ final class ReadBufferManager {
buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead); buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead);
} }
synchronized (this) { synchronized (this) {
inProgressList.remove(buffer); // If this buffer has already been purged during
if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { // close of InputStream then we don't update the lists.
buffer.setStatus(ReadBufferStatus.AVAILABLE); if (inProgressList.contains(buffer)) {
buffer.setLength(bytesActuallyRead); inProgressList.remove(buffer);
} else { if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
freeList.push(buffer.getBufferindex()); buffer.setStatus(ReadBufferStatus.AVAILABLE);
// buffer will be deleted as per the eviction policy. buffer.setLength(bytesActuallyRead);
} else {
freeList.push(buffer.getBufferindex());
// buffer will be deleted as per the eviction policy.
}
// completed list also contains FAILED read buffers
// for sending exception message to clients.
buffer.setStatus(result);
buffer.setTimeStamp(currentTimeMillis());
completedReadList.add(buffer);
} }
buffer.setStatus(result);
buffer.setTimeStamp(currentTimeMillis());
completedReadList.add(buffer);
} }
//outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results
@ -498,11 +505,67 @@ final class ReadBufferManager {
return completedReadList.size(); return completedReadList.size();
} }
@VisibleForTesting
public synchronized List<ReadBuffer> getCompletedReadListCopy() {
return new ArrayList<>(completedReadList);
}
@VisibleForTesting
public synchronized List<Integer> getFreeListCopy() {
return new ArrayList<>(freeList);
}
@VisibleForTesting
public synchronized List<ReadBuffer> getReadAheadQueueCopy() {
return new ArrayList<>(readAheadQueue);
}
@VisibleForTesting
public synchronized List<ReadBuffer> getInProgressCopiedList() {
return new ArrayList<>(inProgressList);
}
@VisibleForTesting @VisibleForTesting
void callTryEvict() { void callTryEvict() {
tryEvict(); tryEvict();
} }
/**
* Purging the buffers associated with an {@link AbfsInputStream}
* from {@link ReadBufferManager} when stream is closed.
* @param stream input stream.
*/
public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream);
readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream);
purgeList(stream, completedReadList);
purgeList(stream, inProgressList);
}
/**
* Method to remove buffers associated with a {@link AbfsInputStream}
* when its close method is called.
* NOTE: This method is not threadsafe and must be called inside a
* synchronised block. See caller.
* @param stream associated input stream.
* @param list list of buffers like {@link this#completedReadList}
* or {@link this#inProgressList}.
*/
private void purgeList(AbfsInputStream stream, LinkedList<ReadBuffer> list) {
for (Iterator<ReadBuffer> it = list.iterator(); it.hasNext();) {
ReadBuffer readBuffer = it.next();
if (readBuffer.getStream() == stream) {
it.remove();
// As failed ReadBuffers (bufferIndex = -1) are already pushed to free
// list in doneReading method, we will skip adding those here again.
if (readBuffer.getBufferindex() != -1) {
freeList.push(readBuffer.getBufferindex());
}
}
}
}
/** /**
* Test method that can clean up the current state of readAhead buffers and * Test method that can clean up the current state of readAhead buffers and
* the lists. Will also trigger a fresh init. * the lists. Will also trigger a fresh init.

View File

@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.io.IOUtils;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READ_AHEAD_BLOCK_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
public class ITestReadBufferManager extends AbstractAbfsIntegrationTest {
public ITestReadBufferManager() throws Exception {
}
@Test
public void testPurgeBufferManagerForParallelStreams() throws Exception {
describe("Testing purging of buffers from ReadBufferManager for "
+ "parallel input streams");
final int numBuffers = 16;
final LinkedList<Integer> freeList = new LinkedList<>();
for (int i=0; i < numBuffers; i++) {
freeList.add(i);
}
ExecutorService executorService = Executors.newFixedThreadPool(4);
AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
try {
for (int i = 0; i < 4; i++) {
final String fileName = methodName.getMethodName() + i;
executorService.submit((Callable<Void>) () -> {
byte[] fileContent = getRandomBytesArray(ONE_MB);
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
try (FSDataInputStream iStream = fs.open(testFilePath)) {
iStream.read();
}
return null;
});
}
} finally {
executorService.shutdown();
}
ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
assertListEmpty("CompletedList", bufferManager.getCompletedReadListCopy());
assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList());
assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());
Assertions.assertThat(bufferManager.getFreeListCopy())
.describedAs("After closing all streams free list contents should match with " + freeList)
.hasSize(numBuffers)
.containsExactlyInAnyOrderElementsOf(freeList);
}
private void assertListEmpty(String listName, List<ReadBuffer> list) {
Assertions.assertThat(list)
.describedAs("After closing all streams %s should be empty", listName)
.hasSize(0);
}
@Test
public void testPurgeBufferManagerForSequentialStream() throws Exception {
describe("Testing purging of buffers in ReadBufferManager for "
+ "sequential input streams");
AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
final String fileName = methodName.getMethodName();
byte[] fileContent = getRandomBytesArray(ONE_MB);
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
AbfsInputStream iStream1 = null;
// stream1 will be closed right away.
try {
iStream1 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream();
// Just reading one byte will trigger all read ahead calls.
iStream1.read();
} finally {
IOUtils.closeStream(iStream1);
}
ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
AbfsInputStream iStream2 = null;
try {
iStream2 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream();
iStream2.read();
// After closing stream1, none of the buffers associated with stream1 should be present.
assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream1);
assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream1);
assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream1);
} finally {
// closing the stream later.
IOUtils.closeStream(iStream2);
}
// After closing stream2, none of the buffers associated with stream2 should be present.
assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream2);
assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream2);
assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream2);
// After closing both the streams, all lists should be empty.
assertListEmpty("CompletedList", bufferManager.getCompletedReadListCopy());
assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList());
assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());
}
private void assertListDoesnotContainBuffersForIstream(List<ReadBuffer> list,
AbfsInputStream inputStream) {
for (ReadBuffer buffer : list) {
Assertions.assertThat(buffer.getStream())
.describedAs("Buffers associated with closed input streams shouldn't be present")
.isNotEqualTo(inputStream);
}
}
private AzureBlobFileSystem getABFSWithReadAheadConfig() throws Exception {
Configuration conf = getRawConfiguration();
conf.setLong(FS_AZURE_READ_AHEAD_QUEUE_DEPTH, 8);
conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE);
conf.setInt(FS_AZURE_READ_AHEAD_BLOCK_SIZE, MIN_BUFFER_SIZE);
return (AzureBlobFileSystem) FileSystem.newInstance(conf);
}
protected byte[] getRandomBytesArray(int length) {
final byte[] b = new byte[length];
new Random().nextBytes(b);
return b;
}
protected Path createFileWithContent(FileSystem fs, String fileName,
byte[] fileContent) throws IOException {
Path testFilePath = path(fileName);
try (FSDataOutputStream oStream = fs.create(testFilePath)) {
oStream.write(fileContent);
oStream.flush();
}
return testFilePath;
}
}